Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
return $future if defined wantarray;
# Caller is not going to keep hold of the Future, so we have to ensure it
# stays alive somehow
$self->adopt_future( $future->else( sub { Future->done } ) );
}
sub _worker_objects
{
my $self = shift;
return values %{ $self->{workers} };
}
=head2 workers
$count = $function->workers
Returns the total number of worker processes available
=cut
sub workers
{
my $self = shift;
return scalar keys %{ $self->{workers} };
}
=head2 workers_busy
$count = $function->workers_busy
Returns the number of worker processes that are currently busy
=cut
sub workers_busy
{
my $self = shift;
return scalar grep { $_->{busy} } $self->_worker_objects;
}
=head2 workers_idle
$count = $function->workers_idle
Returns the number of worker processes that are currently idle
=cut
sub workers_idle
{
my $self = shift;
return scalar grep { !$_->{busy} } $self->_worker_objects;
}
sub _new_worker
{
my $self = shift;
my $worker = IO::Async::Function::Worker->new(
( map { $_ => $self->{$_} } qw( model init_code code setup exit_on_die ) ),
max_calls => $self->{max_worker_calls},
on_finish => $self->_capture_weakself( sub {
my $self = shift or return;
my ( $worker ) = @_;
return if $self->{stopping};
$self->_new_worker if $self->workers < $self->{min_workers};
$self->_dispatch_pending;
} ),
);
$self->add_child( $worker );
return $self->{workers}{$worker->id} = $worker;
}
sub _get_worker
{
my $self = shift;
foreach ( sort keys %{ $self->{workers} } ) {
return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
}
if( $self->workers < $self->{max_workers} ) {
return $self->_new_worker;
}
return undef;
}
sub _call_worker
{
my $self = shift;
my ( $worker, $type, $args ) = @_;
my $future = $worker->call( $type, $args );
if( $self->workers_idle == 0 ) {
$self->{idle_timer}->stop if $self->{idle_timer};
}
return $future;
}
sub _dispatch_pending
{
my $self = shift;
while( my $next = shift @{ $self->{pending_queue} } ) {
my $worker = $self->_get_worker or return;
next if $next->is_cancelled;
$self->debug_printf( "UNQUEUE" );
$next->done( $self, $worker );
return;
( run in 1.030 second using v1.01-cache-2.11-cpan-39bf76dae61 )