GearmanX-Worker
view release on metacpan or search on metacpan
t/01-server.t view on Meta::CPAN
is ($work_done, '3+4', 'async echo roundtrip: list -> scalar');
__END__
warn Dumper $c;
#my $result_ref = $c->do_task('xxxx', \ "1+2");
#warn Dumper $result_ref;
__END__
my $result_ref = $c->do_task('xxxx', \ "1+2");
#use Storable qw(thaw);
#my ($result) = thaw $$result_ref;
#print "1 + 2 = $result\n". Dumper $result;
print "1 + 2 = $$result_ref\n";
##use Storable qw(freeze);
#my $ff = freeze [ 3, 4, 5];
#my $result_ref = $c->do_task('xxxx', \ $ff);
#my ($result) = thaw $$result_ref;
#print "1 + 2 = $result\n". Dumper $result;
__END__
use threads;
use threads::shared;
my @workers = map { threads->new(\&MyWorker::new) } (1..1);
map { $_->detach } @workers;
sleep 4;
__END__
use constant WORKER_TIME => 5;
use_ok ('REST::Depend::Regex::Gearman');
my @urls : shared = (); # workers report back here
my @labels : shared = (); # workers report task labels here
my $work_cnt : shared = 0;
my $work_drop : shared = 0;
my $alives : shared = 0;
sub run_experiment {
my $deps = shift;
my $d = REST::Depend::Regex::Gearman->new ($deps);
my %o = @_;
$o{worker} ||= 'perfect';
@urls = ();
@labels = ();
$work_cnt = 0;
$work_drop = 0;
warn "== MAIN: $o{nr_workers} =====================================================================================";
my @workers = map { threads->new(\&Worker::run, "W$_", $o{worker}) } (1.. $o{nr_workers});
map { $_->detach } @workers;
$alives = scalar @workers;
$d->evolve ($o{kick});
my $start_time = time;
my $end_time = $start_time + $o{max_time}; # we plan to work at most some secs on that
my @gearmandized;
LOOP: {
do {
RESPONSE:
while (my $url = shift @urls) {
# warn "MAIN: popped $url";
my $label = shift @labels; # there MUST be one
warn "MAIN: task label $label found to be finished";
if (my ($t) = $d->things ($label)) {
if ($t->{gearmandized} && $t->{gearmandized}->[0]) { # we asked for it
$t->{gearmandized} = [ undef, time, undef ];
} else {
warn "MAIN: got back UNSOLICITED, ignored";
next RESPONSE;
}
} else {
# warn "MAIN: STRANGE got $label in queue which is not known in Petri";
next RESPONSE;
}
if (my ($place) = $d->things ($url)) {
next RESPONSE unless $place; # it can happen that we get reported a very old worker result
# warn "MAIN: already got a place at $url: ".Dumper $place;
$place->touch;
} else {
warn "MAIN: STRANGE got $url in queue which is not known in Petri";
next RESPONSE;
}
$d->evolve ($url);
}
warn "MAIN: sleeping a bit with alive $alives";
sleep 1;
# $d->evolve ('xxx') if time > $end_time - $loop_time + 2; # 2 secs after start we pretend something else happened
@gearmandized = grep { $_->{gearmandized}->[0] } map { $d->things ($_) } $d->transitions;
warn "still gearmandized: " . scalar @gearmandized;
# warn "MAIN: now is ".time;
my @lates = grep { $_->{gearmandized}->[1] + $_->{gearmandized}->[2] < time } @gearmandized;
warn "MAIN: lates are now ".Dumper [ map { $_->{label} } @lates ];
$d->ignite (map { $_->{label} }
map { $_->{gearmandized}->[0] = undef; $_ } # make sure we take it away from the worker
@lates); # fire them again
# last LOOP if time > $end_time; # we end this game at worst-case time (sequential)
} until scalar @gearmandized == 0 and scalar @urls == 0;
}
# warn "MAIN: leftovers: ".Dumper \@urls;
is (scalar @urls, 0, "nr $o{nr_workers} emptied all reported URLs (in ".(time - $start_time)." secs)");
ok ($work_cnt >= $o{must_work}, "nr $o{nr_workers} worker executed at least as many times (works: $work_cnt, droppings: $work_drop)");
is (scalar $d->ignitables, 0, "nr $o{nr_workers} no more ignitions");
warn "MAIN: killing all running workers";
map { $_->kill('KILL') }
grep { $_->is_running() } @workers;
}
my $deps = {
q{aaa} => {
q{bbb} => [ 'aaa2bbb' => { param1 => 'ppp' } ],
q{ccc} => [ 'aaa2ccc' => { param1 => 'qqq' } ],
q{ddd} => [ 'aaa2ccc' => { param1 => 'qqq' } ],
q{eee} => [ 'aaa2ccc' => { param1 => 'qqq' } ]
},
q{eee} => {
q{fff} => [ 'aaa2bbb' => { param1 => 'ppp' } ],
},
};
#run_experiment ($deps, kick => 'eee', must_work => 1, nr_workers => 1, max_time => 1 * WORKER_TIME * 2);
foreach my $kind (
'suicidal',
'perfect',
'slow',
'sloppy'
) {
foreach my $nr (reverse (1..6)) { # varying the number of workers
foreach (1..10) { # just stress-testing
run_experiment ($deps, kick => 'aaa', must_work => 5, worker => $kind, nr_workers => $nr, max_time => 10 * WORKER_TIME );
}
sleep WORKER_TIME * 3; # just wait everything out
}
}
__END__
my $deps = {
q{/(?<map>.+)/} => {
( run in 1.400 second using v1.01-cache-2.11-cpan-5a3173703d6 )