IO-Async

 view release on metacpan or  search on metacpan

lib/IO/Async/Function.pm  view on Meta::CPAN

exception. If missing or false, the worker will continue running to process
more requests. If true, the worker will be shut down. A new worker might be
constructed by the C<call> method to replace it, if necessary.

=head2 setup => ARRAY

Optional array reference. Specifies the C<setup> key to pass to the underlying
L<IO::Async::Process> when setting up new worker processes.

=cut

sub _init
{
   my $self = shift;
   $self->SUPER::_init( @_ );

   $self->{min_workers} = 1;
   $self->{max_workers} = 8;

   $self->{workers} = {}; # {$id} => IaFunction:Worker

   $self->{pending_queue} = [];
}

sub configure
{
   my $self = shift;
   my %params = @_;

   my %worker_params;
   foreach (qw( model exit_on_die max_worker_calls )) {
      $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
   }

   if( keys %worker_params ) {
      foreach my $worker ( $self->_worker_objects ) {
         $worker->configure( %worker_params );
      }
   }

   if( exists $params{idle_timeout} ) {
      my $timeout = delete $params{idle_timeout};
      if( !$timeout ) {
         $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
      }
      elsif( my $idle_timer = $self->{idle_timer} ) {
         $idle_timer->configure( delay => $timeout );
      }
      else {
         $self->{idle_timer} = IO::Async::Timer::Countdown->new(
            delay => $timeout,
            on_expire => $self->_capture_weakself( sub {
               my $self = shift or return;
               my $workers = $self->{workers};

               # Shut down atmost one idle worker, starting from the highest
               # ID. Since we search from lowest to assign work, this tries
               # to ensure we'll shut down the least useful ones first,
               # keeping more useful ones in memory (page/cache warmth, etc..)
               foreach my $id ( reverse sort keys %$workers ) {
                  next if $workers->{$id}{busy};

                  $workers->{$id}->stop;
                  last;
               }

               # Still more?
               $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
            } ),
         );
         $self->add_child( $self->{idle_timer} );
      }
   }

   foreach (qw( min_workers max_workers )) {
      $self->{$_} = delete $params{$_} if exists $params{$_};
      # TODO: something about retuning
   }

   my $need_restart;

   foreach (qw( init_code code module init_func func setup )) {
      $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
   }

   defined $self->{code} and defined $self->{func} and
      croak "Cannot ->configure both 'code' and 'func'";
   defined $self->{func} and !defined $self->{module} and
      croak "'func' parameter requires a 'module' as well";

   $self->SUPER::configure( %params );

   if( $need_restart and $self->loop ) {
      $self->stop;
      $self->start;
   }
}

sub _add_to_loop
{
   my $self = shift;
   $self->SUPER::_add_to_loop( @_ );

   $self->start;
}

sub _remove_from_loop
{
   my $self = shift;

   $self->stop;

   $self->SUPER::_remove_from_loop( @_ );
}

=head1 METHODS

The following methods documented in C<await> expressions return L<Future>
instances.

=cut

lib/IO/Async/Function.pm  view on Meta::CPAN

sub start
{
   my $self = shift;

   $self->_new_worker for 1 .. $self->{min_workers};
}

=head2 stop

   $function->stop;

Stop the worker processes

   $f = $function->stop;

I<Since version 0.75.>

If called in non-void context, returns a L<IO::Async::Future> instance that
will complete once every worker process has stopped and exited. This may be
useful for waiting until all of the processes are waited on, or other
edge-cases, but is not otherwise particularly useful.

=cut

sub stop
{
   my $self = shift;

   $self->{stopping} = 1;

   my @f;

   foreach my $worker ( $self->_worker_objects ) {
      defined wantarray ? push @f, $worker->stop : $worker->stop;
   }

   return Future->needs_all( @f ) if defined wantarray;
}

=head2 restart

   $function->restart;

Gracefully stop and restart all the worker processes. 

=cut

sub restart
{
   my $self = shift;

   $self->stop;
   $self->start;
}

=head2 call

   @result = await $function->call( %params );

Schedules an invocation of the contained function to be executed on one of the
worker processes. If a non-busy worker is available now, it will be called
immediately. If not, it will be queued and sent to the next free worker that
becomes available.

