AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    my $me  = shift;

    my $id = shift @{ $me->{nearsend} };
    return unless $id;
    debug("sending $me->{info} to nearby site $id");
    $me->_start_peer( $id, 0 );
    inc_stat('dist_send_near');
    inc_stat('dist_send_total');
    return 1;
}

sub _start_next {
    my $me  = shift;

    my $sent;

    # pick next peers
    # start clients

    if( $me->{faraway} ){
        if( $me->{farseen} < $MAXFARSEE ){
            for (1 .. $FARSENDS){
                $sent ++ if $me->_start_far();
            }
        }
        if( $me->{nearseen} < $MAXNEARSEE ){
            for (1 .. $NEARSENDS){
                $sent ++ if $me->_start_near();
            }
        }
    }else{
        $sent ++ if $me->_start_near();
    }

    return $sent;
}

sub _start_one {
    my $me  = shift;
    my $far = shift;

    if( $far ){
        return if $me->{farseen}  >= $MAXFARSEE;
        $me->_start_far();
    }else{
        return if $me->{nearseen} >= $MAXNEARSEE;
        $me->_start_near();
    }
}

sub _start_peer {
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    my $pd   = AC::Yenta::Status->peer($id);
    my $addr = $pd->{ip};	# array of nat ip info

    my $enc = use_encryption($pd);
    my $ect = '';
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $enc ? $proto->encrypt(undef, ${$me->{content}}) : ${$me->{content}} if $me->{content};

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_distrib',
        msgidno		  => $msgid++,
        want_reply	  => 1,
        data_encrypted	  => $enc,
        content_encrypted => $enc,
    }, {
        sender		  => AC::Yenta::Status->my_server_id(),
        hop		  => $me->{req}{hop} + 1,
        expire		  => $me->{req}{expire},
        datum		  => $me->{req}{datum},
    }, \$ect );

    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
                                                   $request . $ect,
                                                   info => "distrib $me->{info} to $id",
                                                  );

    if( $io ){
        $io->set_callback('load',  \&_onload,  $me, $id, $far);
        $io->set_callback('error', \&_onerror, $me, $id, $far);
        $io->start();
    }else{
        debug("start client failed");
    }
}

sub _onload {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");

    if( $evt->{data}{haveit} ){
        if( $far ){
            $me->{farseen}  ++;
            inc_stat('dist_send_far_seen');
        }else{
            $me->{nearseen} ++;
            inc_stat('dist_send_near_seen');
        }
    }

    if( !$me->{faraway} && !$far ){
        # orderly distribution. hop away.
        if( $evt->{data}{haveit} ){
            shift @{$me->{nearsend}};
        }else{
            my $n = $me->{ordershift};
            $n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
            shift @{$me->{nearsend}} for (1 .. $n);
            $me->{ordershift} *= 2;
        }



( run in 1.868 second using v1.01-cache-2.11-cpan-5837b0d9d2c )