AnyEvent-ProcessPool

 view release on metacpan or  search on metacpan

lib/AnyEvent/ProcessPool.pm  view on Meta::CPAN

package AnyEvent::ProcessPool;
# ABSTRACT: Asynchronously runs code concurrently in a pool of perl processes
$AnyEvent::ProcessPool::VERSION = '0.07';
use common::sense;
use Carp;
use AnyEvent;
use AnyEvent::ProcessPool::Process;
use AnyEvent::ProcessPool::Task;
use AnyEvent::ProcessPool::Util qw(next_id cpu_count);

sub new {
  my ($class, %param) = @_;

  my $self = bless {
    workers  => $param{workers} || cpu_count,
    limit    => $param{limit},
    include  => $param{include},
    pool     => [], # array of AE::PP::Process objects
    queue    => [], # array of [id, code] tasks
    complete => {}, # task_id => condvar: signals result to caller
    pending  => {}, # task_id => condvar: signals result internally
  }, $class;

  # Initialize workers but do not yet wait for them to be started
  foreach (1 .. $self->{workers}) {
    my $worker = AnyEvent::ProcessPool::Process->new(
      limit   => $self->{limit},
      include => $self->{include},
    );
    push @{$self->{pool}}, $worker;
  }

  return $self;
}

sub join {
  my $self = shift;
  foreach my $task_id (keys %{$self->{complete}}) {
    if (my $cv = $self->{complete}{$task_id}) {
      $cv->recv;
    }
  }
}

sub DESTROY {
  my ($self, $global) = @_;

  if ($self) {
    # Unblock watchers for any remaining pending tasks
    if (ref $self->{pending}) {
      foreach my $cv (values %{$self->{pending}}) {
        $cv->croak('AnyEvent::ProcessPool destroyed with pending tasks remaining');
      }
    }

    # Terminate any workers still alive
    if (ref $self->{pool}) {
      foreach my $worker (@{$self->{pool}}) {
        $worker->stop if $worker;
      }
    }
  }
}

sub async {
  my ($self, $code, @args) = @_;
  my $id = next_id;
  my $task = AnyEvent::ProcessPool::Task->new($code, \@args);
  $self->{complete}{$id} = AE::cv;
  push @{$self->{queue}}, [$id, $task];
  $self->process_queue;
  return $self->{complete}{$id};
}

sub process_queue {
  my $self  = shift;
  my $queue = $self->{queue};
  my $pool  = $self->{pool};

  while (@$queue && @$pool) {
    my ($id, $task) = @{shift @$queue};
    my $worker = shift @$pool;

    $self->{pending}{$id} = $worker->run($task);

    $self->{pending}{$id}->cb(sub{
      my $task = shift->recv;

      if ($task->failed) {
        $self->{complete}{$id}->croak($task->result);
      } else {
        $self->{complete}{$id}->send($task->result);
      }

      delete $self->{pending}{$id};
      delete $self->{complete}{$id};

      push @$pool, $worker;
      $self->process_queue;
    });
  }
}

1;

__END__



( run in 1.030 second using v1.01-cache-2.11-cpan-5b529ec07f3 )