AnyEvent-ProcessPool

 view release on metacpan or  search on metacpan

lib/AnyEvent/ProcessPool.pm  view on Meta::CPAN

package AnyEvent::ProcessPool;
# ABSTRACT: Asynchronously runs code concurrently in a pool of perl processes
$AnyEvent::ProcessPool::VERSION = '0.07';
use common::sense;
use Carp;
use AnyEvent;
use AnyEvent::ProcessPool::Process;
use AnyEvent::ProcessPool::Task;
use AnyEvent::ProcessPool::Util qw(next_id cpu_count);

sub new {
  my ($class, %param) = @_;

  my $self = bless {
    workers  => $param{workers} || cpu_count,
    limit    => $param{limit},
    include  => $param{include},
    pool     => [], # array of AE::PP::Process objects
    queue    => [], # array of [id, code] tasks
    complete => {}, # task_id => condvar: signals result to caller
    pending  => {}, # task_id => condvar: signals result internally
  }, $class;

  # Initialize workers but do not yet wait for them to be started
  foreach (1 .. $self->{workers}) {
    my $worker = AnyEvent::ProcessPool::Process->new(
      limit   => $self->{limit},
      include => $self->{include},
    );
    push @{$self->{pool}}, $worker;
  }

  return $self;
}

sub join {
  my $self = shift;
  foreach my $task_id (keys %{$self->{complete}}) {
    if (my $cv = $self->{complete}{$task_id}) {
      $cv->recv;
    }
  }
}

sub DESTROY {
  my ($self, $global) = @_;

  if ($self) {
    # Unblock watchers for any remaining pending tasks
    if (ref $self->{pending}) {
      foreach my $cv (values %{$self->{pending}}) {
        $cv->croak('AnyEvent::ProcessPool destroyed with pending tasks remaining');
      }
    }

    # Terminate any workers still alive
    if (ref $self->{pool}) {
      foreach my $worker (@{$self->{pool}}) {
        $worker->stop if $worker;
      }
    }
  }
}

sub async {
  my ($self, $code, @args) = @_;
  my $id = next_id;
  my $task = AnyEvent::ProcessPool::Task->new($code, \@args);
  $self->{complete}{$id} = AE::cv;
  push @{$self->{queue}}, [$id, $task];
  $self->process_queue;
  return $self->{complete}{$id};
}

sub process_queue {
  my $self  = shift;
  my $queue = $self->{queue};
  my $pool  = $self->{pool};

  while (@$queue && @$pool) {
    my ($id, $task) = @{shift @$queue};
    my $worker = shift @$pool;

    $self->{pending}{$id} = $worker->run($task);

    $self->{pending}{$id}->cb(sub{
      my $task = shift->recv;

      if ($task->failed) {
        $self->{complete}{$id}->croak($task->result);
      } else {
        $self->{complete}{$id}->send($task->result);
      }

      delete $self->{pending}{$id};
      delete $self->{complete}{$id};

      push @$pool, $worker;
      $self->process_queue;
    });
  }
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

AnyEvent::ProcessPool - Asynchronously runs code concurrently in a pool of perl processes

=head1 VERSION

version 0.07

=head1 SYNOPSIS

  use AnyEvent::ProcessPool;

  my $pool = AnyEvent::ProcessPool->new(
    workers => 8,
    limit   => 10,
    include => ['lib', 'some/lib/path'],
  );

  my $condvar = $pool->async(sub{
    # do task type stuff...
  });

  # Block until result is ready
  my $result = $condvar->recv;

=head1 DESCRIPTION

Executes code using a pool a forked Perl subprocesses. Supports configurable
pool size, automatically restarting processes after a configurable number of
requests, and closures (with the caveat that changes are not propagated back to
the parent process).

=head1 CONSTRUCTOR

=head2 workers

Required attribute specifying the number of worker processes to launch.
Defaults to the number of CPUs.

=head2 limit

Optional attribute that causes a worker process to be restarted after
performing C<limit> tasks. This can be useful when calling code which may be
leaky. When unspecified or set to zero, worker processes will only be restarted
if it unexpectedly fails.

=head2 include

An optional array ref of paths to add to the perl command string used to start
the sub-process worker.

=head1 METHODS

=head2 async

Executes the supplied code ref in a worker sub-process. Remaining (optional)
arguments are passed unchanged to the code ref in the worker process. Returns a
L<condvar|AnyEvent/CONDITION VARIABLES> that will block and return the task
result when C<recv> is called on it.

Alternately, the name of a task class may be supplied. The class must implement
the methods 'new' (as a constructor) and 'run'. When using a task class, the
arguments will be passed to the constructor (new) and the result of 'run' will



( run in 1.397 second using v1.01-cache-2.11-cpan-97f6503c9c8 )