AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


    debug("finished $me->{map}");
    $DONE{$me->{map}} = $^T if $me->{missing} < $MAXMISSING;
    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
    @AE = grep{ $_ != $me } @AE;
}

sub _next_step {
    my $me = shift;

    $me->{timestamp} = $^T;

    if( $me->{kvfetching} < $MAXFETCH ){
        # any missing data?
        if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
            debug("starting nextgetkv ($me->{kvfetching})");
            $me->_next_get_kv();
        }
    }

    # check nodes?
    if( @{$me->{badnode}} ){
        $me->_start_check();
        return;
    }

    $me->_finished();
}

################################################################

sub _start_check {
    my $me = shift;

    my $node = shift @{$me->{badnode}};
    debug("checking next node: $me->{map} $node->{level}/$node->{version}");
    inc_stat('ae_check_node');

    my $enc     = use_encryption($me->{peer});
    my $proto   = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $request = $proto->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> $msgid++,
        want_reply	=> 1,
        data_encrypted	=> $enc,
    }, {
        map		=> $me->{map},
        level		=> $node->{level},
        version		=> $node->{version},
        shard		=> $node->{shard},
    } );

    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new(
        $me->{peer}{ip}, undef,
        $request,
        info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );

    if( $io ){
        $io->set_callback('load',  \&_check_load,  $me);
        $io->set_callback('error', \&_check_error, $me);
        $io->start();
    }

}

sub _check_load {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;


    debug("check results");
    $evt->{data} ||= {};

    # determine highest level returned

    my @keydata;
    my @nodedata;
    my $maxlev = 0;

    for my $d ( @{ $evt->{data}{check} }){
        debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");

        if( $d->{key} ){
            push @keydata, $d;
            next;
        }
        next if $d->{level} < $maxlev;

        if( $d->{level} > $maxlev ){
            @nodedata = ();
            $maxlev   = $d->{level};
        }
        push @nodedata, $d;
    }

    if( @keydata ){
        $me->_check_result_keys( \@keydata );
    }elsif( @nodedata ){
        $me->_check_result_nodes( $maxlev, \@nodedata );
    }

    $me->_next_step();
}

sub _check_error {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
    $me->_next_step();
}

sub _check_result_keys {
    my $me  = shift;
    my $chk = shift;

    my %vscnt;
    my %vsadd;

    for my $d (@$chk){
        inc_stat('ae_check_key');
        my $vsk = "$d->{version} $d->{shard}";
        $vscnt{ $vsk } ++;

        next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );

        debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
        push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
        inc_stat('ae_key_missing');
        $me->{missing} ++;
        $vsadd{ $vsk } ++;
    }
}

sub _is_expired {
    my $me  = shift;
    my $map = shift;
    my $lev = shift;
    my $ver = shift;

    return unless $me->{expire};

    my $vmx = AC::Yenta::Store::store_version_max( $map, $ver, $lev );
    return unless defined $vmx;

    if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
        debug("skipping expired $lev/$ver - $vmx");
        return 1;
    }

    return;
}

sub _check_result_nodes {
    my $me  = shift;
    my $lev = shift;
    my $chk = shift;

    # determine all of the base versions of the recvd data
    my %ver;
    for my $d (@$chk){
        my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
        $ver{"$ver $shard"} = { ver => $ver, shard => $shard };
    }

    # get all of our merkle data for these versions
    my %merkle;
    my $t_new = timet_to_yenta_version($^T - $TOONEW);

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


sub _next_get_kv {
    my $me = shift;

    return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
    return $me->_start_get_kv_any()  if @{$me->{kvneed}};
}

sub _start_get_kv_any {
    my $me = shift;

    my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();

    # pick a peer
    my $peer = $me->_pick_peer();
    debug("getting kv data from peer $peer->{id}");
    $me->_start_get_kv( $peer, 1, \@get);
}

sub _start_get_kv_orig {
    my $me = shift;

    my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();

    debug("getting kv data from current peer");
    $me->_start_get_kv( $me->{peer}, 0, \@get);
}

sub _start_get_kv {
    my $me    = shift;
    my $peer  = shift;
    my $retry = shift;
    my $get   = shift;

    # insert map into request
    $_->{map} = $me->{map} for @$get;

    # for (@$get){ debug("requesting $_->{key}/$_->{version}") }

    my $enc   = use_encryption($peer);
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_get',
        msgidno		  => $msgid++,
        want_reply	  => 1,
        data_encrypted	  => $enc,
    }, {
        data		  => $get,
    } );

    # connect + send
    debug("sending to $peer->{id}");
    my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
                                                  info => "AE getkv from $peer->{id}" );

    if( $io ){
        $me->{kvfetching} ++;
        $io->set_callback('load',  \&_getkv_load,  $me, $retry, $get);
        $io->set_callback('error', \&_getkv_error, $me, $retry, $get);
        $io->start();
    }
}

sub _getkv_load {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

    $me->{kvfetching} --;
    $evt->{data} ||= {};

    debug("got kv data results");

    my %need = map {
        ( "$_->{key}/$_->{version}" => $_ )
    } @$get;

    for my $d ( @{$evt->{data}{data}}){
        debug("got $d->{map}/$d->{key}/$d->{version}");
        next unless $d->{key} && $d->{version};		# not found

        delete $need{ "$d->{key}/$d->{version}" };
        my $file = $evt->{content};
        $file = \ $d->{file} if $d->{file};

        AC::Yenta::Store::store_put( $d->{map}, $d->{shard}, $d->{key}, $d->{version},
                                     $d->{value}, $file, $d->{meta} );

    }

    push @{$me->{kvneedorig}}, (values %need) if $retry;

    $me->_next_get_kv();

}

sub _getkv_error {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

    $me->{kvfetching} --;

    if( $retry ){
        push @{$me->{kvneedorig}}, @$get;
    }

    $me->_next_get_kv();
}

1;



( run in 1.253 second using v1.01-cache-2.11-cpan-75ffa21a3d4 )