MYDan
view release on metacpan or search on metacpan
lib/MYDan/Agent/Load.pm view on Meta::CPAN
$hint = "FromProxy";
my %rquery = (
code => 'proxy',
proxyload => 1,
argv => [ $node, +{ query => $query, map{ $_ => $run{$_} }grep{ $run{$_} }qw( timeout max port ) } ],
map{ $_ => $run{$_} }grep{ $run{$_} }qw( user sudo env )
);
$rquery{node} = [ $rnode ] if $isc;
$query = MYDan::Agent::Query->dump(\%rquery);
eval{ $query = MYDan::API::Agent->new()->encryption( $query ) if $isc };
die "encryption fail:$@" if $@;
$node = $rnode;
}
}
$hint .= ".$node";
my ( $cv, $len, %keepalive )
= ( AE::cv, $position, cont => '', save => 0 );
printf "position: %d\n", $position if $run{verbose};
my $percent = MYDan::Util::Percent->new(undef, undef, $run{pcb} )->add( $position );
my %hosts = MYDan::Util::Hosts->new()->match( $node );
my ( $size, $filemd5, $own, $mode, $ok );
tcp_connect $hosts{$node}, $run{port}, sub {
my ( $fh ) = @_ or die "tcp_connect: $!";
my $hdl; $hdl = new AnyEvent::Handle(
fh => $fh,
on_read => sub {
my $self = shift;
$self->unshift_read (
chunk => length $self->{rbuf},
sub {
if( $keepalive{save} )
{
$percent->add( length $_[1] );
print $TEMP $_[1];
}
else
{
$keepalive{cont} .= $_[1];
$keepalive{cont} =~ s/^\*+//g;
if( length $keepalive{cont} > 1024000 )
{
undef $hdl;
$cv->send;
}
if( $keepalive{cont} =~ s/\**#\*MYDan_\d+\*#(\d+):([a-z0-9]+):(\w+):(\d+):// )
{
( $size, $filemd5, $own, $mode ) = ( $1, $2, $3, $4 );
if( $run{cc} )
{
$run{chown} ||= $own;
$run{chmod} ||= $mode;
}
if( $filecache->check( $filemd5 ) && ! -e $dp )
{
print "get data from filecache\n";
eval{ $filecache->get( $dp, $filemd5) };
warn "get filecache fail: $@" if $@;
}
if( -f $dp )
{
my $x = eval{ MYDan::Util::FastMD5->hexdigest( $dp ) };
if( $x && $filemd5 && $x eq $filemd5 )
{
die "chmod fail\n" if $run{chmod} && ! chmod oct($run{chmod}), $dp;
if( $run{chown} )
{
die "get $run{chown} uid fail\n" unless my @pw = getpwnam $run{chown};
die "chown fail\n" unless chown @pw[2,3], $dp;
}
undef $hdl; $cv->send; $ok = $size;
}
}
$percent->renew( $size )->add( length $keepalive{cont} );
syswrite( $TEMP, delete $keepalive{cont} );
$keepalive{save} = 1;
}
}
$percent->print('Load'.$hint.' ..') if $run{verbose};
}
);
},
on_error => sub{
undef $hdl;
$cv->send;
},
on_eof => sub{
undef $hdl;
$cv->send;
}
);
$hdl->push_write($query);
$hdl->push_shutdown;
};
$cv->recv;
if( defined $ok )
{
$percent->add( $ok );
$percent->print( 'Load' . $hint . ' ..' ) if $run{verbose};
unlink $temp;
return;
}
seek $TEMP, -6, SEEK_END;
sysread $TEMP, my $end, 6;
unless( $end =~ /^--- 0\n$/ )
{
unlink $temp;
my $err = $keepalive{cont} || '';
$err =~ s/\**#\*MYDan_\d+\*#//;
die "status error $err $end\n";
}
truncate $TEMP, $size;
unless( $filemd5 eq MYDan::Util::FastMD5->hexdigest( $temp ) )
{
unlink $temp;
die "md5 nomatch\n";
}
die "chmod fail\n" if$run{chmod} && ! chmod oct($run{chmod}), $temp;
if( $run{chown} )
{
die "get $run{chown} uid fail\n" unless my @pw = getpwnam $run{chown};
die "chown fail\n" unless chown @pw[2,3], $temp;
}
my $dir = File::Basename::dirname( $dp );
unless( -d $dir )
{
die "mkdir dir fail\n" if system "mkdir -p '$dir'";
}
die "dst path error\n" if -e $dp && ! -f $dp;
die "rename temp file\n" if system "mv '$temp' '$dp'";
eval{ $filecache->save( $dp ); };
warn "save filecache fail: $@" if $@;
}
1;
( run in 0.871 second using v1.01-cache-2.11-cpan-71847e10f99 )