Async-Queue

 view release on metacpan or  search on metacpan

t/10-sync.t  view on Meta::CPAN

    my $another_worker = sub { print "hoge" };
    my %handlers = map { $_ => sub { print $_ } } qw(saturated empty drain);
    is($q->worker($another_worker), $another_worker, "set another_worker");
    is($q->$_($handlers{$_}), $handlers{$_}, "set $_ hander") foreach keys %handlers;
    is($q->worker, $another_worker, "get another_worker");
    is($q->$_(), $handlers{$_}, "get $_ handler") foreach keys %handlers;
    ok(!defined($q->$_(undef)), "set $_ handler to undef") foreach keys %handlers;
}

{
    note('--- event callbacks receive the object as the first argument');
    my @results = ();
    my $q; $q = new_ok('Async::Queue', [
        concurrency => 1, worker => sub {
            my ($task, $cb) = @_;
            push(@results, $task);
            $cb->(uc($task));
        },
        map { my $event = $_; $event => sub {
            my ($aq) = @_;
            is($aq, $q, "\"$event\" event handler receives the object.");

t/15-syncwrap.t  view on Meta::CPAN

    });
    $q->empty(sub {
        $q->check();
        push(@results, "E");
    });
    $q->drain(sub {
        $q->check();
        push(@results, "D");
    });

    note('--- event callbacks: local specs');
    @results = ();
    $q->clearCounter();
    $q->push('a', sub { $q->finish });
    is_deeply(\@results, [qw(S E a D)], '"saturated" before "empty".');
    @results = ();
    $q->push('b', sub { $q->finish });
    is_deeply(\@results, [qw(S E b D)], 'every task goes into the queue first. "empty" fires even if the task is served immediately.');
    $q->check(0, 0, 2, 2);


    note('--- event callbacks (concurrency 1)');
    @results = ();
    $q->clearCounter();
    $q->push($_, sub { $q->check(); push(@results, @_); $q->finish }) foreach qw(a b c);
    is_deeply(\@results, [qw(S E a A D S E b B D S E c C D)], "results OK.") or
        showResults(@results);
    $q->check(0, 0, 3, 3);
    

    note('--- event callbacks (concurrency 3)');
    @results = ();
    $q->clearCounter();
    $q->concurrency(3);
    $q->push($_, sub { $q->check(); push(@results, @_); $q->finish }) foreach qw(w x y z);
    is_deeply(\@results, [qw(E w W D E x X D E y Y D E z Z D)], "results OK") or
        showResults(@results);
    $q->check(0, 0, 4, 4);

    note('--- push inside finish callbacks (concurrency 1)');
    @results = ();
    $q->clearCounter();
    $q->concurrency(1);
    $q->push('a', sub {
        $q->check(0, 1, 0, 1);
        $q->push('b', sub {
            $q->check(1, 1, 1, 3);
            $q->push('d', sub {
                $q->check(0, 1, 3, 4);
                $q->finish;

t/15-syncwrap.t  view on Meta::CPAN

        $q->push('c', sub {
            $q->check(1, 1, 2, 4);
            $q->finish;
        });
        $q->finish;
    });
    is_deeply(\@results, [qw(S E a b c E d D)], '"saturated" fires only when "running" increases to the max.') or
        showResults(@results);
    $q->check(0, 0, 4, 4);

    note('--- push inside finish callbacks (concurrency 3)');
    @results = ();
    $q->clearCounter();
    $q->concurrency(3);
    $q->push('a', sub {
        $q->check(0, 1, 0, 1);
        $q->push('b', sub {
            $q->check(0, 2, 0, 2);
            $q->push('c', sub {
                $q->check(0, 3, 0, 3);
                $q->push('d', sub {

t/17-syncerrors.t  view on Meta::CPAN

            push(@results, $task);
            $cb->();
        }
    );
    throws_ok { $q->push() } qr/you must specify something to push/i, "push() without argument is NOT allowed.";
    lives_ok { $q->push(undef) } 'pushing undef is allowed.';
    is(int(@results), 1, "got result...");
    ok(!defined($results[0]), "... and it's undef.");

    @results = ();
    note('--- invalid finish callbacks');
    lives_ok { $q->push(undef, undef) } 'undef as a finish callback is just ignored.';
    my $somescalar = 11;
    foreach my $junk (10, "hoge", [], {}, \$somescalar) {
        throws_ok { $q->push(undef, $junk) } qr/must be a coderef/i, 'finish callback must be a coderef';
    }
}

{
    note('--- worker and event handlers must not throw exceptions');
    my $q; $q = Async::Queue->new(



( run in 0.455 second using v1.01-cache-2.11-cpan-9b1e4054eb1 )