Async-Queue
view release on metacpan or search on metacpan
lib/Async/Queue.pm view on Meta::CPAN
package Async::Queue;
use 5.006;
use strict;
use warnings;
use Carp;
use Scalar::Util qw(looks_like_number);
sub new {
my ($class, %options) = @_;
my $self = bless {
concurrency => 1,
worker => undef,
drain => undef,
empty => undef,
saturated => undef,
task_queue => [],
running => 0,
}, $class;
$self->$_($options{$_}) foreach qw(concurrency worker drain empty saturated);
return $self;
}
sub _define_hook_accessors {
my ($name, %options) = @_;
my $class = __PACKAGE__;
my $fullname = "${class}::$name";
no strict 'refs';
*{$fullname} = sub {
my ($self, $v) = @_;
if(@_ > 1) {
croak "$name must not be undef." if !defined($v) && !$options{allow_undef};
croak "$name must be a coderef" if defined($v) && ref($v) ne 'CODE';
croak "You cannot set $name while there is a running task." if $self->running > 0;
$self->{$name} = $v;
}
return $self->{$name};
};
}
sub running {
my ($self) = @_;
return $self->{running};
}
sub concurrency {
my ($self, $conc) = @_;
if(@_ > 1) {
croak "You cannot set concurrency while there is a running task" if $self->running > 0;
$conc = 1 if not defined($conc);
croak "concurrency must be a number" if !looks_like_number($conc);
$self->{concurrency} = int($conc);
}
return $self->{concurrency};
}
sub length {
my ($self) = @_;
return int(@{$self->{task_queue}});
}
*waiting = \&length;
_define_hook_accessors 'worker';
_define_hook_accessors $_, allow_undef => 1 foreach qw(drain empty saturated);
sub push {
my ($self, $task, $cb) = @_;
if(@_ < 2) {
croak("You must specify something to push.");
}
if(defined($cb) && ref($cb) ne 'CODE') {
croak("callback for a task must be a coderef");
}
push(@{$self->{task_queue}}, [$task, $cb]);
$self->_shift_run(1);
return $self;
}
sub _shift_run {
my ($self, $from_push) = @_;
return if $self->concurrency > 0 && $self->running >= $self->concurrency;
my $args_ref = shift(@{$self->{task_queue}});
return if !defined($args_ref);
my ($task, $cb) = @$args_ref;
$self->{running} += 1;
if($self->running == $self->concurrency && $from_push && defined($self->saturated)) {
$self->saturated->($self);
}
if(@{$self->{task_queue}} == 0 && defined($self->empty)) {
$self->empty->($self);
}
my $sync = 1;
my $sync_completed = 0;
$self->worker->($task, sub {
my (@worker_results) = @_;
$cb->(@worker_results) if defined($cb);
$self->{running} -= 1;
if(@{$self->{task_queue}} == 0 && $self->running == 0 && defined($self->drain)) {
$self->drain->($self);
}
if($sync) {
$sync_completed = 1;
}else {
@_ = ($self);
goto &_shift_run;
}
}, $self);
$sync = 0;
if($sync_completed) {
@_ = ($self);
( run in 1.418 second using v1.01-cache-2.11-cpan-39bf76dae61 )