AC-Yenta
view release on metacpan or search on metacpan
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-03 10:05 (EDT)
# Function: Anti-Entropy (find missing/stale data, and sync up)
#
# $Id$
package AC::Yenta::Store::AE;
use AC::Yenta::Store;
use AC::Yenta::Config;
use AC::Yenta::Debug 'ae';
use AC::Yenta::Stats;
use AC::Yenta::Conf;
use AC::Yenta::MySelf;
use AC::Yenta::Protocol;
use AC::DC::Sched;
use AC::Misc;
use Socket;
use strict;
my $MAXGET = 32; # maximum number of records per fetch
my $MAXFILES = 4; # maximum number of files per fetch
my $MAXFETCH = 32; # maximum number of simultaneous fetches
my $MAXMISSING = 10; # maximum number of missing records to be considered up to date
my $MAXLOAD = 0.5; # do not run if load average is too high
my $EXPIRE = 300; # expire hung job after this long
my $TOONEW = 60; # don't consider things missing if they are less than this old
my $msgid = $$;
my %DONE; # maps which have finished
my @AE; # normally, just one
AC::DC::Sched->new(
info => 'anti-entropy',
freq => 60,
func => \&AC::Yenta::Store::AE::periodic,
);
sub new {
my $class = shift;
my $me = bless {
badnode => [ {version => 0, shard => 0, level => 0} ],
cache => {},
kvneed => [],
kvneedorig => [],
kvfetching => 0,
missing => 0,
}, $class;
debug("new ae");
$me->_pick_map() || return;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
$me->_init_peer() || return;
debug("checking $me->{map} with $me->{peer}{id}");
inc_stat('ae_runs');
$me->_next_step();
push @AE, $me;
return $me;
}
sub periodic {
# kill dead sessions, start new ones
my @keep;
for my $ae (@AE){
if( $ae->{timestamp} + $EXPIRE > $^T ){
push @keep, $ae;
}
}
@AE = @keep;
return if @AE;
return if loadave() > (conf_value('ae_maxload') || $MAXLOAD);
__PACKAGE__->new();
}
# we are up to date if we have AE'ed every map at least once since starting
sub up_to_date {
my $class = shift;
my $maps = conf_value('map');
for my $m (keys %$maps){
return 0 unless $DONE{$m};
}
return 1;
}
################################################################
# find most stale map
sub _pick_map {
my $me = shift;
my $maps = conf_value('map');
my(@best, $bestv);
for my $m (keys %$maps){
my $lt = AC::Yenta::Store::store_get_internal($m, 'ae_last_start');
if( !@best || $lt < $bestv ){
( run in 0.798 second using v1.01-cache-2.11-cpan-5735350b133 )