AnyEvent-ProcessPool
view release on metacpan or search on metacpan
lib/AnyEvent/ProcessPool.pm view on Meta::CPAN
package AnyEvent::ProcessPool;
# ABSTRACT: Asynchronously runs code concurrently in a pool of perl processes
$AnyEvent::ProcessPool::VERSION = '0.07';
use common::sense;
use Carp;
use AnyEvent;
use AnyEvent::ProcessPool::Process;
use AnyEvent::ProcessPool::Task;
use AnyEvent::ProcessPool::Util qw(next_id cpu_count);
sub new {
my ($class, %param) = @_;
my $self = bless {
workers => $param{workers} || cpu_count,
limit => $param{limit},
include => $param{include},
pool => [], # array of AE::PP::Process objects
queue => [], # array of [id, code] tasks
complete => {}, # task_id => condvar: signals result to caller
pending => {}, # task_id => condvar: signals result internally
}, $class;
# Initialize workers but do not yet wait for them to be started
foreach (1 .. $self->{workers}) {
my $worker = AnyEvent::ProcessPool::Process->new(
limit => $self->{limit},
include => $self->{include},
);
push @{$self->{pool}}, $worker;
}
return $self;
}
sub join {
my $self = shift;
foreach my $task_id (keys %{$self->{complete}}) {
if (my $cv = $self->{complete}{$task_id}) {
$cv->recv;
}
}
}
sub DESTROY {
my ($self, $global) = @_;
if ($self) {
# Unblock watchers for any remaining pending tasks
if (ref $self->{pending}) {
foreach my $cv (values %{$self->{pending}}) {
$cv->croak('AnyEvent::ProcessPool destroyed with pending tasks remaining');
}
}
# Terminate any workers still alive
if (ref $self->{pool}) {
foreach my $worker (@{$self->{pool}}) {
$worker->stop if $worker;
}
}
}
}
sub async {
my ($self, $code, @args) = @_;
my $id = next_id;
my $task = AnyEvent::ProcessPool::Task->new($code, \@args);
$self->{complete}{$id} = AE::cv;
push @{$self->{queue}}, [$id, $task];
$self->process_queue;
return $self->{complete}{$id};
}
sub process_queue {
my $self = shift;
my $queue = $self->{queue};
my $pool = $self->{pool};
while (@$queue && @$pool) {
my ($id, $task) = @{shift @$queue};
my $worker = shift @$pool;
$self->{pending}{$id} = $worker->run($task);
$self->{pending}{$id}->cb(sub{
my $task = shift->recv;
if ($task->failed) {
$self->{complete}{$id}->croak($task->result);
} else {
$self->{complete}{$id}->send($task->result);
}
delete $self->{pending}{$id};
delete $self->{complete}{$id};
push @$pool, $worker;
$self->process_queue;
});
}
}
1;
__END__
( run in 1.030 second using v1.01-cache-2.11-cpan-5b529ec07f3 )