Async-Queue

 view release on metacpan or  search on metacpan

lib/Async/Queue.pm  view on Meta::CPAN

    return int(@{$self->{task_queue}});
}

*waiting = \&length;

_define_hook_accessors 'worker';
_define_hook_accessors $_, allow_undef => 1 foreach qw(drain empty saturated);

sub push {
    my ($self, $task, $cb) = @_;
    if(@_ < 2) {
        croak("You must specify something to push.");
    }
    if(defined($cb) && ref($cb) ne 'CODE') {
        croak("callback for a task must be a coderef");
    }
    push(@{$self->{task_queue}}, [$task, $cb]);
    $self->_shift_run(1);
    return $self;
}

sub _shift_run {
    my ($self, $from_push) = @_;
    return if $self->concurrency > 0 && $self->running >= $self->concurrency;
    my $args_ref = shift(@{$self->{task_queue}});
    return if !defined($args_ref);
    my ($task, $cb) = @$args_ref;
    $self->{running} += 1;
    if($self->running == $self->concurrency && $from_push && defined($self->saturated)) {
        $self->saturated->($self);
    }
    if(@{$self->{task_queue}} == 0 && defined($self->empty)) {
        $self->empty->($self);
    }
    my $sync = 1;
    my $sync_completed = 0;
    $self->worker->($task, sub {
        my (@worker_results) = @_;
        $cb->(@worker_results) if defined($cb);
        $self->{running} -= 1;
        if(@{$self->{task_queue}} == 0 && $self->running == 0 && defined($self->drain)) {
            $self->drain->($self);
        }
        if($sync) {
            $sync_completed = 1;
        }else {
            @_ = ($self);
            goto &_shift_run;
        }
    }, $self);
    $sync = 0;
    if($sync_completed) {
        @_ = ($self);
        goto &_shift_run;
    }
}


=head1 NAME

Async::Queue - control concurrency of asynchronous tasks

=head1 VERSION

Version 0.021

=cut

our $VERSION = '0.021';


=head1 SYNOPSIS


    use Async::Queue;
    
    ## create a queue object with concurrency 2
    my $q = Async::Queue->new(
        concurrency => 2, worker => sub {
            my ($task, $callback) = @_;
            print "hello $task->{name}\n";
            $callback->();
        }
    );
    
    ## assign a callback
    $q->drain(sub {
        print "all items have been processed\n";
    });
    
    ## add some items to the queue
    $q->push({name => 'foo'}, sub {
        print "finished processing foo\n";
    });
    $q->push({name => 'bar'}, sub {
        print "finished processing bar\n";
    });


=head1 DESCRIPTION

L<Async::Queue> is used to process tasks with the specified concurrency.
The tasks given to L<Async::Queue> are processed in parallel with its worker routine up to the concurrency level.
If more tasks arrive at the L<Async::Queue> object, those tasks will wait for currently running tasks to finish.
When a task is finished, one of the waiting tasks starts to be processed in first-in-first-out (FIFO) order.

In short, L<Async::Queue> is a Perl port of the C<queue> object of async.js (L<https://github.com/caolan/async#queue>).

The basic usage of L<Async::Queue> is as follows:

=over

=item 1.

Create L<Async::Queue> object with C<worker> attribute and optional C<concurrency> attribute.
C<worker> is a subroutine reference that processes tasks. C<concurrency> is the concurrency level.

=item 2.

Push tasks to the L<Async::Queue> object via C<push()> method with optional callback functions.

lib/Async/Queue.pm  view on Meta::CPAN


    use strict;
    use warnings;
    use AnyEvent;
    use AnyEvent::HTTP;
    use Async::Queue;
    
    my $q = Async::Queue->new(concurrency => 3, worker => sub {
        my ($url, $callback) = @_;
        print STDERR "Start $url\n";
        http_get $url, sub {
            my ($data, $headers) = @_;
            print STDERR "End $url\n";
            $callback->($data);
        };
    });
    
    my @urls = (
        'http://www.debian.org/',
        'http://www.ubuntu.com/',
        'http://fedoraproject.org/',
        'http://www.opensuse.org/',
        'http://www.centos.org/',
        'http://www.slackware.com/',
        'http://www.gentoo.org/',
        'http://www.archlinux.org/',
        'http://trisquel.info/',
    );
    
    my %results = ();
    my $cv = AnyEvent->condvar;
    foreach my $url (@urls) {
        $cv->begin();
        $q->push($url, sub {
            my ($data) = @_;
            $results{$url} = $data;
            $cv->end();
        });
    }
    $cv->recv;
    
    foreach my $key (keys %results) {
        print STDERR "$key: " . length($results{$key}) . "bytes\n";
    }

This example uses L<AnyEvent::HTTP> to send HTTP GET requests for multiple URLs simultaneously.
While simultaneous requests dramatically improve efficiency, it may overload the client host
and/or the network.

This is where L<Async::Queue> comes in handy. With L<Async::Queue> you can control the concurrency level
of the HTTP sessions (in this case, up to three).



=head1 SEE ALSO

=over

=item L<AnyEvent::FIFO>

The goal of L<AnyEvent::FIFO> is the same as that of L<Async::Queue>: to control concurrency level of asynchronous tasks.
The big difference is that L<AnyEvent::FIFO> is a queue of subroutines while L<Async::Queue> is a queue of tasks (data).
In L<Async::Queue>, worker subroutine is registered with the object in advance.
In L<AnyEvent::FIFO>, it is workers that are pushed to the queue.

You can emulate L<AnyEvent::FIFO> with L<Async::Queue> by pushing subroutine references to it as tasks.

=back


=head1 AUTHOR

Toshio Ito, C<< <debug.ito at gmail.com> >>

=head1 REPOSITORY

L<https://github.com/debug-ito/Async-Queue>

=head1 BUGS

Please report any bugs or feature requests to C<bug-async-queue at rt.cpan.org>, or through
the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Async-Queue>.  I will be notified, and then you'll
automatically be notified of progress on your bug as I make changes.



=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc Async::Queue


You can also look for information at:

=over 4

=item * RT: CPAN's request tracker (report bugs here)

L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=Async-Queue>

=item * AnnoCPAN: Annotated CPAN documentation

L<http://annocpan.org/dist/Async-Queue>

=item * CPAN Ratings

L<http://cpanratings.perl.org/d/Async-Queue>

=item * Search CPAN

L<http://search.cpan.org/dist/Async-Queue/>

=back


=head1 LICENSE AND COPYRIGHT

Copyright 2012 Toshio Ito.

This program is free software; you can redistribute it and/or modify it



( run in 0.334 second using v1.01-cache-2.11-cpan-df04353d9ac )