Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

   if( defined $params{on_result} ) {
      my $on_result = delete $params{on_result};
      ref $on_result or croak "Expected 'on_result' to be a reference";

      $on_done = $self->_capture_weakself( sub {
         my $self = shift or return;
         $self->debug_printf( "CONT on_result return" );
         $on_result->( return => @_ );
      } );
      $on_fail = $self->_capture_weakself( sub {
         my $self = shift or return;
         my ( $err, @values ) = @_;
         $self->debug_printf( "CONT on_result error" );
         $on_result->( error => @values );
      } );
   }
   elsif( defined $params{on_return} and defined $params{on_error} ) {
      my $on_return = delete $params{on_return};
      ref $on_return or croak "Expected 'on_return' to be a reference";
      my $on_error  = delete $params{on_error};
      ref $on_error or croak "Expected 'on_error' to be a reference";

      $on_done = $self->_capture_weakself( sub {
         my $self = shift or return;
         $self->debug_printf( "CONT on_return" );
         $on_return->( @_ );
      } );
      $on_fail = $self->_capture_weakself( sub {
         my $self = shift or return;
         $self->debug_printf( "CONT on_error" );
         $on_error->( @_ );
      } );
   }
   elsif( !defined wantarray ) {
      croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future";
   }

   my $request = IO::Async::Channel->encode( $args );

   my $future;
   if( my $worker = $self->_get_worker ) {
      $self->debug_printf( "CALL" );
      $future = $self->_call_worker( $worker, $request );
   }
   else {
      $self->debug_printf( "QUEUE" );
      push @{ $self->{pending_queue} }, my $wait_f = $self->loop->new_future;

      $future = $wait_f->then( sub {
         my ( $self, $worker ) = @_;
         $self->_call_worker( $worker, $request );
      });
   }

   $future->on_done( $on_done ) if $on_done;
   $future->on_fail( $on_fail ) if $on_fail;

   return $future if defined wantarray;

   # Caller is not going to keep hold of the Future, so we have to ensure it
   # stays alive somehow
   $self->adopt_future( $future->else( sub { Future->done } ) );
}

sub _worker_objects
{
   my $self = shift;
   return values %{ $self->{workers} };
}

=head2 workers

   $count = $function->workers

Returns the total number of worker processes available

=cut

sub workers
{
   my $self = shift;
   return scalar keys %{ $self->{workers} };
}

=head2 workers_busy

   $count = $function->workers_busy

Returns the number of worker processes that are currently busy

=cut

sub workers_busy
{
   my $self = shift;
   return scalar grep { $_->{busy} } $self->_worker_objects;
}

=head2 workers_idle

   $count = $function->workers_idle

Returns the number of worker processes that are currently idle

=cut

sub workers_idle
{
   my $self = shift;
   return scalar grep { !$_->{busy} } $self->_worker_objects;
}

sub _new_worker
{
   my $self = shift;

   my $worker = IO::Async::Function::Worker->new(
      ( map { $_ => $self->{$_} } qw( model init_code code setup exit_on_die ) ),
      max_calls => $self->{max_worker_calls},

      on_finish => $self->_capture_weakself( sub {



( run in 0.734 second using v1.01-cache-2.11-cpan-df04353d9ac )