Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
=item 2.
When a blocking OS syscall or library-level function needs to be called, and
no nonblocking or asynchronous version is supplied. This is used by
L<IO::Async::Resolver>.
=back
This object is ideal for representing "pure" functions; that is, blocks of
code which have no stateful effect on the process, and whose result depends
only on the arguments passed in. For a more general co-routine ability, see
also L<IO::Async::Routine>.
=cut
=head1 PARAMETERS
The following named parameters may be passed to C<new> or C<configure>:
=head2 code => CODE
The body of the function to execute.
@result = $code->( @args )
=head2 init_code => CODE
Optional. If defined, this is invoked exactly once in every child process or
thread, after it is created, but before the first invocation of the function
body itself.
$init_code->()
=head2 model => "fork" | "thread"
Optional. Requests a specific L<IO::Async::Routine> model. If not supplied,
leaves the default choice up to Routine.
=head2 min_workers => INT
=head2 max_workers => INT
The lower and upper bounds of worker processes to try to keep running. The
actual number running at any time will be kept somewhere between these bounds
according to load.
=head2 max_worker_calls => INT
Optional. If provided, stop a worker process after it has processed this
number of calls. (New workers may be started to replace stopped ones, within
the bounds given above).
=head2 idle_timeout => NUM
Optional. If provided, idle worker processes will be shut down after this
amount of time, if there are more than C<min_workers> of them.
=head2 exit_on_die => BOOL
Optional boolean, controls what happens after the C<code> throws an
exception. If missing or false, the worker will continue running to process
more requests. If true, the worker will be shut down. A new worker might be
constructed by the C<call> method to replace it, if necessary.
=head2 setup => ARRAY
Optional array reference. Specifies the C<setup> key to pass to the underlying
L<IO::Async::Process> when setting up new worker processes.
=cut
sub _init
{
my $self = shift;
$self->SUPER::_init( @_ );
$self->{min_workers} = 1;
$self->{max_workers} = 8;
$self->{workers} = {}; # {$id} => IaFunction:Worker
$self->{pending_queue} = [];
}
sub configure
{
my $self = shift;
my %params = @_;
my %worker_params;
foreach (qw( model exit_on_die max_worker_calls )) {
$self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_};
}
if( keys %worker_params ) {
foreach my $worker ( $self->_worker_objects ) {
$worker->configure( %worker_params );
}
}
if( exists $params{idle_timeout} ) {
my $timeout = delete $params{idle_timeout};
if( !$timeout ) {
$self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer};
}
elsif( my $idle_timer = $self->{idle_timer} ) {
$idle_timer->configure( delay => $timeout );
}
else {
$self->{idle_timer} = IO::Async::Timer::Countdown->new(
delay => $timeout,
on_expire => $self->_capture_weakself( sub {
my $self = shift or return;
my $workers = $self->{workers};
# Shut down atmost one idle worker, starting from the highest
# ID. Since we search from lowest to assign work, this tries
# to ensure we'll shut down the least useful ones first,
# keeping more useful ones in memory (page/cache warmth, etc..)
foreach my $id ( reverse sort keys %$workers ) {
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
}
=head2 stop
$function->stop
Stop the worker processes
=cut
sub stop
{
my $self = shift;
$self->{stopping} = 1;
foreach my $worker ( $self->_worker_objects ) {
$worker->stop;
}
}
=head2 restar
$function->restart
Gracefully stop and restart all the worker processes.
=cut
sub restart
{
my $self = shift;
$self->stop;
$self->start;
}
=head2 call
@result = $function->call( %params )->get
Schedules an invocation of the contained function to be executed on one of the
worker processes. If a non-busy worker is available now, it will be called
immediately. If not, it will be queued and sent to the next free worker that
becomes available.
The request will already have been serialised by the marshaller, so it will be
safe to modify any referenced data structures in the arguments after this call
returns.
The C<%params> hash takes the following keys:
=over 8
=item args => ARRAY
A reference to the array of arguments to pass to the code.
=back
If the function body returns normally the list of results are provided as the
(successful) result of returned future. If the function throws an exception
this results in a failed future. In the special case that the exception is in
fact an unblessed C<ARRAY> reference, this array is unpacked and used as-is
for the C<fail> result. If the exception is not such a reference, it is used
as the first argument to C<fail>, in the category of C<error>.
$f->done( @result )
$f->fail( @{ $exception } )
$f->fail( $exception, error => )
=head2 call (void)
$function->call( %params )
When not returning a future, the C<on_result>, C<on_return> and C<on_error>
arguments give continuations to handle successful results or failure.
=over 8
=item on_result => CODE
A continuation that is invoked when the code has been executed. If the code
returned normally, it is called as:
$on_result->( 'return', @values )
If the code threw an exception, or some other error occured such as a closed
connection or the process died, it is called as:
$on_result->( 'error', $exception_name )
=item on_return => CODE and on_error => CODE
An alternative to C<on_result>. Two continuations to use in either of the
circumstances given above. They will be called directly, without the leading
'return' or 'error' value.
=back
=cut
sub call
{
my $self = shift;
my %params = @_;
# TODO: possibly just queue this?
$self->loop or croak "Cannot ->call on a Function not yet in a Loop";
my $args = delete $params{args};
ref $args eq "ARRAY" or croak "Expected 'args' to be an array";
my ( $on_done, $on_fail );
if( defined $params{on_result} ) {
my $on_result = delete $params{on_result};
ref $on_result or croak "Expected 'on_result' to be a reference";
$on_done = $self->_capture_weakself( sub {
my $self = shift or return;
$self->debug_printf( "CONT on_result return" );
( run in 0.819 second using v1.01-cache-2.11-cpan-39bf76dae61 )