view release on metacpan or search on metacpan
eg/yenta.conf view on Meta::CPAN
# example yenta config
#
# file will be reloaded automagically if it changes. no need to hup or restart.
port 3503
environment prod
# save peer status in a file?
savestatus /var/tmp/yenta.status yenta
allow 127.0.0.1
allow 10.200.2.0/23
# seed peers to locate the network at startup
seedpeer 10.200.2.4:3503
seedpeer 10.200.2.5:3503
# enable debugging?
#debug ae
#debug map
#debug merkle
# ...
lib/AC/Yenta.pm view on Meta::CPAN
=item allow
specify networks allowed to connect.
allow 127.0.0.1
allow 192.168.10.0/24
=item seedpeer
specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.
seedpeer 192.168.10.11:3503
seedpeer 192.168.10.12:3503
=item secret
specify a secret key used to encrypt data transfered between
yentas in different datacenters.
lib/AC/Yenta/D.pm view on Meta::CPAN
$AC::Yenta::CONF = AC::Yenta::Config->new(
$cfile, onreload => sub {
AC::Yenta::Store::configure();
});
initlog( 'yenta', (conf_value('syslog') || 'local5'), $opt->{debugall} );
AC::Yenta::Debug->init( $opt->{debugall}, $AC::Yenta::CONF);
daemonize(5, 'yentad', $opt->{argv}) unless $opt->{foreground};
verbose("starting.");
$SIG{CHLD} = $SIG{PIPE} = sub{}; # ignore
$SIG{INT} = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit; # abort
# initialize subsystems
my $port = $opt->{port} || conf_value('port');
AC::Yenta::MySelf->init( $port, $opt->{persistent_id} );
AC::Yenta::Store::configure();
AC::Yenta::Status::init( $port );
AC::Yenta::Monitor::init();
AC::Yenta::NetMon::init();
AC::DC::IO::TCP::Server->new( $port, 'AC::Yenta::Server' );
verbose("server started on tcp/$port");
# start "cronjobs"
AC::DC::Sched->new(
info => 'check config files',
freq => 30,
func => sub { $AC::Yenta::CONF->check() },
);
run_and_watch(
($opt->{foreground} || $opt->{debugall}),
\&AC::DC::IO::mainloop,
);
lib/AC/Yenta/Kibitz/Status/Client.pm view on Meta::CPAN
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid = $$;
sub new {
my $class = shift;
debug('starting kibitz status client');
my $me = $class->SUPER::new( @_ );
return unless $me;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
return $me;
}
lib/AC/Yenta/Kibitz/Status/Client.pm view on Meta::CPAN
$prefer ||= $public unless int rand(20);
# prefer private addr if available (cheaper)
$prefer ||= $private || $public || $down;
return unless $prefer;
#print STDERR "using ", inet_itoa($prefer->{ipv4}), "\n";
return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}
sub start {
my $me = shift;
$me->SUPER::start();
# build request
my $yp = AC::Yenta::Protocol->new();
my $pb = AC::Yenta::Kibitz::Status::myself();
my $hdr = $yp->encode_header(
type => 'yenta_status',
data_length => length($pb),
content_length => 0,
want_reply => 1,
msgid => $msgid++,
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $TIMEOUT = 5;
sub new {
my $class = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
debug('starting kibitz store client');
my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
return unless $me;
$me->{_req} = $req;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
return $me;
}
sub start {
my $me = shift;
$me->SUPER::start();
$me->write( $me->{_req} );
$me->timeout_rel($TIMEOUT);
return $me;
}
sub shutdown {
my $me = shift;
# maybe call error handler
$me->run_callback('error', undef) unless $me->{_store_ok};
lib/AC/Yenta/Monitor.pm view on Meta::CPAN
sub periodic {
my $mon = conf_value('monitor');
# clean up old data
for my $id (keys %MON){
isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
}
# start monitoring (send heartbeat request)
for my $m (@$mon){
my $ip = $m->{ipa};
my $port = $m->{port};
my $id = "$ip:$port";
debug("start monitoring $id");
my $ok = AC::Yenta::Monitor::Client->new( $ip, $port,
info => "monitor client: $id",
monitor_peer => $id,
);
isdown($id, 0) unless $ok;
}
}
lib/AC/Yenta/Monitor/Client.pm view on Meta::CPAN
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid = $$;
sub new {
my $class = shift;
debug('starting monitor status client');
my $me = $class->SUPER::new( @_ );
unless($me){
return;
}
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
$me->start();
# build request
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $hdr = $yp->encode_header(
type => 'heartbeat_request',
data_length => 0,
content_length => 0,
want_reply => 1,
msgid => $msgid++,
);
lib/AC/Yenta/MySelf.pm view on Meta::CPAN
return [
# use this IP for communication with servers this datacenter (same natdom)
{ ip => $privat_ip, natdom => my_datacenter() },
# otherwise use this IP
{ ip => $public_ip },
]
}
=head2 init
inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.
my $HOSTNAME;
my $DOMAIN;
sub init {
$HOSTNAME = hostname();
($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
}
=head1 BUGS
lib/AC/Yenta/Status.pm view on Meta::CPAN
freq => (conf_value('time_status_kibitz') || 5),
func => \&periodic,
);
AC::DC::Sched->new(
info => 'save status',
freq => (conf_value('time_status_save') || 5),
func => \&save_status,
);
}
# start up a client every so often
sub periodic {
# clean up down or lost peers
for my $id ( keys %{$DATA->{allpeer}} ){
my $p = $DATA->{allpeer}{$id};
next unless $p;
next if $p->{status} == 200 && $p->{timestamp} > $^T - $KEEPLOST;
_maybe_remove( $id );
}
# randomly pick a peer
my($id, $ip, $port) = _random_peer();
return unless $id;
# start a client
debug("starting status kibitz client to $id");
my $c = AC::Yenta::Kibitz::Status::Client->new( $ip, $port,
info => "status client: $id",
status_peer => $id,
);
return __PACKAGE__->isdown($id) unless $c;
$c->start();
}
sub _random_peer {
my $here = my_datacenter();
# sceptical
my @scept = values %{$DATA->{sceptical}};
my @all = map { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
cache => {},
kvneed => [],
kvneedorig => [],
kvfetching => 0,
missing => 0,
}, $class;
debug("new ae");
$me->_pick_map() || return;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
$me->_init_peer() || return;
debug("checking $me->{map} with $me->{peer}{id}");
inc_stat('ae_runs');
$me->_next_step();
push @AE, $me;
return $me;
}
sub periodic {
# kill dead sessions, start new ones
my @keep;
for my $ae (@AE){
if( $ae->{timestamp} + $EXPIRE > $^T ){
push @keep, $ae;
}
}
@AE = @keep;
return if @AE;
return if loadave() > (conf_value('ae_maxload') || $MAXLOAD);
__PACKAGE__->new();
}
# we are up to date if we have AE'ed every map at least once since starting
sub up_to_date {
my $class = shift;
my $maps = conf_value('map');
for my $m (keys %$maps){
return 0 unless $DONE{$m};
}
return 1;
}
################################################################
# find most stale map
sub _pick_map {
my $me = shift;
my $maps = conf_value('map');
my(@best, $bestv);
for my $m (keys %$maps){
my $lt = AC::Yenta::Store::store_get_internal($m, 'ae_last_start');
if( !@best || $lt < $bestv ){
@best = $m;
$bestv = $lt;
}elsif( $lt == $bestv ){
push @best, $m;
}
}
return unless @best;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
sub _next_step {
my $me = shift;
$me->{timestamp} = $^T;
if( $me->{kvfetching} < $MAXFETCH ){
# any missing data?
if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
debug("starting nextgetkv ($me->{kvfetching})");
$me->_next_get_kv();
}
}
# check nodes?
if( @{$me->{badnode}} ){
$me->_start_check();
return;
}
$me->_finished();
}
################################################################
sub _start_check {
my $me = shift;
my $node = shift @{$me->{badnode}};
debug("checking next node: $me->{map} $node->{level}/$node->{version}");
inc_stat('ae_check_node');
my $enc = use_encryption($me->{peer});
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $request = $proto->encode_request( {
type => 'yenta_check',
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new(
$me->{peer}{ip}, undef,
$request,
info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );
if( $io ){
$io->set_callback('load', \&_check_load, $me);
$io->set_callback('error', \&_check_error, $me);
$io->start();
}
}
sub _check_load {
my $io = shift;
my $evt = shift;
my $me = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer
# (the one that said it has the data)
sub _next_get_kv {
my $me = shift;
return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
return $me->_start_get_kv_any() if @{$me->{kvneed}};
}
sub _start_get_kv_any {
my $me = shift;
my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();
# pick a peer
my $peer = $me->_pick_peer();
debug("getting kv data from peer $peer->{id}");
$me->_start_get_kv( $peer, 1, \@get);
}
sub _start_get_kv_orig {
my $me = shift;
my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();
debug("getting kv data from current peer");
$me->_start_get_kv( $me->{peer}, 0, \@get);
}
sub _start_get_kv {
my $me = shift;
my $peer = shift;
my $retry = shift;
my $get = shift;
# insert map into request
$_->{map} = $me->{map} for @$get;
# for (@$get){ debug("requesting $_->{key}/$_->{version}") }
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
info => "AE getkv from $peer->{id}" );
if( $io ){
$me->{kvfetching} ++;
$io->set_callback('load', \&_getkv_load, $me, $retry, $get);
$io->set_callback('error', \&_getkv_error, $me, $retry, $get);
$io->start();
}
}
sub _getkv_load {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
sub get {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $v;
debug("get $map/$sub/$key");
$me->_start();
my $r = $me->{db}->db_get( _key($map,$sub,$key), $v );
$me->_finish();
return if $r; # not found
if( wantarray ){
return ($v, 1);
}
return $v;
}
sub put {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $val = shift;
debug("put $map/$sub/$key");
$me->_start();
my $r = $me->{db}->db_put( _key($map,$sub,$key), $val);
$me->_finish();
return !$r;
}
sub del {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
$me->_start();
$me->{db}->db_del( _key($map,$sub,$key));
$me->_finish();
}
sub sync {
my $me = shift;
$me->{db}->db_sync();
}
sub range {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $end = shift; # undef => to end of map
my ($k, $v, @k);
$me->_start();
my $cursor = $me->{db}->db_cursor();
$k = _key($map,$sub,$key);
my $e = _key($map,$sub,$end);
$cursor->c_get($k, $v, DB_SET_RANGE);
my $MAX = 100;
my $max = $MAX;
while( !$end || ($k lt $e) ){
debug("range $k");
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
push @k, { k => $k, v => $v };
my $r = $cursor->c_get($k, $v, DB_NEXT);
last if $r; # error
# cursor locks the db
# close+recreate so other processes can proceed
unless( $max -- ){
$cursor->c_close();
$me->_finish();
sleep 0;
$me->_start();
$cursor = $me->{db}->db_cursor();
$cursor->c_get($k, $v, DB_SET);
$max = $MAX;
}
}
$cursor->c_close();
$me->_finish();
return @k;
}
################################################################
sub _sig {
print STDERR "bdbi signal @_\n", AC::Error::stack_trace(), "\n";
exit(-1);
}
sub _start {
my $me = shift;
$me->{alarmold} = alarm($TIMEOUT);
return unless $me->{hasenv};
# as long as perl handles the signals, everything gets cleaned up
# well enough for the locks to be removed
for my $sig (qw(INT QUIT KILL TERM ALRM)){
$SIG{$sig} ||= \&_sig;
}
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
debug("distributing $me->{info}");
inc_stat( 'dist_requests' );
inc_stat( 'dist_requests_faraway' ) if $me->{faraway};
$me->_init_strategy($sender);
# RSN - check load
my $max = conf_value('distrib_max') || $MAXUNDERWAY;
if( @DIST < $max ){
$me->_start_next();
}
push @DIST, $me;
return $me;
}
# periodically, go through and restart or expire
sub periodic {
my @keep;
my $max = conf_value('distrib_max') || $MAXUNDERWAY;
my $chance = (@DIST > $max) ? ($max / @DIST) : 1;
for my $r (@DIST){
# debug("periodic $r->{info}");
next if $^T > $r->{req}{expire};
if( (rand() <= $chance) && (AC::DC::IO->underway() <= 2 * $max) ){
my $keep = $r->_start_next();
push @keep, $r if $keep;
}else{
push @keep, $r;
}
}
@DIST = @keep;
}
################################################################
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
my $pd = AC::Yenta::Status->peer($id);
next unless $pd->{subsystem} eq 'yenta';
next unless $pd->{environment} eq $env;
next unless grep {$map eq $_} @{ $pd->{map} };
push @id, $id;
}
return @id;
}
sub _start_far {
my $me = shift;
my $d = shift @{ $me->{farsend} };
return unless $d;
# randomly pick one server in chosen dc
my @id = grep {
my $x = AC::Yenta::Status->peer($_);
($x->{status} == 200) ? 1 : 0;
} @{$d->{id}};
return unless @id;
my $id = $id[ rand(@id) ];
debug("sending $me->{info} to far site $id in $d->{dc}");
$me->_start_peer( $id, 1 );
inc_stat('dist_send_far');
inc_stat('dist_send_total');
return 1;
}
sub _start_near {
my $me = shift;
my $id = shift @{ $me->{nearsend} };
return unless $id;
debug("sending $me->{info} to nearby site $id");
$me->_start_peer( $id, 0 );
inc_stat('dist_send_near');
inc_stat('dist_send_total');
return 1;
}
sub _start_next {
my $me = shift;
my $sent;
# pick next peers
# start clients
if( $me->{faraway} ){
if( $me->{farseen} < $MAXFARSEE ){
for (1 .. $FARSENDS){
$sent ++ if $me->_start_far();
}
}
if( $me->{nearseen} < $MAXNEARSEE ){
for (1 .. $NEARSENDS){
$sent ++ if $me->_start_near();
}
}
}else{
$sent ++ if $me->_start_near();
}
return $sent;
}
sub _start_one {
my $me = shift;
my $far = shift;
if( $far ){
return if $me->{farseen} >= $MAXFARSEE;
$me->_start_far();
}else{
return if $me->{nearseen} >= $MAXNEARSEE;
$me->_start_near();
}
}
sub _start_peer {
my $me = shift;
my $id = shift;
my $far = shift;
my $pd = AC::Yenta::Status->peer($id);
my $addr = $pd->{ip}; # array of nat ip info
my $enc = use_encryption($pd);
my $ect = '';
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
$request . $ect,
info => "distrib $me->{info} to $id",
);
if( $io ){
$io->set_callback('load', \&_onload, $me, $id, $far);
$io->set_callback('error', \&_onerror, $me, $id, $far);
$io->start();
}else{
debug("start client failed");
}
}
sub _onload {
my $io = shift;
my $evt = shift;
my $me = shift;
my $id = shift;
my $far = shift;
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
if( $evt->{data}{haveit} ){
shift @{$me->{nearsend}};
}else{
my $n = $me->{ordershift};
$n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
shift @{$me->{nearsend}} for (1 .. $n);
$me->{ordershift} *= 2;
}
}
$me->_start_one($far);
}
sub _onerror {
my $io = shift;
my $evt = shift;
my $me = shift;
my $id = shift;
my $far = shift;
verbose("error distributing $me->{info} to $id");
# don't need to track anything
$me->_start_one($far);
}
sub _orderly {
my $peers = shift;
my $myself = AC::Yenta::Status->my_server_id();
my @p = sort {$a cmp $b} @$peers;
my @left = grep { $_ lt $myself } @p;
my @right = grep { $_ gt $myself } @p;
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
$db->del($me->{name}, 'data', $vk);
$db->del($me->{name}, 'meta', $vk);
return $cshard;
}
################################################################
sub range {
my $me = shift;
my $start = shift;
my $end = shift;
my $db = $me->{db};
return $db->range($me->{name}, 'vers', $start, $end);
}
################################################################
sub get_internal {
my $me = shift;
my $key = shift;
my($d, $found) = $me->{db}->get($me->{name}, 'internal', $key);
return $d;