IO-Async
view release on metacpan or search on metacpan
and the tests here were reporting false negatives.
0.21 CHANGES:
* Added "use warnings" to all modules
* Created Notifier->configure method to allow changing properties of
a Notifier or subclass after construction
* New 'examples' dir with some small example scripts
BUGFIXES:
* More robust timing tests to avoid some spurious test failures due
to busy testing servers or other non-issues
0.20 CHANGES:
* Major reworking of underlying Loop implementation:
+ Unified low-level IO, timer and signal watches as callbacks
+ Split IO handle parts of Notifier into new IO::Async::Handle
class
+ Created Timer and Signal subclasses of Notifier
These changes will require a compatible upgrade to the underlying
Loop implementation.
lib/IO/Async/Function.pm view on Meta::CPAN
delay => $timeout,
on_expire => $self->_capture_weakself( sub {
my $self = shift or return;
my $workers = $self->{workers};
# Shut down atmost one idle worker, starting from the highest
# ID. Since we search from lowest to assign work, this tries
# to ensure we'll shut down the least useful ones first,
# keeping more useful ones in memory (page/cache warmth, etc..)
foreach my $id ( reverse sort keys %$workers ) {
next if $workers->{$id}{busy};
$workers->{$id}->stop;
last;
}
# Still more?
$self->{idle_timer}->start if $self->workers_idle > $self->{min_workers};
} ),
);
$self->add_child( $self->{idle_timer} );
lib/IO/Async/Function.pm view on Meta::CPAN
$self->stop;
$self->start;
}
=head2 call
@result = await $function->call( %params );
Schedules an invocation of the contained function to be executed on one of the
worker processes. If a non-busy worker is available now, it will be called
immediately. If not, it will be queued and sent to the next free worker that
becomes available.
The request will already have been serialised by the marshaller, so it will be
safe to modify any referenced data structures in the arguments after this call
returns.
The C<%params> hash takes the following keys:
=over 8
lib/IO/Async/Function.pm view on Meta::CPAN
Returns the total number of worker processes available
=cut
sub workers
{
my $self = shift;
return scalar keys %{ $self->{workers} };
}
=head2 workers_busy
$count = $function->workers_busy;
Returns the number of worker processes that are currently busy
=cut
sub workers_busy
{
my $self = shift;
return scalar grep { $_->{busy} } $self->_worker_objects;
}
=head2 workers_idle
$count = $function->workers_idle;
Returns the number of worker processes that are currently idle
=cut
sub workers_idle
{
my $self = shift;
return scalar grep { !$_->{busy} } $self->_worker_objects;
}
sub _new_worker
{
my $self = shift;
my $worker = IO::Async::Function::Worker->new(
( map { $_ => $self->{$_} } qw( model init_code code module init_func func setup exit_on_die ) ),
max_calls => $self->{max_worker_calls},
lib/IO/Async/Function.pm view on Meta::CPAN
$self->add_child( $worker );
return $self->{workers}{$worker->id} = $worker;
}
sub _get_worker
{
my $self = shift;
foreach ( sort keys %{ $self->{workers} } ) {
return $self->{workers}{$_} if !$self->{workers}{$_}{busy};
}
if( $self->workers < $self->{max_workers} ) {
return $self->_new_worker;
}
return undef;
}
sub _call_worker
lib/IO/Async/Function.pm view on Meta::CPAN
{
my $worker = shift;
$worker->{arg_channel}->close;
my $ret;
$ret = $worker->result_future if defined wantarray;
if( my $function = $worker->parent ) {
delete $function->{workers}{$worker->id};
if( $worker->{busy} ) {
$worker->{remove_on_idle}++;
}
else {
$function->remove_child( $worker );
}
}
return $ret;
}
sub call
{
my $worker = shift;
my ( $args ) = @_;
$worker->{arg_channel}->send_encoded( $args );
$worker->{busy} = 1;
$worker->{max_calls}--;
return $worker->{ret_channel}->recv->then(
# on recv
$worker->_capture_weakself( sub {
my ( $worker, $result ) = @_;
my ( $type, @values ) = @$result;
$worker->stop if !$worker->{max_calls} or
$worker->{exit_on_die} && $type eq "e";
lib/IO/Async/Function.pm view on Meta::CPAN
# on EOF
$worker->_capture_weakself( sub {
my ( $worker ) = @_;
$worker->stop;
return Future->fail( "closed", "closed" );
} )
)->on_ready( $worker->_capture_weakself( sub {
my ( $worker, $f ) = @_;
$worker->{busy} = 0;
my $function = $worker->parent;
$function->_dispatch_pending if $function;
$function->remove_child( $worker ) if $function and $worker->{remove_on_idle};
}));
}
=head1 EXAMPLES
lib/IO/Async/LoopTests.pm view on Meta::CPAN
{
my ( $code, $lower, $upper, $name ) = @_;
my $start = time;
$code->();
my $took = ( time - $start ) / AUT;
cmp_ok( $took, '>=', $lower, "$name took at least $lower seconds" ) if defined $lower;
cmp_ok( $took, '<=', $upper * 3, "$name took no more than $upper seconds" ) if defined $upper;
if( $took > $upper and $took <= $upper * 3 ) {
diag( "$name took longer than $upper seconds - this may just be an indication of a busy testing machine rather than a bug" );
}
}
=head1 TEST SUITES
The following test suite names exist, to be passed as a name in the C<@tests>
argument to C<run_tests>:
=cut
t/18loop-select-legacy.t view on Meta::CPAN
my ( $now, $took );
$now = time;
select( $rvec, $wvec, $evec, $timeout );
$took = (time - $now) / AUT;
cmp_ok( $took, '>', 1.7, 'loop_once(5) while waiting for timer takes at least 1.7 seconds' );
cmp_ok( $took, '<', 10, 'loop_once(5) while waiting for timer no more than 10 seconds' );
if( $took > 2.5 ) {
diag( "took more than 2.5 seconds to select(2).\n" .
"This is not itself a bug, and may just be an indication of a busy testing machine" );
}
$loop->post_select( $rvec, $evec, $wvec );
# select might have returned just a little early, such that the TimerQueue
# doesn't think anything is ready yet. We need to handle that case.
while( !$done ) {
die "It should have been ready by now" if( time - $now > 5 * AUT );
$timeout = 0.1 * AUT;
t/42function.t view on Meta::CPAN
ok( defined $function, '$function defined' );
isa_ok( $function, [ "IO::Async::Function" ], '$function isa IO::Async::Function' );
is_oneref( $function, '$function has refcount 1' );
$loop->add( $function );
is_refcount( $function, 2, '$function has refcount 2 after $loop->add' );
is( $function->workers, 1, '$function has 1 worker' );
is( $function->workers_busy, 0, '$function has 0 workers busy' );
is( $function->workers_idle, 1, '$function has 1 workers idle' );
my $future = $function->call(
args => [ 10, 20 ],
);
isa_ok( $future, [ "Future" ], '$future isa Future' );
is_refcount( $function, 2, '$function has refcount 2 after ->call' );
is( $function->workers_busy, 1, '$function has 1 worker busy after ->call' );
is( $function->workers_idle, 0, '$function has 0 worker idle after ->call' );
wait_for { $future->is_ready };
my ( $result ) = $future->get;
is( $result, 30, '$result after call returns by future' );
is( $function->workers_busy, 0, '$function has 0 workers busy after call returns' );
is( $function->workers_idle, 1, '$function has 1 workers idle after call returns' );
# ->stop future
wait_for_future my $stop_f = $function->stop;
ok( !$stop_f->failure, '$stop_f succeeds' );
$loop->remove( $function );
}
# by callback
t/42function.t view on Meta::CPAN
my $serial = 0;
my $function = IO::Async::Function->new(
# Keep exactly 1 process so captured lexical works for testing
min_workers => 1,
max_workers => 1,
code => sub { return $serial++ },
);
$loop->add( $function );
# Push something just to make the function busy first
$function->call( args => [], on_return => sub {}, on_error => sub {} );
my $f = Future->needs_all(
$function->call( args => [] ), # no priority
$function->call( args => [], priority => 1 ),
$function->call( args => [], priority => 1 ),
$function->call( args => [], priority => 2 ),
);
is( [ ( wait_for_future $f )->get ],
t/TimeAbout.pm view on Meta::CPAN
my $lower = $target*0.75;
my $upper = $target*1.5 + 1;
my $now = time;
$code->();
my $took = (time - $now) / AUT;
cmp_ok( $took, '>', $lower, "$name took at least $lower" );
cmp_ok( $took, '<', $upper * 3, "$name took no more than $upper" );
if( $took > $upper and $took <= $upper * 3 ) {
diag( "$name took longer than $upper - this may just be an indication of a busy testing machine rather than a bug" );
}
}
0x55AA;
( run in 0.265 second using v1.01-cache-2.11-cpan-87723dcf8b7 )