IO-Async
view release on metacpan or search on metacpan
lib/IO/Async/Function.pm view on Meta::CPAN
}
elsif( defined $params{on_return} and defined $params{on_error} ) {
my $on_return = delete $params{on_return};
ref $on_return or croak "Expected 'on_return' to be a reference";
my $on_error = delete $params{on_error};
ref $on_error or croak "Expected 'on_error' to be a reference";
$on_done = $on_return;
$on_fail = $on_error;
}
elsif( !defined wantarray ) {
croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future";
}
$self->debug_printf_call( @$args );
my $request = IO::Async::Channel->encode( $args );
my $future;
if( my $worker = $self->_get_worker ) {
$future = $self->_call_worker( $worker, $request );
}
else {
$self->debug_printf( "QUEUE" );
my $queue = $self->{pending_queue};
my $next = Pending(
my $priority = $params{priority} || 0,
my $wait_f = $self->loop->new_future,
);
if( $priority ) {
my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue;
splice @$queue, $idx // $#$queue+1, 0, ( $next );
}
else {
push @$queue, $next;
}
$future = $wait_f->then( sub {
my ( $self, $worker ) = @_;
$self->_call_worker( $worker, $request );
});
}
$future->on_done( $self->_capture_weakself( sub {
my $self = shift or return;
$self->debug_printf_result( @_ );
}));
$future->on_fail( $self->_capture_weakself( sub {
my $self = shift or return;
$self->debug_printf_failure( @_ );
}));
$future->on_done( $on_done ) if $on_done;
$future->on_fail( $on_fail ) if $on_fail;
return $future if defined wantarray;
# Caller is not going to keep hold of the Future, so we have to ensure it
# stays alive somehow
$self->adopt_future( $future->else( sub { Future->done } ) );
}
sub _worker_objects
{
my $self = shift;
return values %{ $self->{workers} };
}
=head2 workers
$count = $function->workers;
Returns the total number of worker processes available
=cut
sub workers
{
my $self = shift;
return scalar keys %{ $self->{workers} };
}
=head2 workers_busy
$count = $function->workers_busy;
Returns the number of worker processes that are currently busy
=cut
sub workers_busy
{
my $self = shift;
return scalar grep { $_->{busy} } $self->_worker_objects;
}
=head2 workers_idle
$count = $function->workers_idle;
Returns the number of worker processes that are currently idle
=cut
sub workers_idle
{
my $self = shift;
return scalar grep { !$_->{busy} } $self->_worker_objects;
}
sub _new_worker
{
my $self = shift;
my $worker = IO::Async::Function::Worker->new(
( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
max_calls => $self->{max_worker_calls},
on_finish => $self->_capture_weakself( sub {
( run in 0.812 second using v1.01-cache-2.11-cpan-39bf76dae61 )