MYDan
view release on metacpan or search on metacpan
lib/MYDan/Util/Phasic.pm view on Meta::CPAN
{
my $self = shift;
my %run = ( retry => 0, timeout => 0, log => \*STDERR, @_ );
my ( $retry, $log, $gave ) = delete @run{ qw( retry log gave ) };
my $MULTI = ( $gave && $gave > 0 ) ? $gave - 1 : 2;
my $timeout = $run{timeout};
my ( $w8, $code ) = @$self{ 'weight', 'code' };
$log = MYDan::Util::Say->new( $log );
$run{log} = sub { $log->say( @_ ) };
my ( @w8queue, %w8 ) = map { Thread::Queue->new() } 0, 1;
my $thrc = ( sort { $a <=> $b } 0 + @{ $self->{dst}}, $MAX )[0];
map{ $w8queue[0]->enqueue( $_ ) }@{ $self->{src} }, @{ $self->{dst} };
my @queue = map { Thread::Queue->new() } 0, 1;
$thrc ||= 1;
for my $i ( 1 .. $thrc )
{
threads::async
{
while ( $w8queue[0]->pending() )
{
my $hostname = $w8queue[0]->dequeue_nb( 1 );
next unless $hostname;
my $ipw8 = &$w8( $hostname );
$w8queue[1]->enqueue( $hostname, $ipw8 );
}
while ( 1 )
{
my ( $ok, $src, $dst, $info ) = 1;
eval
{
# local $SIG{ALRM} = sub { die "timeout\n" if $src };
( $src, $dst ) = $queue[0]->dequeue( 2 );
$info = &$code( $src, $dst, %run );
};
if ( $@ ) { $ok = 0; $info = $@ }
$queue[1]->enqueue( $src, $dst, $ok, $info );
}
}->detach;
}
my $count = @{ $self->{src} } + @{ $self->{dst} };
my $split = int ( $SPLIT * $count );
while( 1 )
{
my ( $h, $i ) = $w8queue[1]->dequeue( 2 );
$w8{$h} = $i; $count --;
last unless $count;
}
my %src = map { $_ => $w8{$_} } @{ $self->{src} };
my %dst = map { $_ => $w8{$_} } @{ $self->{dst} };
my %quiesce = map { $_ => 1 } @{ $self->{quiesce} };
my ( %multi, %busy, %err ) = map{ $_ => $MULTI }keys %src;
for ( my $now = time; %dst || %busy; sleep $POLL )
{
while ( $queue[1]->pending() )
{
my ( $src, $dst, $ok, $info ) = $queue[1]->dequeue_nb( 4 );
delete @busy{ $src, $dst };
$multi{$src} ++ if defined $src{$src};
$src{$src} = $w8{$src} unless $quiesce{$src};
if ( $ok )
{
unless( $quiesce{$dst} )
{
$src{$dst} = $w8{$dst};
$multi{$dst} = $MULTI;
}
}
elsif ( $err{$dst} ++ < $retry )
{
$dst{$dst} = $w8{$dst};
}
else
{
delete $dst{$dst};
}
$log->say( "$dst <= $src: $info" );
}
if ( $timeout && time - $now > $timeout )
{
map{
unless( defined $multi{$_} )
{
$err{$_} = $retry +1;
$log->say( "$_: timeout" );
}
}keys %busy, keys %dst; last;
}
elsif ( %src && %dst )
{
my $dst = ( keys %dst )[ int( rand time ) % 2 ? -1 : 0 ];
my $w8 = $busy{$dst} = delete $dst{$dst};
my %dist = map { $_ => abs( $w8{$_} - $w8 ) }keys %src;
my $src = ( sort { $dist{$a} <=> $dist{$b} } keys %dist )[0];
$busy{$src} = delete $src{$src};
$log->say( "$src => $dst: RSYNC" );
$queue[0]->enqueue( $src, $dst );
}
elsif( $MULTI && %dst && ( scalar keys %dst ) > $split )
{
next unless my @multi = grep{ $multi{$_} > 0 }keys %multi;
my $dst = ( keys %dst )[ int( rand time ) % 2 ? -1 : 0 ];
my $w8 = $busy{$dst} = delete $dst{$dst};
my %dist = map { $_ => abs( $w8{$_} - $w8 ) }@multi;
my $src = ( sort { $dist{$a} <=> $dist{$b} } keys %dist )[0];
$multi{$src} --;
$log->say( "$src => $dst: MULTI" );
$queue[0]->enqueue( $src, $dst );
}
}
$self->{failed} = [ grep { $err{$_} > $retry } keys %err ];
return $self;
}
sub failed
{
my $self = shift;
my $failed = $self->{failed};
return wantarray ? @$failed : $failed;
}
1;
( run in 2.062 seconds using v1.01-cache-2.11-cpan-5a3173703d6 )