Async-Queue
view release on metacpan or search on metacpan
lib/Async/Queue.pm view on Meta::CPAN
return int(@{$self->{task_queue}});
}
*waiting = \&length;
_define_hook_accessors 'worker';
_define_hook_accessors $_, allow_undef => 1 foreach qw(drain empty saturated);
sub push {
my ($self, $task, $cb) = @_;
if(@_ < 2) {
croak("You must specify something to push.");
}
if(defined($cb) && ref($cb) ne 'CODE') {
croak("callback for a task must be a coderef");
}
push(@{$self->{task_queue}}, [$task, $cb]);
$self->_shift_run(1);
return $self;
}
sub _shift_run {
my ($self, $from_push) = @_;
return if $self->concurrency > 0 && $self->running >= $self->concurrency;
my $args_ref = shift(@{$self->{task_queue}});
return if !defined($args_ref);
my ($task, $cb) = @$args_ref;
$self->{running} += 1;
if($self->running == $self->concurrency && $from_push && defined($self->saturated)) {
$self->saturated->($self);
}
if(@{$self->{task_queue}} == 0 && defined($self->empty)) {
$self->empty->($self);
}
my $sync = 1;
my $sync_completed = 0;
$self->worker->($task, sub {
my (@worker_results) = @_;
$cb->(@worker_results) if defined($cb);
$self->{running} -= 1;
if(@{$self->{task_queue}} == 0 && $self->running == 0 && defined($self->drain)) {
$self->drain->($self);
}
if($sync) {
$sync_completed = 1;
}else {
@_ = ($self);
goto &_shift_run;
}
}, $self);
$sync = 0;
if($sync_completed) {
@_ = ($self);
goto &_shift_run;
}
}
=head1 NAME
Async::Queue - control concurrency of asynchronous tasks
=head1 VERSION
Version 0.021
=cut
our $VERSION = '0.021';
=head1 SYNOPSIS
use Async::Queue;
## create a queue object with concurrency 2
my $q = Async::Queue->new(
concurrency => 2, worker => sub {
my ($task, $callback) = @_;
print "hello $task->{name}\n";
$callback->();
}
);
## assign a callback
$q->drain(sub {
print "all items have been processed\n";
});
## add some items to the queue
$q->push({name => 'foo'}, sub {
print "finished processing foo\n";
});
$q->push({name => 'bar'}, sub {
print "finished processing bar\n";
});
=head1 DESCRIPTION
L<Async::Queue> is used to process tasks with the specified concurrency.
The tasks given to L<Async::Queue> are processed in parallel with its worker routine up to the concurrency level.
If more tasks arrive at the L<Async::Queue> object, those tasks will wait for currently running tasks to finish.
When a task is finished, one of the waiting tasks starts to be processed in first-in-first-out (FIFO) order.
In short, L<Async::Queue> is a Perl port of the C<queue> object of async.js (L<https://github.com/caolan/async#queue>).
The basic usage of L<Async::Queue> is as follows:
=over
=item 1.
Create L<Async::Queue> object with C<worker> attribute and optional C<concurrency> attribute.
C<worker> is a subroutine reference that processes tasks. C<concurrency> is the concurrency level.
=item 2.
Push tasks to the L<Async::Queue> object via C<push()> method with optional callback functions.
lib/Async/Queue.pm view on Meta::CPAN
use strict;
use warnings;
use AnyEvent;
use AnyEvent::HTTP;
use Async::Queue;
my $q = Async::Queue->new(concurrency => 3, worker => sub {
my ($url, $callback) = @_;
print STDERR "Start $url\n";
http_get $url, sub {
my ($data, $headers) = @_;
print STDERR "End $url\n";
$callback->($data);
};
});
my @urls = (
'http://www.debian.org/',
'http://www.ubuntu.com/',
'http://fedoraproject.org/',
'http://www.opensuse.org/',
'http://www.centos.org/',
'http://www.slackware.com/',
'http://www.gentoo.org/',
'http://www.archlinux.org/',
'http://trisquel.info/',
);
my %results = ();
my $cv = AnyEvent->condvar;
foreach my $url (@urls) {
$cv->begin();
$q->push($url, sub {
my ($data) = @_;
$results{$url} = $data;
$cv->end();
});
}
$cv->recv;
foreach my $key (keys %results) {
print STDERR "$key: " . length($results{$key}) . "bytes\n";
}
This example uses L<AnyEvent::HTTP> to send HTTP GET requests for multiple URLs simultaneously.
While simultaneous requests dramatically improve efficiency, it may overload the client host
and/or the network.
This is where L<Async::Queue> comes in handy. With L<Async::Queue> you can control the concurrency level
of the HTTP sessions (in this case, up to three).
=head1 SEE ALSO
=over
=item L<AnyEvent::FIFO>
The goal of L<AnyEvent::FIFO> is the same as that of L<Async::Queue>: to control concurrency level of asynchronous tasks.
The big difference is that L<AnyEvent::FIFO> is a queue of subroutines while L<Async::Queue> is a queue of tasks (data).
In L<Async::Queue>, worker subroutine is registered with the object in advance.
In L<AnyEvent::FIFO>, it is workers that are pushed to the queue.
You can emulate L<AnyEvent::FIFO> with L<Async::Queue> by pushing subroutine references to it as tasks.
=back
=head1 AUTHOR
Toshio Ito, C<< <debug.ito at gmail.com> >>
=head1 REPOSITORY
L<https://github.com/debug-ito/Async-Queue>
=head1 BUGS
Please report any bugs or feature requests to C<bug-async-queue at rt.cpan.org>, or through
the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=Async-Queue>. I will be notified, and then you'll
automatically be notified of progress on your bug as I make changes.
=head1 SUPPORT
You can find documentation for this module with the perldoc command.
perldoc Async::Queue
You can also look for information at:
=over 4
=item * RT: CPAN's request tracker (report bugs here)
L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=Async-Queue>
=item * AnnoCPAN: Annotated CPAN documentation
L<http://annocpan.org/dist/Async-Queue>
=item * CPAN Ratings
L<http://cpanratings.perl.org/d/Async-Queue>
=item * Search CPAN
L<http://search.cpan.org/dist/Async-Queue/>
=back
=head1 LICENSE AND COPYRIGHT
Copyright 2012 Toshio Ito.
This program is free software; you can redistribute it and/or modify it
( run in 0.334 second using v1.01-cache-2.11-cpan-df04353d9ac )