Async-Queue
view release on metacpan or search on metacpan
t/10-sync.t view on Meta::CPAN
note('--- worker arguments');
my $got_task = '';
my $q; $q = new_ok('Async::Queue', [ concurrency => 1, worker => sub {
my ($task, $cb, $queue) = @_;
$got_task = $task;
is(ref($cb), "CODE", 'cb is a coderef');
is($queue, $q, 'queue is the object q');
} ]);
$q->push('a');
is($got_task, 'a', 'worker executed');
}
{
note('--- basic push() behavior');
my @results = ();
my $q = new_ok('Async::Queue', [ concurrency => 1, worker => sub {
my ($task, $cb) = @_;
push(@results, $task);
$cb->(lc($task), uc($task));
}]);
checkQueue $q, 0, 0, 1;
is($q->push("a"), $q, 'push() method returns the object.');
checkQueue $q, 0, 0, 1;
is_deeply(\@results, ["a"], "results ok");
@results = ();
foreach my $letter (qw(b c d)) {
$q->push($letter);
checkQueue $q, 0, 0, 1;
}
is_deeply(\@results, [qw(b c d)], "results OK");
note('--- callback to push()');
@results = ();
is($q->push("E", sub {
my @args = @_;
push(@results, @args);
checkQueue $q, 0, 1, 1;
}), $q, 'push() method returns the object.');
is_deeply(\@results, [qw(E e E)], "results OK. push() callback is called with arguments");
}
{
note('--- accessors');
my $worker = sub { };
my $q = new_ok('Async::Queue', [concurrency => 12, worker => $worker]);
checkQueue $q, 0, 0, 12;
is($q->concurrency(5), 5, "set concurrency to 5");
checkQueue $q, 0, 0, 5;
is($q->worker, $worker, "worker() returns coderef");
ok(!defined($q->$_), "$_() returns undef now.") foreach qw(saturated empty drain);
my $another_worker = sub { print "hoge" };
my %handlers = map { $_ => sub { print $_ } } qw(saturated empty drain);
is($q->worker($another_worker), $another_worker, "set another_worker");
is($q->$_($handlers{$_}), $handlers{$_}, "set $_ hander") foreach keys %handlers;
is($q->worker, $another_worker, "get another_worker");
is($q->$_(), $handlers{$_}, "get $_ handler") foreach keys %handlers;
ok(!defined($q->$_(undef)), "set $_ handler to undef") foreach keys %handlers;
}
{
note('--- event callbacks receive the object as the first argument');
my @results = ();
my $q; $q = new_ok('Async::Queue', [
concurrency => 1, worker => sub {
my ($task, $cb) = @_;
push(@results, $task);
$cb->(uc($task));
},
map { my $event = $_; $event => sub {
my ($aq) = @_;
is($aq, $q, "\"$event\" event handler receives the object.");
push(@results, $event);
} } qw(saturated empty drain)
]);
$q->push("task", sub {
my ($ret) = @_;
push(@results, $ret, "finish");
});
is_deeply(\@results, [qw(saturated empty task TASK finish drain)], "results OK. all events are fired.");
}
{
note('--- default concurrency');
my $q = new_ok('Async::Queue', [worker => sub {}]);
is($q->concurrency, 1, "concurrency is 1 by default.");
$q = new_ok('Async::Queue', [ worker => sub {}, concurrency => undef ]);
is($q->concurrency, 1, "concurrency is 1 by default.");
$q->concurrency(3);
is($q->concurrency, 3, "concurrency changed to 3.");
is($q->concurrency(undef), 1, "concurrency changed to the default, which is 1.");
is($q->concurrency, 1, "concurrency is 1.");
}
done_testing();
( run in 2.513 seconds using v1.01-cache-2.11-cpan-cdf2f3d4e48 )