AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

        missing		=> 0,
    }, $class;

    debug("new ae");
    $me->_pick_map()  || return;

    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
    $me->_init_peer() || return;

    debug("checking $me->{map} with $me->{peer}{id}");
    inc_stat('ae_runs');
    $me->_next_step();

    push @AE, $me;
    return $me;
}

sub periodic {
    # kill dead sessions, start new ones

    my @keep;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    $me->_finished();
}

################################################################

sub _start_check {
    my $me = shift;

    my $node = shift @{$me->{badnode}};
    debug("checking next node: $me->{map} $node->{level}/$node->{version}");
    inc_stat('ae_check_node');

    my $enc     = use_encryption($me->{peer});
    my $proto   = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $request = $proto->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> $msgid++,
        want_reply	=> 1,
        data_encrypted	=> $enc,
    }, {
        map		=> $me->{map},

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

}

sub _check_result_keys {
    my $me  = shift;
    my $chk = shift;

    my %vscnt;
    my %vsadd;

    for my $d (@$chk){
        inc_stat('ae_check_key');
        my $vsk = "$d->{version} $d->{shard}";
        $vscnt{ $vsk } ++;

        next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );

        debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
        push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
        inc_stat('ae_key_missing');
        $me->{missing} ++;
        $vsadd{ $vsk } ++;
    }
}

sub _is_expired {
    my $me  = shift;
    my $map = shift;
    my $lev = shift;
    my $ver = shift;

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

        faraway 	=> (my_datacenter() ne $sendat->{datacenter}),

        farseen		=> 0,
        nearseen	=> 0,
        farsend		=> [],
        nearsend	=> [],
        ordershift	=> 4,
    }, $class;

    debug("distributing $me->{info}");
    inc_stat( 'dist_requests' );
    inc_stat( 'dist_requests_faraway' ) if $me->{faraway};


    $me->_init_strategy($sender);

    # RSN - check load
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;
    if( @DIST < $max ){
        $me->_start_next();
    }
    push @DIST, $me;

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    # randomly pick one server in chosen dc
    my @id = grep {
        my $x = AC::Yenta::Status->peer($_);
        ($x->{status} == 200) ? 1 : 0;
    } @{$d->{id}};
    return unless @id;

    my $id = $id[ rand(@id) ];
    debug("sending $me->{info} to far site $id in $d->{dc}");
    $me->_start_peer( $id, 1 );
    inc_stat('dist_send_far');
    inc_stat('dist_send_total');
    return 1;
}

sub _start_near {
    my $me  = shift;

    my $id = shift @{ $me->{nearsend} };
    return unless $id;
    debug("sending $me->{info} to nearby site $id");
    $me->_start_peer( $id, 0 );
    inc_stat('dist_send_near');
    inc_stat('dist_send_total');
    return 1;
}

sub _start_next {
    my $me  = shift;

    my $sent;

    # pick next peers
    # start clients

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");

    if( $evt->{data}{haveit} ){
        if( $far ){
            $me->{farseen}  ++;
            inc_stat('dist_send_far_seen');
        }else{
            $me->{nearseen} ++;
            inc_stat('dist_send_near_seen');
        }
    }

    if( !$me->{faraway} && !$far ){
        # orderly distribution. hop away.
        if( $evt->{data}{haveit} ){
            shift @{$me->{nearsend}};
        }else{
            my $n = $me->{ordershift};
            $n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;



( run in 0.929 second using v1.01-cache-2.11-cpan-49f99fa48dc )