AC-Yenta

 view release on metacpan or  search on metacpan

eg/yenta.conf  view on Meta::CPAN

# example yenta config
#
# file will be reloaded automagically if it changes. no need to hup or restart.


port            3503

environment	prod

# save peer status in a file?
savestatus      /var/tmp/yenta.status           yenta

allow		127.0.0.1
allow           10.200.2.0/23

# seed peers to locate the network at startup
seedpeer        10.200.2.4:3503
seedpeer        10.200.2.5:3503


# enable debugging?
#debug           ae
#debug           map
#debug           merkle
# ...

lib/AC/Yenta.pm  view on Meta::CPAN


=item allow

specify networks allowed to connect.

    allow 127.0.0.1
    allow 192.168.10.0/24

=item seedpeer

specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.

    seedpeer 192.168.10.11:3503
    seedpeer 192.168.10.12:3503

=item secret

specify a secret key used to encrypt data transfered between
yentas in different datacenters.

lib/AC/Yenta/D.pm  view on Meta::CPAN

    $AC::Yenta::CONF = AC::Yenta::Config->new(
        $cfile, onreload => sub {
            AC::Yenta::Store::configure();
        });


    initlog( 'yenta', (conf_value('syslog') || 'local5'), $opt->{debugall} );

    AC::Yenta::Debug->init( $opt->{debugall}, $AC::Yenta::CONF);
    daemonize(5, 'yentad', $opt->{argv}) unless $opt->{foreground};
    verbose("starting.");


    $SIG{CHLD} = $SIG{PIPE} = sub{};        				# ignore
    $SIG{INT}  = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit;  # abort

    # initialize subsystems
    my $port = $opt->{port} || conf_value('port');

    AC::Yenta::MySelf->init( $port, $opt->{persistent_id} );
    AC::Yenta::Store::configure();
    AC::Yenta::Status::init( $port );
    AC::Yenta::Monitor::init();
    AC::Yenta::NetMon::init();
    AC::DC::IO::TCP::Server->new( $port, 'AC::Yenta::Server' );
    verbose("server started on tcp/$port");


    # start "cronjobs"
    AC::DC::Sched->new(
        info	=> 'check config files',
        freq	=> 30,
        func	=> sub { $AC::Yenta::CONF->check() },
       );

    run_and_watch(
        ($opt->{foreground} || $opt->{debugall}),
        \&AC::DC::IO::mainloop,
       );

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting kibitz status client');
    my $me = $class->SUPER::new( @_ );
    return unless $me;

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

    $prefer ||= $public unless int rand(20);
    # prefer private addr if available (cheaper)
    $prefer ||= $private || $public || $down;
    return unless $prefer;

    #print STDERR "using ", inet_itoa($prefer->{ipv4}), "\n";
    return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}


sub start {
    my $me = shift;

    $me->SUPER::start();

    # build request
    my $yp  = AC::Yenta::Protocol->new();
    my $pb  = AC::Yenta::Kibitz::Status::myself();
    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($pb),
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN

our @ISA = 'AC::Yenta::IO::TCP::Client';

my $TIMEOUT = 5;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $req   = shift;

    debug('starting kibitz store client');
    my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
    return unless $me;

    $me->{_req} = $req;
    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

sub start {
    my $me = shift;

    $me->SUPER::start();
    $me->write( $me->{_req} );
    $me->timeout_rel($TIMEOUT);
    return $me;
}

sub shutdown {
    my $me = shift;

    # maybe call error handler
    $me->run_callback('error', undef) unless $me->{_store_ok};

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN


sub periodic {

    my $mon = conf_value('monitor');

    # clean up old data
    for my $id (keys %MON){
        isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
    }

    # start monitoring (send heartbeat request)
    for my $m (@$mon){
        my $ip   = $m->{ipa};
        my $port = $m->{port};
        my $id   = "$ip:$port";
        debug("start monitoring $id");

        my $ok = AC::Yenta::Monitor::Client->new( $ip, $port,
                                                info 		=> "monitor client: $id",
                                                monitor_peer	=> $id,
                                               );

        isdown($id, 0) unless $ok;
    }
}

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN

our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting monitor status client');
    my $me = $class->SUPER::new( @_ );
    unless($me){
        return;
    }

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    $me->start();

    # build request
    my $yp  = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $hdr = $yp->encode_header(
        type		=> 'heartbeat_request',
        data_length	=> 0,
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,
       );

lib/AC/Yenta/MySelf.pm  view on Meta::CPAN

        return [
            # use this IP for communication with servers this datacenter (same natdom)
            { ip => $privat_ip, natdom => my_datacenter() },
            # otherwise use this IP
            { ip => $public_ip },
        ]
    }

=head2 init

inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.

    my $HOSTNAME;
    my $DOMAIN;
    sub init {
        $HOSTNAME = hostname();
        ($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
    }

=head1 BUGS

lib/AC/Yenta/Status.pm  view on Meta::CPAN

        freq	=> (conf_value('time_status_kibitz') || 5),
        func	=> \&periodic,
       );
    AC::DC::Sched->new(
        info	=> 'save status',
        freq	=> (conf_value('time_status_save') || 5),
        func	=> \&save_status,
       );
}

