AC-Yenta
view release on metacpan or search on metacpan
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-01 18:56 (EDT)
# Function: distribute data to other peers
#
# $Id$
package AC::Yenta::Store::Distrib;
use AC::Yenta::Kibitz::Store::Client;
use AC::Yenta::Debug 'distrib';
use AC::Yenta::Config;
use AC::Yenta::Protocol;
use AC::Yenta::Stats;
use AC::Yenta::MySelf;
use AC::Misc;
use AC::DC::Sched;
use strict;
my $MAXHOP = 10;
my $MAXFARSEE = 2;
my $MAXNEARSEE = 3;
my $FARSENDS = 1;
my $NEARSENDS = 2;
my $MAXUNDERWAY = 64;
my $msgid = $$;
my @DIST;
AC::DC::Sched->new(
info => 'distribution',
freq => 5,
func => \&AC::Yenta::Store::Distrib::periodic,
);
sub new {
my $class = shift;
my $req = shift;
my $cont = shift;
return if $req->{hop} >= $MAXHOP;
return if $req->{expire} < $^T;
my $sender = $req->{sender};
my $sendat = AC::Yenta::Status->peer($sender);
my $me = bless {
info => "$req->{datum}{map}/$req->{datum}{key}/$req->{datum}{version}",
map => $req->{datum}{map},
req => $req,
content => $cont,
# we tune the distribution algorithm based on where it came from:
faraway => (my_datacenter() ne $sendat->{datacenter}),
farseen => 0,
nearseen => 0,
farsend => [],
nearsend => [],
ordershift => 4,
}, $class;
debug("distributing $me->{info}");
inc_stat( 'dist_requests' );
inc_stat( 'dist_requests_faraway' ) if $me->{faraway};
$me->_init_strategy($sender);
# RSN - check load
my $max = conf_value('distrib_max') || $MAXUNDERWAY;
if( @DIST < $max ){
$me->_start_next();
}
push @DIST, $me;
return $me;
}
# periodically, go through and restart or expire
sub periodic {
my @keep;
my $max = conf_value('distrib_max') || $MAXUNDERWAY;
my $chance = (@DIST > $max) ? ($max / @DIST) : 1;
for my $r (@DIST){
# debug("periodic $r->{info}");
next if $^T > $r->{req}{expire};
if( (rand() <= $chance) && (AC::DC::IO->underway() <= 2 * $max) ){
my $keep = $r->_start_next();
push @keep, $r if $keep;
}else{
push @keep, $r;
}
}
@DIST = @keep;
}
################################################################
# determine distribution strategy
# - if we recvd it from faraway, we will send it to other datacenters, and randomly in the same datacenter
# - otherwise we send it in the same datacenter, in an orderly fashion
# RSN - find an strategy with faster convergence + less duplication
( run in 2.488 seconds using v1.01-cache-2.11-cpan-0d23b851a93 )