AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-01 18:56 (EDT)
# Function: distribute data to other peers
#
# $Id$

package AC::Yenta::Store::Distrib;
use AC::Yenta::Kibitz::Store::Client;
use AC::Yenta::Debug 'distrib';
use AC::Yenta::Config;
use AC::Yenta::Protocol;
use AC::Yenta::Stats;
use AC::Yenta::MySelf;
use AC::Misc;
use AC::DC::Sched;
use strict;

my $MAXHOP      = 10;
my $MAXFARSEE   = 2;
my $MAXNEARSEE  = 3;
my $FARSENDS    = 1;
my $NEARSENDS   = 2;
my $MAXUNDERWAY = 64;

my $msgid = $$;
my @DIST;

AC::DC::Sched->new(
    info	=> 'distribution',
    freq	=> 5,
    func	=> \&AC::Yenta::Store::Distrib::periodic,
   );

sub new {
    my $class = shift;
    my $req   = shift;
    my $cont  = shift;

    return if $req->{hop} >= $MAXHOP;
    return if $req->{expire} < $^T;

    my $sender = $req->{sender};
    my $sendat = AC::Yenta::Status->peer($sender);

    my $me = bless {
        info		=> "$req->{datum}{map}/$req->{datum}{key}/$req->{datum}{version}",
        map		=> $req->{datum}{map},
        req		=> $req,
        content		=> $cont,
        # we tune the distribution algorithm based on where it came from:
        faraway 	=> (my_datacenter() ne $sendat->{datacenter}),

        farseen		=> 0,
        nearseen	=> 0,
        farsend		=> [],
        nearsend	=> [],
        ordershift	=> 4,
    }, $class;

    debug("distributing $me->{info}");
    inc_stat( 'dist_requests' );
    inc_stat( 'dist_requests_faraway' ) if $me->{faraway};


    $me->_init_strategy($sender);

    # RSN - check load
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;
    if( @DIST < $max ){
        $me->_start_next();
    }
    push @DIST, $me;

    return $me;
}

# periodically, go through and restart or expire
sub periodic {

    my @keep;
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;

    my $chance = (@DIST > $max) ? ($max / @DIST) : 1;

    for my $r (@DIST){
        # debug("periodic $r->{info}");
        next if $^T > $r->{req}{expire};

        if( (rand() <= $chance) && (AC::DC::IO->underway() <= 2 * $max) ){
            my $keep = $r->_start_next();
            push @keep, $r if $keep;
        }else{
            push @keep, $r;
        }
    }

    @DIST = @keep;
}

################################################################
# determine distribution strategy
#   - if we recvd it from faraway, we will send it to other datacenters, and randomly in the same datacenter
#   - otherwise we send it in the same datacenter, in an orderly fashion
# RSN - find an strategy with faster convergence + less duplication



( run in 2.488 seconds using v1.01-cache-2.11-cpan-0d23b851a93 )