Async-Simple-Pool

 view release on metacpan or  search on metacpan

t/10-pool-partial-results-flush.t  view on Meta::CPAN

#!perl -T

use Modern::Perl;
use POSIX;
use Test::Spec;
use Test::Exception;
use Time::HiRes qw/ sleep time /;

plan tests => 18;

my $dev_mode = 0;

# use lib '../lib';

use Data::Dumper;


use Async::Simple::Pool;

warn 'DEVELOPER MODE' if $dev_mode;

my $timeout = $dev_mode ? 0.03 : 0.5;

my $full_cycle_worst_time = 3 * $timeout * 10 * 1.5;
my $worker_delay = $timeout * 10;

my $is_win = $^O =~ /^(dos|os2|MSWin32|NetWare)$/;

describe 'All' => sub {

    describe 'process' => sub {

        my( $task, @data );

        before each => sub {
            $task = sub {
                my( $data ) = @_;
                $data->{ok} = 1;
                return $data;
            };

            @data = map { \%{{ i => $_ }} } 1..20;
        };

        # break_on     => busy/run/done
        # flush_data   => 0/1
        describe 'partial results' => sub {
            my $slow_task = sub {
                my( $data ) = @_;
                sleep $worker_delay;
                $data->{ok} = 1;
                return $data;
            };

            it 'check for results with a flush data fulllist result' => sub {
                my $time = time;

                my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'list' );

                # Most likely we haven't results yet
                my $result = $pool->process;

                # But a very overloaded systems can calculate results up here
                my $results_count = scalar( grep $_, @$result );
                ok( $results_count <= 10, 'all threads are busy, no waiting for results' );

                # Wait, untill first pack (1..10) of results will be ready
                sleep $full_cycle_worst_time;
                $result = $pool->process;
                $results_count += scalar( grep $_, @$result );

                # Total amount of first pack is 10 ( 20 tasks divided by 10 streams )
                ok( $results_count < 20 , 'all threads are busy, got some results' );

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                $results_count += scalar( grep $_, @$result );

                # Another pack of results: previous 10 was flushed and new 10 gathered.
                is( $results_count, 20, 'all threads are busy, got some results' );

                $result = $pool->process;
                $results_count += scalar( grep $_, @$result );

                # All results were flushed. Pool completely clear.
                is( scalar( grep $_, @$result ), 0, 'all results are read, nothing left' );
                is( $results_count, 20, 'all results are read. We have exactly @data results' );

                is( scalar @{ $pool->all_keys   }, 0, 'all_keys list is empty' );
                is( scalar @{ $pool->queue_keys }, 0, 'queue_keys list is empty' );
                is( scalar keys %{ $pool->data  }, 0, 'data is empty' );
            };

            it 'check for results with a flush data, hash result' => sub {
                my $time = time;

                my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'hash' );

                my $result = $pool->process;
                my $first_results_count = scalar( grep $_, keys %$result );
                ok( $first_results_count < 10, 'all threads are busy, no waiting for results' );
                warn 'Too lazy system for something goes wrong! Results count = ' . $first_results_count . ' immediately after tasks start!'  if $first_results_count;

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep $result->{$_}, keys %$result ) + $first_results_count, 10, 'all threads are busy, got some results' );

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep  $result->{$_}, keys %$result ), 10, 'all threads are busy, got some results' );

                $result = $pool->process;
                is( scalar( grep  $result->{$_}, keys %$result ), 0, 'all threads are busy, got some results' );

                is( scalar @{ $pool->all_keys   }, 0, 'all_keys list is empty' );
                is( scalar @{ $pool->queue_keys }, 0, 'queue_keys list is empty' );
                is( scalar keys %{ $pool->data  }, 0, 'data is empty' );
            };

            it 'check for results with a flush data, list result' => sub {
                my $time = time;

                my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'list' );

                my $result = $pool->process;
                is( scalar( grep $_, @$result ), 0, 'all threads are busy, no waiting for results' );

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );
            };

        };
    };
};

runtests unless caller;



( run in 2.085 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )