DynGig-Util
view release on metacpan or search on metacpan
lib/DynGig/Util/MultiPhase.pm view on Meta::CPAN
=head1 DESCRIPTION
=head2 run()
Launch task.
=cut
sub run
{
my ( $this, %param ) = @_;
my ( %busy, %retry, %error, %thread );
my $retry = $this->{retry};
my $thread = $this->{thread};
my %dst = %{ $this->{dst} };
my %src = %{ $this->{src} };
my $queue = Thread::Queue->new();
my $handle = $param{log} || *STDERR;
while ( %dst || threads->list() || $queue->pending() )
{
while ( $queue->pending() )
{
my ( $status, $src, $dst, $result ) = $queue->dequeue( 4 );
if ( $status == ERROR )
{
$retry{$dst} ||= 0;
if ( $retry{$dst} < $retry )
{
$dst{$dst} = $busy{$dst};
$retry{$dst} ++;
}
else
{
$error{$dst} = $result;
}
}
else
{
$src{$dst} = $busy{$dst};
}
$src{$src} = $busy{$src};
delete $busy{$src};
delete $busy{$dst};
$thread{$src}{$dst}->join();
print $handle "$src => $dst $result";
}
for my $i ( 1 .. $thread - threads->list() )
{
last unless keys %src && keys %dst;
my ( $src, $dst ) = $this->_select( \%src, \%dst );
$busy{$src} = $src{$src};
$busy{$dst} = $dst{$dst};
delete $src{$src};
delete $dst{$dst};
$thread{$src}{$dst} = threads::async
{
my ( $status, $result ) = $this->_eval( $src, $dst );
$queue->enqueue( $status, $src, $dst, $result );
};
( run in 0.287 second using v1.01-cache-2.11-cpan-87723dcf8b7 )