Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN


=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=head2 code => CODE

The body of the function to execute.

 @result = $code->( @args )

=head2 init_code => CODE

Optional. If defined, this is invoked exactly once in every child process or
thread, after it is created, but before the first invocation of the function
body itself.

 $init_code->()

=head2 model => "fork" | "thread"

Optional. Requests a specific L<IO::Async::Routine> model. If not supplied,
leaves the default choice up to Routine.

=head2 min_workers => INT

=head2 max_workers => INT

The lower and upper bounds of worker processes to try to keep running. The
actual number running at any time will be kept somewhere between these bounds
according to load.

=head2 max_worker_calls => INT

Optional. If provided, stop a worker process after it has processed this
number of calls. (New workers may be started to replace stopped ones, within
the bounds given above).

=head2 idle_timeout => NUM

Optional. If provided, idle worker processes will be shut down after this
amount of time, if there are more than C<min_workers> of them.

=head2 exit_on_die => BOOL

Optional boolean, controls what happens after the C<code> throws an
exception. If missing or false, the worker will continue running to process
more requests. If true, the worker will be shut down. A new worker might be
constructed by the C<call> method to replace it, if necessary.

=head2 setup => ARRAY

Optional array reference. Specifies the C<setup> key to pass to the underlying
L<IO::Async::Process> when setting up new worker processes.

=cut

sub _init
{
   my $self = shift;
   $self->SUPER::_init( @_ );

   $self->{min_workers} = 1;
   $self->{max_workers} = 8;

   $self->{workers} = {}; # {$id} => IaFunction:Worker

   $self->{pending_queue} = [];
}

sub configure
{
   my $self = shift;
   my %params = @_;

   my %worker_params;
   foreach (qw( model exit_on_die max_worker_calls )) {
      $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
   }

   if( keys %worker_params ) {
      foreach my $worker ( $self->_worker_objects ) {
         $worker->configure( %worker_params );
      }
   }

   if( exists $params{idle_timeout} ) {
      my $timeout = delete $params{idle_timeout};
      if( !$timeout ) {
         $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
      }
      elsif( my $idle_timer = $self->{idle_timer} ) {
         $idle_timer->configure( delay => $timeout );
      }
      else {
         $self->{idle_timer} = IO::Async::Timer::Countdown->new(
            delay => $timeout,
            on_expire => $self->_capture_weakself( sub {
               my $self = shift or return;
               my $workers = $self->{workers};

               # Shut down atmost one idle worker, starting from the highest
               # ID. Since we search from lowest to assign work, this tries
               # to ensure we'll shut down the least useful ones first,
               # keeping more useful ones in memory (page/cache warmth, etc..)
               foreach my $id ( reverse sort keys %$workers ) {
                  next if $workers->{$id}{busy};

                  $workers->{$id}->stop;
                  last;
               }

               # Still more?
               $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
            } ),
         );
         $self->add_child( $self->{idle_timer} );
      }
   }

   foreach (qw( min_workers max_workers )) {
      $self->{$_} = delete $params{$_} if exists $params{$_};
      # TODO: something about retuning
   }

   my $need_restart;

   foreach (qw( init_code code setup )) {
      $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
   }

   $self->SUPER::configure( %params );

   if( $need_restart and $self->loop ) {
      $self->stop;
      $self->start;
   }
}

sub _add_to_loop
{
   my $self = shift;
   $self->SUPER::_add_to_loop( @_ );

   $self->start;
}

sub _remove_from_loop
{
   my $self = shift;

   $self->stop;

   $self->SUPER::_remove_from_loop( @_ );
}

=head1 METHODS

The following methods documented with a trailing call to C<< ->get >> return
L<Future> instances.

=cut

=head2 start

   $function->start

Start the worker processes

=cut

sub start
{
   my $self = shift;

   $self->_new_worker for 1 .. $self->{min_workers};
}

=head2 stop

   $function->stop

Stop the worker processes

=cut

sub stop
{
   my $self = shift;

   $self->{stopping} = 1;
   foreach my $worker ( $self->_worker_objects ) {
      $worker->stop;
   }
}

=head2 restar

   $function->restart

Gracefully stop and restart all the worker processes. 

=cut

sub restart
{
   my $self = shift;

   $self->stop;
   $self->start;
}

=head2 call

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

   return $future;
}

sub _dispatch_pending
{
   my $self = shift;

   while( my $next = shift @{ $self->{pending_queue} } ) {
      my $worker = $self->_get_worker or return;

      next if $next->is_cancelled;

      $self->debug_printf( "UNQUEUE" );
      $next->done( $self, $worker );
      return;
   }

   if( $self->workers_idle > $self->{min_workers} ) {
      $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
   }
}

package # hide from indexer
   IO::Async::Function::Worker;

use base qw( IO::Async::Routine );

use IO::Async::Channel;

sub new
{
   my $class = shift;
   my %params = @_;

   my $arg_channel = IO::Async::Channel->new;
   my $ret_channel = IO::Async::Channel->new;

   my $init = delete $params{init_code};
   my $code = delete $params{code};
   $params{code} = sub {
      $init->() if defined $init;

      while( my $args = $arg_channel->recv ) {
         my @ret;
         my $ok = eval { @ret = $code->( @$args ); 1 };

         if( $ok ) {
            $ret_channel->send( [ r => @ret ] );
         }
         elsif( ref $@ ) {
            # Presume that $@ is an ARRAYref of error results
            $ret_channel->send( [ e => @{ $@ } ] );
         }
         else {
            chomp( my $e = "$@" );
            $ret_channel->send( [ e => $e, error => ] );
         }
      }
   };

   my $worker = $class->SUPER::new(
      %params,
      channels_in  => [ $arg_channel ],
      channels_out => [ $ret_channel ],
   );

   $worker->{arg_channel} = $arg_channel;
   $worker->{ret_channel} = $ret_channel;

   return $worker;
}

sub configure
{
   my $self = shift;
   my %params = @_;

   exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );

   $self->SUPER::configure( %params );
}

sub stop
{
   my $worker = shift;
   $worker->{arg_channel}->close;

   if( my $function = $worker->parent ) {
      delete $function->{workers}{$worker->id};

      if( $worker->{busy} ) {
         $worker->{remove_on_idle}++;
      }
      else {
         $function->remove_child( $worker );
      }
   }
}

sub call
{
   my $worker = shift;
   my ( $args ) = @_;

   $worker->{arg_channel}->send_encoded( $args );

   $worker->{busy} = 1;
   $worker->{max_calls}--;

   return $worker->{ret_channel}->recv->then(
      # on recv
      $worker->_capture_weakself( sub {
         my ( $worker, $result ) = @_;
         my ( $type, @values ) = @$result;

         $worker->stop if !$worker->{max_calls} or
                          $worker->{exit_on_die} && $type eq "e";

         if( $type eq "r" ) {
            return Future->done( @values );
         }
         elsif( $type eq "e" ) {
            return Future->fail( @values );
         }
         else {
            die "Unrecognised type from worker - $type\n";
         }
      } ),
      # on EOF
      $worker->_capture_weakself( sub {
         my ( $worker ) = @_;

         $worker->stop;

         return Future->fail( "closed", "closed" );
      } )
   )->on_ready( $worker->_capture_weakself( sub {
      my ( $worker, $f ) = @_;
      $worker->{busy} = 0;



( run in 1.793 second using v1.01-cache-2.11-cpan-5a3173703d6 )