Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
=head1 PARAMETERS
The following named parameters may be passed to C<new> or C<configure>:
=head2 code => CODE
The body of the function to execute.
@result = $code->( @args )
=head2 init_code => CODE
Optional. If defined, this is invoked exactly once in every child process or
thread, after it is created, but before the first invocation of the function
body itself.
$init_code->()
=head2 model => "fork" | "thread"
Optional. Requests a specific L<IO::Async::Routine> model. If not supplied,
leaves the default choice up to Routine.
=head2 min_workers => INT
=head2 max_workers => INT
The lower and upper bounds of worker processes to try to keep running. The
actual number running at any time will be kept somewhere between these bounds
according to load.
=head2 max_worker_calls => INT
Optional. If provided, stop a worker process after it has processed this
number of calls. (New workers may be started to replace stopped ones, within
the bounds given above).
=head2 idle_timeout => NUM
Optional. If provided, idle worker processes will be shut down after this
amount of time, if there are more than C<min_workers> of them.
=head2 exit_on_die => BOOL
Optional boolean, controls what happens after the C<code> throws an
exception. If missing or false, the worker will continue running to process
more requests. If true, the worker will be shut down. A new worker might be
constructed by the C<call> method to replace it, if necessary.
=head2 setup => ARRAY
Optional array reference. Specifies the C<setup> key to pass to the underlying
L<IO::Async::Process> when setting up new worker processes.
=cut
sub _init
{
my $self = shift;
$self->SUPER::_init( @_ );
$self->{min_workers} = 1;
$self->{max_workers} = 8;
$self->{workers} = {}; # {$id} => IaFunction:Worker
$self->{pending_queue} = [];
}
sub configure
{
my $self = shift;
my %params = @_;
my %worker_params;
foreach (qw( model exit_on_die max_worker_calls )) {
$self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
}
if( keys %worker_params ) {
foreach my $worker ( $self->_worker_objects ) {
$worker->configure( %worker_params );
}
}
if( exists $params{idle_timeout} ) {
my $timeout = delete $params{idle_timeout};
if( !$timeout ) {
$self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
}
elsif( my $idle_timer = $self->{idle_timer} ) {
$idle_timer->configure( delay => $timeout );
}
else {
$self->{idle_timer} = IO::Async::Timer::Countdown->new(
delay => $timeout,
on_expire => $self->_capture_weakself( sub {
my $self = shift or return;
my $workers = $self->{workers};
# Shut down atmost one idle worker, starting from the highest
# ID. Since we search from lowest to assign work, this tries
# to ensure we'll shut down the least useful ones first,
# keeping more useful ones in memory (page/cache warmth, etc..)
foreach my $id ( reverse sort keys %$workers ) {
next if $workers->{$id}{busy};
$workers->{$id}->stop;
last;
}
# Still more?
$self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
} ),
);
$self->add_child( $self->{idle_timer} );
}
}
foreach (qw( min_workers max_workers )) {
$self->{$_} = delete $params{$_} if exists $params{$_};
# TODO: something about retuning
}
my $need_restart;
foreach (qw( init_code code setup )) {
$need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
}
$self->SUPER::configure( %params );
if( $need_restart and $self->loop ) {
$self->stop;
$self->start;
}
}
sub _add_to_loop
{
my $self = shift;
$self->SUPER::_add_to_loop( @_ );
$self->start;
}
sub _remove_from_loop
{
my $self = shift;
$self->stop;
$self->SUPER::_remove_from_loop( @_ );
}
=head1 METHODS
The following methods documented with a trailing call to C<< ->get >> return
L<Future> instances.
=cut
=head2 start
$function->start
Start the worker processes
=cut
sub start
{
my $self = shift;
$self->_new_worker for 1 .. $self->{min_workers};
}
=head2 stop
$function->stop
Stop the worker processes
=cut
sub stop
{
my $self = shift;
$self->{stopping} = 1;
foreach my $worker ( $self->_worker_objects ) {
$worker->stop;
}
}
=head2 restar
$function->restart
Gracefully stop and restart all the worker processes.
=cut
sub restart
{
my $self = shift;
$self->stop;
$self->start;
}
=head2 call
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
return $future;
}
sub _dispatch_pending
{
my $self = shift;
while( my $next = shift @{ $self->{pending_queue} } ) {
my $worker = $self->_get_worker or return;
next if $next->is_cancelled;
$self->debug_printf( "UNQUEUE" );
$next->done( $self, $worker );
return;
}
if( $self->workers_idle > $self->{min_workers} ) {
$self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
}
}
package # hide from indexer
IO::Async::Function::Worker;
use base qw( IO::Async::Routine );
use IO::Async::Channel;
sub new
{
my $class = shift;
my %params = @_;
my $arg_channel = IO::Async::Channel->new;
my $ret_channel = IO::Async::Channel->new;
my $init = delete $params{init_code};
my $code = delete $params{code};
$params{code} = sub {
$init->() if defined $init;
while( my $args = $arg_channel->recv ) {
my @ret;
my $ok = eval { @ret = $code->( @$args ); 1 };
if( $ok ) {
$ret_channel->send( [ r => @ret ] );
}
elsif( ref $@ ) {
# Presume that $@ is an ARRAYref of error results
$ret_channel->send( [ e => @{ $@ } ] );
}
else {
chomp( my $e = "$@" );
$ret_channel->send( [ e => $e, error => ] );
}
}
};
my $worker = $class->SUPER::new(
%params,
channels_in => [ $arg_channel ],
channels_out => [ $ret_channel ],
);
$worker->{arg_channel} = $arg_channel;
$worker->{ret_channel} = $ret_channel;
return $worker;
}
sub configure
{
my $self = shift;
my %params = @_;
exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );
$self->SUPER::configure( %params );
}
sub stop
{
my $worker = shift;
$worker->{arg_channel}->close;
if( my $function = $worker->parent ) {
delete $function->{workers}{$worker->id};
if( $worker->{busy} ) {
$worker->{remove_on_idle}++;
}
else {
$function->remove_child( $worker );
}
}
}
sub call
{
my $worker = shift;
my ( $args ) = @_;
$worker->{arg_channel}->send_encoded( $args );
$worker->{busy} = 1;
$worker->{max_calls}--;
return $worker->{ret_channel}->recv->then(
# on recv
$worker->_capture_weakself( sub {
my ( $worker, $result ) = @_;
my ( $type, @values ) = @$result;
$worker->stop if !$worker->{max_calls} or
$worker->{exit_on_die} && $type eq "e";
if( $type eq "r" ) {
return Future->done( @values );
}
elsif( $type eq "e" ) {
return Future->fail( @values );
}
else {
die "Unrecognised type from worker - $type\n";
}
} ),
# on EOF
$worker->_capture_weakself( sub {
my ( $worker ) = @_;
$worker->stop;
return Future->fail( "closed", "closed" );
} )
)->on_ready( $worker->_capture_weakself( sub {
my ( $worker, $f ) = @_;
$worker->{busy} = 0;
( run in 1.793 second using v1.01-cache-2.11-cpan-5a3173703d6 )