AC-Yenta
view release on metacpan or search on metacpan
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
if( $me->{nearseen} < $MAXNEARSEE ){
for (1 .. $NEARSENDS){
$sent ++ if $me->_start_near();
}
}
}else{
$sent ++ if $me->_start_near();
}
return $sent;
}
sub _start_one {
my $me = shift;
my $far = shift;
if( $far ){
return if $me->{farseen} >= $MAXFARSEE;
$me->_start_far();
}else{
return if $me->{nearseen} >= $MAXNEARSEE;
$me->_start_near();
}
}
sub _start_peer {
my $me = shift;
my $id = shift;
my $far = shift;
my $pd = AC::Yenta::Status->peer($id);
my $addr = $pd->{ip}; # array of nat ip info
my $enc = use_encryption($pd);
my $ect = '';
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
$ect = $enc ? $proto->encrypt(undef, ${$me->{content}}) : ${$me->{content}} if $me->{content};
# build request
my $request = $proto->encode_request( {
type => 'yenta_distrib',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
content_encrypted => $enc,
}, {
sender => AC::Yenta::Status->my_server_id(),
hop => $me->{req}{hop} + 1,
expire => $me->{req}{expire},
datum => $me->{req}{datum},
}, \$ect );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
$request . $ect,
info => "distrib $me->{info} to $id",
);
if( $io ){
$io->set_callback('load', \&_onload, $me, $id, $far);
$io->set_callback('error', \&_onerror, $me, $id, $far);
$io->start();
}else{
debug("start client failed");
}
}
sub _onload {
my $io = shift;
my $evt = shift;
my $me = shift;
my $id = shift;
my $far = shift;
debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");
if( $evt->{data}{haveit} ){
if( $far ){
$me->{farseen} ++;
inc_stat('dist_send_far_seen');
}else{
$me->{nearseen} ++;
inc_stat('dist_send_near_seen');
}
}
if( !$me->{faraway} && !$far ){
# orderly distribution. hop away.
if( $evt->{data}{haveit} ){
shift @{$me->{nearsend}};
}else{
my $n = $me->{ordershift};
$n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
shift @{$me->{nearsend}} for (1 .. $n);
$me->{ordershift} *= 2;
}
}
$me->_start_one($far);
}
sub _onerror {
my $io = shift;
my $evt = shift;
my $me = shift;
my $id = shift;
my $far = shift;
verbose("error distributing $me->{info} to $id");
# don't need to track anything
$me->_start_one($far);
}
sub _orderly {
my $peers = shift;
my $myself = AC::Yenta::Status->my_server_id();
my @p = sort {$a cmp $b} @$peers;
my @left = grep { $_ lt $myself } @p;
my @right = grep { $_ gt $myself } @p;
@p = (@right, @left);
return \@p;
}
1;
( run in 1.131 second using v1.01-cache-2.11-cpan-437f7b0c052 )