IO-Async

 view release on metacpan or  search on metacpan

lib/IO/Async/Function.pm  view on Meta::CPAN

   }
   elsif( defined $params{on_return} and defined $params{on_error} ) {
      my $on_return = delete $params{on_return};
      ref $on_return or croak "Expected 'on_return' to be a reference";
      my $on_error  = delete $params{on_error};
      ref $on_error or croak "Expected 'on_error' to be a reference";

      $on_done = $on_return;
      $on_fail = $on_error;
   }
   elsif( !defined wantarray ) {
      croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future";
   }

   $self->debug_printf_call( @$args );

   my $request = IO::Async::Channel->encode( $args );

   my $future;
   if( my $worker = $self->_get_worker ) {
      $future = $self->_call_worker( $worker, $request );
   }
   else {
      $self->debug_printf( "QUEUE" );
      my $queue = $self->{pending_queue};

      my $next = Pending(
         my $priority = $params{priority} || 0,
         my $wait_f = $self->loop->new_future,
      );

      if( $priority ) {
         my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue;
         splice @$queue, $idx // $#$queue+1, 0, ( $next );
      }
      else {
         push @$queue, $next;
      }

      $future = $wait_f->then( sub {
         my ( $self, $worker ) = @_;
         $self->_call_worker( $worker, $request );
      });
   }

   $future->on_done( $self->_capture_weakself( sub {
      my $self = shift or return;
      $self->debug_printf_result( @_ );
   }));
   $future->on_fail( $self->_capture_weakself( sub {
      my $self = shift or return;
      $self->debug_printf_failure( @_ );
   }));

   $future->on_done( $on_done ) if $on_done;
   $future->on_fail( $on_fail ) if $on_fail;

   return $future if defined wantarray;

   # Caller is not going to keep hold of the Future, so we have to ensure it
   # stays alive somehow
   $self->adopt_future( $future->else( sub { Future->done } ) );
}

sub _worker_objects
{
   my $self = shift;
   return values %{ $self->{workers} };
}

=head2 workers

   $count = $function->workers;

Returns the total number of worker processes available

=cut

sub workers
{
   my $self = shift;
   return scalar keys %{ $self->{workers} };
}

=head2 workers_busy

   $count = $function->workers_busy;

Returns the number of worker processes that are currently busy

=cut

sub workers_busy
{
   my $self = shift;
   return scalar grep { $_->{busy} } $self->_worker_objects;
}

=head2 workers_idle

   $count = $function->workers_idle;

Returns the number of worker processes that are currently idle

=cut

sub workers_idle
{
   my $self = shift;
   return scalar grep { !$_->{busy} } $self->_worker_objects;
}

sub _new_worker
{
   my $self = shift;

   my $worker = IO::Async::Function::Worker->new(
      ( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
      max_calls => $self->{max_worker_calls},

      on_finish => $self->_capture_weakself( sub {



( run in 0.812 second using v1.01-cache-2.11-cpan-39bf76dae61 )