AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

sub new {
    my $class = shift;

    my $me = bless {
        badnode		=> [ {version => 0, shard => 0, level => 0} ],
        cache		=> {},
        kvneed		=> [],
        kvneedorig	=> [],
        kvfetching	=> 0,
        missing		=> 0,
    }, $class;

    debug("new ae");
    $me->_pick_map()  || return;

    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
    $me->_init_peer() || return;

    debug("checking $me->{map} with $me->{peer}{id}");
    inc_stat('ae_runs');
    $me->_next_step();

    push @AE, $me;
    return $me;
}

sub periodic {
    # kill dead sessions, start new ones

    my @keep;
    for my $ae (@AE){
        if( $ae->{timestamp} + $EXPIRE > $^T ){
            push @keep, $ae;
        }
    }
    @AE = @keep;

    return if @AE;
    return if loadave() > (conf_value('ae_maxload') || $MAXLOAD);
    __PACKAGE__->new();
}

# we are up to date if we have AE'ed every map at least once since starting
sub up_to_date {
    my $class = shift;

    my $maps = conf_value('map');
    for my $m (keys %$maps){
        return 0 unless $DONE{$m};
    }
    return 1;
}

################################################################

# find most stale map
sub _pick_map {
    my $me = shift;

    my $maps = conf_value('map');
    my(@best, $bestv);
    for my $m (keys %$maps){
        my $lt = AC::Yenta::Store::store_get_internal($m, 'ae_last_start');
        if( !@best || $lt < $bestv ){
            @best = $m;
            $bestv = $lt;
        }elsif( $lt == $bestv ){
            push @best, $m;
        }
    }

    return unless @best;

    my $map = $best[ rand(@best) ];
    $me->{map} = $map;

    # is this a data or file map?
    # adjust accordingly
    my $cf = conf_map( $map );
    $me->{has_files} = 1 if $cf->{basedir};
    $me->{maxget} = $me->{has_files} ? $MAXFILES : $MAXGET;
    $me->{expire} = $cf->{expire};

    return 1;
}

sub _init_peer {
    my $me = shift;

    my $here = my_datacenter();
    my @peer = AC::Yenta::Status->mappeers( $me->{map} );
    my $env  = conf_value('environment');

    my(@near, @far, @ood);

    for my $p (@peer){
        my $d = AC::Yenta::Status->peer($p);
        next unless $d->{environment} eq $env;
        next unless $d->{status}      == 200;

        if( $d->{uptodate} ){
            if( $d->{datacenter} eq $here ){
                push @near, $d;
            }else{
                push @far, $d;
            }
        }else{
            push @ood, $d;
        }
    }

    $me->{peers_near} = \@near if @near;
    $me->{peers_far}  = \@far  if @far;
    $me->{peers_ood}  = \@ood  if @ood;

    my $peer = $me->_pick_peer();
    return unless $peer;
    $me->{peer} = $peer;
    return 1;

}

sub _pick_peer {
    my $me = shift;

    my @peer;
    if( $me->{peers_near} && $me->{peers_far} ){
        # prefer close peers, usually
        if( int(rand(8)) ){
            @peer = @{$me->{peers_near}};
        }else{
            @peer = @{$me->{peers_far}};
        }
    }elsif( $me->{peers_near} ){



( run in 1.055 second using v1.01-cache-2.11-cpan-140bd7fdf52 )