AC-MrGamoo

 view release on metacpan or  search on metacpan

lib/AC/MrGamoo/Xfer.pm  view on Meta::CPAN

sub timeout {
    my $me = shift;
    debug("xfer timeout");
    $me->shut();
}

sub shutdown {
    my $me = shift;

    delete $REGISTRY{$me->{request}{copyid}};

    debug("exitval = $me->{exitval}");
    if( !$me->{exitval} ){
        $me->run_callback('on_success');
    }else{
        $me->run_callback('on_failure');
    }

    periodic(1);	# try to start another xfer
}

sub _send_status_update {
    my $req = shift;

    debug('send xfer status');
    AC::MrGamoo::API::Xfer::tell_master( $req, 100, 'Working...' );
}


sub _get_file {
    my $oreq    = shift;
    my $loc     = shift;
    my $srcname = shift;	# on remote system
    my $tmpfile = shift;


    my($addr, $port) = get_peer_addr_from_id( $loc );
    unless( $addr ){
        debug("cannot find addr for $loc");
        return;
    }

    debug("connecting to $addr:$port");

    my $req = AC::MrGamoo::Protocol->encode_request( {
        type		=> 'scribl_get',
        msgidno		=> $$,
        want_reply	=> 1,
    }, { filename => $srcname } );

    my $p;
    eval {
        # connect
        my $s = AC::MrGamoo::Protocol->connect_to_server( inet_aton($addr), $port );
        return unless $s;

        # send req
        AC::MrGamoo::Protocol->write_request($s, $req);

        # get response
        my $buf = AC::MrGamoo::Protocol->read_data($s, AC::MrGamoo::Protocol->header_size(), 30);
        $p      = AC::MrGamoo::Protocol->decode_header($buf);
        $p->{data} = AC::MrGamoo::Protocol->read_data($s, $p->{data_length}, 1);
        $p->{data} = AC::MrGamoo::Protocol->decode_reply($p);

        debug("recvd response $p->{data}{status_code}");
        return unless $p->{data}{status_code} == 200;

        # stream file to disk
        my $size = $p->{content_length};
        debug("recving file ($size B)");

        my $fd;
        unless( open( $fd, "> $tmpfile" ) ){
            verbose("cannot open output file '$tmpfile': $!");
            return;
        }

        my $chk  = _sendfile($oreq, $fd, $s, $size);
        my $sha1 = $p->{data}{hash_sha1};
        die "SHA1 check failed\n" if $sha1 && $sha1 ne $chk;
    };
    if(my $e=$@){
        debug("error: $e");
        return;
    }

    return $p;
}

sub _sendfile {
    my $req   = shift;
    my $out   = shift;
    my $in    = shift;
    my $size  = shift;

    my $t;

    my $sha1 = Digest::SHA1->new();

    while($size){
        my $buf;
        my $len = $size > $BUFSIZ ? $BUFSIZ : $size;
        alarm( 1 );
        my $i = sysread($in, $buf, $len);
        die "read failed: $!\n" unless $i > 0;
        $size -= $i;
        $i = syswrite($out, $buf);
        die "write failed: $!\n" unless $i > 0;
        $sha1->add($buf);

        # periodically tell master we are still  copying
        if( time() - $t > $STATUSTIME ){
            _send_status_update( $req );
            $t = time();
        }
    }
    alarm(0);

    return $sha1->b64digest();
}

sub report {

    my $txt;

    for my $t (values %REGISTRY){
        my $status = $t->{fd} ? 'running' : 'queued';
        $txt .= "$t->{request}{copyid} $status\n";
    }

    return $txt;
}

sub periodic {
    my $quick = shift;

    # how many xfers are running?
    my $nrun = 0;
    for my $t (values %REGISTRY){



( run in 0.831 second using v1.01-cache-2.11-cpan-39bf76dae61 )