MYDan

 view release on metacpan or  search on metacpan

lib/MYDan/Agent/Load.pm  view on Meta::CPAN

            $hint = "FromProxy";
            my %rquery = ( 
                code => 'proxy', 
                proxyload => 1,
                argv => [ $node, +{ query => $query, map{ $_ => $run{$_} }grep{ $run{$_} }qw( timeout max port ) } ],
	        map{ $_ => $run{$_} }grep{ $run{$_} }qw( user sudo env ) 
            );

            $rquery{node} = [ $rnode ] if $isc;

            $query = MYDan::Agent::Query->dump(\%rquery);
    
            eval{ $query = MYDan::API::Agent->new()->encryption( $query ) if $isc };
	    die "encryption fail:$@" if $@;

            $node = $rnode;
        }
    }

    $hint .= ".$node";
    my ( $cv, $len, %keepalive )
        = ( AE::cv, $position,  cont => '', save => 0 );
    
    printf "position: %d\n", $position if $run{verbose};

    my $percent =  MYDan::Util::Percent->new(undef, undef, $run{pcb} )->add( $position );
    
    my %hosts = MYDan::Util::Hosts->new()->match( $node );

    my ( $size, $filemd5, $own, $mode, $ok );
    tcp_connect $hosts{$node}, $run{port}, sub {
        my ( $fh ) = @_  or die "tcp_connect: $!";
        my $hdl; $hdl = new AnyEvent::Handle(
           fh => $fh,
           on_read => sub {
               my $self = shift;
               $self->unshift_read (
                   chunk => length $self->{rbuf},
                   sub {
                       if( $keepalive{save} )
                       {
                           $percent->add( length $_[1] );
                           print $TEMP $_[1];
                       }
                       else
                       {
                           $keepalive{cont} .= $_[1];
                           $keepalive{cont} =~ s/^\*+//g;

			   if( length $keepalive{cont} > 1024000 )
			   {
				   undef $hdl;
				   $cv->send;
			   }

                           if( $keepalive{cont} =~ s/\**#\*MYDan_\d+\*#(\d+):([a-z0-9]+):(\w+):(\d+):// )
                           {
                               ( $size, $filemd5, $own, $mode ) = ( $1, $2, $3, $4 );
			       if( $run{cc} )
			       {
                                   $run{chown} ||= $own;
				   $run{chmod} ||= $mode;
			       }

			       if( $filecache->check( $filemd5 ) && ! -e $dp  )
			       {
				       print "get data from filecache\n";
				       eval{ $filecache->get( $dp, $filemd5) };
				       warn "get filecache fail: $@" if $@;
			       }

			       if( -f $dp )
			       {
                       my $x = eval{ MYDan::Util::FastMD5->hexdigest( $dp ) };
                       if( $x && $filemd5 && $x eq $filemd5 )
                       {
                           die "chmod fail\n" if $run{chmod} && ! chmod oct($run{chmod}), $dp;
                           if( $run{chown} )
                           {
                               die "get $run{chown} uid fail\n" unless my @pw = getpwnam $run{chown};
                               die "chown fail\n" unless chown @pw[2,3], $dp;
                           }
                           undef $hdl; $cv->send; $ok = $size;
                       }
			       }

			       $percent->renew( $size )->add( length $keepalive{cont}  );
                               syswrite( $TEMP, delete $keepalive{cont} );
                               $keepalive{save} = 1;
                           }
                       }

                       $percent->print('Load'.$hint.' ..') if $run{verbose};
                   }
               );
            },
            on_error => sub{
                undef $hdl;
                 $cv->send;
            },
            on_eof => sub{
                undef $hdl;
                 $cv->send;
             }
        );
        $hdl->push_write($query);
        $hdl->push_shutdown;
    };

    $cv->recv;

    if( defined $ok )
    {

	$percent->add( $ok );
	$percent->print( 'Load' . $hint . ' ..' ) if $run{verbose};
        unlink $temp;
        return;
    }

    seek $TEMP, -6, SEEK_END;
    sysread $TEMP, my $end, 6;

    unless( $end =~ /^--- 0\n$/  )
    {
        unlink $temp;
	my $err = $keepalive{cont} || '';
	$err =~ s/\**#\*MYDan_\d+\*#//;
        die "status error $err $end\n";
    }
    truncate $TEMP, $size;

    unless( $filemd5 eq MYDan::Util::FastMD5->hexdigest( $temp ) )
    {
        unlink $temp;
        die "md5 nomatch\n";
    }

   
    die "chmod fail\n" if$run{chmod} && ! chmod oct($run{chmod}), $temp;
    if( $run{chown} )
    {
        die "get $run{chown} uid fail\n" unless my @pw = getpwnam $run{chown};
	die "chown fail\n" unless chown @pw[2,3], $temp;
    }

    my $dir = File::Basename::dirname( $dp );
    unless( -d $dir )
    {
        die "mkdir dir fail\n" if system "mkdir -p '$dir'";
    }
    
    die "dst path error\n"  if -e $dp && ! -f $dp;
 
    die "rename temp file\n" if system "mv '$temp' '$dp'";
    eval{ $filecache->save( $dp ); };
    warn "save filecache fail: $@" if $@;
}

1;



( run in 0.871 second using v1.01-cache-2.11-cpan-71847e10f99 )