Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
if( defined $params{on_result} ) {
my $on_result = delete $params{on_result};
ref $on_result or croak "Expected 'on_result' to be a reference";
$on_done = $self->_capture_weakself( sub {
my $self = shift or return;
$self->debug_printf( "CONT on_result return" );
$on_result->( return => @_ );
} );
$on_fail = $self->_capture_weakself( sub {
my $self = shift or return;
my ( $err, @values ) = @_;
$self->debug_printf( "CONT on_result error" );
$on_result->( error => @values );
} );
}
elsif( defined $params{on_return} and defined $params{on_error} ) {
my $on_return = delete $params{on_return};
ref $on_return or croak "Expected 'on_return' to be a reference";
my $on_error = delete $params{on_error};
ref $on_error or croak "Expected 'on_error' to be a reference";
$on_done = $self->_capture_weakself( sub {
my $self = shift or return;
$self->debug_printf( "CONT on_return" );
$on_return->( @_ );
} );
$on_fail = $self->_capture_weakself( sub {
my $self = shift or return;
$self->debug_printf( "CONT on_error" );
$on_error->( @_ );
} );
}
elsif( !defined wantarray ) {
croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future";
}
my $request = IO::Async::Channel->encode( $args );
my $future;
if( my $worker = $self->_get_worker ) {
$self->debug_printf( "CALL" );
$future = $self->_call_worker( $worker, $request );
}
else {
$self->debug_printf( "QUEUE" );
push @{ $self->{pending_queue} }, my $wait_f = $self->loop->new_future;
$future = $wait_f->then( sub {
my ( $self, $worker ) = @_;
$self->_call_worker( $worker, $request );
});
}
$future->on_done( $on_done ) if $on_done;
$future->on_fail( $on_fail ) if $on_fail;
return $future if defined wantarray;
# Caller is not going to keep hold of the Future, so we have to ensure it
# stays alive somehow
$self->adopt_future( $future->else( sub { Future->done } ) );
}
sub _worker_objects
{
my $self = shift;
return values %{ $self->{workers} };
}
=head2 workers
$count = $function->workers
Returns the total number of worker processes available
=cut
sub workers
{
my $self = shift;
return scalar keys %{ $self->{workers} };
}
=head2 workers_busy
$count = $function->workers_busy
Returns the number of worker processes that are currently busy
=cut
sub workers_busy
{
my $self = shift;
return scalar grep { $_->{busy} } $self->_worker_objects;
}
=head2 workers_idle
$count = $function->workers_idle
Returns the number of worker processes that are currently idle
=cut
sub workers_idle
{
my $self = shift;
return scalar grep { !$_->{busy} } $self->_worker_objects;
}
sub _new_worker
{
my $self = shift;
my $worker = IO::Async::Function::Worker->new(
( map { $_ => $self->{$_} } qw( model init_code code setup exit_on_die ) ),
max_calls => $self->{max_worker_calls},
on_finish => $self->_capture_weakself( sub {
( run in 0.734 second using v1.01-cache-2.11-cpan-df04353d9ac )