The request will already have been serialised by the marshaller, so it will be
safe to modify any referenced data structures in the arguments after this call
returns.

The C<%params> hash takes the following keys:

=over 8

=item args => ARRAY

A reference to the array of arguments to pass to the code.

=item priority => NUM

Optional. Defines the sorting order when no workers are available and calls
must be queued for later. A default of zero will apply if not provided.

Higher values cause the call to be considered more important, and will be
placed earlier in the queue than calls with a smaller value. Calls of equal
priority are still handled in FIFO order.

=back

If the function body returns normally the list of results are provided as the
(successful) result of returned future. If the function throws an exception
this results in a failed future. In the special case that the exception is in
fact an unblessed C<ARRAY> reference, this array is unpacked and used as-is
for the C<fail> result. If the exception is not such a reference, it is used
as the first argument to C<fail>, in the category of C<error>.

   $f->done( @result );

   $f->fail( @{ $exception } );
   $f->fail( $exception, error => );

=head2 call (void)

   $function->call( %params );

When not returning a future, the C<on_result>, C<on_return> and C<on_error>
arguments give continuations to handle successful results or failure.

=over 8

=item on_result => CODE

A continuation that is invoked when the code has been executed. If the code
returned normally, it is called as:

   $on_result->( 'return', @values )

If the code threw an exception, or some other error occurred such as a closed
connection or the process died, it is called as:

   $on_result->( 'error', $exception_name )

=item on_return => CODE and on_error => CODE

lib/IO/Async/Function.pm  view on Meta::CPAN

      my $queue = $self->{pending_queue};

      my $next = Pending(
         my $priority = $params{priority} || 0,
         my $wait_f = $self->loop->new_future,
      );

      if( $priority ) {
         my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue;
         splice @$queue, $idx // $#$queue+1, 0, ( $next );
      }
      else {
         push @$queue, $next;
      }

      $future = $wait_f->then( sub {
         my ( $self, $worker ) = @_;
         $self->_call_worker( $worker, $request );
      });
   }

   $future->on_done( $self->_capture_weakself( sub {
      my $self = shift or return;
      $self->debug_printf_result( @_ );
   }));
   $future->on_fail( $self->_capture_weakself( sub {
      my $self = shift or return;
      $self->debug_printf_failure( @_ );
   }));

   $future->on_done( $on_done ) if $on_done;
   $future->on_fail( $on_fail ) if $on_fail;

   return $future if defined wantarray;

   # Caller is not going to keep hold of the Future, so we have to ensure it
   # stays alive somehow
   $self->adopt_future( $future->else( sub { Future->done } ) );
}

sub _worker_objects
{
   my $self = shift;
   return values %{ $self->{workers} };
}

=head2 workers

   $count = $function->workers;

Returns the total number of worker processes available

=cut

sub workers
{
   my $self = shift;
   return scalar keys %{ $self->{workers} };
}

=head2 workers_busy

   $count = $function->workers_busy;

Returns the number of worker processes that are currently busy

=cut

sub workers_busy
{
   my $self = shift;
   return scalar grep { $_->{busy} } $self->_worker_objects;
}

=head2 workers_idle

   $count = $function->workers_idle;

Returns the number of worker processes that are currently idle

=cut

sub workers_idle
{
   my $self = shift;
   return scalar grep { !$_->{busy} } $self->_worker_objects;
}

