Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

            delay => $timeout,
            on_expire => $self->_capture_weakself( sub {
               my $self = shift or return;
               my $workers = $self->{workers};

               # Shut down atmost one idle worker, starting from the highest
               # ID. Since we search from lowest to assign work, this tries
               # to ensure we'll shut down the least useful ones first,
               # keeping more useful ones in memory (page/cache warmth, etc..)
               foreach my $id ( reverse sort keys %$workers ) {
                  next if $workers->{$id}{busy};

                  $workers->{$id}->stop;
                  last;
               }

               # Still more?
               $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
            } ),
         );
         $self->add_child( $self->{idle_timer} );

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN


   $self->stop;
   $self->start;
}

=head2 call

   @result = $function->call( %params )->get

Schedules an invocation of the contained function to be executed on one of the
worker processes. If a non-busy worker is available now, it will be called
immediately. If not, it will be queued and sent to the next free worker that
becomes available.

The request will already have been serialised by the marshaller, so it will be
safe to modify any referenced data structures in the arguments after this call
returns.

The C<%params> hash takes the following keys:

=over 8

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

Returns the total number of worker processes available

=cut

sub workers
{
   my $self = shift;
   return scalar keys %{ $self->{workers} };
}

=head2 workers_busy

   $count = $function->workers_busy

Returns the number of worker processes that are currently busy

=cut

sub workers_busy
{
   my $self = shift;
   return scalar grep { $_->{busy} } $self->_worker_objects;
}

=head2 workers_idle

   $count = $function->workers_idle

Returns the number of worker processes that are currently idle

=cut

sub workers_idle
{
   my $self = shift;
   return scalar grep { !$_->{busy} } $self->_worker_objects;
}

sub _new_worker
{
   my $self = shift;

   my $worker = IO::Async::Function::Worker->new(
      ( map { $_ => $self->{$_} } qw( model init_code code setup exit_on_die ) ),
      max_calls => $self->{max_worker_calls},

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

   $self->add_child( $worker );

   return $self->{workers}{$worker->id} = $worker;
}

sub _get_worker
{
   my $self = shift;

   foreach ( sort keys %{ $self->{workers} } ) {
      return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
   }

   if( $self->workers < $self->{max_workers} ) {
      return $self->_new_worker;
   }

   return undef;
}

sub _call_worker

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

}

sub stop
{
   my $worker = shift;
   $worker->{arg_channel}->close;

   if( my $function = $worker->parent ) {
      delete $function->{workers}{$worker->id};

      if( $worker->{busy} ) {
         $worker->{remove_on_idle}++;
      }
      else {
         $function->remove_child( $worker );
      }
   }
}

sub call
{
   my $worker = shift;
   my ( $args ) = @_;

   $worker->{arg_channel}->send_encoded( $args );

   $worker->{busy} = 1;
   $worker->{max_calls}--;

   return $worker->{ret_channel}->recv->then(
      # on recv
      $worker->_capture_weakself( sub {
         my ( $worker, $result ) = @_;
         my ( $type, @values ) = @$result;

         $worker->stop if !$worker->{max_calls} or
                          $worker->{exit_on_die} && $type eq "e";

local/lib/perl5/IO/Async/Function.pm  view on Meta::CPAN

      # on EOF
      $worker->_capture_weakself( sub {
         my ( $worker ) = @_;

         $worker->stop;

         return Future->fail( "closed", "closed" );
      } )
   )->on_ready( $worker->_capture_weakself( sub {
      my ( $worker, $f ) = @_;
      $worker->{busy} = 0;

      my $function = $worker->parent;
      $function->_dispatch_pending if $function;

      $function->remove_child( $worker ) if $function and $worker->{remove_on_idle};
   }));
}

=head1 EXAMPLES

local/lib/perl5/IO/Async/LoopTests.pm  view on Meta::CPAN

{
   my ( $code, $lower, $upper, $name ) = @_;

   my $start = time;
   $code->();
   my $took = ( time - $start ) / AUT;

   cmp_ok( $took, '>=', $lower, "$name took at least $lower seconds" ) if defined $lower;
   cmp_ok( $took, '<=', $upper * 3, "$name took no more than $upper seconds" ) if defined $upper;
   if( $took > $upper and $took <= $upper * 3 ) {
      diag( "$name took longer than $upper seconds - this may just be an indication of a busy testing machine rather than a bug" );
   }
}

=head1 TEST SUITES

The following test suite names exist, to be passed as a name in the C<@tests>
argument to C<run_tests>:

=cut



( run in 0.328 second using v1.01-cache-2.11-cpan-87723dcf8b7 )