GearmanX-Worker

 view release on metacpan or  search on metacpan

t/01-server.t  view on Meta::CPAN


is ($work_done, '3+4', 'async echo roundtrip: list -> scalar');

__END__



warn Dumper $c;


#my $result_ref = $c->do_task('xxxx', \ "1+2");
#warn Dumper $result_ref;

__END__







my $result_ref = $c->do_task('xxxx', \ "1+2");
#use Storable qw(thaw);
#my ($result) = thaw $$result_ref;
#print "1 + 2 = $result\n". Dumper $result;
print "1 + 2 = $$result_ref\n";

##use Storable qw(freeze);
#my $ff = freeze [ 3, 4, 5];
#my $result_ref = $c->do_task('xxxx', \ $ff);
#my ($result) = thaw $$result_ref;
#print "1 + 2 = $result\n". Dumper $result;


__END__

use threads;
use threads::shared;

my @workers = map { threads->new(\&MyWorker::new) } (1..1);
map { $_->detach } @workers;



sleep 4;


__END__



use constant WORKER_TIME => 5;

use_ok ('REST::Depend::Regex::Gearman');


my @urls      : shared = ();         # workers report back here
my @labels    : shared = ();        # workers report task labels here
my $work_cnt  : shared = 0;
my $work_drop : shared = 0;
my $alives    : shared = 0;



sub run_experiment {
    my $deps = shift;
    my $d = REST::Depend::Regex::Gearman->new ($deps);
    my %o = @_;
    $o{worker} ||= 'perfect';

    @urls = ();
    @labels = ();
    $work_cnt = 0;
    $work_drop = 0;

    warn "== MAIN: $o{nr_workers} =====================================================================================";
    my @workers = map { threads->new(\&Worker::run, "W$_", $o{worker}) } (1.. $o{nr_workers});
    map { $_->detach } @workers;
    $alives = scalar @workers;

    $d->evolve ($o{kick});

    my $start_time = time;
    my $end_time   = $start_time + $o{max_time}; # we plan to work at most some secs on that
    my @gearmandized;

  LOOP: {
    do {
      RESPONSE:
	while (my $url = shift @urls) {
#	    warn "MAIN: popped $url";

	    my $label = shift @labels; # there MUST be one
	    warn "MAIN: task label $label found to be finished";
	    if (my ($t) = $d->things ($label)) {
		if ($t->{gearmandized} && $t->{gearmandized}->[0]) {               # we asked for it
		    $t->{gearmandized} = [ undef, time, undef ];
		} else {
		    warn "MAIN: got back UNSOLICITED, ignored";
		    next RESPONSE;
		}
	    } else {
#		warn "MAIN: STRANGE got $label in queue which is not known in Petri";
		next RESPONSE;
	    }

	    if (my ($place) = $d->things ($url)) {
		next RESPONSE unless $place;                               # it can happen that we get reported a very old worker result
#		warn "MAIN: already got a place at $url: ".Dumper $place;
		$place->touch;
	    } else {
		warn "MAIN: STRANGE got $url in queue which is not known in Petri";
		next RESPONSE;
	    }
	    $d->evolve ($url);
	}
	warn "MAIN: sleeping a bit with alive $alives";
	sleep 1;

#	$d->evolve ('xxx') if time > $end_time - $loop_time + 2; # 2 secs after start we pretend something else happened

	@gearmandized = grep { $_->{gearmandized}->[0] } map { $d->things ($_) } $d->transitions;
	warn "still gearmandized: " . scalar @gearmandized;

#	warn "MAIN: now is ".time;
	my @lates = grep { $_->{gearmandized}->[1] + $_->{gearmandized}->[2] < time }   @gearmandized;
	warn "MAIN: lates are now ".Dumper [ map { $_->{label} } @lates ];
	$d->ignite (map { $_->{label} }
		    map { $_->{gearmandized}->[0] = undef; $_ } # make sure we take it away from the worker
		    @lates);      # fire them again

#	last LOOP if time > $end_time;                                            # we end this game at worst-case time (sequential)
    } until scalar @gearmandized == 0 and scalar @urls == 0;
    }
#    warn "MAIN: leftovers: ".Dumper \@urls;
    is (scalar @urls, 0,            "nr $o{nr_workers} emptied all reported URLs (in ".(time - $start_time)." secs)");
    ok ($work_cnt >= $o{must_work}, "nr $o{nr_workers} worker executed at least as many times (works: $work_cnt, droppings: $work_drop)");
    is (scalar $d->ignitables, 0,   "nr $o{nr_workers} no more ignitions");

    warn "MAIN: killing all running workers";
    map  { $_->kill('KILL') } 
         grep { $_->is_running() } @workers;

}

my $deps = {
    q{aaa} => {
	q{bbb} => [ 'aaa2bbb' => { param1 => 'ppp' } ],
	q{ccc} => [ 'aaa2ccc' => { param1 => 'qqq' } ],
	q{ddd} => [ 'aaa2ccc' => { param1 => 'qqq' } ],
	q{eee} => [ 'aaa2ccc' => { param1 => 'qqq' } ]
    },
    q{eee} => {
	q{fff} => [ 'aaa2bbb' => { param1 => 'ppp' } ],
    },
};

#run_experiment ($deps, kick => 'eee', must_work => 1, nr_workers => 1, max_time => 1 * WORKER_TIME * 2);

foreach my $kind (
                  'suicidal',
                  'perfect',
                  'slow',
                  'sloppy'
                  ) {
    foreach my $nr (reverse (1..6)) { # varying the number of workers
	foreach (1..10) { # just stress-testing
	    run_experiment ($deps, kick => 'aaa', must_work => 5, worker => $kind, nr_workers => $nr, max_time => 10 * WORKER_TIME  );
	}
	sleep WORKER_TIME * 3; # just wait everything out
    }
}

__END__

my $deps = {
    q{/(?<map>.+)/} => {



( run in 1.400 second using v1.01-cache-2.11-cpan-5a3173703d6 )