Async-Queue
view release on metacpan or search on metacpan
t/15-syncwrap.t view on Meta::CPAN
use strict;
use warnings;
use FindBin;
use lib ("$FindBin::RealBin/lib");
use Test::More;
BEGIN {
use_ok('Test::AQWrapper');
}
sub showResults {
diag("results: ". join(" ", @_));
}
{
my @results = ();
my $q; $q = new_ok('Test::AQWrapper', [concurrency => 1, worker => sub {
my ($task, $cb) = @_;
$q->check();
push(@results, $task);
$cb->(uc($task));
}]);
$q->saturated(sub {
$q->check();
push(@results, "S");
});
$q->empty(sub {
$q->check();
push(@results, "E");
});
$q->drain(sub {
$q->check();
push(@results, "D");
});
note('--- event callbacks: local specs');
@results = ();
$q->clearCounter();
$q->push('a', sub { $q->finish });
is_deeply(\@results, [qw(S E a D)], '"saturated" before "empty".');
@results = ();
$q->push('b', sub { $q->finish });
is_deeply(\@results, [qw(S E b D)], 'every task goes into the queue first. "empty" fires even if the task is served immediately.');
$q->check(0, 0, 2, 2);
note('--- event callbacks (concurrency 1)');
@results = ();
$q->clearCounter();
$q->push($_, sub { $q->check(); push(@results, @_); $q->finish }) foreach qw(a b c);
is_deeply(\@results, [qw(S E a A D S E b B D S E c C D)], "results OK.") or
showResults(@results);
$q->check(0, 0, 3, 3);
note('--- event callbacks (concurrency 3)');
@results = ();
$q->clearCounter();
$q->concurrency(3);
$q->push($_, sub { $q->check(); push(@results, @_); $q->finish }) foreach qw(w x y z);
is_deeply(\@results, [qw(E w W D E x X D E y Y D E z Z D)], "results OK") or
showResults(@results);
$q->check(0, 0, 4, 4);
note('--- push inside finish callbacks (concurrency 1)');
@results = ();
$q->clearCounter();
$q->concurrency(1);
$q->push('a', sub {
$q->check(0, 1, 0, 1);
$q->push('b', sub {
$q->check(1, 1, 1, 3);
$q->push('d', sub {
$q->check(0, 1, 3, 4);
$q->finish;
});
$q->finish;
});
$q->push('c', sub {
$q->check(1, 1, 2, 4);
$q->finish;
});
$q->finish;
});
is_deeply(\@results, [qw(S E a b c E d D)], '"saturated" fires only when "running" increases to the max.') or
showResults(@results);
$q->check(0, 0, 4, 4);
note('--- push inside finish callbacks (concurrency 3)');
@results = ();
$q->clearCounter();
$q->concurrency(3);
$q->push('a', sub {
$q->check(0, 1, 0, 1);
$q->push('b', sub {
$q->check(0, 2, 0, 2);
$q->push('c', sub {
$q->check(0, 3, 0, 3);
$q->push('d', sub {
$q->check(2, 3, 1, 6);
$q->finish;
});
$q->push('e', sub {
$q->check(1, 3, 2, 6);
$q->finish;
});
$q->push("f", sub {
$q->check(0, 3, 3, 6);
$q->finish;
});
$q->finish;
});
$q->push('g', sub {
$q->check(0, 3, 4, 7);
$q->push('h', sub {
$q->check(0, 3, 5, 8);
$q->push('i', sub {
$q->check(0, 3, 6, 9);
$q->finish;
});
$q->finish;
});
$q->finish;
});
$q->finish;
});
$q->finish;
});
is_deeply(\@results, [qw(E a E b S E c d e E f S E g E h E i D)]) or
showResults(@results);
$q->check(0, 0, 9, 9);
note('--- "empty" event keeps firing until its "saturated"');
$q->concurrency(3);
$q->clearCounter;
@results = ();
$q->push("a", sub {
$q->push("b", sub {
$q->push("c", sub {
$q->push($_, sub { $q->finish }) foreach qw(d e f g);
$q->finish;
});
$q->finish;
});
$q->finish;
});
is_deeply(\@results, [qw(E a E b S E c d e f E g D)],
'results OK. "empty" event keeps firing until "saturated"') or
showResults(@results);
( run in 0.661 second using v1.01-cache-2.11-cpan-39bf76dae61 )