AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-03 10:05 (EDT)
# Function: Anti-Entropy (find missing/stale data, and sync up)
#
# $Id$

package AC::Yenta::Store::AE;
use AC::Yenta::Store;
use AC::Yenta::Config;
use AC::Yenta::Debug 'ae';
use AC::Yenta::Stats;
use AC::Yenta::Conf;
use AC::Yenta::MySelf;
use AC::Yenta::Protocol;
use AC::DC::Sched;
use AC::Misc;
use Socket;
use strict;

my $MAXGET     = 32;	# maximum number of records per fetch
my $MAXFILES   = 4;	# maximum number of files per fetch
my $MAXFETCH   = 32;	# maximum number of simultaneous fetches
my $MAXMISSING = 10;	# maximum number of missing records to be considered up to date
my $MAXLOAD    = 0.5;	# do not run if load average is too high
my $EXPIRE     = 300;	# expire hung job after this long
my $TOONEW     = 60;	# don't consider things missing if they are less than this old

my $msgid      = $$;
my %DONE;		# maps which have finished
my @AE;			# normally, just one

AC::DC::Sched->new(
    info	=> 'anti-entropy',
    freq	=> 60,
    func	=> \&AC::Yenta::Store::AE::periodic,
   );

sub new {
    my $class = shift;

    my $me = bless {
        badnode		=> [ {version => 0, shard => 0, level => 0} ],
        cache		=> {},
        kvneed		=> [],
        kvneedorig	=> [],
        kvfetching	=> 0,
        missing		=> 0,
    }, $class;

    debug("new ae");
    $me->_pick_map()  || return;

    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
    $me->_init_peer() || return;

    debug("checking $me->{map} with $me->{peer}{id}");
    inc_stat('ae_runs');
    $me->_next_step();

    push @AE, $me;
    return $me;
}

sub periodic {
    # kill dead sessions, start new ones

    my @keep;
    for my $ae (@AE){
        if( $ae->{timestamp} + $EXPIRE > $^T ){
            push @keep, $ae;
        }
    }
    @AE = @keep;

    return if @AE;
    return if loadave() > (conf_value('ae_maxload') || $MAXLOAD);
    __PACKAGE__->new();
}

# we are up to date if we have AE'ed every map at least once since starting
sub up_to_date {
    my $class = shift;

    my $maps = conf_value('map');
    for my $m (keys %$maps){
        return 0 unless $DONE{$m};
    }
    return 1;
}

################################################################

# find most stale map
sub _pick_map {
    my $me = shift;

    my $maps = conf_value('map');
    my(@best, $bestv);
    for my $m (keys %$maps){
        my $lt = AC::Yenta::Store::store_get_internal($m, 'ae_last_start');
        if( !@best || $lt < $bestv ){



( run in 0.798 second using v1.01-cache-2.11-cpan-5735350b133 )