MYDan

 view release on metacpan or  search on metacpan

lib/MYDan/Agent/Mrsync.pm  view on Meta::CPAN

use File::Basename;

use base qw( MYDan::Util::Phasic );

our %RUN = ( retry => 2, opt => '-aq' );

use MYDan::Agent::Query;
use Time::HiRes qw(time);
use AnyEvent;
use AnyEvent::Impl::Perl;
use AnyEvent::Socket;
use AnyEvent::Handle;
use MYDan::Util::OptConf;
use MYDan::API::Agent;
use MYDan::Agent::Client;

our ( %agent, $queryx ); 
BEGIN{ %agent = MYDan::Util::OptConf->load()->dump( 'agent' ); };

sub new
{
    my ( $class, %param ) = splice @_;
    my ( $sp, $dp ) = delete @param{ qw( sp dp ) };
    my %src = map { $_ => 1 } @{ $param{src} };

    $queryx = $param{queryxdata} if $param{queryxdata};
    $queryx = queryx( node => [ map{ @{ $param{$_} }}qw( src dst )], map{ $_ => $param{$_} } qw( user sudo ) ) if $param{2};

    map{ delete $param{$_} }qw( 1 2 user sudo queryxdata );

    $sp = $dp unless $sp;
    $dp = $sp unless $dp;

    croak "path not defined" unless $sp;

    $param{dst} = [ grep { ! $src{$_} } @{ $param{dst} } ] if $sp eq $dp;

    if ( $sp =~ /\/$/ ) { $dp .= '/' if $dp !~ /\/$/ }
    elsif ( $dp =~ /\/$/ ) { $dp .= File::Basename::basename( $sp ) }

    my $w8 = sub 
    {
        my @addr = gethostbyname shift;
        return @addr ? unpack N => $addr[-1] : 0;
    };

    my $rsync = sub
    {
        my ( $src, $dst, %param, %result ) = splice @_;
        my $sp = $src{$src} ? $sp : $dp;

        eval{
	    if( $queryx )
	    {
                my $load = [ $sp ];
                $load = Compress::Zlib::compress( YAML::XS::Dump $load );
                $load = 'data:' . length( $load ) . ':'. $load;
                $load .= $queryx->{load};


                my $dl =  +{ load => $load, src => $src, port => $agent{port}, sp => $sp, dp => $dp, map{ $_ => $param{$_} }qw( chown chmod cc ) };
                my $download = Compress::Zlib::compress( YAML::XS::Dump $dl );
                $download = 'data:' . length( $download ) . ':'. $download;
                $download .= $queryx->{download};

                my %query = ( 
                    argv => $dl,
                    code => 'download', map{ $_ => $param{$_} }qw( user sudo env )
                );

	        %result = MYDan::Agent::Client->new(
	                $dst
	            )->run( %agent, query => \%query, queryx => $download, timeout => $param{timeout} );
	    }
	    else
	    {
                my $isc = $agent{role} && $agent{role} eq 'client' ? 1 : 0;

                my %load = ( 
                    argv => [ $sp ],
                    code => 'load', map{ $_ => $param{$_} }qw( user sudo )
                );
                $load{node} = [ $src ] if $isc;

                my $load = MYDan::Agent::Query->dump(\%load);
                eval{ $load = MYDan::API::Agent->new()->encryption( $load ) if $isc };
                die "encryption fail:$@" if $@;


                my %query = ( 
                    argv => +{ load => $load, src => $src, port => $agent{port}, sp => $sp, dp => $dp, map{ $_ => $param{$_} }qw( chown chmod cc ) },
	            code => 'download', map{ $_ => $param{$_} }qw( user sudo env )
                );

	        %result = MYDan::Agent::Client->new(
	                $dst
	            )->run( %agent, query => \%query, timeout => $param{timeout} );

            }

            my $result = $result{$dst}||'';
	    die $result unless $result =~ /--- 0\n$/;
        };

        return $@ ? die "ERR: rsync $@" : 'OK';

    };

    bless $class->SUPER::new( %param, weight => $w8, code => $rsync ),
        ref $class || $class;
}

sub queryx
{
    my ( %param, %queryx ) = @_; #user sudo node

    my $isc = $agent{role} && $agent{role} eq 'client' ? 1 : 0;

    for my $type ( qw( load download ) )
    {
        my %query = (
            code => $type, map{ $_ => $param{$_} }qw( user sudo env )
        );

        $query{node} = $param{node} if $isc;
        my $query = MYDan::Agent::Query->dump(\%query);
        eval{ $query = MYDan::API::Agent->new()->encryption( $query ) if $isc };
        die "encryption fail:$@" if $@;
        $queryx{$type} = $query;
    }
    return \%queryx;
}

sub run
{
    my ( $self, %run ) = splice @_;
    $MYDan::Util::Phasic::MAX = delete $run{max} if $run{max};
    $self->SUPER::run( %RUN, %run );
    return $self;
}

1;



( run in 0.750 second using v1.01-cache-2.11-cpan-71847e10f99 )