Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
delay => $timeout,
on_expire => $self->_capture_weakself( sub {
my $self = shift or return;
my $workers = $self->{workers};
# Shut down atmost one idle worker, starting from the highest
# ID. Since we search from lowest to assign work, this tries
# to ensure we'll shut down the least useful ones first,
# keeping more useful ones in memory (page/cache warmth, etc..)
foreach my $id ( reverse sort keys %$workers ) {
next if $workers->{$id}{busy};
$workers->{$id}->stop;
last;
}
# Still more?
$self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
} ),
);
$self->add_child( $self->{idle_timer} );
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
$self->stop;
$self->start;
}
=head2 call
@result = $function->call( %params )->get
Schedules an invocation of the contained function to be executed on one of the
worker processes. If a non-busy worker is available now, it will be called
immediately. If not, it will be queued and sent to the next free worker that
becomes available.
The request will already have been serialised by the marshaller, so it will be
safe to modify any referenced data structures in the arguments after this call
returns.
The C<%params> hash takes the following keys:
=over 8
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
Returns the total number of worker processes available
=cut
sub workers
{
my $self = shift;
return scalar keys %{ $self->{workers} };
}
=head2 workers_busy
$count = $function->workers_busy
Returns the number of worker processes that are currently busy
=cut
sub workers_busy
{
my $self = shift;
return scalar grep { $_->{busy} } $self->_worker_objects;
}
=head2 workers_idle
$count = $function->workers_idle
Returns the number of worker processes that are currently idle
=cut
sub workers_idle
{
my $self = shift;
return scalar grep { !$_->{busy} } $self->_worker_objects;
}
sub _new_worker
{
my $self = shift;
my $worker = IO::Async::Function::Worker->new(
( map { $_ => $self->{$_} } qw( model init_code code setup exit_on_die ) ),
max_calls => $self->{max_worker_calls},
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
$self->add_child( $worker );
return $self->{workers}{$worker->id} = $worker;
}
sub _get_worker
{
my $self = shift;
foreach ( sort keys %{ $self->{workers} } ) {
return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
}
if( $self->workers < $self->{max_workers} ) {
return $self->_new_worker;
}
return undef;
}
sub _call_worker
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
}
sub stop
{
my $worker = shift;
$worker->{arg_channel}->close;
if( my $function = $worker->parent ) {
delete $function->{workers}{$worker->id};
if( $worker->{busy} ) {
$worker->{remove_on_idle}++;
}
else {
$function->remove_child( $worker );
}
}
}
sub call
{
my $worker = shift;
my ( $args ) = @_;
$worker->{arg_channel}->send_encoded( $args );
$worker->{busy} = 1;
$worker->{max_calls}--;
return $worker->{ret_channel}->recv->then(
# on recv
$worker->_capture_weakself( sub {
my ( $worker, $result ) = @_;
my ( $type, @values ) = @$result;
$worker->stop if !$worker->{max_calls} or
$worker->{exit_on_die} && $type eq "e";
local/lib/perl5/IO/Async/Function.pm view on Meta::CPAN
# on EOF
$worker->_capture_weakself( sub {
my ( $worker ) = @_;
$worker->stop;
return Future->fail( "closed", "closed" );
} )
)->on_ready( $worker->_capture_weakself( sub {
my ( $worker, $f ) = @_;
$worker->{busy} = 0;
my $function = $worker->parent;
$function->_dispatch_pending if $function;
$function->remove_child( $worker ) if $function and $worker->{remove_on_idle};
}));
}
=head1 EXAMPLES
local/lib/perl5/IO/Async/LoopTests.pm view on Meta::CPAN
{
my ( $code, $lower, $upper, $name ) = @_;
my $start = time;
$code->();
my $took = ( time - $start ) / AUT;
cmp_ok( $took, '>=', $lower, "$name took at least $lower seconds" ) if defined $lower;
cmp_ok( $took, '<=', $upper * 3, "$name took no more than $upper seconds" ) if defined $upper;
if( $took > $upper and $took <= $upper * 3 ) {
diag( "$name took longer than $upper seconds - this may just be an indication of a busy testing machine rather than a bug" );
}
}
=head1 TEST SUITES
The following test suite names exist, to be passed as a name in the C<@tests>
argument to C<run_tests>:
=cut
( run in 0.328 second using v1.01-cache-2.11-cpan-87723dcf8b7 )