AC-Yenta
view release on metacpan or search on metacpan
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
}elsif( $me->{peers_near} ){
@peer = @{$me->{peers_near}};
}elsif( $me->{peers_far} ){
@peer = @{$me->{peers_far}};
}elsif( $me->{peers_ood} ){
# only use out-of-date peers as a last resort
# NB: if we never used ood peers, we'd have a bootstrap deadlock
@peer = @{$me->{peers_ood}};
}
return unless @peer;
my $peer = $peer[ rand(@peer) ];
return $peer;
}
################################################################
sub _finished {
my $me = shift;
debug("finished $me->{map}");
$DONE{$me->{map}} = $^T if $me->{missing} < $MAXMISSING;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
@AE = grep{ $_ != $me } @AE;
}
sub _next_step {
my $me = shift;
$me->{timestamp} = $^T;
if( $me->{kvfetching} < $MAXFETCH ){
# any missing data?
if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
debug("starting nextgetkv ($me->{kvfetching})");
$me->_next_get_kv();
}
}
# check nodes?
if( @{$me->{badnode}} ){
$me->_start_check();
return;
}
$me->_finished();
}
################################################################
sub _start_check {
my $me = shift;
my $node = shift @{$me->{badnode}};
debug("checking next node: $me->{map} $node->{level}/$node->{version}");
inc_stat('ae_check_node');
my $enc = use_encryption($me->{peer});
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $request = $proto->encode_request( {
type => 'yenta_check',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
map => $me->{map},
level => $node->{level},
version => $node->{version},
shard => $node->{shard},
} );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new(
$me->{peer}{ip}, undef,
$request,
info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );
if( $io ){
$io->set_callback('load', \&_check_load, $me);
$io->set_callback('error', \&_check_error, $me);
$io->start();
}
}
sub _check_load {
my $io = shift;
my $evt = shift;
my $me = shift;
debug("check results");
$evt->{data} ||= {};
# determine highest level returned
my @keydata;
my @nodedata;
my $maxlev = 0;
for my $d ( @{ $evt->{data}{check} }){
debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");
if( $d->{key} ){
push @keydata, $d;
next;
}
next if $d->{level} < $maxlev;
if( $d->{level} > $maxlev ){
@nodedata = ();
$maxlev = $d->{level};
}
push @nodedata, $d;
}
if( @keydata ){
$me->_check_result_keys( \@keydata );
}elsif( @nodedata ){
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
next if $me->_is_expired($me->{map}, $lev, $d->{version});
# RSN - skip unwanted shards
my $hash = $merkle{"$d->{version} $d->{shard}"};
if( $d->{hash} eq $hash ){
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} => match");
next;
}else{
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} != $hash");
}
# stick them at the front
unshift @{$me->{badnode}}, { version => $d->{version}, shard => $d->{shard}, level => $lev };
}
}
################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer
# (the one that said it has the data)
sub _next_get_kv {
my $me = shift;
return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
return $me->_start_get_kv_any() if @{$me->{kvneed}};
}
sub _start_get_kv_any {
my $me = shift;
my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();
# pick a peer
my $peer = $me->_pick_peer();
debug("getting kv data from peer $peer->{id}");
$me->_start_get_kv( $peer, 1, \@get);
}
sub _start_get_kv_orig {
my $me = shift;
my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();
debug("getting kv data from current peer");
$me->_start_get_kv( $me->{peer}, 0, \@get);
}
sub _start_get_kv {
my $me = shift;
my $peer = shift;
my $retry = shift;
my $get = shift;
# insert map into request
$_->{map} = $me->{map} for @$get;
# for (@$get){ debug("requesting $_->{key}/$_->{version}") }
my $enc = use_encryption($peer);
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
# build request
my $request = $proto->encode_request( {
type => 'yenta_get',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
data => $get,
} );
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
info => "AE getkv from $peer->{id}" );
if( $io ){
$me->{kvfetching} ++;
$io->set_callback('load', \&_getkv_load, $me, $retry, $get);
$io->set_callback('error', \&_getkv_error, $me, $retry, $get);
$io->start();
}
}
sub _getkv_load {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
$me->{kvfetching} --;
$evt->{data} ||= {};
debug("got kv data results");
my %need = map {
( "$_->{key}/$_->{version}" => $_ )
} @$get;
for my $d ( @{$evt->{data}{data}}){
debug("got $d->{map}/$d->{key}/$d->{version}");
next unless $d->{key} && $d->{version}; # not found
delete $need{ "$d->{key}/$d->{version}" };
my $file = $evt->{content};
$file = \ $d->{file} if $d->{file};
AC::Yenta::Store::store_put( $d->{map}, $d->{shard}, $d->{key}, $d->{version},
$d->{value}, $file, $d->{meta} );
}
push @{$me->{kvneedorig}}, (values %need) if $retry;
$me->_next_get_kv();
}
sub _getkv_error {
( run in 2.447 seconds using v1.01-cache-2.11-cpan-98e64b0badf )