# start up a client every so often
sub periodic {

    # clean up down or lost peers
    for my $id ( keys %{$DATA->{allpeer}} ){
        my $p = $DATA->{allpeer}{$id};
        next unless $p;

        next if $p->{status} == 200 && $p->{timestamp} > $^T - $KEEPLOST;
        _maybe_remove( $id );
    }

    # randomly pick a peer
    my($id, $ip, $port) = _random_peer();
    return unless $id;

    # start a client
    debug("starting status kibitz client to $id");

    my $c = AC::Yenta::Kibitz::Status::Client->new( $ip, $port,
                                            info 	=> "status client: $id",
                                            status_peer	=> $id,
                                           );
    return __PACKAGE__->isdown($id) unless $c;

    $c->start();
}

sub _random_peer {

    my $here  = my_datacenter();

    # sceptical
    my @scept = values %{$DATA->{sceptical}};

    my @all   = map  { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

        cache		=> {},
        kvneed		=> [],
        kvneedorig	=> [],
        kvfetching	=> 0,
        missing		=> 0,
    }, $class;

    debug("new ae");
    $me->_pick_map()  || return;

    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
    $me->_init_peer() || return;

    debug("checking $me->{map} with $me->{peer}{id}");
    inc_stat('ae_runs');
    $me->_next_step();

    push @AE, $me;
    return $me;
}

sub periodic {
    # kill dead sessions, start new ones

    my @keep;
    for my $ae (@AE){
        if( $ae->{timestamp} + $EXPIRE > $^T ){
            push @keep, $ae;
        }
    }
    @AE = @keep;

    return if @AE;
    return if loadave() > (conf_value('ae_maxload') || $MAXLOAD);
    __PACKAGE__->new();
}

# we are up to date if we have AE'ed every map at least once since starting
sub up_to_date {
    my $class = shift;

    my $maps = conf_value('map');
    for my $m (keys %$maps){
        return 0 unless $DONE{$m};
    }
    return 1;
}

################################################################

# find most stale map
sub _pick_map {
    my $me = shift;

    my $maps = conf_value('map');
    my(@best, $bestv);
    for my $m (keys %$maps){
        my $lt = AC::Yenta::Store::store_get_internal($m, 'ae_last_start');
        if( !@best || $lt < $bestv ){
            @best = $m;
            $bestv = $lt;
        }elsif( $lt == $bestv ){
            push @best, $m;
        }
    }

    return unless @best;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

}

sub _next_step {
    my $me = shift;

    $me->{timestamp} = $^T;

    if( $me->{kvfetching} < $MAXFETCH ){
        # any missing data?
        if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
            debug("starting nextgetkv ($me->{kvfetching})");
            $me->_next_get_kv();
        }
    }

    # check nodes?
    if( @{$me->{badnode}} ){
        $me->_start_check();
        return;
    }

    $me->_finished();
}

################################################################

