Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

   return $future if defined wantarray;

   # Caller is not going to keep hold of the Future, so we have to ensure it
   # stays alive somehow
   $self->adopt_future( $future->else( sub { Future->done } ) );
}

sub _worker_objects
{
   my $self = shift;
   return values %{ $self->{workers} };
}

=head2 workers

   $count = $function->workers

Returns the total number of worker processes available

=cut

sub workers
{
   my $self = shift;
   return scalar keys %{ $self->{workers} };
}

=head2 workers_busy

   $count = $function->workers_busy

Returns the number of worker processes that are currently busy

=cut

sub workers_busy
{
   my $self = shift;
   return scalar grep { $_->{busy} } $self->_worker_objects;
}

=head2 workers_idle

   $count = $function->workers_idle

Returns the number of worker processes that are currently idle

=cut

sub workers_idle
{
   my $self = shift;
   return scalar grep { !$_->{busy} } $self->_worker_objects;
}

sub _new_worker
{
   my $self = shift;

   my $worker = IO::Async::Function::Worker->new(
      ( map { $_ => $self->{$_} } qw( model init_code code setup exit_on_die ) ),
      max_calls => $self->{max_worker_calls},

      on_finish => $self->_capture_weakself( sub {
         my $self = shift or return;
         my ( $worker ) = @_;

         return if $self->{stopping};

         $self->_new_worker if $self->workers < $self->{min_workers};

         $self->_dispatch_pending;
      } ),
   );

   $self->add_child( $worker );

   return $self->{workers}{$worker->id} = $worker;
}

sub _get_worker
{
   my $self = shift;

   foreach ( sort keys %{ $self->{workers} } ) {
      return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
   }

   if( $self->workers < $self->{max_workers} ) {
      return $self->_new_worker;
   }

   return undef;
}

sub _call_worker
{
   my $self = shift;
   my ( $worker, $type, $args ) = @_;

   my $future = $worker->call( $type, $args );

   if( $self->workers_idle == 0 ) {
      $self->{idle_timer}->stop if $self->{idle_timer};
   }

   return $future;
}

sub _dispatch_pending
{
   my $self = shift;

   while( my $next = shift @{ $self->{pending_queue} } ) {
      my $worker = $self->_get_worker or return;

      next if $next->is_cancelled;

      $self->debug_printf( "UNQUEUE" );
      $next->done( $self, $worker );
      return;



( run in 1.030 second using v1.01-cache-2.11-cpan-39bf76dae61 )