AnyEvent-Fork-Pool

 view release on metacpan or  search on metacpan

Pool.pm  view on Meta::CPAN


sub run {
   my ($template, $function, %arg) = @_;

   my $max        = $arg{max}        || 4;
   my $idle       = $arg{idle}       || 0,
   my $load       = $arg{load}       || 2,
   my $start      = $arg{start}      || 0.1,
   my $stop       = $arg{stop}       || 10,
   my $on_event   = $arg{on_event}   || sub { },
   my $on_destroy = $arg{on_destroy};

   my @rpc = (
      async      =>        $arg{async},
      init       =>        $arg{init},
      serialiser => delete $arg{serialiser},
      on_error   =>        $arg{on_error},
   );

   my (@pool, @queue, $nidle, $start_w, $stop_w, $shutdown);
   my ($start_worker, $stop_worker, $want_start, $want_stop, $scheduler);

   my $destroy_guard = Guard::guard {
      $on_destroy->()
         if $on_destroy;
   };

   $template
      ->require ("AnyEvent::Fork::RPC::" . ($arg{async} ? "Async" : "Sync"))
      ->eval ('
           my ($magic0, $magic1) = @_;
           sub AnyEvent::Fork::Pool::retire() {
              AnyEvent::Fork::RPC::event $magic0, "quit", $magic1;
           }
        ', $magic0, $magic1)
   ;

   $start_worker = sub {
      my $proc = [0, 0, undef]; # load, index, rpc

      $proc->[2] = $template
         ->fork
         ->AnyEvent::Fork::RPC::run ($function,
              @rpc,
              on_event => sub {
                 if (@_ == 3 && $_[0] eq $magic0 && $_[2] eq $magic1) {
                    $destroy_guard if 0; # keep it alive

                    $_[1] eq "quit" and $stop_worker->($proc);
                    return;
                 }

                 &$on_event;
              },
           )
      ;

      ++$nidle;
      Array::Heap::push_heap_idx @pool, $proc;

      Scalar::Util::weaken $proc;
   };

   $stop_worker = sub {
      my $proc = shift;

      $proc->[0]
         or --$nidle;

      Array::Heap::splice_heap_idx @pool, $proc->[1]
         if defined $proc->[1];

      @$proc = 0; # tell others to leave it be
   };

   $want_start = sub {
      undef $stop_w;

      $start_w ||= AE::timer $start, $start, sub {
         if (($nidle < $idle || @queue) && @pool < $max) {
            $start_worker->();
            $scheduler->();
         } else {
            undef $start_w;
         }
      };
   };

   $want_stop = sub {
      $stop_w ||= AE::timer $stop, $stop, sub {
         $stop_worker->($pool[0])
            if $nidle;

         undef $stop_w
            if $nidle <= $idle;
      };
   };

   $scheduler = sub {
      if (@queue) {
         while (@queue) {
            @pool or $start_worker->();

            my $proc = $pool[0];

            if ($proc->[0] < $load) {
               # found free worker, increase load
               unless ($proc->[0]++) {
                  # worker became busy
                  --$nidle
                     or undef $stop_w;

                  $want_start->()
                     if $nidle < $idle && @pool < $max;
               }

               Array::Heap::adjust_heap_idx @pool, 0;

               my $job = shift @queue;
               my $ocb = pop @$job;



( run in 1.761 second using v1.01-cache-2.11-cpan-39bf76dae61 )