sub _start_check {
    my $me = shift;

    my $node = shift @{$me->{badnode}};
    debug("checking next node: $me->{map} $node->{level}/$node->{version}");
    inc_stat('ae_check_node');

    my $enc     = use_encryption($me->{peer});
    my $proto   = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $request = $proto->encode_request( {
        type		=> 'yenta_check',

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new(
        $me->{peer}{ip}, undef,
        $request,
        info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );

    if( $io ){
        $io->set_callback('load',  \&_check_load,  $me);
        $io->set_callback('error', \&_check_error, $me);
        $io->start();
    }

}

sub _check_load {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;


lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

}

################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer
# (the one that said it has the data)

sub _next_get_kv {
    my $me = shift;

    return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
    return $me->_start_get_kv_any()  if @{$me->{kvneed}};
}

sub _start_get_kv_any {
    my $me = shift;

    my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();

    # pick a peer
    my $peer = $me->_pick_peer();
    debug("getting kv data from peer $peer->{id}");
    $me->_start_get_kv( $peer, 1, \@get);
}

sub _start_get_kv_orig {
    my $me = shift;

    my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();

    debug("getting kv data from current peer");
    $me->_start_get_kv( $me->{peer}, 0, \@get);
}

sub _start_get_kv {
    my $me    = shift;
    my $peer  = shift;
    my $retry = shift;
    my $get   = shift;

    # insert map into request
    $_->{map} = $me->{map} for @$get;

    # for (@$get){ debug("requesting $_->{key}/$_->{version}") }

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


    # connect + send
    debug("sending to $peer->{id}");
    my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
                                                  info => "AE getkv from $peer->{id}" );

    if( $io ){
        $me->{kvfetching} ++;
        $io->set_callback('load',  \&_getkv_load,  $me, $retry, $get);
        $io->set_callback('error', \&_getkv_error, $me, $retry, $get);
        $io->start();
    }
}

sub _getkv_load {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN


sub get {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    my $v;
    debug("get $map/$sub/$key");

    $me->_start();
    my $r = $me->{db}->db_get( _key($map,$sub,$key), $v );
    $me->_finish();

    return if $r; # not found

    if( wantarray ){
        return ($v, 1);
    }
    return $v;
}

sub put {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $val = shift;

    debug("put $map/$sub/$key");

    $me->_start();
    my $r = $me->{db}->db_put( _key($map,$sub,$key), $val);
    $me->_finish();

    return !$r;
}

sub del {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    $me->_start();
    $me->{db}->db_del( _key($map,$sub,$key));
    $me->_finish();
}

sub sync {
    my $me  = shift;

    $me->{db}->db_sync();
}

sub range {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $end = shift;	# undef => to end of map

    my ($k, $v, @k);
    $me->_start();
    my $cursor = $me->{db}->db_cursor();
    $k = _key($map,$sub,$key);
    my $e = _key($map,$sub,$end);
    $cursor->c_get($k, $v, DB_SET_RANGE);

    my $MAX = 100;
    my $max = $MAX;

    while( !$end || ($k lt $e) ){
        debug("range $k");

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN

        push @k, { k => $k, v => $v };
        my $r = $cursor->c_get($k, $v, DB_NEXT);
        last if $r;	# error

        # cursor locks the db
        # close+recreate so other processes can proceed
        unless( $max -- ){
            $cursor->c_close();
            $me->_finish();
            sleep 0;
            $me->_start();
            $cursor = $me->{db}->db_cursor();
            $cursor->c_get($k, $v, DB_SET);
            $max = $MAX;
        }
    }
    $cursor->c_close();
    $me->_finish();

    return @k;
}

################################################################

sub _sig {
    print STDERR "bdbi signal @_\n", AC::Error::stack_trace(), "\n";
    exit(-1);
}

sub _start {
    my $me = shift;

    $me->{alarmold} = alarm($TIMEOUT);
    return unless $me->{hasenv};

    # as long as perl handles the signals, everything gets cleaned up
    # well enough for the locks to be removed
    for my $sig (qw(INT QUIT KILL TERM ALRM)){
        $SIG{$sig} ||= \&_sig;
    }

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    debug("distributing $me->{info}");
    inc_stat( 'dist_requests' );
    inc_stat( 'dist_requests_faraway' ) if $me->{faraway};


    $me->_init_strategy($sender);

    # RSN - check load
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;
    if( @DIST < $max ){
        $me->_start_next();
    }
    push @DIST, $me;

    return $me;
}

# periodically, go through and restart or expire
sub periodic {

    my @keep;
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;

    my $chance = (@DIST > $max) ? ($max / @DIST) : 1;

    for my $r (@DIST){
        # debug("periodic $r->{info}");
        next if $^T > $r->{req}{expire};

        if( (rand() <= $chance) && (AC::DC::IO->underway() <= 2 * $max) ){
            my $keep = $r->_start_next();
            push @keep, $r if $keep;
        }else{
            push @keep, $r;
        }
    }

    @DIST = @keep;
}

################################################################

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

        my $pd = AC::Yenta::Status->peer($id);

        next unless $pd->{subsystem}   eq 'yenta';
        next unless $pd->{environment} eq $env;
        next unless grep {$map eq $_} @{ $pd->{map} };
        push @id, $id;
    }
    return @id;
}