sub _new_worker
{
   my $self = shift;

   my $worker = IO::Async::Function::Worker->new(
      ( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
      max_calls => $self->{max_worker_calls},

      on_finish => $self->_capture_weakself( sub {
         my $self = shift or return;
         my ( $worker ) = @_;

         return if $self->{stopping};

         $self->_new_worker if $self->workers < $self->{min_workers};

         $self->_dispatch_pending;
      } ),
   );

   $self->add_child( $worker );

   return $self->{workers}{$worker->id} = $worker;
}

sub _get_worker
{
   my $self = shift;

   foreach ( sort keys %{ $self->{workers} } ) {
      return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
   }

   if( $self->workers < $self->{max_workers} ) {
      return $self->_new_worker;
   }

   return undef;
}

sub _call_worker
{
   my $self = shift;
   my ( $worker, $type, $args ) = @_;

   my $future = $worker->call( $type, $args );

   if( $self->workers_idle == 0 ) {
      $self->{idle_timer}->stop if $self->{idle_timer};
   }

   return $future;
}

sub _dispatch_pending
{
   my $self = shift;

   while( my $next = shift @{ $self->{pending_queue} } ) {
      my $worker = $self->_get_worker or return;

      my $f = $next->f;

      next if $f->is_cancelled;

      $self->debug_printf( "UNQUEUE" );
      $f->done( $self, $worker );
      return;
   }

   if( $self->workers_idle > $self->{min_workers} ) {
      $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
   }
}

package # hide from indexer
   IO::Async::Function::Worker;

use base qw( IO::Async::Routine );

use Carp;

use IO::Async::Channel;

use IO::Async::Internals::FunctionWorker;

sub new
{
   my $class = shift;
   my %params = @_;

lib/IO/Async/Function.pm  view on Meta::CPAN

      };
   }
   elsif( defined( my $func = $params{func} ) ) {
      my $module    = $params{module};
      my $init_func = $params{init_func};
      my @init_args;

      $params{module} = "IO::Async::Internals::FunctionWorker";
      $params{func}   = "run_worker";

      ( $init_func, @init_args ) = @$init_func if ref( $init_func ) eq "ARRAY";

      $send_initial = [ $module, $func, $init_func, @init_args ];
   }

   delete @params{qw( init_code init_func )};

   my $worker = $class->SUPER::new(
      %params,
      channels_in  => [ $arg_channel ],
      channels_out => [ $ret_channel ],
   );

   $worker->{arg_channel} = $arg_channel;
   $worker->{ret_channel} = $ret_channel;

   $worker->{send_initial} = $send_initial if $send_initial;

   return $worker;
}

sub _add_to_loop
{
   my $self = shift;
   $self->SUPER::_add_to_loop( @_ );

   $self->{arg_channel}->send( delete $self->{send_initial} ) if $self->{send_initial};
}

sub configure
{
   my $self = shift;
   my %params = @_;

   exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );

   $self->SUPER::configure( %params );
}

sub stop
{
   my $worker = shift;
   $worker->{arg_channel}->close;

   my $ret;
   $ret = $worker->result_future if defined wantarray;

   if( my $function = $worker->parent ) {
      delete $function->{workers}{$worker->id};

      if( $worker->{busy} ) {
         $worker->{remove_on_idle}++;
      }
      else {
         $function->remove_child( $worker );
      }
   }

   return $ret;
}

sub call
{
   my $worker = shift;
   my ( $args ) = @_;

   $worker->{arg_channel}->send_encoded( $args );

   $worker->{busy} = 1;
   $worker->{max_calls}--;

   return $worker->{ret_channel}->recv->then(
      # on recv
      $worker->_capture_weakself( sub {
         my ( $worker, $result ) = @_;
         my ( $type, @values ) = @$result;

         $worker->stop if !$worker->{max_calls} or
                          $worker->{exit_on_die} && $type eq "e";

         if( $type eq "r" ) {
            return Future->done( @values );
         }
         elsif( $type eq "e" ) {
            return Future->fail( @values );
         }
         else {
            die "Unrecognised type from worker - $type\n";
         }
      } ),
      # on EOF
      $worker->_capture_weakself( sub {
         my ( $worker ) = @_;

         $worker->stop;

         return Future->fail( "closed", "closed" );
      } )
   )->on_ready( $worker->_capture_weakself( sub {
      my ( $worker, $f ) = @_;
      $worker->{busy} = 0;

      my $function = $worker->parent;
      $function->_dispatch_pending if $function;

      $function->remove_child( $worker ) if $function and $worker->{remove_on_idle};
   }));
}

=head1 EXAMPLES

=head2 Extended Error Information on Failure

The array-unpacking form of exception indiciation allows the function body to
more precicely control the resulting failure from the C<call> future.

   my $divider = IO::Async::Function->new(
      code => sub {
         my ( $numerator, $divisor ) = @_;
         $divisor == 0 and
            die [ "Cannot divide by zero", div_zero => $numerator, $divisor ];

         return $numerator / $divisor;
      }
   );

=head1 NOTES

For the record, 123454321 is 11111 * 11111, a square number, and therefore not
prime.

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;



( run in 0.880 second using v1.01-cache-2.11-cpan-39bf76dae61 )