AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

        if( $me->{nearseen} < $MAXNEARSEE ){
            for (1 .. $NEARSENDS){
                $sent ++ if $me->_start_near();
            }
        }
    }else{
        $sent ++ if $me->_start_near();
    }

    return $sent;
}

sub _start_one {
    my $me  = shift;
    my $far = shift;

    if( $far ){
        return if $me->{farseen}  >= $MAXFARSEE;
        $me->_start_far();
    }else{
        return if $me->{nearseen} >= $MAXNEARSEE;
        $me->_start_near();
    }
}

sub _start_peer {
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    my $pd   = AC::Yenta::Status->peer($id);
    my $addr = $pd->{ip};	# array of nat ip info

    my $enc = use_encryption($pd);
    my $ect = '';
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $enc ? $proto->encrypt(undef, ${$me->{content}}) : ${$me->{content}} if $me->{content};

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_distrib',
        msgidno		  => $msgid++,
        want_reply	  => 1,
        data_encrypted	  => $enc,
        content_encrypted => $enc,
    }, {
        sender		  => AC::Yenta::Status->my_server_id(),
        hop		  => $me->{req}{hop} + 1,
        expire		  => $me->{req}{expire},
        datum		  => $me->{req}{datum},
    }, \$ect );

    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
                                                   $request . $ect,
                                                   info => "distrib $me->{info} to $id",
                                                  );

    if( $io ){
        $io->set_callback('load',  \&_onload,  $me, $id, $far);
        $io->set_callback('error', \&_onerror, $me, $id, $far);
        $io->start();
    }else{
        debug("start client failed");
    }
}

sub _onload {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");

    if( $evt->{data}{haveit} ){
        if( $far ){
            $me->{farseen}  ++;
            inc_stat('dist_send_far_seen');
        }else{
            $me->{nearseen} ++;
            inc_stat('dist_send_near_seen');
        }
    }

    if( !$me->{faraway} && !$far ){
        # orderly distribution. hop away.
        if( $evt->{data}{haveit} ){
            shift @{$me->{nearsend}};
        }else{
            my $n = $me->{ordershift};
            $n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
            shift @{$me->{nearsend}} for (1 .. $n);
            $me->{ordershift} *= 2;
        }
    }

    $me->_start_one($far);
}

sub _onerror {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    verbose("error distributing $me->{info} to $id");
    # don't need to track anything

    $me->_start_one($far);
}

sub _orderly {
    my $peers = shift;

    my $myself = AC::Yenta::Status->my_server_id();
    my @p = sort {$a cmp $b} @$peers;

    my @left  = grep { $_ lt $myself } @p;
    my @right = grep { $_ gt $myself } @p;

    @p = (@right, @left);
    return \@p;
}

1;



( run in 1.131 second using v1.01-cache-2.11-cpan-437f7b0c052 )