AC-Yenta
view release on metacpan or search on metacpan
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
debug("finished $me->{map}");
$DONE{$me->{map}} = $^T if $me->{missing} < $MAXMISSING;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
@AE = grep{ $_ != $me } @AE;
}
sub _next_step {
my $me = shift;
$me->{timestamp} = $^T;
if( $me->{kvfetching} < $MAXFETCH ){
# any missing data?
if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
debug("starting nextgetkv ($me->{kvfetching})");
$me->_next_get_kv();
}
}
# check nodes?
if( @{$me->{badnode}} ){
$me->_start_check();
return;
}
$me->_finished();
}
################################################################
sub _start_check {
my $me = shift;
my $node = shift @{$me->{badnode}};
debug("checking next node: $me->{map} $node->{level}/$node->{version}");
inc_stat('ae_check_node');
my $enc = use_encryption($me->{peer});
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $request = $proto->encode_request( {
type => 'yenta_check',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
map => $me->{map},
level => $node->{level},
version => $node->{version},
shard => $node->{shard},
} );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new(
$me->{peer}{ip}, undef,
$request,
info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );
if( $io ){
$io->set_callback('load', \&_check_load, $me);
$io->set_callback('error', \&_check_error, $me);
$io->start();
}
}
sub _check_load {
my $io = shift;
my $evt = shift;
my $me = shift;
debug("check results");
$evt->{data} ||= {};
# determine highest level returned
my @keydata;
my @nodedata;
my $maxlev = 0;
for my $d ( @{ $evt->{data}{check} }){
debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");
if( $d->{key} ){
push @keydata, $d;
next;
}
next if $d->{level} < $maxlev;
if( $d->{level} > $maxlev ){
@nodedata = ();
$maxlev = $d->{level};
}
push @nodedata, $d;
}
if( @keydata ){
$me->_check_result_keys( \@keydata );
}elsif( @nodedata ){
$me->_check_result_nodes( $maxlev, \@nodedata );
}
$me->_next_step();
}
sub _check_error {
my $io = shift;
my $evt = shift;
my $me = shift;
verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
$me->_next_step();
}
sub _check_result_keys {
my $me = shift;
my $chk = shift;
my %vscnt;
my %vsadd;
for my $d (@$chk){
inc_stat('ae_check_key');
my $vsk = "$d->{version} $d->{shard}";
$vscnt{ $vsk } ++;
next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );
debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
inc_stat('ae_key_missing');
$me->{missing} ++;
$vsadd{ $vsk } ++;
}
}
sub _is_expired {
my $me = shift;
my $map = shift;
my $lev = shift;
my $ver = shift;
return unless $me->{expire};
my $vmx = AC::Yenta::Store::store_version_max( $map, $ver, $lev );
return unless defined $vmx;
if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
debug("skipping expired $lev/$ver - $vmx");
return 1;
}
return;
}
sub _check_result_nodes {
my $me = shift;
my $lev = shift;
my $chk = shift;
# determine all of the base versions of the recvd data
my %ver;
for my $d (@$chk){
my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
$ver{"$ver $shard"} = { ver => $ver, shard => $shard };
}
# get all of our merkle data for these versions
my %merkle;
my $t_new = timet_to_yenta_version($^T - $TOONEW);
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
sub _next_get_kv {
my $me = shift;
return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
return $me->_start_get_kv_any() if @{$me->{kvneed}};
}
sub _start_get_kv_any {
my $me = shift;
my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();
# pick a peer
my $peer = $me->_pick_peer();
debug("getting kv data from peer $peer->{id}");
$me->_start_get_kv( $peer, 1, \@get);
}
sub _start_get_kv_orig {
my $me = shift;
my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();
debug("getting kv data from current peer");
$me->_start_get_kv( $me->{peer}, 0, \@get);
}
sub _start_get_kv {
my $me = shift;
my $peer = shift;
my $retry = shift;
my $get = shift;
# insert map into request
$_->{map} = $me->{map} for @$get;
# for (@$get){ debug("requesting $_->{key}/$_->{version}") }
my $enc = use_encryption($peer);
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
# build request
my $request = $proto->encode_request( {
type => 'yenta_get',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
data => $get,
} );
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
info => "AE getkv from $peer->{id}" );
if( $io ){
$me->{kvfetching} ++;
$io->set_callback('load', \&_getkv_load, $me, $retry, $get);
$io->set_callback('error', \&_getkv_error, $me, $retry, $get);
$io->start();
}
}
sub _getkv_load {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
$me->{kvfetching} --;
$evt->{data} ||= {};
debug("got kv data results");
my %need = map {
( "$_->{key}/$_->{version}" => $_ )
} @$get;
for my $d ( @{$evt->{data}{data}}){
debug("got $d->{map}/$d->{key}/$d->{version}");
next unless $d->{key} && $d->{version}; # not found
delete $need{ "$d->{key}/$d->{version}" };
my $file = $evt->{content};
$file = \ $d->{file} if $d->{file};
AC::Yenta::Store::store_put( $d->{map}, $d->{shard}, $d->{key}, $d->{version},
$d->{value}, $file, $d->{meta} );
}
push @{$me->{kvneedorig}}, (values %need) if $retry;
$me->_next_get_kv();
}
sub _getkv_error {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
$me->{kvfetching} --;
if( $retry ){
push @{$me->{kvneedorig}}, @$get;
}
$me->_next_get_kv();
}
1;
( run in 1.253 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )