Async-Simple-Pool
view release on metacpan or search on metacpan
lib/Async/Simple/Pool.pm view on Meta::CPAN
You can use these %pool_params:
data - ArrayRef/HashRef. A data for tasks, as described above,
tasks_count - Integer number of workers. 10 by default.
flush_data - 1 - remove or 0 - don't remove results from pool, when they has been readed by $pool->process()
result_type - list (list of ready results) / full_list (list of all results) / hash (hash of ready results)
break_on - busy (when all workers are busy) / run(all data is executing) / done (all result are ready)
task_class - see explanation below. For example 'Your::Task::Class';
task_params - Any params you wish to pass to each task object to $task->new( %$here ).
The last way to use pool is to make your own task class.
You can make your own class of task. This class MUST has at least this code:
package Your::Task::Class;
lib/Async/Simple/Pool.pm view on Meta::CPAN
is => 'rw',
isa => 'Str',
default => 'fulllist',
);
=head2 break_on
Condition of stopping waiting for results and do something other before next check.
'busy' = $self->process will exit after filling all the tasks with tasks, without any checks
'run' = $self->process will end straight after the last task started
'done' = $self->process will wait until all the tasks have finished their work
Default is 'done'
=cut
has break_on => (
lib/Async/Simple/Pool.pm view on Meta::CPAN
my $results = Async::Simple::Pool->new( $task, \@data )->results; # Just do everything and give me my results!
my $pool = Async::Simple::Pool->new( task => $task ); # Minimal init with hash of params, all by default, process sould be started manually below
full list of params for default task type (Async::Simple::Fork) with default values.
my $pp = Async::Simple::Pool->new(
tasks_count => 10,
break_on => 'done', # [ 'busy', 'run', 'done' ]
data => \@data,
task_class => 'Async::Simple::Fork',
task_params => { # Can be placed into pool params directly
task => $task,
timeout => 0.01,
},
);
It is a good idea to run new() before gathering all this huge amount of data, and run $pool->process separately:
lib/Async/Simple/Pool.pm view on Meta::CPAN
my ( $data, $keys ) = _conv_data_to_internal( $self->data, $new_data );
$self->log( 'PROCESS: new data parsed', $data ) if $self->logger;
$self->data( $data );
push @{ $self->queue_keys }, @$keys;
push @{ $self->all_keys }, @$keys;
};
my $break_on_busy = $self->break_on eq 'busy';
my $break_on_run = $self->break_on eq 'run';
while( 1 ) {
$self->log( 'PROCESS', 'internal cycle unless exit condition' ) if $self->logger;
$self->read_tasks() if grep $_->has_id, @{ $self->tasks };
$self->write_tasks();
if ( $break_on_busy ) {
$self->log( 'PROCESS', 'internal cycle exit: all threads are busy' ) if $self->logger;
last;
}
# Has not started data
next if scalar @{ $self->queue_keys };
if ( $break_on_run ) {
$self->log( 'PROCESS', 'internal cycle exit: all tasks are started' ) if $self->logger;
last;
}
lib/Async/Simple/Pool.pm view on Meta::CPAN
$self->log( 'NEW THREAD ADDED', { ref $task => {%$task} } ) if $self->logger;
};
return \@tasks;
};
=head2 read_tasks
Internal.
Reads busy tasks.
=cut
sub read_tasks {
my ( $self ) = @_;
my @busy_tasks = grep $_->has_id, @{ $self->tasks } or return;
$self->log( 'READ TASKS', { busy_tasks_found => scalar @busy_tasks } ) if $self->logger;
my $data = $self->data;
for my $task ( @busy_tasks ) {
$task->clear_answer;
$task->get();
unless ( $task->has_answer ) {
$self->log( 'READ TASKS NO ANSWER', { id => $task->id } ) if $self->logger;
next;
};
$self->log( 'READ TASKS GOT ANSWER', { id => $task->id, answer => $task->answer } ) if $self->logger;
t/07-pool-inits.t view on Meta::CPAN
};
describe 'full params init' => sub {
my $pool;
my $results;
it 'full passed params init' => sub {
$pool = Async::Simple::Pool->new(
tasks_count => 5,
break_on => 'done', # [ 'busy', 'run', 'done' ]
data => \@data, # [ any type you wish ]
result_type => 'hash',
task_class => $is_win ? 'Async::Simple::Task::ForkTmpFile' : 'Async::Simple::Task::Fork',
task_params => { # Can be placed into pool params directly
task => $task,
timeout => $timeout,
},
);
};
t/09-pool-partial-results-collect.t view on Meta::CPAN
before each => sub {
$task = sub {
my( $data ) = @_;
$data->{ok} = 1;
return $data;
};
@data = map { \%{{ i => $_ }} } 1..20;
};
# break_on => busy/run/done
# flush_data => 0/1
describe 'partial results' => sub {
my $slow_task = sub {
my( $data ) = @_;
sleep $worker_delay;
$data->{ok} = 1;
return $data;
};
it 'check for results with break_on = "done"' => sub {
t/09-pool-partial-results-collect.t view on Meta::CPAN
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data );
my $result = $pool->process;
is( scalar( grep $_, @$result ), 20, 'waiting for all tasks by default' );
my $work_time = time - $time;
ok( $work_time < $worker_delay * 3.5, sprintf 'async done work time = %.2f sec', $work_time );
};
it 'check for results with break_on = "busy"' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy' );
my $result = $pool->process;
is( scalar( grep $_, @$result ), 0, 'busy: do not wait anything, just run the jobs' );
my $work_time = time - $time;
ok( $work_time < $worker_delay, sprintf 'async done work time = %.2f sec', $work_time );
};
it 'check for results with break_on = "run"' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'run' );
t/09-pool-partial-results-collect.t view on Meta::CPAN
ok( scalar( grep $_, @$result ) < 20, 'run: do not wait anything, just run the jobs' );
ok( scalar( grep $_, @$result ) > 9, 'run: do not wait anything, just run the jobs' );
my $work_time = time - $time;
ok( $work_time < $worker_delay * 3, sprintf 'async done work time = %.2f sec', $work_time );
};
it 'check for results' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy' );
my $work_time = time - $time;
ok( $work_time < $worker_delay, sprintf 'async done work time = %.2f sec', $work_time );
my $result = $pool->process;
is( scalar( grep $_, @$result ), 0, 'all threads are busy, no waiting for results' );
# 10 tasks and 20 jobs, so we should expect exactly 2 passes of work for threads + 1 if worst case.
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $_, @$result ), 20, 'all threads are busy, got some results' );
};
};
};
};
runtests unless caller;
t/10-pool-partial-results-flush.t view on Meta::CPAN
before each => sub {
$task = sub {
my( $data ) = @_;
$data->{ok} = 1;
return $data;
};
@data = map { \%{{ i => $_ }} } 1..20;
};
# break_on => busy/run/done
# flush_data => 0/1
describe 'partial results' => sub {
my $slow_task = sub {
my( $data ) = @_;
sleep $worker_delay;
$data->{ok} = 1;
return $data;
};
it 'check for results with a flush data fulllist result' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'list' );
# Most likely we haven't results yet
my $result = $pool->process;
# But a very overloaded systems can calculate results up here
my $results_count = scalar( grep $_, @$result );
ok( $results_count <= 10, 'all threads are busy, no waiting for results' );
# Wait, untill first pack (1..10) of results will be ready
sleep $full_cycle_worst_time;
$result = $pool->process;
$results_count += scalar( grep $_, @$result );
# Total amount of first pack is 10 ( 20 tasks divided by 10 streams )
ok( $results_count < 20 , 'all threads are busy, got some results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
$results_count += scalar( grep $_, @$result );
# Another pack of results: previous 10 was flushed and new 10 gathered.
is( $results_count, 20, 'all threads are busy, got some results' );
$result = $pool->process;
$results_count += scalar( grep $_, @$result );
# All results were flushed. Pool completely clear.
is( scalar( grep $_, @$result ), 0, 'all results are read, nothing left' );
is( $results_count, 20, 'all results are read. We have exactly @data results' );
is( scalar @{ $pool->all_keys }, 0, 'all_keys list is empty' );
is( scalar @{ $pool->queue_keys }, 0, 'queue_keys list is empty' );
is( scalar keys %{ $pool->data }, 0, 'data is empty' );
};
it 'check for results with a flush data, hash result' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'hash' );
my $result = $pool->process;
my $first_results_count = scalar( grep $_, keys %$result );
ok( $first_results_count < 10, 'all threads are busy, no waiting for results' );
warn 'Too lazy system for something goes wrong! Results count = ' . $first_results_count . ' immediately after tasks start!' if $first_results_count;
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $result->{$_}, keys %$result ) + $first_results_count, 10, 'all threads are busy, got some results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $result->{$_}, keys %$result ), 10, 'all threads are busy, got some results' );
$result = $pool->process;
is( scalar( grep $result->{$_}, keys %$result ), 0, 'all threads are busy, got some results' );
is( scalar @{ $pool->all_keys }, 0, 'all_keys list is empty' );
is( scalar @{ $pool->queue_keys }, 0, 'queue_keys list is empty' );
is( scalar keys %{ $pool->data }, 0, 'data is empty' );
};
it 'check for results with a flush data, list result' => sub {
my $time = time;
my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'list' );
my $result = $pool->process;
is( scalar( grep $_, @$result ), 0, 'all threads are busy, no waiting for results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );
sleep $full_cycle_worst_time;
$result = $pool->process;
is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );
};
};
};
};
runtests unless caller;
( run in 0.250 second using v1.01-cache-2.11-cpan-87723dcf8b7 )