DynGig-Util

 view release on metacpan or  search on metacpan

lib/DynGig/Util/MultiPhase.pm  view on Meta::CPAN

=head1 DESCRIPTION

=head2 run()

Launch task.

=cut
sub run
{
    my ( $this, %param ) = @_;
    my ( %busy, %retry, %error, %thread );
    my $retry = $this->{retry};
    my $thread = $this->{thread};
    my %dst = %{ $this->{dst} };
    my %src = %{ $this->{src} };
    my $queue = Thread::Queue->new();
    my $handle = $param{log} || *STDERR;

    while ( %dst || threads->list() || $queue->pending() )
    {
        while ( $queue->pending() )
        {
            my ( $status, $src, $dst, $result ) = $queue->dequeue( 4 );

            if ( $status == ERROR )
            {
                $retry{$dst} ||= 0;

                if ( $retry{$dst} < $retry )
                {
                    $dst{$dst} = $busy{$dst};
                    $retry{$dst} ++;
                }
                else
                {
                    $error{$dst} = $result;
                }
            }
            else
            {
                $src{$dst} = $busy{$dst};
            }

            $src{$src} = $busy{$src};

            delete $busy{$src};
            delete $busy{$dst};

            $thread{$src}{$dst}->join();

            print $handle "$src => $dst $result";
        }

        for my $i ( 1 .. $thread - threads->list() )
        {
            last unless keys %src && keys %dst;

            my ( $src, $dst ) = $this->_select( \%src, \%dst );

            $busy{$src} = $src{$src};
            $busy{$dst} = $dst{$dst};

            delete $src{$src};
            delete $dst{$dst};

            $thread{$src}{$dst} = threads::async
            { 
                my ( $status, $result ) = $this->_eval( $src, $dst );
                $queue->enqueue( $status, $src, $dst, $result );
            };



( run in 0.287 second using v1.01-cache-2.11-cpan-87723dcf8b7 )