Async-Queue
view release on metacpan or search on metacpan
t/10-sync.t view on Meta::CPAN
my $another_worker = sub { print "hoge" };
my %handlers = map { $_ => sub { print $_ } } qw(saturated empty drain);
is($q->worker($another_worker), $another_worker, "set another_worker");
is($q->$_($handlers{$_}), $handlers{$_}, "set $_ hander") foreach keys %handlers;
is($q->worker, $another_worker, "get another_worker");
is($q->$_(), $handlers{$_}, "get $_ handler") foreach keys %handlers;
ok(!defined($q->$_(undef)), "set $_ handler to undef") foreach keys %handlers;
}
{
note('--- event callbacks receive the object as the first argument');
my @results = ();
my $q; $q = new_ok('Async::Queue', [
concurrency => 1, worker => sub {
my ($task, $cb) = @_;
push(@results, $task);
$cb->(uc($task));
},
map { my $event = $_; $event => sub {
my ($aq) = @_;
is($aq, $q, "\"$event\" event handler receives the object.");
t/15-syncwrap.t view on Meta::CPAN
});
$q->empty(sub {
$q->check();
push(@results, "E");
});
$q->drain(sub {
$q->check();
push(@results, "D");
});
note('--- event callbacks: local specs');
@results = ();
$q->clearCounter();
$q->push('a', sub { $q->finish });
is_deeply(\@results, [qw(S E a D)], '"saturated" before "empty".');
@results = ();
$q->push('b', sub { $q->finish });
is_deeply(\@results, [qw(S E b D)], 'every task goes into the queue first. "empty" fires even if the task is served immediately.');
$q->check(0, 0, 2, 2);
note('--- event callbacks (concurrency 1)');
@results = ();
$q->clearCounter();
$q->push($_, sub { $q->check(); push(@results, @_); $q->finish }) foreach qw(a b c);
is_deeply(\@results, [qw(S E a A D S E b B D S E c C D)], "results OK.") or
showResults(@results);
$q->check(0, 0, 3, 3);
note('--- event callbacks (concurrency 3)');
@results = ();
$q->clearCounter();
$q->concurrency(3);
$q->push($_, sub { $q->check(); push(@results, @_); $q->finish }) foreach qw(w x y z);
is_deeply(\@results, [qw(E w W D E x X D E y Y D E z Z D)], "results OK") or
showResults(@results);
$q->check(0, 0, 4, 4);
note('--- push inside finish callbacks (concurrency 1)');
@results = ();
$q->clearCounter();
$q->concurrency(1);
$q->push('a', sub {
$q->check(0, 1, 0, 1);
$q->push('b', sub {
$q->check(1, 1, 1, 3);
$q->push('d', sub {
$q->check(0, 1, 3, 4);
$q->finish;
t/15-syncwrap.t view on Meta::CPAN
$q->push('c', sub {
$q->check(1, 1, 2, 4);
$q->finish;
});
$q->finish;
});
is_deeply(\@results, [qw(S E a b c E d D)], '"saturated" fires only when "running" increases to the max.') or
showResults(@results);
$q->check(0, 0, 4, 4);
note('--- push inside finish callbacks (concurrency 3)');
@results = ();
$q->clearCounter();
$q->concurrency(3);
$q->push('a', sub {
$q->check(0, 1, 0, 1);
$q->push('b', sub {
$q->check(0, 2, 0, 2);
$q->push('c', sub {
$q->check(0, 3, 0, 3);
$q->push('d', sub {
t/17-syncerrors.t view on Meta::CPAN
push(@results, $task);
$cb->();
}
);
throws_ok { $q->push() } qr/you must specify something to push/i, "push() without argument is NOT allowed.";
lives_ok { $q->push(undef) } 'pushing undef is allowed.';
is(int(@results), 1, "got result...");
ok(!defined($results[0]), "... and it's undef.");
@results = ();
note('--- invalid finish callbacks');
lives_ok { $q->push(undef, undef) } 'undef as a finish callback is just ignored.';
my $somescalar = 11;
foreach my $junk (10, "hoge", [], {}, \$somescalar) {
throws_ok { $q->push(undef, $junk) } qr/must be a coderef/i, 'finish callback must be a coderef';
}
}
{
note('--- worker and event handlers must not throw exceptions');
my $q; $q = Async::Queue->new(
( run in 0.455 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )