DynGig-Util
view release on metacpan or search on metacpan
lib/DynGig/Util/MultiPhase.pm view on Meta::CPAN
$mp->run( log => $handle );
=cut
sub new
{
my ( $class, %config ) = @_;
my $src = $config{src} ||= [];
my $dst = $config{dst} ||= [];
my $retry = $config{retry} ||= 0;
my $thread = $config{thread} ||= MAX_THR;
my $timeout = $config{timeout} ||= 0;
my $weight = $config{weight} ||= sub { 0 };
$config{src} = [ $src ] unless ref $src;
$config{dst} = [ $dst ] unless ref $src;
$config{code} ||= sub { };
$config{param} ||= +{};
my %ref =
(
retry => '',
thread => '',
timeout => '',
dst => 'ARRAY',
src => 'ARRAY',
code => 'CODE',
param => 'HASH',
weight => 'CODE',
);
map { croak "Invalid $_ definition.\n"
if ref $config{$_} ne $ref{$_} } keys %ref;
map { croak "Invalid $_ definition.\n" if $config{$_} !~ /^\d+$/
|| $config{$_} < 0 } qw( retry thread timeout );
my %src = map { $_ => &$weight( $_ ) } @$src;
my %dst = map { $_ => &$weight( $_ ) } grep { ! $src{$_} } @$dst;
$config{thread} = MAX_THR if $thread > MAX_THR;
$config{src} = \%src;
$config{dst} = \%dst;
my $this = bless \%config, ref $class || $class;
return $this;
}
=head1 DESCRIPTION
=head2 run()
Launch task.
=cut
sub run
{
my ( $this, %param ) = @_;
my ( %busy, %retry, %error, %thread );
my $retry = $this->{retry};
my $thread = $this->{thread};
my %dst = %{ $this->{dst} };
my %src = %{ $this->{src} };
my $queue = Thread::Queue->new();
my $handle = $param{log} || *STDERR;
while ( %dst || threads->list() || $queue->pending() )
{
while ( $queue->pending() )
{
my ( $status, $src, $dst, $result ) = $queue->dequeue( 4 );
if ( $status == ERROR )
{
$retry{$dst} ||= 0;
if ( $retry{$dst} < $retry )
{
$dst{$dst} = $busy{$dst};
$retry{$dst} ++;
}
else
{
$error{$dst} = $result;
}
}
else
{
$src{$dst} = $busy{$dst};
}
$src{$src} = $busy{$src};
delete $busy{$src};
delete $busy{$dst};
$thread{$src}{$dst}->join();
print $handle "$src => $dst $result";
}
for my $i ( 1 .. $thread - threads->list() )
{
last unless keys %src && keys %dst;
my ( $src, $dst ) = $this->_select( \%src, \%dst );
$busy{$src} = $src{$src};
$busy{$dst} = $dst{$dst};
delete $src{$src};
delete $dst{$dst};
$thread{$src}{$dst} = threads::async
{
my ( $status, $result ) = $this->_eval( $src, $dst );
$queue->enqueue( $status, $src, $dst, $result );
};
}
sleep 1;
}
$this->{error} = %error ? \%error : undef;
}
=head2 error()
Return errors as a HASH if any or undef
=cut
sub error
{
my $this = shift @_;
return $this->{error};
}
sub _select
{
my ( $this, $src, $dst ) = @_;
my ( $s, $w ) = each %$src;
my %dst = map { $_ => abs( $dst->{$_} - $w ) } keys %$dst;
my ( $d ) = sort { $dst{$a} <=> $dst{$b} } keys %dst;
return $s, $d;
}
sub _eval
{
my ( $this, $src, $dst ) = @_;
my ( $status, @result ) = OK;
eval
{
my $code = $this->{code};
my $param = $this->{param};
my $timeout = $this->{timeout};
local $SIG{ALRM} = sub { die "timeout after $timeout seconds\n" };
alarm $timeout;
@result = &$code( %$param, src => $src, dst => $dst );
alarm 0;
};
if ( $@ )
( run in 1.219 second using v1.01-cache-2.11-cpan-39bf76dae61 )