sub _start_far {
    my $me  = shift;

    my $d = shift @{ $me->{farsend} };
    return unless $d;

    # randomly pick one server in chosen dc
    my @id = grep {
        my $x = AC::Yenta::Status->peer($_);
        ($x->{status} == 200) ? 1 : 0;
    } @{$d->{id}};
    return unless @id;

    my $id = $id[ rand(@id) ];
    debug("sending $me->{info} to far site $id in $d->{dc}");
    $me->_start_peer( $id, 1 );
    inc_stat('dist_send_far');
    inc_stat('dist_send_total');
    return 1;
}

sub _start_near {
    my $me  = shift;

    my $id = shift @{ $me->{nearsend} };
    return unless $id;
    debug("sending $me->{info} to nearby site $id");
    $me->_start_peer( $id, 0 );
    inc_stat('dist_send_near');
    inc_stat('dist_send_total');
    return 1;
}

sub _start_next {
    my $me  = shift;

    my $sent;

    # pick next peers
    # start clients

    if( $me->{faraway} ){
        if( $me->{farseen} < $MAXFARSEE ){
            for (1 .. $FARSENDS){
                $sent ++ if $me->_start_far();
            }
        }
        if( $me->{nearseen} < $MAXNEARSEE ){
            for (1 .. $NEARSENDS){
                $sent ++ if $me->_start_near();
            }
        }
    }else{
        $sent ++ if $me->_start_near();
    }

    return $sent;
}

sub _start_one {
    my $me  = shift;
    my $far = shift;

    if( $far ){
        return if $me->{farseen}  >= $MAXFARSEE;
        $me->_start_far();
    }else{
        return if $me->{nearseen} >= $MAXNEARSEE;
        $me->_start_near();
    }
}

sub _start_peer {
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    my $pd   = AC::Yenta::Status->peer($id);
    my $addr = $pd->{ip};	# array of nat ip info

    my $enc = use_encryption($pd);
    my $ect = '';
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN


    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
                                                   $request . $ect,
                                                   info => "distrib $me->{info} to $id",
                                                  );

    if( $io ){
        $io->set_callback('load',  \&_onload,  $me, $id, $far);
        $io->set_callback('error', \&_onerror, $me, $id, $far);
        $io->start();
    }else{
        debug("start client failed");
    }
}

sub _onload {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

        if( $evt->{data}{haveit} ){
            shift @{$me->{nearsend}};
        }else{
            my $n = $me->{ordershift};
            $n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
            shift @{$me->{nearsend}} for (1 .. $n);
            $me->{ordershift} *= 2;
        }
    }

    $me->_start_one($far);
}

sub _onerror {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    verbose("error distributing $me->{info} to $id");
    # don't need to track anything

    $me->_start_one($far);
}

sub _orderly {
    my $peers = shift;

    my $myself = AC::Yenta::Status->my_server_id();
    my @p = sort {$a cmp $b} @$peers;

    my @left  = grep { $_ lt $myself } @p;
    my @right = grep { $_ gt $myself } @p;

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

    $db->del($me->{name}, 'data', $vk);
    $db->del($me->{name}, 'meta', $vk);

    return $cshard;
}

################################################################

sub range {
    my $me    = shift;
    my $start = shift;
    my $end   = shift;

    my $db = $me->{db};
    return $db->range($me->{name}, 'vers', $start, $end);
}

################################################################

sub get_internal {
    my $me  = shift;
    my $key = shift;

    my($d, $found) = $me->{db}->get($me->{name}, 'internal', $key);
    return $d;



( run in 0.873 second using v1.01-cache-2.11-cpan-fd5d4e115d8 )