Async-Simple-Pool

 view release on metacpan or  search on metacpan

lib/Async/Simple/Pool.pm  view on Meta::CPAN

    my $break_on_busy = $self->break_on eq 'busy';
    my $break_on_run  = $self->break_on eq 'run';

    while( 1 ) {
        $self->log( 'PROCESS', 'internal cycle unless exit condition' )  if $self->logger;

        $self->read_tasks()  if grep $_->has_id, @{ $self->tasks };
        $self->write_tasks();

        if ( $break_on_busy ) {
            $self->log( 'PROCESS', 'internal cycle exit: all threads are busy' )  if $self->logger;
            last;
        }

        # Has not started data
        next if scalar @{ $self->queue_keys };

        if ( $break_on_run ) {
            $self->log( 'PROCESS', 'internal cycle exit: all tasks are started' )  if $self->logger;
            last;
        }

lib/Async/Simple/Task/Fork.pm  view on Meta::CPAN

Makes child process and returns pid of child process to parent or 0 to child process

=cut

sub fork_child {
    my ( $self ) = @_;

    # This is here instead of BEGIN, because this package uses as "extends" in Async::Simple::Task::ForkTmpFile
    # TODO: Maybe it would be great to move this code(function) to separate package
    # if ( $^O =~ /^(dos|os2|MSWin32|NetWare)$/ ) {
    #     die 'Your OS does not support threads... Use Async::Simple::Task::ForkTmpFile instead.';
    # };

    # Pipes: parent -> child and child -> parent
    pipe my( $parent_reader, $child_writer  )  or die 'Child  to Parent pipe open error';
    pipe my( $child_reader,  $parent_writer )  or die 'Parent to Child  pipe open error';

    my $pid = fork() // die "fork() failed: $!";

    # child
    unless ( $pid ) {

t/05-task-fork.t  view on Meta::CPAN


# use lib '../lib';

use Async::Simple::Task::Fork;

if ( $^O =~ /^(dos|os2|MSWin32|NetWare)$/ ) {
    plan tests => 1;

    describe 'All' => sub {
        it 'fallback' => sub {
            ok( 1, 'Your OS does not support threads... fallback to tmpfiles' );
        }
    };

    runtests unless caller;

    exit;
};


plan tests => 18;

t/09-pool-partial-results-collect.t  view on Meta::CPAN

            it 'check for results' => sub {
                my $time = time;

                my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy' );

                my $work_time = time - $time;
                ok( $work_time < $worker_delay, sprintf 'async done work time = %.2f sec', $work_time );

                my $result = $pool->process;

                is( scalar( grep $_, @$result ), 0, 'all threads are busy, no waiting for results' );

                # 10 tasks and 20 jobs, so we should expect exactly 2 passes of work for threads + 1 if worst case.
                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep $_, @$result ), 20, 'all threads are busy, got some results' );
            };
        };
    };
};

runtests unless caller;

t/10-pool-partial-results-flush.t  view on Meta::CPAN

            it 'check for results with a flush data fulllist result' => sub {
                my $time = time;

                my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'list' );

                # Most likely we haven't results yet
                my $result = $pool->process;

                # But a very overloaded systems can calculate results up here
                my $results_count = scalar( grep $_, @$result );
                ok( $results_count <= 10, 'all threads are busy, no waiting for results' );

                # Wait, untill first pack (1..10) of results will be ready
                sleep $full_cycle_worst_time;
                $result = $pool->process;
                $results_count += scalar( grep $_, @$result );

                # Total amount of first pack is 10 ( 20 tasks divided by 10 streams )
                ok( $results_count < 20 , 'all threads are busy, got some results' );

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                $results_count += scalar( grep $_, @$result );

                # Another pack of results: previous 10 was flushed and new 10 gathered.
                is( $results_count, 20, 'all threads are busy, got some results' );

                $result = $pool->process;
                $results_count += scalar( grep $_, @$result );

                # All results were flushed. Pool completely clear.
                is( scalar( grep $_, @$result ), 0, 'all results are read, nothing left' );
                is( $results_count, 20, 'all results are read. We have exactly @data results' );

                is( scalar @{ $pool->all_keys   }, 0, 'all_keys list is empty' );
                is( scalar @{ $pool->queue_keys }, 0, 'queue_keys list is empty' );
                is( scalar keys %{ $pool->data  }, 0, 'data is empty' );
            };

            it 'check for results with a flush data, hash result' => sub {
                my $time = time;

                my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'hash' );

                my $result = $pool->process;
                my $first_results_count = scalar( grep $_, keys %$result );
                ok( $first_results_count < 10, 'all threads are busy, no waiting for results' );
                warn 'Too lazy system for something goes wrong! Results count = ' . $first_results_count . ' immediately after tasks start!'  if $first_results_count;

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep $result->{$_}, keys %$result ) + $first_results_count, 10, 'all threads are busy, got some results' );

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep  $result->{$_}, keys %$result ), 10, 'all threads are busy, got some results' );

                $result = $pool->process;
                is( scalar( grep  $result->{$_}, keys %$result ), 0, 'all threads are busy, got some results' );

                is( scalar @{ $pool->all_keys   }, 0, 'all_keys list is empty' );
                is( scalar @{ $pool->queue_keys }, 0, 'queue_keys list is empty' );
                is( scalar keys %{ $pool->data  }, 0, 'data is empty' );
            };

            it 'check for results with a flush data, list result' => sub {
                my $time = time;

                my $pool = Async::Simple::Pool->new( $slow_task, \@data, break_on => 'busy', flush_data => 1, result_type => 'list' );

                my $result = $pool->process;
                is( scalar( grep $_, @$result ), 0, 'all threads are busy, no waiting for results' );

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );

                sleep $full_cycle_worst_time;
                $result = $pool->process;
                is( scalar( grep $_, @$result ), 10, 'all threads are busy, got some results' );
            };

        };
    };
};

runtests unless caller;



( run in 0.657 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )