Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
actual number running at any time will be kept somewhere between these bounds
according to load.
=head2 max_worker_calls => INT
Optional. If provided, stop a worker process after it has processed this
number of calls. (New workers may be started to replace stopped ones, within
the bounds given above).
=head2 idle_timeout => NUM
Optional. If provided, idle worker processes will be shut down after this
amount of time, if there are more than C<min_workers> of them.
=head2 exit_on_die => BOOL
Optional boolean, controls what happens after the C<code> throws an
exception. If missing or false, the worker will continue running to process
more requests. If true, the worker will be shut down. A new worker might be
constructed by the C<call> method to replace it, if necessary.
=head2 setup => ARRAY
Optional array reference. Specifies the C<setup> key to pass to the underlying
L<IO::Async::Process> when setting up new worker processes.
=cut
sub _init
{
my $self = shift;
$self->SUPER::_init( @_ );
$self->{min_workers} = 1;
$self->{max_workers} = 8;
$self->{workers} = {}; # {$id} => IaFunction:Worker
$self->{pending_queue} = [];
}
sub configure
{
my $self = shift;
my %params = @_;
my %worker_params;
foreach (qw( model exit_on_die max_worker_calls )) {
$self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
}
if( keys %worker_params ) {
foreach my $worker ( $self->_worker_objects ) {
$worker->configure( %worker_params );
}
}
if( exists $params{idle_timeout} ) {
my $timeout = delete $params{idle_timeout};
if( !$timeout ) {
$self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
}
elsif( my $idle_timer = $self->{idle_timer} ) {
$idle_timer->configure( delay => $timeout );
}
else {
$self->{idle_timer} = IO::Async::Timer::Countdown->new(
delay => $timeout,
on_expire => $self->_capture_weakself( sub {
my $self = shift or return;
my $workers = $self->{workers};
# Shut down atmost one idle worker, starting from the highest
# ID. Since we search from lowest to assign work, this tries
# to ensure we'll shut down the least useful ones first,
# keeping more useful ones in memory (page/cache warmth, etc..)
foreach my $id ( reverse sort keys %$workers ) {
next if $workers->{$id}{busy};
$workers->{$id}->stop;
last;
}
# Still more?
$self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
} ),
);
$self->add_child( $self->{idle_timer} );
}
}
foreach (qw( min_workers max_workers )) {
$self->{$_} = delete $params{$_} if exists $params{$_};
# TODO: something about retuning
}
my $need_restart;
foreach (qw( init_code code setup )) {
$need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_};
}
$self->SUPER::configure( %params );
if( $need_restart and $self->loop ) {
$self->stop;
$self->start;
}
}
sub _add_to_loop
{
my $self = shift;
$self->SUPER::_add_to_loop( @_ );
$self->start;
}
sub _remove_from_loop
{
my $self = shift;
$self->stop;
$self->SUPER::_remove_from_loop( @_ );
}
=head1 METHODS
The following methods documented with a trailing call to C<< ->get >> return
L<Future> instances.
=cut
=head2 start
$function->start
Start the worker processes
=cut
sub start
{
my $self = shift;
$self->_new_worker for 1 .. $self->{min_workers};
}
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
$count = $function->workers_idle
Returns the number of worker processes that are currently idle
=cut
sub workers_idle
{
my $self = shift;
return scalar grep { !$_->{busy} } $self->_worker_objects;
}
sub _new_worker
{
my $self = shift;
my $worker = IO::Async::Function::Worker->new(
( map { $_ => $self->{$_} } qw( model init_code code setup exit_on_die ) ),
max_calls => $self->{max_worker_calls},
on_finish => $self->_capture_weakself( sub {
my $self = shift or return;
my ( $worker ) = @_;
return if $self->{stopping};
$self->_new_worker if $self->workers < $self->{min_workers};
$self->_dispatch_pending;
} ),
);
$self->add_child( $worker );
return $self->{workers}{$worker->id} = $worker;
}
sub _get_worker
{
my $self = shift;
foreach ( sort keys %{ $self->{workers} } ) {
return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
}
if( $self->workers < $self->{max_workers} ) {
return $self->_new_worker;
}
return undef;
}
sub _call_worker
{
my $self = shift;
my ( $worker, $type, $args ) = @_;
my $future = $worker->call( $type, $args );
if( $self->workers_idle == 0 ) {
$self->{idle_timer}->stop if $self->{idle_timer};
}
return $future;
}
sub _dispatch_pending
{
my $self = shift;
while( my $next = shift @{ $self->{pending_queue} } ) {
my $worker = $self->_get_worker or return;
next if $next->is_cancelled;
$self->debug_printf( "UNQUEUE" );
$next->done( $self, $worker );
return;
}
if( $self->workers_idle > $self->{min_workers} ) {
$self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running;
}
}
package # hide from indexer
IO::Async::Function::Worker;
use base qw( IO::Async::Routine );
use IO::Async::Channel;
sub new
{
my $class = shift;
my %params = @_;
my $arg_channel = IO::Async::Channel->new;
my $ret_channel = IO::Async::Channel->new;
my $init = delete $params{init_code};
my $code = delete $params{code};
$params{code} = sub {
$init->() if defined $init;
while( my $args = $arg_channel->recv ) {
my @ret;
my $ok = eval { @ret = $code->( @$args ); 1 };
if( $ok ) {
$ret_channel->send( [ r => @ret ] );
}
elsif( ref $@ ) {
# Presume that $@ is an ARRAYref of error results
$ret_channel->send( [ e => @{ $@ } ] );
}
else {
chomp( my $e = "$@" );
$ret_channel->send( [ e => $e, error => ] );
}
}
};
my $worker = $class->SUPER::new(
%params,
channels_in => [ $arg_channel ],
channels_out => [ $ret_channel ],
);
$worker->{arg_channel} = $arg_channel;
$worker->{ret_channel} = $ret_channel;
return $worker;
}
sub configure
{
my $self = shift;
my %params = @_;
exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls );
( run in 0.660 second using v1.01-cache-2.11-cpan-39bf76dae61 )