Async-Simple-Pool
view release on metacpan or search on metacpan
lib/Async/Simple/Pool.pm view on Meta::CPAN
my $break_on_busy = $self->break_on eq 'busy';
my $break_on_run = $self->break_on eq 'run';
while( 1 ) {
$self->log( 'PROCESS', 'internal cycle unless exit condition' ) if $self->logger;
$self->read_tasks() if grep $_->has_id, @{ $self->tasks };
$self->write_tasks();
if ( $break_on_busy ) {
$self->log( 'PROCESS', 'internal cycle exit: all threads are busy' ) if $self->logger;
last;
}
# Has not started data
next if scalar @{ $self->queue_keys };
if ( $break_on_run ) {
$self->log( 'PROCESS', 'internal cycle exit: all tasks are started' ) if $self->logger;
last;
}
lib/Async/Simple/Task/Fork.pm view on Meta::CPAN
Makes child process and returns pid of child process to parent or 0 to child process
=cut
sub fork_child {
my ( $self ) = @_;
# This is here instead of BEGIN, because this package uses as "extends" in Async::Simple::Task::ForkTmpFile
# TODO: Maybe it would be great to move this code(function) to separate package
# if ( $^O =~ /^(dos|os2|MSWin32|NetWare)$/ ) {
# die 'Your OS does not support threads... Use Async::Simple::Task::ForkTmpFile instead.';
# };
# Pipes: parent -> child and child -> parent
pipe my( $parent_reader, $child_writer ) or die 'Child to Parent pipe open error';
pipe my( $child_reader, $parent_writer ) or die 'Parent to Child pipe open error';
my $pid = fork() // die "fork() failed: $!";
# child
unless ( $pid ) {
t/05-task-fork.t view on Meta::CPAN
# use lib '../lib';
use Async::Simple::Task::Fork;
if ( $^O =~ /^(dos|os2|MSWin32|NetWare)$/ ) {
plan tests => 1;
describe 'All' => sub {
it 'fallback' => sub {
ok( 1, 'Your OS does not support threads... fallback to tmpfiles' );
}
};
runtests unless caller;
exit;
};
plan tests => 18;
t/09-pool-partial-results-collect.t view on Meta::CPAN
it 'check for results' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy' );
my $work_time = time - $time;
ok( $work_time < $worker_delay, sprintf 'async done work time = %.2f sec', $work_time );
my $result = $pool->process;
is( scalar( grep $_, @$result ), 0, 'all threads are busy, no waiting for results' );
# 10 tasks and 20 jobs, so we should expect exactly 2 passes of work for threads + 1 if worst case.
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $_, @$result ), 20, 'all threads are busy, got some results' );
};
};
};
};
runtests unless caller;
t/10-pool-partial-results-flush.t view on Meta::CPAN
it 'check for results with a flush data fulllist result' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'list' );
# Most likely we haven't results yet
my $result = $pool->process;
# But a very overloaded systems can calculate results up here
my $results_count = scalar( grep $_, @$result );
ok( $results_count <= 10, 'all threads are busy, no waiting for results' );
# Wait, untill first pack (1..10) of results will be ready
sleep $full_cycle_worst_time;
$result = $pool->process;
$results_count += scalar( grep $_, @$result );
# Total amount of first pack is 10 ( 20 tasks divided by 10 streams )
ok( $results_count < 20 , 'all threads are busy, got some results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
$results_count += scalar( grep $_, @$result );
# Another pack of results: previous 10 was flushed and new 10 gathered.
is( $results_count, 20, 'all threads are busy, got some results' );
$result = $pool->process;
$results_count += scalar( grep $_, @$result );
# All results were flushed. Pool completely clear.
is( scalar( grep $_, @$result ), 0, 'all results are read, nothing left' );
is( $results_count, 20, 'all results are read. We have exactly @data results' );
is( scalar @{ $pool->all_keys }, 0, 'all_keys list is empty' );
is( scalar @{ $pool->queue_keys }, 0, 'queue_keys list is empty' );
is( scalar keys %{ $pool->data }, 0, 'data is empty' );
};
it 'check for results with a flush data, hash result' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'hash' );
my $result = $pool->process;
my $first_results_count = scalar( grep $_, keys %$result );
ok( $first_results_count < 10, 'all threads are busy, no waiting for results' );
warn 'Too lazy system for something goes wrong! Results count = ' . $first_results_count . ' immediately after tasks start!' if $first_results_count;
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $result->{$_}, keys %$result ) + $first_results_count, 10, 'all threads are busy, got some results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $result->{$_}, keys %$result ), 10, 'all threads are busy, got some results' );
$result = $pool->process;
is( scalar( grep $result->{$_}, keys %$result ), 0, 'all threads are busy, got some results' );
is( scalar @{ $pool->all_keys }, 0, 'all_keys list is empty' );
is( scalar @{ $pool->queue_keys }, 0, 'queue_keys list is empty' );
is( scalar keys %{ $pool->data }, 0, 'data is empty' );
};
it 'check for results with a flush data, list result' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'list' );
my $result = $pool->process;
is( scalar( grep $_, @$result ), 0, 'all threads are busy, no waiting for results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );
};
};
};
};
runtests unless caller;
( run in 0.657 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )