AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


    my $vmx = AC::Yenta::Store::store_version_max( $map, $ver, $lev );
    return unless defined $vmx;

    if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
        debug("skipping expired $lev/$ver - $vmx");
        return 1;
    }

    return;
}

sub _check_result_nodes {
    my $me  = shift;
    my $lev = shift;
    my $chk = shift;

    # determine all of the base versions of the recvd data
    my %ver;
    for my $d (@$chk){
        my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
        $ver{"$ver $shard"} = { ver => $ver, shard => $shard };
    }

    # get all of our merkle data for these versions
    my %merkle;
    my $t_new = timet_to_yenta_version($^T - $TOONEW);

    for my $d (values %ver){
        next if $d->{ver} > $t_new;				# too new, ignore
        next if $me->_is_expired($me->{map}, $lev, $d->{ver});
        # RSN - skip unwanted shards
        my $ms = AC::Yenta::Store::store_get_merkle($me->{map}, $d->{shard}, $d->{ver}, $lev - 1);
        for my $m (@$ms){
            # debug("my hash $me->{map} $m->{level}/$m->{shard}/$m->{version} => $m->{hash}");
            $merkle{"$m->{version} $m->{shard}"} = $m->{hash};
        }
    }

    # compare (don't bother with things that are too new (the data may still be en route))
    for my $d (@$chk){
        next if $d->{version} > $t_new;				# too new, ignore
        next if $me->_is_expired($me->{map}, $lev, $d->{version});
        # RSN - skip unwanted shards
        my $hash = $merkle{"$d->{version} $d->{shard}"};

        if( $d->{hash} eq $hash ){
            debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} => match");
            next;
        }else{
            debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} != $hash");
        }

        # stick them at the front
        unshift @{$me->{badnode}}, { version => $d->{version}, shard => $d->{shard}, level => $lev };
    }
}

################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer
# (the one that said it has the data)

sub _next_get_kv {
    my $me = shift;

    return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
    return $me->_start_get_kv_any()  if @{$me->{kvneed}};
}

sub _start_get_kv_any {
    my $me = shift;

    my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();

    # pick a peer
    my $peer = $me->_pick_peer();
    debug("getting kv data from peer $peer->{id}");
    $me->_start_get_kv( $peer, 1, \@get);
}

sub _start_get_kv_orig {
    my $me = shift;

    my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();

    debug("getting kv data from current peer");
    $me->_start_get_kv( $me->{peer}, 0, \@get);
}

sub _start_get_kv {
    my $me    = shift;
    my $peer  = shift;
    my $retry = shift;
    my $get   = shift;

    # insert map into request
    $_->{map} = $me->{map} for @$get;

    # for (@$get){ debug("requesting $_->{key}/$_->{version}") }

    my $enc   = use_encryption($peer);
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_get',
        msgidno		  => $msgid++,
        want_reply	  => 1,
        data_encrypted	  => $enc,
    }, {
        data		  => $get,
    } );

    # connect + send
    debug("sending to $peer->{id}");
    my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
                                                  info => "AE getkv from $peer->{id}" );

    if( $io ){
        $me->{kvfetching} ++;



( run in 1.211 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )