AnyEvent-ForkManager
view release on metacpan or search on metacpan
lib/AnyEvent/ForkManager.pm view on Meta::CPAN
package AnyEvent::ForkManager;
use 5.008_001;
use strict;
use warnings;
our $VERSION = '0.07';
use AnyEvent;
use Scalar::Util qw/weaken/;
use POSIX qw/WNOHANG/;
use Time::HiRes ();
use Class::Accessor::Lite 0.04 (
ro => [
qw/max_workers manager_pid/,
],
rw => [
qw/on_start on_finish on_error on_enqueue on_dequeue on_working_max/,
qw/process_queue running_worker process_cb wait_async/,
],
);
sub default_max_workers { 10 }
sub new {
my $class = shift;
my $arg = (@_ == 1) ? +shift : +{ @_ };
$arg->{max_workers} ||= $class->default_max_workers;
bless(+{
%$arg,
manager_pid => $$,
} => $class)->init;
}
sub init {
my $self = shift;
$self->process_queue([]);
$self->running_worker(+{});
$self->process_cb(+{});
return $self;
}
sub is_child { shift->manager_pid != $$ }
sub is_working_max {
my $self = shift;
$self->num_workers >= $self->max_workers;
}
sub num_workers {
my $self = shift;
return scalar keys %{ $self->running_worker };
}
sub num_queues {
my $self = shift;
return scalar @{ $self->process_queue };
}
sub start {
my $self = shift;
my $arg = (@_ == 1) ? +shift : +{ @_ };
die "\$fork_manager->start() should be called within the manager process\n"
if $self->is_child;
if ($self->is_working_max) {## child working max
$self->_run_cb('on_working_max' => @{ $arg->{args} });
$self->enqueue($arg);
return;
}
else {## create child process
my $pid = fork;
if (not(defined $pid)) {
$self->_run_cb('on_error' => @{ $arg->{args} });
return;
}
elsif ($pid) {
# parent
$self->_run_cb('on_start' => $pid, @{ $arg->{args} });
$self->process_cb->{$pid} = $self->_create_callback(@{ $arg->{args} });
$self->running_worker->{$pid} = AnyEvent->child(
pid => $pid,
cb => $self->process_cb->{$pid},
);
# delete worker watcher if already finished child process.
delete $self->running_worker->{$pid} unless exists $self->process_cb->{$pid};
return $pid;
}
else {
# child
$arg->{cb}->($self, @{ $arg->{args} });
$self->finish;
}
}
}
sub _create_callback {
my($self, @args) = @_;
weaken($self);
return sub {
my ($pid, $status) = @_;
delete $self->running_worker->{$pid};
delete $self->process_cb->{$pid};
$self->_run_cb('on_finish' => $pid, $status, @args);
if ($self->num_queues) {
## dequeue
$self->dequeue;
}
};
}
sub finish {
my ($self, $exit_code) = @_;
die "\$fork_manager->finish() shouln't be called within the manager process\n"
unless $self->is_child;
exit($exit_code || 0);
}
sub enqueue {
my($self, $arg) = @_;
$self->_run_cb('on_enqueue' => @{ $arg->{args} });
push @{ $self->process_queue } => $arg;
}
sub dequeue {
my $self = shift;
until ($self->is_working_max) {
last unless @{ $self->process_queue };
# dequeue
if (my $arg = shift @{ $self->process_queue }) {
$self->_run_cb('on_dequeue' => @{ $arg->{args} });
$self->start($arg);
}
}
}
sub signal_all_children {
my ($self, $sig) = @_;
foreach my $pid (sort keys %{ $self->running_worker }) {
kill $sig, $pid;
}
}
sub wait_all_children {
my $self = shift;
my $arg = (@_ == 1) ? +shift : +{ @_ };
my $cb = $arg->{cb};
if ($arg->{blocking}) {
$self->_wait_all_children_with_blocking;
$self->$cb;
}
else {
die 'cannot call.' if $self->wait_async;
my $super = $self->on_finish;
weaken($self);
$self->on_finish(
sub {
$super->(@_);
if ($self->num_workers == 0 and $self->num_queues == 0) {
$self->$cb;
$self->on_finish($super);
$self->wait_async(0);
}
}
);
$self->wait_async(1);
}
}
sub _run_cb {
my $self = shift;
my $name = shift;
my $cb = $self->$name();
if ($cb) {
$self->$cb(@_);
}
}
our $WAIT_INTERVAL = 0.1 * 1000 * 1000;
sub _wait_all_children_with_blocking {
my $self = shift;
until ($self->num_workers == 0 and $self->num_queues == 0) {
my($pid, $status) = _wait_with_status(-1, WNOHANG);
if ($pid and exists $self->running_worker->{$pid}) {
$self->process_cb->{$pid}->($pid, $status);
}
}
continue {
# retry interval
Time::HiRes::usleep( $WAIT_INTERVAL );
}
}
# function
sub _wait_with_status {## blocking
my($waitpid, $option) = @_;
use vmsish 'status';
local $?;
my $pid = waitpid($waitpid, $option);
return ($pid, $?);
}
1;
__END__
=for stopwords cb
=head1 NAME
AnyEvent::ForkManager - A simple parallel processing fork manager with AnyEvent
( run in 0.972 second using v1.01-cache-2.11-cpan-39bf76dae61 )