Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

actual number running at any time will be kept somewhere between these bounds
according to load.

=head2 max_worker_calls => INT

Optional. If provided, stop a worker process after it has processed this
number of calls. (New workers may be started to replace stopped ones, within
the bounds given above).

=head2 idle_timeout => NUM

Optional. If provided, idle worker processes will be shut down after this
amount of time, if there are more than C<min_workers> of them.

=head2 exit_on_die => BOOL

Optional boolean, controls what happens after the C<code> throws an
exception. If missing or false, the worker will continue running to process
more requests. If true, the worker will be shut down. A new worker might be
constructed by the C<call> method to replace it, if necessary.

=head2 setup => ARRAY

Optional array reference. Specifies the C<setup> key to pass to the underlying
L<IO::Async::Process> when setting up new worker processes.

=cut

sub _init
{
   my $self = shift;
   $self->SUPER::_init( @_ );

   $self->{min_workers} = 1;
   $self->{max_workers} = 8;

   $self->{workers} = {}; # {$id} => IaFunction:Worker

   $self->{pending_queue} = [];
}

sub configure
{
   my $self = shift;
   my %params = @_;

   my %worker_params;
   foreach (qw( model exit_on_die max_worker_calls )) {
      $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
   }

   if( keys %worker_params ) {
      foreach my $worker ( $self->_worker_objects ) {
         $worker->configure( %worker_params );
      }
   }

   if( exists $params{idle_timeout} ) {
      my $timeout = delete $params{idle_timeout};
      if( !$timeout ) {
         $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
      }
      elsif( my $idle_timer = $self->{idle_timer} ) {
         $idle_timer->configure( delay => $timeout );
      }
      else {
         $self->{idle_timer} = IO::Async::Timer::Countdown->new(
            delay => $timeout,
            on_expire => $self->_capture_weakself( sub {
               my $self = shift or return;
               my $workers = $self->{workers};

               # Shut down atmost one idle worker, starting from the highest
               # ID. Since we search from lowest to assign work, this tries
               # to ensure we'll shut down the least useful ones first,
               # keeping more useful ones in memory (page/cache warmth, etc..)
               foreach my $id ( reverse sort keys %$workers ) {
                  next if $workers->{$id}{busy};

                  $workers->{$id}->stop;
                  last;
               }

               # Still more?
               $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
            } ),
         );
         $self->add_child( $self->{idle_timer} );
      }
   }

   foreach (qw( min_workers max_workers )) {
      $self->{$_} = delete $params{$_} if exists $params{$_};
      # TODO: something about retuning
   }

   my $need_restart;

   foreach (qw( init_code code setup )) {
      $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
   }

   $self->SUPER::configure( %params );

   if( $need_restart and $self->loop ) {
      $self->stop;
      $self->start;
   }
}

sub _add_to_loop
{
   my $self = shift;
   $self->SUPER::_add_to_loop( @_ );

   $self->start;
}

sub _remove_from_loop
{
   my $self = shift;

   $self->stop;

   $self->SUPER::_remove_from_loop( @_ );
}

=head1 METHODS

The following methods documented with a trailing call to C<< ->get >> return
L<Future> instances.

=cut

=head2 start

   $function->start

Start the worker processes

=cut

sub start
{
   my $self = shift;

   $self->_new_worker for 1 .. $self->{min_workers};
}

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

   $count = $function->workers_idle

Returns the number of worker processes that are currently idle

=cut

sub workers_idle
{
   my $self = shift;
   return scalar grep { !$_->{busy} } $self->_worker_objects;
}

sub _new_worker
{
   my $self = shift;

   my $worker = IO::Async::Function::Worker->new(
      ( map { $_ => $self->{$_} } qw( model init_code code setup exit_on_die ) ),
      max_calls => $self->{max_worker_calls},

      on_finish => $self->_capture_weakself( sub {
         my $self = shift or return;
         my ( $worker ) = @_;

         return if $self->{stopping};

         $self->_new_worker if $self->workers < $self->{min_workers};

         $self->_dispatch_pending;
      } ),
   );

   $self->add_child( $worker );

   return $self->{workers}{$worker->id} = $worker;
}

sub _get_worker
{
   my $self = shift;

   foreach ( sort keys %{ $self->{workers} } ) {
      return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
   }

   if( $self->workers < $self->{max_workers} ) {
      return $self->_new_worker;
   }

   return undef;
}

sub _call_worker
{
   my $self = shift;
   my ( $worker, $type, $args ) = @_;

   my $future = $worker->call( $type, $args );

   if( $self->workers_idle == 0 ) {
      $self->{idle_timer}->stop if $self->{idle_timer};
   }

   return $future;
}

sub _dispatch_pending
{
   my $self = shift;

   while( my $next = shift @{ $self->{pending_queue} } ) {
      my $worker = $self->_get_worker or return;

      next if $next->is_cancelled;

      $self->debug_printf( "UNQUEUE" );
      $next->done( $self, $worker );
      return;
   }

   if( $self->workers_idle > $self->{min_workers} ) {
      $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
   }
}

package # hide from indexer
   IO::Async::Function::Worker;

use base qw( IO::Async::Routine );

use IO::Async::Channel;

sub new
{
   my $class = shift;
   my %params = @_;

   my $arg_channel = IO::Async::Channel->new;
   my $ret_channel = IO::Async::Channel->new;

   my $init = delete $params{init_code};
   my $code = delete $params{code};
   $params{code} = sub {
      $init->() if defined $init;

      while( my $args = $arg_channel->recv ) {
         my @ret;
         my $ok = eval { @ret = $code->( @$args ); 1 };

         if( $ok ) {
            $ret_channel->send( [ r => @ret ] );
         }
         elsif( ref $@ ) {
            # Presume that $@ is an ARRAYref of error results
            $ret_channel->send( [ e => @{ $@ } ] );
         }
         else {
            chomp( my $e = "$@" );
            $ret_channel->send( [ e => $e, error => ] );
         }
      }
   };

   my $worker = $class->SUPER::new(
      %params,
      channels_in  => [ $arg_channel ],
      channels_out => [ $ret_channel ],
   );

   $worker->{arg_channel} = $arg_channel;
   $worker->{ret_channel} = $ret_channel;

   return $worker;
}

sub configure
{
   my $self = shift;
   my %params = @_;

   exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );



( run in 0.660 second using v1.01-cache-2.11-cpan-39bf76dae61 )