AC-Yenta
view release on metacpan or search on metacpan
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
missing => 0,
}, $class;
debug("new ae");
$me->_pick_map() || return;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
$me->_init_peer() || return;
debug("checking $me->{map} with $me->{peer}{id}");
inc_stat('ae_runs');
$me->_next_step();
push @AE, $me;
return $me;
}
sub periodic {
# kill dead sessions, start new ones
my @keep;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
$me->_finished();
}
################################################################
sub _start_check {
my $me = shift;
my $node = shift @{$me->{badnode}};
debug("checking next node: $me->{map} $node->{level}/$node->{version}");
inc_stat('ae_check_node');
my $enc = use_encryption($me->{peer});
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $request = $proto->encode_request( {
type => 'yenta_check',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
map => $me->{map},
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
sub _check_result_keys {
my $me = shift;
my $chk = shift;
my %vscnt;
my %vsadd;
for my $d (@$chk){
inc_stat('ae_check_key');
my $vsk = "$d->{version} $d->{shard}";
$vscnt{ $vsk } ++;
next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );
debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
inc_stat('ae_key_missing');
$me->{missing} ++;
$vsadd{ $vsk } ++;
}
}
sub _is_expired {
my $me = shift;
my $map = shift;
my $lev = shift;
my $ver = shift;
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
faraway => (my_datacenter() ne $sendat->{datacenter}),
farseen => 0,
nearseen => 0,
farsend => [],
nearsend => [],
ordershift => 4,
}, $class;
debug("distributing $me->{info}");
inc_stat( 'dist_requests' );
inc_stat( 'dist_requests_faraway' ) if $me->{faraway};
$me->_init_strategy($sender);
# RSN - check load
my $max = conf_value('distrib_max') || $MAXUNDERWAY;
if( @DIST < $max ){
$me->_start_next();
}
push @DIST, $me;
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
# randomly pick one server in chosen dc
my @id = grep {
my $x = AC::Yenta::Status->peer($_);
($x->{status} == 200) ? 1 : 0;
} @{$d->{id}};
return unless @id;
my $id = $id[ rand(@id) ];
debug("sending $me->{info} to far site $id in $d->{dc}");
$me->_start_peer( $id, 1 );
inc_stat('dist_send_far');
inc_stat('dist_send_total');
return 1;
}
sub _start_near {
my $me = shift;
my $id = shift @{ $me->{nearsend} };
return unless $id;
debug("sending $me->{info} to nearby site $id");
$me->_start_peer( $id, 0 );
inc_stat('dist_send_near');
inc_stat('dist_send_total');
return 1;
}
sub _start_next {
my $me = shift;
my $sent;
# pick next peers
# start clients
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
my $evt = shift;
my $me = shift;
my $id = shift;
my $far = shift;
debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");
if( $evt->{data}{haveit} ){
if( $far ){
$me->{farseen} ++;
inc_stat('dist_send_far_seen');
}else{
$me->{nearseen} ++;
inc_stat('dist_send_near_seen');
}
}
if( !$me->{faraway} && !$far ){
# orderly distribution. hop away.
if( $evt->{data}{haveit} ){
shift @{$me->{nearsend}};
}else{
my $n = $me->{ordershift};
$n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
( run in 0.740 second using v1.01-cache-2.11-cpan-49f99fa48dc )