IO-Async
view release on metacpan or search on metacpan
lib/IO/Async/Function.pm view on Meta::CPAN
exception. If missing or false, the worker will continue running to process
more requests. If true, the worker will be shut down. A new worker might be
constructed by the C<call> method to replace it, if necessary.
=head2 setup => ARRAY
Optional array reference. Specifies the C<setup> key to pass to the underlying
L<IO::Async::Process> when setting up new worker processes.
=cut
sub _init
{
my $self = shift;
$self->SUPER::_init( @_ );
$self->{min_workers} = 1;
$self->{max_workers} = 8;
$self->{workers} = {}; # {$id} => IaFunction:Worker
$self->{pending_queue} = [];
}
sub configure
{
my $self = shift;
my %params = @_;
my %worker_params;
foreach (qw( model exit_on_die max_worker_calls )) {
$self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
}
if( keys %worker_params ) {
foreach my $worker ( $self->_worker_objects ) {
$worker->configure( %worker_params );
}
}
if( exists $params{idle_timeout} ) {
my $timeout = delete $params{idle_timeout};
if( !$timeout ) {
$self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
}
elsif( my $idle_timer = $self->{idle_timer} ) {
$idle_timer->configure( delay => $timeout );
}
else {
$self->{idle_timer} = IO::Async::Timer::Countdown->new(
delay => $timeout,
on_expire => $self->_capture_weakself( sub {
my $self = shift or return;
my $workers = $self->{workers};
# Shut down atmost one idle worker, starting from the highest
# ID. Since we search from lowest to assign work, this tries
# to ensure we'll shut down the least useful ones first,
# keeping more useful ones in memory (page/cache warmth, etc..)
foreach my $id ( reverse sort keys %$workers ) {
next if $workers->{$id}{busy};
$workers->{$id}->stop;
last;
}
# Still more?
$self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
} ),
);
$self->add_child( $self->{idle_timer} );
}
}
foreach (qw( min_workers max_workers )) {
$self->{$_} = delete $params{$_} if exists $params{$_};
# TODO: something about retuning
}
my $need_restart;
foreach (qw( init_code code module init_func func setup )) {
$need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
}
defined $self->{code} and defined $self->{func} and
croak "Cannot ->configure both 'code' and 'func'";
defined $self->{func} and !defined $self->{module} and
croak "'func' parameter requires a 'module' as well";
$self->SUPER::configure( %params );
if( $need_restart and $self->loop ) {
$self->stop;
$self->start;
}
}
sub _add_to_loop
{
my $self = shift;
$self->SUPER::_add_to_loop( @_ );
$self->start;
}
sub _remove_from_loop
{
my $self = shift;
$self->stop;
$self->SUPER::_remove_from_loop( @_ );
}
=head1 METHODS
The following methods documented in C<await> expressions return L<Future>
instances.
=cut
lib/IO/Async/Function.pm view on Meta::CPAN
sub start
{
my $self = shift;
$self->_new_worker for 1 .. $self->{min_workers};
}
=head2 stop
$function->stop;
Stop the worker processes
$f = $function->stop;
I<Since version 0.75.>
If called in non-void context, returns a L<IO::Async::Future> instance that
will complete once every worker process has stopped and exited. This may be
useful for waiting until all of the processes are waited on, or other
edge-cases, but is not otherwise particularly useful.
=cut
sub stop
{
my $self = shift;
$self->{stopping} = 1;
my @f;
foreach my $worker ( $self->_worker_objects ) {
defined wantarray ? push @f, $worker->stop : $worker->stop;
}
return Future->needs_all( @f ) if defined wantarray;
}
=head2 restart
$function->restart;
Gracefully stop and restart all the worker processes.
=cut
sub restart
{
my $self = shift;
$self->stop;
$self->start;
}
=head2 call
@result = await $function->call( %params );
Schedules an invocation of the contained function to be executed on one of the
worker processes. If a non-busy worker is available now, it will be called
immediately. If not, it will be queued and sent to the next free worker that
becomes available.
The request will already have been serialised by the marshaller, so it will be
safe to modify any referenced data structures in the arguments after this call
returns.
The C<%params> hash takes the following keys:
=over 8
=item args => ARRAY
A reference to the array of arguments to pass to the code.
=item priority => NUM
Optional. Defines the sorting order when no workers are available and calls
must be queued for later. A default of zero will apply if not provided.
Higher values cause the call to be considered more important, and will be
placed earlier in the queue than calls with a smaller value. Calls of equal
priority are still handled in FIFO order.
=back
If the function body returns normally the list of results are provided as the
(successful) result of returned future. If the function throws an exception
this results in a failed future. In the special case that the exception is in
fact an unblessed C<ARRAY> reference, this array is unpacked and used as-is
for the C<fail> result. If the exception is not such a reference, it is used
as the first argument to C<fail>, in the category of C<error>.
$f->done( @result );
$f->fail( @{ $exception } );
$f->fail( $exception, error => );
=head2 call (void)
$function->call( %params );
When not returning a future, the C<on_result>, C<on_return> and C<on_error>
arguments give continuations to handle successful results or failure.
=over 8
=item on_result => CODE
A continuation that is invoked when the code has been executed. If the code
returned normally, it is called as:
$on_result->( 'return', @values )
If the code threw an exception, or some other error occurred such as a closed
connection or the process died, it is called as:
$on_result->( 'error', $exception_name )
=item on_return => CODE and on_error => CODE
lib/IO/Async/Function.pm view on Meta::CPAN
my $queue = $self->{pending_queue};
my $next = Pending(
my $priority = $params{priority} || 0,
my $wait_f = $self->loop->new_future,
);
if( $priority ) {
my $idx = first { $queue->[$_]->priority < $priority } 0 .. $#$queue;
splice @$queue, $idx // $#$queue+1, 0, ( $next );
}
else {
push @$queue, $next;
}
$future = $wait_f->then( sub {
my ( $self, $worker ) = @_;
$self->_call_worker( $worker, $request );
});
}
$future->on_done( $self->_capture_weakself( sub {
my $self = shift or return;
$self->debug_printf_result( @_ );
}));
$future->on_fail( $self->_capture_weakself( sub {
my $self = shift or return;
$self->debug_printf_failure( @_ );
}));
$future->on_done( $on_done ) if $on_done;
$future->on_fail( $on_fail ) if $on_fail;
return $future if defined wantarray;
# Caller is not going to keep hold of the Future, so we have to ensure it
# stays alive somehow
$self->adopt_future( $future->else( sub { Future->done } ) );
}
sub _worker_objects
{
my $self = shift;
return values %{ $self->{workers} };
}
=head2 workers
$count = $function->workers;
Returns the total number of worker processes available
=cut
sub workers
{
my $self = shift;
return scalar keys %{ $self->{workers} };
}
=head2 workers_busy
$count = $function->workers_busy;
Returns the number of worker processes that are currently busy
=cut
sub workers_busy
{
my $self = shift;
return scalar grep { $_->{busy} } $self->_worker_objects;
}
=head2 workers_idle
$count = $function->workers_idle;
Returns the number of worker processes that are currently idle
=cut
sub workers_idle
{
my $self = shift;
return scalar grep { !$_->{busy} } $self->_worker_objects;
}
sub _new_worker
{
my $self = shift;
my $worker = IO::Async::Function::Worker->new(
( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
max_calls => $self->{max_worker_calls},
on_finish => $self->_capture_weakself( sub {
my $self = shift or return;
my ( $worker ) = @_;
return if $self->{stopping};
$self->_new_worker if $self->workers < $self->{min_workers};
$self->_dispatch_pending;
} ),
);
$self->add_child( $worker );
return $self->{workers}{$worker->id} = $worker;
}
sub _get_worker
{
my $self = shift;
foreach ( sort keys %{ $self->{workers} } ) {
return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
}
if( $self->workers < $self->{max_workers} ) {
return $self->_new_worker;
}
return undef;
}
sub _call_worker
{
my $self = shift;
my ( $worker, $type, $args ) = @_;
my $future = $worker->call( $type, $args );
if( $self->workers_idle == 0 ) {
$self->{idle_timer}->stop if $self->{idle_timer};
}
return $future;
}
sub _dispatch_pending
{
my $self = shift;
while( my $next = shift @{ $self->{pending_queue} } ) {
my $worker = $self->_get_worker or return;
my $f = $next->f;
next if $f->is_cancelled;
$self->debug_printf( "UNQUEUE" );
$f->done( $self, $worker );
return;
}
if( $self->workers_idle > $self->{min_workers} ) {
$self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
}
}
package # hide from indexer
IO::Async::Function::Worker;
use base qw( IO::Async::Routine );
use Carp;
use IO::Async::Channel;
use IO::Async::Internals::FunctionWorker;
sub new
{
my $class = shift;
my %params = @_;
lib/IO/Async/Function.pm view on Meta::CPAN
};
}
elsif( defined( my $func = $params{func} ) ) {
my $module = $params{module};
my $init_func = $params{init_func};
my @init_args;
$params{module} = "IO::Async::Internals::FunctionWorker";
$params{func} = "run_worker";
( $init_func, @init_args ) = @$init_func if ref( $init_func ) eq "ARRAY";
$send_initial = [ $module, $func, $init_func, @init_args ];
}
delete @params{qw( init_code init_func )};
my $worker = $class->SUPER::new(
%params,
channels_in => [ $arg_channel ],
channels_out => [ $ret_channel ],
);
$worker->{arg_channel} = $arg_channel;
$worker->{ret_channel} = $ret_channel;
$worker->{send_initial} = $send_initial if $send_initial;
return $worker;
}
sub _add_to_loop
{
my $self = shift;
$self->SUPER::_add_to_loop( @_ );
$self->{arg_channel}->send( delete $self->{send_initial} ) if $self->{send_initial};
}
sub configure
{
my $self = shift;
my %params = @_;
exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );
$self->SUPER::configure( %params );
}
sub stop
{
my $worker = shift;
$worker->{arg_channel}->close;
my $ret;
$ret = $worker->result_future if defined wantarray;
if( my $function = $worker->parent ) {
delete $function->{workers}{$worker->id};
if( $worker->{busy} ) {
$worker->{remove_on_idle}++;
}
else {
$function->remove_child( $worker );
}
}
return $ret;
}
sub call
{
my $worker = shift;
my ( $args ) = @_;
$worker->{arg_channel}->send_encoded( $args );
$worker->{busy} = 1;
$worker->{max_calls}--;
return $worker->{ret_channel}->recv->then(
# on recv
$worker->_capture_weakself( sub {
my ( $worker, $result ) = @_;
my ( $type, @values ) = @$result;
$worker->stop if !$worker->{max_calls} or
$worker->{exit_on_die} && $type eq "e";
if( $type eq "r" ) {
return Future->done( @values );
}
elsif( $type eq "e" ) {
return Future->fail( @values );
}
else {
die "Unrecognised type from worker - $type\n";
}
} ),
# on EOF
$worker->_capture_weakself( sub {
my ( $worker ) = @_;
$worker->stop;
return Future->fail( "closed", "closed" );
} )
)->on_ready( $worker->_capture_weakself( sub {
my ( $worker, $f ) = @_;
$worker->{busy} = 0;
my $function = $worker->parent;
$function->_dispatch_pending if $function;
$function->remove_child( $worker ) if $function and $worker->{remove_on_idle};
}));
}
=head1 EXAMPLES
=head2 Extended Error Information on Failure
The array-unpacking form of exception indiciation allows the function body to
more precicely control the resulting failure from the C<call> future.
my $divider = IO::Async::Function->new(
code => sub {
my ( $numerator, $divisor ) = @_;
$divisor == 0 and
die [ "Cannot divide by zero", div_zero => $numerator, $divisor ];
return $numerator / $divisor;
}
);
=head1 NOTES
For the record, 123454321 is 11111 * 11111, a square number, and therefore not
prime.
=head1 AUTHOR
Paul Evans <leonerd@leonerd.org.uk>
=cut
0x55AA;
( run in 0.880 second using v1.01-cache-2.11-cpan-39bf76dae61 )