AC-Yenta
view release on metacpan or search on metacpan
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my $vmx = AC::Yenta::Store::store_version_max( $map, $ver, $lev );
return unless defined $vmx;
if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
debug("skipping expired $lev/$ver - $vmx");
return 1;
}
return;
}
sub _check_result_nodes {
my $me = shift;
my $lev = shift;
my $chk = shift;
# determine all of the base versions of the recvd data
my %ver;
for my $d (@$chk){
my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
$ver{"$ver $shard"} = { ver => $ver, shard => $shard };
}
# get all of our merkle data for these versions
my %merkle;
my $t_new = timet_to_yenta_version($^T - $TOONEW);
for my $d (values %ver){
next if $d->{ver} > $t_new; # too new, ignore
next if $me->_is_expired($me->{map}, $lev, $d->{ver});
# RSN - skip unwanted shards
my $ms = AC::Yenta::Store::store_get_merkle($me->{map}, $d->{shard}, $d->{ver}, $lev - 1);
for my $m (@$ms){
# debug("my hash $me->{map} $m->{level}/$m->{shard}/$m->{version} => $m->{hash}");
$merkle{"$m->{version} $m->{shard}"} = $m->{hash};
}
}
# compare (don't bother with things that are too new (the data may still be en route))
for my $d (@$chk){
next if $d->{version} > $t_new; # too new, ignore
next if $me->_is_expired($me->{map}, $lev, $d->{version});
# RSN - skip unwanted shards
my $hash = $merkle{"$d->{version} $d->{shard}"};
if( $d->{hash} eq $hash ){
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} => match");
next;
}else{
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} != $hash");
}
# stick them at the front
unshift @{$me->{badnode}}, { version => $d->{version}, shard => $d->{shard}, level => $lev };
}
}
################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer
# (the one that said it has the data)
sub _next_get_kv {
my $me = shift;
return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
return $me->_start_get_kv_any() if @{$me->{kvneed}};
}
sub _start_get_kv_any {
my $me = shift;
my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();
# pick a peer
my $peer = $me->_pick_peer();
debug("getting kv data from peer $peer->{id}");
$me->_start_get_kv( $peer, 1, \@get);
}
sub _start_get_kv_orig {
my $me = shift;
my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();
debug("getting kv data from current peer");
$me->_start_get_kv( $me->{peer}, 0, \@get);
}
sub _start_get_kv {
my $me = shift;
my $peer = shift;
my $retry = shift;
my $get = shift;
# insert map into request
$_->{map} = $me->{map} for @$get;
# for (@$get){ debug("requesting $_->{key}/$_->{version}") }
my $enc = use_encryption($peer);
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
# build request
my $request = $proto->encode_request( {
type => 'yenta_get',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
data => $get,
} );
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
info => "AE getkv from $peer->{id}" );
if( $io ){
$me->{kvfetching} ++;
( run in 1.211 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )