AnyEvent-ForkManager

 view release on metacpan or  search on metacpan

lib/AnyEvent/ForkManager.pm  view on Meta::CPAN

package AnyEvent::ForkManager;
use 5.008_001;
use strict;
use warnings;

our $VERSION = '0.07';

use AnyEvent;
use Scalar::Util qw/weaken/;
use POSIX qw/WNOHANG/;
use Time::HiRes ();

use Class::Accessor::Lite 0.04 (
    ro  => [
        qw/max_workers manager_pid/,
    ],
    rw  => [
        qw/on_start on_finish on_error on_enqueue on_dequeue on_working_max/,
        qw/process_queue running_worker process_cb wait_async/,
    ],
);

sub default_max_workers { 10 }

sub new {
    my $class = shift;
    my $arg  = (@_ == 1) ? +shift : +{ @_ };
    $arg->{max_workers} ||= $class->default_max_workers;

    bless(+{
        %$arg,
        manager_pid => $$,
    } => $class)->init;
}

sub init {
    my $self = shift;

    $self->process_queue([]);
    $self->running_worker(+{});
    $self->process_cb(+{});

    return $self;
}

sub is_child { shift->manager_pid != $$ }
sub is_working_max {
    my $self = shift;

    $self->num_workers >= $self->max_workers;
}

sub num_workers {
    my $self = shift;
    return scalar keys %{ $self->running_worker };
}

sub num_queues {
    my $self = shift;
    return scalar @{ $self->process_queue };
}

sub start {
    my $self = shift;
    my $arg  = (@_ == 1) ? +shift : +{ @_ };

    die "\$fork_manager->start() should be called within the manager process\n"
        if $self->is_child;

    if ($self->is_working_max) {## child working max
        $self->_run_cb('on_working_max' => @{ $arg->{args} });
        $self->enqueue($arg);
        return;
    }
    else {## create child process
        my $pid = fork;

        if (not(defined $pid)) {
            $self->_run_cb('on_error' => @{ $arg->{args} });
            return;
        }
        elsif ($pid) {
            # parent
            $self->_run_cb('on_start' => $pid, @{ $arg->{args} });
            $self->process_cb->{$pid}     = $self->_create_callback(@{ $arg->{args} });
            $self->running_worker->{$pid} = AnyEvent->child(
                pid => $pid,
                cb  => $self->process_cb->{$pid},
            );

            # delete worker watcher if already finished child process.
            delete $self->running_worker->{$pid} unless exists $self->process_cb->{$pid};

            return $pid;
        }
        else {
            # child
            $arg->{cb}->($self, @{ $arg->{args} });
            $self->finish;
        }
    }
}

sub _create_callback {
    my($self, @args) = @_;

    weaken($self);
    return sub {
        my ($pid, $status) = @_;
        delete $self->running_worker->{$pid};
        delete $self->process_cb->{$pid};
        $self->_run_cb('on_finish' => $pid, $status, @args);

        if ($self->num_queues) {
            ## dequeue
            $self->dequeue;
        }
    };
}

sub finish {
    my ($self, $exit_code) = @_;
    die "\$fork_manager->finish() shouln't be called within the manager process\n"
        unless $self->is_child;

    exit($exit_code || 0);
}

sub enqueue {
    my($self, $arg) = @_;

    $self->_run_cb('on_enqueue' => @{ $arg->{args} });
    push @{ $self->process_queue } => $arg;
}

sub dequeue {
    my $self = shift;

    until ($self->is_working_max) {
        last unless @{ $self->process_queue };

        # dequeue
        if (my $arg = shift @{ $self->process_queue }) {
            $self->_run_cb('on_dequeue' => @{ $arg->{args} });
            $self->start($arg);
        }
    }
}

sub signal_all_children {
    my ($self, $sig) = @_;
    foreach my $pid (sort keys %{ $self->running_worker }) {
        kill $sig, $pid;
    }
}

sub wait_all_children {
    my $self = shift;
    my $arg  = (@_ == 1) ? +shift : +{ @_ };

    my $cb = $arg->{cb};
    if ($arg->{blocking}) {
        $self->_wait_all_children_with_blocking;
        $self->$cb;
    }
    else {
        die 'cannot call.' if $self->wait_async;

        my $super = $self->on_finish;

        weaken($self);
        $self->on_finish(
            sub {
                $super->(@_);
                if ($self->num_workers == 0 and $self->num_queues == 0) {
                    $self->$cb;
                    $self->on_finish($super);
                    $self->wait_async(0);
                }
            }
        );

        $self->wait_async(1);
    }
}

sub _run_cb {
    my $self = shift;
    my $name = shift;

    my $cb = $self->$name();
    if ($cb) {
        $self->$cb(@_);
    }
}

our $WAIT_INTERVAL = 0.1 * 1000 * 1000;
sub _wait_all_children_with_blocking {
    my $self = shift;

    until ($self->num_workers == 0 and $self->num_queues == 0) {
        my($pid, $status) = _wait_with_status(-1, WNOHANG);
        if ($pid and exists $self->running_worker->{$pid}) {
            $self->process_cb->{$pid}->($pid, $status);
        }
    }
    continue {
        # retry interval
        Time::HiRes::usleep( $WAIT_INTERVAL );
    }
}

# function
sub _wait_with_status {## blocking
    my($waitpid, $option) = @_;

    use vmsish 'status';
    local $?;

    my $pid = waitpid($waitpid, $option);
    return ($pid, $?);
}

1;
__END__

=for stopwords cb

=head1 NAME

AnyEvent::ForkManager - A simple parallel processing fork manager with AnyEvent



( run in 0.972 second using v1.01-cache-2.11-cpan-39bf76dae61 )