Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

   $function->stop

Stop the worker processes

=cut

sub stop
{
   my $self = shift;

   $self->{stopping} = 1;
   foreach my $worker ( $self->_worker_objects ) {
      $worker->stop;
   }
}

=head2 restar

   $function->restart

Gracefully stop and restart all the worker processes. 

=cut

sub restart
{
   my $self = shift;

   $self->stop;
   $self->start;
}

=head2 call

   @result = $function->call( %params )->get

Schedules an invocation of the contained function to be executed on one of the
worker processes. If a non-busy worker is available now, it will be called
immediately. If not, it will be queued and sent to the next free worker that
becomes available.

The request will already have been serialised by the marshaller, so it will be
safe to modify any referenced data structures in the arguments after this call
returns.

The C<%params> hash takes the following keys:

=over 8

=item args => ARRAY

A reference to the array of arguments to pass to the code.

=back

If the function body returns normally the list of results are provided as the
(successful) result of returned future. If the function throws an exception
this results in a failed future. In the special case that the exception is in
fact an unblessed C<ARRAY> reference, this array is unpacked and used as-is
for the C<fail> result. If the exception is not such a reference, it is used
as the first argument to C<fail>, in the category of C<error>.

   $f->done( @result )

   $f->fail( @{ $exception } )
   $f->fail( $exception, error => )

=head2 call (void)

   $function->call( %params )

When not returning a future, the C<on_result>, C<on_return> and C<on_error>
arguments give continuations to handle successful results or failure.

=over 8

=item on_result => CODE

A continuation that is invoked when the code has been executed. If the code
returned normally, it is called as:

 $on_result->( 'return', @values )

If the code threw an exception, or some other error occured such as a closed
connection or the process died, it is called as:

 $on_result->( 'error', $exception_name )

=item on_return => CODE and on_error => CODE

An alternative to C<on_result>. Two continuations to use in either of the
circumstances given above. They will be called directly, without the leading
'return' or 'error' value.

=back

=cut

sub call
{
   my $self = shift;
   my %params = @_;

   # TODO: possibly just queue this?
   $self->loop or croak "Cannot ->call on a Function not yet in a Loop";

   my $args = delete $params{args};
   ref $args eq "ARRAY" or croak "Expected 'args' to be an array";

   my ( $on_done, $on_fail );
   if( defined $params{on_result} ) {
      my $on_result = delete $params{on_result};
      ref $on_result or croak "Expected 'on_result' to be a reference";

      $on_done = $self->_capture_weakself( sub {
         my $self = shift or return;
         $self->debug_printf( "CONT on_result return" );
         $on_result->( return => @_ );
      } );
      $on_fail = $self->_capture_weakself( sub {
         my $self = shift or return;
         my ( $err, @values ) = @_;
         $self->debug_printf( "CONT on_result error" );
         $on_result->( error => @values );
      } );
   }
   elsif( defined $params{on_return} and defined $params{on_error} ) {
      my $on_return = delete $params{on_return};
      ref $on_return or croak "Expected 'on_return' to be a reference";
      my $on_error  = delete $params{on_error};
      ref $on_error or croak "Expected 'on_error' to be a reference";

      $on_done = $self->_capture_weakself( sub {
         my $self = shift or return;
         $self->debug_printf( "CONT on_return" );
         $on_return->( @_ );
      } );
      $on_fail = $self->_capture_weakself( sub {
         my $self = shift or return;
         $self->debug_printf( "CONT on_error" );
         $on_error->( @_ );
      } );
   }
   elsif( !defined wantarray ) {
      croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future";
   }

   my $request = IO::Async::Channel->encode( $args );

   my $future;
   if( my $worker = $self->_get_worker ) {
      $self->debug_printf( "CALL" );
      $future = $self->_call_worker( $worker, $request );
   }
   else {
      $self->debug_printf( "QUEUE" );
      push @{ $self->{pending_queue} }, my $wait_f = $self->loop->new_future;

      $future = $wait_f->then( sub {
         my ( $self, $worker ) = @_;
         $self->_call_worker( $worker, $request );
      });
   }

   $future->on_done( $on_done ) if $on_done;
   $future->on_fail( $on_fail ) if $on_fail;

   return $future if defined wantarray;

   # Caller is not going to keep hold of the Future, so we have to ensure it
   # stays alive somehow
   $self->adopt_future( $future->else( sub { Future->done } ) );
}

sub _worker_objects
{
   my $self = shift;
   return values %{ $self->{workers} };
}

=head2 workers

   $count = $function->workers

Returns the total number of worker processes available

=cut

sub workers
{
   my $self = shift;
   return scalar keys %{ $self->{workers} };
}

=head2 workers_busy

   $count = $function->workers_busy

Returns the number of worker processes that are currently busy

=cut

sub workers_busy
{
   my $self = shift;

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

{
   my $self = shift;
   my ( $worker, $type, $args ) = @_;

   my $future = $worker->call( $type, $args );

   if( $self->workers_idle == 0 ) {
      $self->{idle_timer}->stop if $self->{idle_timer};
   }

   return $future;
}

sub _dispatch_pending
{
   my $self = shift;

   while( my $next = shift @{ $self->{pending_queue} } ) {
      my $worker = $self->_get_worker or return;

      next if $next->is_cancelled;

      $self->debug_printf( "UNQUEUE" );
      $next->done( $self, $worker );
      return;
   }

   if( $self->workers_idle > $self->{min_workers} ) {
      $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
   }
}

package # hide from indexer
   IO::Async::Function::Worker;

use base qw( IO::Async::Routine );

use IO::Async::Channel;

sub new
{
   my $class = shift;
   my %params = @_;

   my $arg_channel = IO::Async::Channel->new;
   my $ret_channel = IO::Async::Channel->new;

   my $init = delete $params{init_code};
   my $code = delete $params{code};
   $params{code} = sub {
      $init->() if defined $init;

      while( my $args = $arg_channel->recv ) {
         my @ret;
         my $ok = eval { @ret = $code->( @$args ); 1 };

         if( $ok ) {
            $ret_channel->send( [ r => @ret ] );
         }
         elsif( ref $@ ) {
            # Presume that $@ is an ARRAYref of error results
            $ret_channel->send( [ e => @{ $@ } ] );
         }
         else {
            chomp( my $e = "$@" );
            $ret_channel->send( [ e => $e, error => ] );
         }
      }
   };

   my $worker = $class->SUPER::new(
      %params,
      channels_in  => [ $arg_channel ],
      channels_out => [ $ret_channel ],
   );

   $worker->{arg_channel} = $arg_channel;
   $worker->{ret_channel} = $ret_channel;

   return $worker;
}

sub configure
{
   my $self = shift;
   my %params = @_;

   exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );

   $self->SUPER::configure( %params );
}

sub stop
{
   my $worker = shift;
   $worker->{arg_channel}->close;

   if( my $function = $worker->parent ) {
      delete $function->{workers}{$worker->id};

      if( $worker->{busy} ) {
         $worker->{remove_on_idle}++;
      }
      else {
         $function->remove_child( $worker );
      }
   }
}

sub call
{
   my $worker = shift;
   my ( $args ) = @_;

   $worker->{arg_channel}->send_encoded( $args );

   $worker->{busy} = 1;
   $worker->{max_calls}--;

   return $worker->{ret_channel}->recv->then(
      # on recv
      $worker->_capture_weakself( sub {
         my ( $worker, $result ) = @_;
         my ( $type, @values ) = @$result;

         $worker->stop if !$worker->{max_calls} or



( run in 0.609 second using v1.01-cache-2.11-cpan-140bd7fdf52 )