AC-Yenta

 view release on metacpan or  search on metacpan

eg/myself.pm  view on Meta::CPAN

# example myself

# $Id$

package Local::Yenta::MySelf;
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# our tcp port
    my $id    = shift;  # from cmd line

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.example.com//;	# remove domain
        $SERVERID = "yenta/$h";
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

eg/yenta_get  view on Meta::CPAN

    debug 	=> \&debug,
   );

my $res = $y->get($map, $key);

print dumper($res), "\n";


exit;

sub debug {
    print STDERR @_, "\n";
}

lib/AC/Yenta/AC/MySelf.pm  view on Meta::CPAN


package AC::Yenta::AC::MySelf;
use AC::Yenta::Config;
use AC::Yenta::Debug;
use AC::DataCenter;	# provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.adcopy.*//;
        my $v = conf_value('environment');
        $SERVERID = 'yenta';
        $SERVERID .= '/' . $v unless $v eq 'prod';
        $SERVERID .= '@' . $h;
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    my $r = $MSGTYPE{$name};
    AC::DC::Protocol->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


# one or more of:
#   new( host, port )
#   new( servers => [ { host, port }, ... ] )
#   new( server_file )

sub new {
    my $class = shift;

    my $me = bless {
        debug	=> sub{ },
        host	=> 'localhost',
        proto	=> AC::DC::Protocol->new(),
        copies	=> 1,
        @_,
    }, $class;

    $me->{server_file} ||= $me->{altfile};	# compat

    die "servers or server_file?\n" unless $me->{servers} || $me->{server_file};

    return $me;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $key = shift;
    my $ver = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_get',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        data	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
        } ]
    } );

    return $me->_send_request($map, $req);
}

sub _shard {
    my $key = shift;

    my $sh = sha1($key);
    my($a, $b) = unpack('NN', $sh);
    return $a<<32 | $b;
}


sub distribute {
    my $me   = shift;
    my $map  = shift;
    my $key  = shift;
    my $ver  = shift;
    my $val  = shift;
    my $file = shift;	# reference
    my $meta = shift;

    return unless $key && $ver;
    $me->{retries} = 25 unless $me->{retries};

lib/AC/Yenta/Client.pm  view on Meta::CPAN

            shard	=> _shard($key),	# NYI
            value	=> $val,
            meta	=> $meta,
        } ]
    }, $file );

    return $me->_send_request($map, $req, $file);
    # return undef | result
}

sub check {
    my $me  = shift;
    my $map = shift;
    my $ver = shift;
    my $lev = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        map		=> $map,
        level		=> $lev,
        version		=> $ver,
    } );

    return $me->_send_request($map, $req);
}

################################################################

sub _send_request {
    my $me   = shift;
    my $map  = shift;
    my $req  = shift;
    my $file = shift;	# reference

    my $tries = $me->{retries} + 1;
    my $copy  = $me->{copies} || 1;
    my $delay = 0.25;

    $me->_init_hostlist($map);

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    for (1 .. $tries){
        return unless $addr;
        my $res = $me->_try_server($addr, $port, $req, $file);
        return $res if $res && !--$copy;
        ($addr, $port) = $me->_next_host($map);
        sleep $delay;
        $delay *= 1.414;
    }
}

sub _try_server {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;
    my $file = shift;	# reference

    my $ipn = inet_aton($addr);
    $req .= $$file if $file;

    $me->{debug}->("trying to contact yenta server $addr:$port");

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    if(my $e = $@){
        $me->{debug}->("yenta request failed: $e");
        $res = undef;
    }
    return $res;
}


################################################################

sub _next_host {
    my $me  = shift;
    my $map = shift;

    $me->_read_serverfile($map) unless $me->{_server};
    return unless $me->{_server} && @{$me->{_server}};
    my $next = shift @{$me->{_server}};
    return( $next->{addr}, $next->{port} );
}

sub _init_hostlist {
    my $me  = shift;
    my $map = shift;

    my @server;
    push @server, {
        addr	=> $me->{host},
        port	=> $me->{port},
    } if $me->{host} && $me->{port};

    push @server, @{$me->{servers}} if $me->{servers};
    $me->{_server} = \@server;

    $me->_read_serverfile($map);
}

# yentad saves a list of alternate peers to try in case it dies
sub _read_serverfile {
    my $me  = shift;
    my $map = shift;

    my $f;
    my @server;
    my @faraway;
    open($f, $me->{server_file});
    local $/ = "\n";
    while(<$f>){
        chop;

lib/AC/Yenta/Conf.pm  view on Meta::CPAN

use AC::Yenta::SixtyFour;
use AC::Import;
use strict;

our @EXPORT = qw(timet_to_yenta_version_factor timet_to_yenta_version);

my $TTVF = x64_one_million();


# deprecated
sub timet_to_yenta_version_factor {
    return $TTVF;
}

sub timet_to_yenta_version {
    my $t = shift;

    return unless defined $t;
    return $t * $TTVF unless ref $TTVF;

    # math::bigint does not like to multiply floats
    my $ti = int $t;
    my $tf = $t - $ti;

    return $ti * $TTVF + int($TTVF->numify() * $tf);

lib/AC/Yenta/Config.pm  view on Meta::CPAN

    monitor	=> \&parse_monitor,
    map		=> \&parse_map,

);

my @MAP = qw(dbfile basedir keepold history expire backend sharded);


################################################################

sub handle_config {
    my $me   = shift;
    my $key  = shift;
    my $rest = shift;

    my $fnc = $CONFIG{$key};
    return unless $fnc;
    $fnc->($me, $key, $rest);
    return 1;
}

################################################################

sub parse_map {
    my $me    = shift;
    my $key   = shift;
    my $value = shift;

    my($map) = $value =~ /(\S+)\s+\{\s*/;
    die "invalid map spec\n" unless $map;

    my $md = {};
    problem("map '$map' redefined") if $me->{_pending}{map}{$map};

lib/AC/Yenta/Config.pm  view on Meta::CPAN

            $v = cvt_timespec($v) if $k eq 'expire';
            $md->{$k} = $v;
        }else{
            problem("unknown map option '$k'");
        }
    }

    $me->{_pending}{map}{$map} = $md;
}

sub parse_monitor {
    my $me  = shift;
    my $key = shift;
    my $mon = shift;

    my($ip, $port) = split /:/, $mon;
    push @{$me->{_pending}{monitor}}, {
        monitor	=> $mon,
        ipa	=> $ip,
        ipn	=> inet_aton($ip),
        ipi	=> inet_atoi($ip),
        port	=> $port,
    };
}

sub parse_savefile {
    my $me   = shift;
    my $key  = shift;
    my $save = shift;

    my($file, @type) = split /\s+/, $save;
    push @{$me->{_pending}{savestatus}}, {
        type	=> \@type,
        file	=> $file,
    };
}

sub cvt_timespec {
    my $t = shift;

    my %f = ( m => 60, h => 3600, d => 86400 );
    my($n, $f) = $t =~ /(\d+)(\D?)/;

    $f = $f{$f} || 1;
    return $n * $f;
}


################################################################

sub conf_value {
    my $key = shift;

    return $AC::Yenta::CONF->{config}{$key};
}

sub conf_map {
    my $map = shift;

    return $AC::Yenta::CONF->{config}{map}{$map};
}


1;

lib/AC/Yenta/Crypto.pm  view on Meta::CPAN

use AC::Misc;
use Time::HiRes 'time';
use Crypt::Rijndael;
use Digest::SHA  qw(sha256 hmac_sha256_base64);
use strict;

require 'AC/protobuf/auth.pl';

my $ALGORITHM = 'x-acy-aes-1';

sub new {
    my $class  = shift;
    my $secret = shift;

    return bless {
        secret	=> $secret,
    }, $class;
}

sub encrypt {
    my $me  = shift;
    my $buf = shift;

    my $seqno  = int( time() * 1_000_000 );
    my $nonce  = random_text(48);
    my $key    = $me->_key($seqno, $nonce);
    my $iv     = $me->_iv($key, $seqno, $nonce);

    # pad
    my $pbuf = $buf;

lib/AC/Yenta/Crypto.pm  view on Meta::CPAN

        hmac		=> $hmac,
        length		=> length($buf),
        ciphertext	=> $ct,
    } );

    debug("encrypted <$seqno,$nonce,$hmac>");

    return $eb;
}

sub decrypt {
    my $me  = shift;
    my $buf = shift;

    my $ed     = ACPEncrypt->decode( $buf );
    die "cannot decrypt: unknown alg\n" unless $ed->{algorithm} eq $ALGORITHM;

    my $seqno  = $ed->{seqno},
    my $nonce  = $ed->{nonce};
    my $key    = $me->_key($seqno, $nonce);
    my $iv     = $me->_iv($key, $seqno, $nonce);

lib/AC/Yenta/Crypto.pm  view on Meta::CPAN

    my $aes    = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
    $aes->set_iv( $iv );
    my $pt     = substr($aes->decrypt( $ed->{ciphertext} ), 0, $ed->{length});

    debug("decrypted <$seqno,$nonce,$hmac>");

    return $pt;
}


sub _key {
    my $me    = shift;
    my $seqno = shift;
    my $nonce = shift;

    return sha256( 'key1' . $me->{secret} . $seqno . $nonce . '1yek' );
}

sub _iv {
    my $me    = shift;
    my $key   = shift;
    my $seqno = shift;
    my $nonce = shift;

    return substr(sha256( 'iv'   . $key . $seqno ), 0, 16);
}


1;

lib/AC/Yenta/Customize.pm  view on Meta::CPAN

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 10:37 (EST)
# Function: connect user provided implementation
#
# $Id$

package AC::Yenta::Customize;
use strict;

sub customize {
    my $class  = shift;
    my $implby = shift;

    (my $default = $class) =~ s/(.*)::([^:]+)$/$1::Default::$2/;

    # load user's implemantation + default
    for my $p ($implby, $default){
        eval "require $p" if $p;
        die $@ if $@;
    }

lib/AC/Yenta/D.pm  view on Meta::CPAN

use AC::Yenta::Status;
use AC::Yenta::Store;
use AC::Yenta::MySelf;
use AC::Yenta::NetMon;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
# use AC::Yenta::Store::Tokyo;

use strict;

sub new {
    my $class = shift;
    my %p = @_;

    AC::Yenta::MySelf->customize( $p{class_myself} );
    # ...

    return bless \$class, $class;
}

sub daemon {
    my $me    = shift;
    my $cfile = shift;
    my $opt   = shift;	# foreground, debugall, persistent_id, argv

    die "no config file specified\n" unless $cfile;

    # configure
    $AC::Yenta::CONF = AC::Yenta::Config->new(
        $cfile, onreload => sub {
            AC::Yenta::Store::configure();
        });


    initlog( 'yenta', (conf_value('syslog') || 'local5'), $opt->{debugall} );

    AC::Yenta::Debug->init( $opt->{debugall}, $AC::Yenta::CONF);
    daemonize(5, 'yentad', $opt->{argv}) unless $opt->{foreground};
    verbose("starting.");

lib/AC/Yenta/D.pm  view on Meta::CPAN

    AC::Yenta::Monitor::init();
    AC::Yenta::NetMon::init();
    AC::DC::IO::TCP::Server->new( $port, 'AC::Yenta::Server' );
    verbose("server started on tcp/$port");


    # start "cronjobs"
    AC::DC::Sched->new(
        info	=> 'check config files',
        freq	=> 30,
        func	=> sub { $AC::Yenta::CONF->check() },
       );

    run_and_watch(
        ($opt->{foreground} || $opt->{debugall}),
        \&AC::DC::IO::mainloop,
       );
}


1;

lib/AC/Yenta/Default/MySelf.pm  view on Meta::CPAN

use AC::Yenta::Debug;
use Sys::Hostname;
use Socket;
use strict;


my $SERVERID;
my $MYIP = inet_ntoa(scalar gethostbyname(hostname()));
die "cannot determine my IP addr.\nsee 'class_myself' in the documentation\n" unless $MYIP;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        $SERVERID = 'yenta/' . conf_value('environment') . '@' . hostname();
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

sub my_network_info {
    return [ { ipa => $MYIP } ];
}

sub my_datacenter {
    return 'default';
}

1;

lib/AC/Yenta/Direct.pm  view on Meta::CPAN

use AC::Yenta::Store::Map;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
use AC::Yenta::Store::Tokyo;
use AC::Misc;
use AC::Import;
use strict;

our @EXPORT = 'yenta_direct_get';

sub yenta_direct_get {
    my $file = shift;
    my $map  = shift;
    my $key  = shift;

    my $me = __PACKAGE__->new($map, $file);
    return unless $me;
    return $me->get( $key );
}


sub new {
    my $class = shift;
    my $map   = shift;
    my $file  = shift;

    my $db = AC::Yenta::Store::Map->new( $map, undef, { dbfile => $file, readonly => 1 } );
    return unless $db;

    return bless { db => $db }, $class;
}

sub get {
    my $me  = shift;
    my $key = shift;

    my $db = $me->{db};
    return $db->get($key);
}

sub allkeys {
    my $me  = shift;

    return $me->getrange( '', '' );
}

sub getrange {
    my $me  = shift;
    my $lo  = shift;
    my $hi  = shift;

    my $db = $me->{db};

    return map { $_->{k} } $db->range( ($lo||''), ($hi||'') );
}

1;

lib/AC/Yenta/IO/TCP/Client.pm  view on Meta::CPAN

package AC::Yenta::IO::TCP::Client;
our @ISA = 'AC::DC::IO::TCP::Client';

use AC::Yenta::MySelf;
use AC::Misc;
use strict;

my $inited;
my $natdom;

sub use_addr_port {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;

    # is addr + port => return
    return ($addr, $port) unless ref $addr;

    # addr is array of nat ip info (ACPIPPort)

    _init() unless $inited;

lib/AC/Yenta/IO/TCP/Client.pm  view on Meta::CPAN

        $private = $i if $i->{natdom} eq $natdom;
    }

    # prefer private addr if available (cheaper)
    my $prefer = $private || $public;
    return unless $prefer;

    return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}

sub _init {

    # determine my local NAT domain
    my $nat = my_network_info();

    for my $i (@$nat){
        $natdom ||= $i->{natdom};
    }
    $inited = 1;
}

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

use Sys::Hostname;
use Socket;
require 'AC/protobuf/yenta_status.pl';
use strict;


my $HOSTNAME = hostname();

################################################################

sub _myself {

    my $maps = conf_value('map');

    my @ipinfo;
    my $natinfo = my_network_info();
    my $status  = 200;

    for my $i ( @$natinfo ){
        my $st = AC::Yenta::NetMon::status_dom( $i->{natdom} );

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

        lastup		=> $^T,
        ip		=> \@ipinfo,
        map		=> [ keys %$maps ],
        sort_metric	=> loadave() * 1000,
    };
}


################################################################

sub myself {

    # tell server about ourself
    return ACPYentaStatusRequest->encode({
        myself => _myself(),
    });
}

sub response {

    # send client everything we know
    my @peer = AC::Yenta::Status->allpeers();
    push @peer, _myself();
    # add the items we monitor
    push @peer, AC::Yenta::Monitor::export();

    return ACPYentaStatusReply->encode({
        status	=> \@peer,
    });
}

################################################################

# do not believe a client that says it is up
# put it on the sceptical queue, and check for ourself
sub update_sceptical {
    my $gpb = shift;
    my $io  = shift;

    return unless $gpb;
    my $c;
    eval {
        $c = ACPYentaStatusRequest->decode( $gpb );
        $c = $c->{myself};
    };
    if(my $e = $@){

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

    }

    my $id = $c->{server_id};

    # don't track myself
    return if AC::Yenta::Status->my_server_id() eq $id;

    AC::Yenta::Status->update_sceptical($id, $c, $io);
}

sub update {
    my $gpb = shift;

    return unless $gpb;
    my $c;
    eval {
        $c = ACPYentaStatusReply->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode status data: $e");
        return;

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN


    for my $up (@{$c->{status}}){
        my $id = $up->{server_id};
        next if $up->{via} eq $myself;
        next if $id eq $myself;
        AC::Yenta::Status->update($id, $up);
    }
}

# unable to connect to server. mark it down
sub isdown {
    my $id = shift;

    return unless $id;

    AC::Yenta::Status->isdown( $id );
}
1;

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

use AC::Dumper;
use AC::Misc;
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting kibitz status client');
    my $me = $class->SUPER::new( @_ );
    return unless $me;

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

sub use_addr_port {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;

    # is addr + port => return
    return ($addr, $port) unless ref $addr;

    # addr is array of nat ip info (ACPIPPort)

    my $down;

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

    $prefer ||= $public unless int rand(20);
    # prefer private addr if available (cheaper)
    $prefer ||= $private || $public || $down;
    return unless $prefer;

    #print STDERR "using ", inet_itoa($prefer->{ipv4}), "\n";
    return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}


sub start {
    my $me = shift;

    $me->SUPER::start();

    # build request
    my $yp  = AC::Yenta::Protocol->new();
    my $pb  = AC::Yenta::Kibitz::Status::myself();
    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($pb),

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

        msgid		=> $msgid++,
       );

    # write request
    $me->write( $hdr . $pb );
    $me->timeout_rel($TIMEOUT);
    return $me;
}


sub timeout {
    my $me = shift;
    $me->shut();
}

sub shutdown {
    my $me = shift;

    unless( $me->{status_ok} ){
        AC::Yenta::Kibitz::Status::isdown( $me->{status_peer} );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    #debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;

lib/AC/Yenta/Kibitz/Status/Server.pm  view on Meta::CPAN

# Function: 
#
# $Id$

package AC::Yenta::Kibitz::Status::Server;
use AC::Dumper;
use AC::Yenta::Debug 'status_server';
use strict;


sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;

    if( $gpb ){
        AC::Yenta::Kibitz::Status::update_sceptical( $gpb, $io );
    }

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN

use AC::Yenta::Protocol;
use AC::Yenta::IO::TCP::Client;
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;

our @ISA = 'AC::Yenta::IO::TCP::Client';

my $TIMEOUT = 5;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $req   = shift;

    debug('starting kibitz store client');
    my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
    return unless $me;

    $me->{_req} = $req;
    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

sub start {
    my $me = shift;

    $me->SUPER::start();
    $me->write( $me->{_req} );
    $me->timeout_rel($TIMEOUT);
    return $me;
}

sub shutdown {
    my $me = shift;

    # maybe call error handler
    $me->run_callback('error', undef) unless $me->{_store_ok};
}

sub timeout {
    my $me = shift;

    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    $me->timeout_rel($TIMEOUT) if $evt->{data} && !$proto;
    return unless $proto;
    $proto->{data}    = $data;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

use AC::Yenta::Config;
use AC::Dumper;
use JSON;
use Digest::SHA 'sha1_base64';
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;

my $TIMEOUT = 1;

sub api_get {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# not used

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        content_encrypted => $proto->{data_encrypted},
    }, { data => \@res }, \$ect );

    debug("sending get reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response . $ect );

}

sub api_check {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# not used

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        }, { check => \@res } );

    debug("sending check reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response );

}

# get + process merkle data
sub _get_check {
    my $map   = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    my $res = store_get_merkle($map, $shard, $ver, $lev);
    return unless $res;
    for my $r (@$res) {
        $r->{map}   = $map;
    }

    return @$res;
}

sub api_distrib {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# reference

    # decode request
    my $req;
    eval {
        $req = ACPYentaDistRequest->decode( $gpb );
        die "invalid k/v for put request\n" unless $req->{datum}{key} && $req->{datum}{version};

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        is_reply	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, { status_code => 200, status_message => 'OK', haveit => !$want } );

    debug("sending distrib reply");
    $io->timeout_rel($TIMEOUT);
    $io->write_and_shut( $response );

}

sub _check_content {
    my $meta = shift;
    my $cont = shift;

    return 1 unless $meta && $meta =~ /^\{/;

    eval {
        $meta = decode_json($meta);
    };
    return 1 if $@;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        return unless $chk eq $meta->{sha1};
    }
    if( $meta->{size} ){
        my $len = length($$cont);
        return unless $len == $meta->{size};
    }

    return 1;
}

sub _reply_error {
    my $io    = shift;
    my $proto = shift;
    my $code  = shift;
    my $msg   = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN


require 'AC/protobuf/yenta_status.pl';
require 'AC/protobuf/heartbeat.pl';

my $FREQ	= 2;
my $OLD_DOWN	= 30;
my $OLD_KEEP	= 1800;

my %MON;	# by 'id' (from config file)

sub init {

    AC::DC::Sched->new(
        info	=> 'monitor',
        freq	=> $FREQ,
        func	=> \&periodic,
       );
}

sub periodic {

    my $mon = conf_value('monitor');

    # clean up old data
    for my $id (keys %MON){
        isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
    }

    # start monitoring (send heartbeat request)
    for my $m (@$mon){

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

                                                info 		=> "monitor client: $id",
                                                monitor_peer	=> $id,
                                               );

        isdown($id, 0) unless $ok;
    }
}

# data for kibitzing around
# array of ACPYentaStatus
sub export {

    my @d;
    my $here = my_datacenter();
    my $self = my_server_id();

    for my $v (values %MON){

        push @d, {
            id			=> $v->{id},	# from config, typically localhost:port
            datacenter		=> $here,

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

            capacity_metric	=> $v->{capacity_metric},
            server_id		=> $v->{server_id},
            instance_id 	=> $v->{server_id},
            ip			=> $v->{ip},
            path		=> '.',
        };
    }
    return @d;
}

sub isdown {
    my $id   = shift;
    my $code = shift;

    my $d = $MON{$id};
    return unless $d;

    # require 2 polls to fail
    return unless $^T - $d->{lastup} >= 2 * $FREQ;

    $code = 0 if $code == 200;

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

    $d->{timestamp}   = $^T;

    debug("monitor $id is down");

    if( $d->{lastup} < $^T - $OLD_KEEP ){
        debug("monitor $id down too long. removing from report");
        delete $MON{$id};
    }
}

sub update {
    my $id = shift;
    my $gb = shift;	# ACPHeartbeat

    my $up;
    eval {
        $up = ACPHeartBeat->decode( $gb );
        $up->{id} = $id;
    };
    if(my $e = $@){
        problem("cannot decode hb data: $e");

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

    return isdown($id, 0) unless $^T - $up->{timestamp} < $OLD_DOWN;

    debug("monitor $id is up");
    $up->{lastup} = $^T;
    $up->{downcount} = 0;

    _hb_ip_info( $up, $MON{$id} );
    $MON{$id} = $up;
}

sub _hb_ip_info {
    my $up  = shift;
    my $old = shift;

    my $ip;

    $ip = $old->{ip} if ($old->{process_id} == $up->{process_id}) && ($old->{server_id} eq $up->{server_id});

    unless( $ip ){
        my $port = $up->{port};
        unless( $port ){

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN

use AC::Yenta::Debug 'monitor_client';
use AC::Yenta::IO::TCP::Client;
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting monitor status client');
    my $me = $class->SUPER::new( @_ );
    unless($me){
        return;
    }

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN

       );

    # write request
    $me->write( $hdr );

    $me->timeout_rel($TIMEOUT);

    return $me;
}

sub timeout {
    my $me = shift;
    $me->shut();
}

sub shutdown {
    my $me = shift;

    unless( $me->{status_ok} ){
        AC::Yenta::Monitor::isdown( $me->{monitor_peer} );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new();
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;

lib/AC/Yenta/MySelf.pm  view on Meta::CPAN

=head1 DESCRIPTION

provide functions to override default behavior. you may define
any or all of the following functions.

=head2 my_server_id

return a unique identity for this yenta instance. typically,
something similar to the server hostname.

    sub my_server_id {
        return 'yenta@' . hostname();
    }

=head2 my_datacenter

return the name of the local datacenter. yenta will use this
to determine which systems are local (same datacenter) and
which are remote (different datacenter), and will tune various
behaviors accordingly.

    sub my_datacenter {
        my($domain) = hostname() =~ /^[\.]+\.(.*)/;
        return $domain;
    }

=head2 my_network_info

return information about the various networks this server has.

    sub my_network_info {
        my $public_ip = inet_ntoa(scalar gethostbyname(hostname()));
        my $privat_ip = inet_ntoa(scalar gethostbyname('internal-' . hostname()));


        return [
            # use this IP for communication with servers this datacenter (same natdom)
            { ip => $privat_ip, natdom => my_datacenter() },
            # otherwise use this IP
            { ip => $public_ip },
        ]
    }

=head2 init

inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.

    my $HOSTNAME;
    my $DOMAIN;
    sub init {
        $HOSTNAME = hostname();
        ($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
    }

=head1 BUGS

none. you write this yourself.

=head1 SEE ALSO

lib/AC/Yenta/NetMon.pm  view on Meta::CPAN

use Socket;

use strict;

my $STALE	= 120;

my %lastok;	# natdom => T
my %natdom;	# ip => natdom


sub init {

    my $natinfo = my_network_info();
    for my $n ( @$natinfo ){
        my $dom = $n->{natdom} || 'public';
        $natdom{ $n->{ipa} } = $dom;
        $lastok{ $dom } = $^T;	# assume everything is working
    }
}

sub update {
    my $io = shift;

    my $ip  = inet_ntoa( (sockaddr_in(getsockname($io->{fd})))[1] );
    my $dom = $natdom{ $ip } || 'public';

    $lastok{$dom} = $^T;
}

sub status_dom {
    my $dom = shift;

    $dom ||= 'public';
    return unless exists $lastok{$dom};	# not local
    return ($lastok{$dom || 'public'} + $STALE < $^T) ? 0 : 200;
}


1;

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

  yenta_check		=> { num => 9, reqc => 'ACPYentaCheckRequest',  resc => 'ACPYentaCheckReply' },
 );


for my $name (keys %MSGTYPE){
    my $r = $MSGTYPE{$name};
    __PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


sub read_protocol {
    my $me  = shift;
    my $io  = shift;
    my $evt = shift;

    $io->{rbuffer} .= $evt->{data};

    return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;

    if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
        # decode header

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

        $content = $me->_decrypt_data( $io, $auth, $content );
        return unless $content;
    }

    # content is passed as reference
    return ($p, $data, ($content ? \$content : undef));
}

# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
    my $io  = shift;
    my $evt = shift;

    return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
    my($get, $url, $http) = split /\s+/, $io->{rbuffer};

    return ( { type => 'http', method => $get }, $url );
}

################################################################

sub _decrypt_data {
    my $me   = shift;
    my $io   = shift;
    my $auth = shift;
    my $data = shift;

    eval {
        $data = $me->decrypt( $auth, $data );
    };
    if(my $e=$@){
        verbose("cannot decrypt protocol data: $e");

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

            cause	=> 'read',
            error	=> "cannot decrypt protocol: $e",
        });
        $io->shut();
        return;
    }

    return $data;
}

sub use_encryption {
    my $peer = shift;

    return unless conf_value('secret');
    # only encrypt far-away traffic, not local
    return $peer->{datacenter} ne my_datacenter();
}

sub encrypt {
    my $me    = shift;
    my $auth  = shift;	# not currently used
    my $buf   = shift;

    my $secret = $me->{secret};
    return $buf unless $secret;
    return unless $buf;
    my $crypto = AC::Yenta::Crypto->new( $secret );
    return $crypto->encrypt( $buf );
}

sub decrypt {
    my $me    = shift;
    my $abuf  = shift;	# not currently used
    my $buf   = shift;

    my $secret = $me->{secret};
    return $buf unless $secret;
    return unless $buf;
    my $crypto = AC::Yenta::Crypto->new( $secret );
    return $crypto->decrypt( $buf );
}

lib/AC/Yenta/Server.pm  view on Meta::CPAN

my $TIMEOUT = 2;

my %HANDLER = (
    yenta_status	=> 'AC::Yenta::Kibitz::Status::Server',
    yenta_get		=> \&AC::Yenta::Kibitz::Store::Server::api_get,
    yenta_distrib	=> \&AC::Yenta::Kibitz::Store::Server::api_distrib,
    yenta_check		=> \&AC::Yenta::Kibitz::Store::Server::api_check,
    http		=> 'AC::Yenta::Stats',
   );

sub new {
    my $class = shift;
    my $fd    = shift;
    my $ip    = shift;

    unless( $AC::Yenta::CONF->check_acl( $ip ) ){
        verbose("rejecting connection from $ip");
        return;
    }

    my $me = $class->SUPER::new( peerip => $ip, info => "tcp yenta server (from: $ip)" );

    $me->init($fd);
    $me->wantread(1);
    $me->timeout_rel($TIMEOUT);
    $me->set_callback('read',    \&read);
    $me->set_callback('timeout', \&timeout);
}

sub timeout {
    my $me = shift;

    debug("connection timed out");
    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

lib/AC/Yenta/SixtyFour.pm  view on Meta::CPAN

# Author: Jeff Weisberg <jaw @ tcp4me.com>
# Created: 2010-Dec-23 11:39 (EST)
# Function: 64 bit numbers as native integers or math::bigints?
#
# $Id$

package AC::Yenta::SixtyFour;
use strict;

# export one set of functions as x64_<name>
sub import {
    my $class  = shift;
    my $caller = caller;

    my $l = length(sprintf '%x', -1);
    my $prefix;
    if( $l >= 16 ){
        $prefix = 'native_';
    }else{
        $prefix = 'bigint_';
        require Math::BigInt;

lib/AC/Yenta/SixtyFour.pm  view on Meta::CPAN


    no strict;
    no warnings;
    for my $f qw(number_to_hex hex_to_number sixty_four_ones one_million){
        *{$caller . '::' . 'x64_' . $f} = \&{ $prefix . $f };
    }
}

################################################################

sub native_number_to_hex {
    my $v = shift;
    return sprintf '%016X', $v;
}

sub native_hex_to_number {
    my $v = shift;
    return hex($v);
}

# prevent overflow warning on 32 bit system
my $sfo = '0xFFFFFFFF_FFFFFFFF';
sub native_sixty_four_ones {
    return hex($sfo);
}

sub native_one_million {
    return 1_000_000;
}

################################################################


sub bigint_number_to_hex {
    my $v = shift;

    if( ref $v ){
        my $h = $v->as_hex();

        # remove leading '0x', and pad to length 16
        $h =~ s/^0x//;
        return ('0' x (16 - length($h))) . $h;

    }else{
        # QQQ?
        return sprintf '%016X', $v;
    }
}

sub bigint_hex_to_number {
    my $v = shift;
    return Math::BigInt->new('0x' . $v);
}

sub bigint_sixty_four_ones {
    return Math::BigInt->new($sfo);
}

sub bigint_one_million {
    return Math::BigInt->new('1_000_000');
}


################################################################

1;

lib/AC/Yenta/Stats.pm  view on Meta::CPAN

my %STATS;

my %HANDLER = (
    loadave		=> \&http_load,
    stats		=> \&http_stats,
    status		=> \&http_status,
    peers		=> \&AC::Yenta::Status::report,
    dumppeers		=> \&AC::Yenta::Status::report_long,
   );

sub add_idle {
    my $idle  = shift;
    my $total = shift;

    # decaying average
    return unless $total;
    my $load = 1 - $idle / $total;
    $total = 60 if $total > 60;
    my $exp = exp( - $total / 60 );
    $loadave = $loadave * $exp + $load * (1 - $exp);
}

sub loadave {
    return $loadave;
}

sub inc_stat {
    my $stat = shift;

    $STATS{$stat} ++;
}

sub handler {
    my $class = shift;
    my $io    = shift;
    my $proto = shift;
    my $url   = shift;

    debug("http request $url");
    $url =~ s|^/||;
    $url =~ s/%(..)/chr(hex($1))/eg;

    my $f = $HANDLER{$url};

lib/AC/Yenta/Stats.pm  view on Meta::CPAN

      . "Connection: close\r\n"
      . "Content-Type: text/plain; charset=UTF-8\r\n"
      . "Content-Length: " . length($content) . "\r\n"
      . "\r\n"
      . $content ;

    $io->write($res);
    $io->set_callback('write_buffer_empty', \&_done );
}

sub _done {
    my $io = shift;
    $io->shut();
}

################################################################

sub http_notfound {
    my $url = shift;

    return ("404 NOT FOUND\nThe requested url /$url was not found on this server.\nSo sorry.\n\n", 404, "Not Found");
}

sub http_load {

    return sprintf("loadave:    %0.4f\n\n", loadave());
}

sub http_status {
    my $status = AC::Yenta::NetMon::status_dom('public');
    return "status: OK\n\n" if $status == 200;
    return("status: PROBLEM\n\n", 500, "Problem");
}

sub http_stats {

    my $res;
    for my $k (sort keys %STATS){
        $res .= sprintf("%-24s%s\n", "$k:", $STATS{$k});
    }

    my @peers = AC::Yenta::Status->allpeers();
    $res .= sprintf("%-24s%s\n", "peers:", scalar @peers);
    $res .= "\n";
    return $res;
}

sub http_data {
    my $url = shift;

    my(undef, $map, $key, $ver) = split m|/|, $url;
    my($data, $version, $file, $meta) = store_get($map, $key, $ver);

    return http_notfound($url) unless $version;
    return $data;
}

sub http_file {
    my $url = shift;

    my(undef, $map, $key, $ver) = split m|/|, $url;
    my($data, $version, $file, $meta) = store_get($map, $key, $ver);

    return http_notfound($url) unless $version && $file;
    return $$file;
}

1;

lib/AC/Yenta/Status.pm  view on Meta::CPAN


our $DATA = bless {
    allpeer	=> {},		# yenta_status
    sceptical	=> {},
    mappeer	=> {},		# {map} => { id => id }
    peermap	=> {},		# {id}  => @map
    datacenter  => {},		# {dc}  => { id => id }
    peertype	=> {},		# {ss}  => { id => id }
};

sub init {
    my $port = shift;

    $PORT = $port;

    AC::DC::Sched->new(
        info	=> 'kibitz status',
        freq	=> (conf_value('time_status_kibitz') || 5),
        func	=> \&periodic,
       );
    AC::DC::Sched->new(
        info	=> 'save status',
        freq	=> (conf_value('time_status_save') || 5),
        func	=> \&save_status,
       );
}

# start up a client every so often
sub periodic {

    # clean up down or lost peers
    for my $id ( keys %{$DATA->{allpeer}} ){
        my $p = $DATA->{allpeer}{$id};
        next unless $p;

        next if $p->{status} == 200 && $p->{timestamp} > $^T - $KEEPLOST;
        _maybe_remove( $id );
    }

lib/AC/Yenta/Status.pm  view on Meta::CPAN


    my $c = AC::Yenta::Kibitz::Status::Client->new( $ip, $port,
                                            info 	=> "status client: $id",
                                            status_peer	=> $id,
                                           );
    return __PACKAGE__->isdown($id) unless $c;

    $c->start();
}

sub _random_peer {

    my $here  = my_datacenter();

    # sceptical
    my @scept = values %{$DATA->{sceptical}};

    my @all   = map  { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};
    my @old   = grep { $_->{timestamp} < $^T - $KEEPLOST *.75 } @all;
    my @local = grep { $_->{datacenter} eq $here } @all;	# this datacenter
    my @away  = grep { $_->{datacenter} ne $here } @all;	# not this datacenter

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    # don't talk to self. any of my addrs.
    my $ipinfo = my_network_info();
    for my $i (@$ipinfo){
        return if $ip eq $i->{ipa} && $port == $PORT;
    }

    return("seed/$ip:$port", $ip, $port);
}

# server list for save file
sub server_list {
    my $type = shift;

    ($type, my $where) = split m|/|, $type;
    # where - no longer used
    $where ||= my_datacenter();

    my @peer = keys %{ $DATA->{peertype}{$type} };
    return unless @peer;

    # nothing too old
    @peer = grep { $DATA->{allpeer}{$_}{lastup} > $^T - $SAVEMAX } @peer;
    return unless @peer;

    return map { $DATA->{allpeer}{$_} } @peer;
}

# save a list of peers, in case I crash, and for others to use
sub save_status {

    my $save = conf_value('savestatus');
    my $here = my_datacenter();

    # also save locally running services
    my @mon  = AC::Yenta::Monitor::export();

    for my $s ( @$save ){
        my $file  = $s->{file};
        my $types = $s->{type};

lib/AC/Yenta/Status.pm  view on Meta::CPAN

        close FILE;
        unless( rename("$file.tmp", $file) ){
            problem("cannot rename save file '$file': $!");
        }

    }
}

################################################################
# diagnostic reports
sub report {

    my $res;

    for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
        my $id = sprintf '%-28s', $v->{server_id};
        my $metric = int( $v->{sort_metric} );
        $res .= "$id $v->{hostname}\t$v->{datacenter}\t$v->{subsystem}\t$v->{environment}\t$v->{status}\t$metric\n";
    }

    return $res;
}

sub report_long {

    my $res;

    for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
        $res .= dumper( $v ) . "\n\n";
    }
    return $res;
}
################################################################

sub my_port { $PORT }


sub my_instance_id {
    my $class = shift;
    return my_server_id() . sprintf('/%04x', $$);
}

sub peer {
    my $class = shift;
    my $id    = shift;

    return $DATA->{allpeer}{$id};
}

sub allpeers {
    my $class = shift;

    # idown sets status to 0 (below), skip such
    return grep { $_->{status} } values %{$DATA->{allpeer}};
}

sub mappeers {
    my $class = shift;
    my $map   = shift;

    return keys %{ $DATA->{mappeer}{$map} };
}

sub datacenters {
    my $class = shift;

    return $DATA->{datacenter};
}
################################################################

sub _remove {
    my $id = shift;

    my $ss = $DATA->{allpeer}{$id}{subsystem};
    delete $DATA->{peertype}{$ss}{$id} if $ss;

    my $dc = $DATA->{allpeer}{$id}{datacenter};
    delete $DATA->{datacenter}{$dc}{$id} if $dc;

    verbose("deleting peer: $id");
    delete $DATA->{allpeer}{$id};

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    }
    delete $DATA->{peermap}{$id};

    # delete its monitored items
    for my $p (keys %{$DATA->{allpeer}}){
        next unless $DATA->{allpeer}{$p}{via} eq $id;
        _remove($p);
    }
}

sub _maybe_remove {
    my $id = shift;

    my $d = $DATA->{allpeer}{$id};

    if( ($^T - $d->{lastup} > $KEEPDOWN) || ($^T - $d->{timestamp} > $KEEPLOST) ){

        _remove($id);
    }
}

sub isdown {
    my $class = shift;
    my $id    = shift;

    debug("marking peer '$id' as down");

    if( ! $DATA->{allpeer}{$id} ){
        return unless $DATA->{sceptical}{$id};
        # we know it is down, and want to kibbitz this fact
        $DATA->{allpeer}{$id} = $DATA->{sceptical}{$id};
    }

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    if( $DATA->{allpeer}{$id} ){
        $DATA->{allpeer}{$id}{timestamp} = $^T;
        $DATA->{allpeer}{$id}{status}    = 0;
        $DATA->{allpeer}{$id}{path}      = my_server_id();
    }
    _maybe_remove( $id );
}

################################################################

sub _env_ok {
    my $class = shift;
    my $id    = shift;
    my $up    = shift;

    # if( $up->{environment} ne conf_value('environment') ){
    #     verbose("ignoring update from $id - wrong env: $up->{environment}");
    #     return;
    # }
    return 1;
}

sub update_sceptical {
    my $class = shift;
    my $id    = shift;	# ->server_id
    my $up    = shift;
    my $io    = shift;

    return unless $class->_env_ok($id, $up);

    if( $DATA->{allpeer}{$id} ){
        # already known
        delete $DATA->{sceptical}{$id};

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    # only accept updates from the server itself
    # no 3rd party updates. no misconfigured serevrs.
    problem("server misconfigured $id != $io->{peerip}")
      unless grep { inet_atoi($io->{peerip}) == $_->{ipv4}  } @{$up->{ip}};

    $up->{id} = $id;
    delete $up->{lastup};
    $DATA->{sceptical}{$id} = $up;
}

sub update {
    my $class = shift;
    my $id    = shift;	# -> server_id
    my $up    = shift;

    return unless $class->_env_ok($id, $up);

    # only keep it if it is relatively fresh, and valid
    return unless $up->{timestamp} > $^T - $KEEPLOST;
    return unless $up->{status};

lib/AC/Yenta/Store.pm  view on Meta::CPAN

use AC::Yenta::Store::AE;
use AC::Yenta::Store::Expire;
use strict;

our @EXPORT = qw(store_get store_put store_want store_get_merkle store_get_internal store_set_internal store_expire store_remove);

my %STORE;


# create maps from config
sub configure {

    my $maps = $AC::Yenta::CONF->{config}{map};

    my %remove = %STORE;
    for my $map (keys %{$maps}){
        debug("configuring map $map");

        my $conf = $maps->{$map};
        my $sharded = $conf->{sharded};
        my $c  = $sharded ? 'AC::Yenta::Store::Sharded' : 'AC::Yenta::Store::Map';

lib/AC/Yenta/Store.pm  view on Meta::CPAN

        $STORE{$map} = $m;
        delete $remove{$map};
    }

    for my $map (keys %remove){
        debug("removing unused map '$map'");
        delete $STORE{$map};
    }
}

sub store_get {
    my $map   = shift;
    my $key   = shift;
    my $ver   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->get($key, $ver);
}

sub store_want {
    my $map   = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->want($shard, $key, $ver);
}

sub store_put {
    my $map   = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;
    my $data  = shift;
    my $file  = shift;	# reference
    my $meta  = shift;

    my $m = $STORE{$map};
    return unless $m;

    debug("storing $map/$key/$ver");
    $m->put($shard, $key, $ver, $data, $file, $meta);
}

# NB: only removes local copy temporarily. will be replaced at next AE run
sub store_remove {
    my $map   = shift;
    my $key   = shift;
    my $ver   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->remove($key, $ver);
}


sub store_get_merkle {
    my $map   = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->get_merkle($shard, $ver, $lev);
}

sub store_get_internal {
    my $map   = shift;
    my $key   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->get_internal($key);
}

sub store_set_internal {
    my $map   = shift;
    my $key   = shift;
    my $val   = shift;

    my $m = $STORE{$map};
    return unless $m;

    $m->set_internal($key, $val);
}

sub store_expire {
    my $map  = shift;
    my $max  = shift;	# all versions before this

    my $m = $STORE{$map};
    return unless $m;

    $m->expire($max);
}

sub store_normalize_version {
    my $map = shift;

    my $m = $STORE{$map};
    return unless $m;

    $m->normalize_version(@_);
}

sub store_version_max {
    my $map = shift;

    my $m = $STORE{$map};
    return unless $m;

    $m->version_max(@_);
}

sub store_merkle_scrub {
    my $map = shift;

    my $m = $STORE{$map};
    return unless $m;

    $m->merkle_scrub(@_);
}

1;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

my $msgid      = $$;
my %DONE;		# maps which have finished
my @AE;			# normally, just one

AC::DC::Sched->new(
    info	=> 'anti-entropy',
    freq	=> 60,
    func	=> \&AC::Yenta::Store::AE::periodic,
   );

sub new {
    my $class = shift;

    my $me = bless {
        badnode		=> [ {version => 0, shard => 0, level => 0} ],
        cache		=> {},
        kvneed		=> [],
        kvneedorig	=> [],
        kvfetching	=> 0,
        missing		=> 0,
    }, $class;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    $me->_init_peer() || return;

    debug("checking $me->{map} with $me->{peer}{id}");
    inc_stat('ae_runs');
    $me->_next_step();

    push @AE, $me;
    return $me;
}

sub periodic {
    # kill dead sessions, start new ones

    my @keep;
    for my $ae (@AE){
        if( $ae->{timestamp} + $EXPIRE > $^T ){
            push @keep, $ae;
        }
    }
    @AE = @keep;

    return if @AE;
    return if loadave() > (conf_value('ae_maxload') || $MAXLOAD);
    __PACKAGE__->new();
}

# we are up to date if we have AE'ed every map at least once since starting
sub up_to_date {
    my $class = shift;

    my $maps = conf_value('map');
    for my $m (keys %$maps){
        return 0 unless $DONE{$m};
    }
    return 1;
}

################################################################

# find most stale map
sub _pick_map {
    my $me = shift;

    my $maps = conf_value('map');
    my(@best, $bestv);
    for my $m (keys %$maps){
        my $lt = AC::Yenta::Store::store_get_internal($m, 'ae_last_start');
        if( !@best || $lt < $bestv ){
            @best = $m;
            $bestv = $lt;
        }elsif( $lt == $bestv ){

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    # is this a data or file map?
    # adjust accordingly
    my $cf = conf_map( $map );
    $me->{has_files} = 1 if $cf->{basedir};
    $me->{maxget} = $me->{has_files} ? $MAXFILES : $MAXGET;
    $me->{expire} = $cf->{expire};

    return 1;
}

sub _init_peer {
    my $me = shift;

    my $here = my_datacenter();
    my @peer = AC::Yenta::Status->mappeers( $me->{map} );
    my $env  = conf_value('environment');

    my(@near, @far, @ood);

    for my $p (@peer){
        my $d = AC::Yenta::Status->peer($p);

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    $me->{peers_far}  = \@far  if @far;
    $me->{peers_ood}  = \@ood  if @ood;

    my $peer = $me->_pick_peer();
    return unless $peer;
    $me->{peer} = $peer;
    return 1;

}

sub _pick_peer {
    my $me = shift;

    my @peer;
    if( $me->{peers_near} && $me->{peers_far} ){
        # prefer close peers, usually
        if( int(rand(8)) ){
            @peer = @{$me->{peers_near}};
        }else{
            @peer = @{$me->{peers_far}};
        }

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    }

    return unless @peer;
    my $peer = $peer[ rand(@peer) ];

    return $peer;
}

################################################################

sub _finished {
    my $me = shift;

    debug("finished $me->{map}");
    $DONE{$me->{map}} = $^T if $me->{missing} < $MAXMISSING;
    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
    @AE = grep{ $_ != $me } @AE;
}

sub _next_step {
    my $me = shift;

    $me->{timestamp} = $^T;

    if( $me->{kvfetching} < $MAXFETCH ){
        # any missing data?
        if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
            debug("starting nextgetkv ($me->{kvfetching})");
            $me->_next_get_kv();
        }

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    if( @{$me->{badnode}} ){
        $me->_start_check();
        return;
    }

    $me->_finished();
}

################################################################

sub _start_check {
    my $me = shift;

    my $node = shift @{$me->{badnode}};
    debug("checking next node: $me->{map} $node->{level}/$node->{version}");
    inc_stat('ae_check_node');

    my $enc     = use_encryption($me->{peer});
    my $proto   = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $request = $proto->encode_request( {
        type		=> 'yenta_check',

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

        info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );

    if( $io ){
        $io->set_callback('load',  \&_check_load,  $me);
        $io->set_callback('error', \&_check_error, $me);
        $io->start();
    }

}

sub _check_load {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;


    debug("check results");
    $evt->{data} ||= {};

    # determine highest level returned

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


    if( @keydata ){
        $me->_check_result_keys( \@keydata );
    }elsif( @nodedata ){
        $me->_check_result_nodes( $maxlev, \@nodedata );
    }

    $me->_next_step();
}

sub _check_error {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
    $me->_next_step();
}

sub _check_result_keys {
    my $me  = shift;
    my $chk = shift;

    my %vscnt;
    my %vsadd;

    for my $d (@$chk){
        inc_stat('ae_check_key');
        my $vsk = "$d->{version} $d->{shard}";
        $vscnt{ $vsk } ++;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

        next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );

        debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
        push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
        inc_stat('ae_key_missing');
        $me->{missing} ++;
        $vsadd{ $vsk } ++;
    }
}

sub _is_expired {
    my $me  = shift;
    my $map = shift;
    my $lev = shift;
    my $ver = shift;

    return unless $me->{expire};

    my $vmx = AC::Yenta::Store::store_version_max( $map, $ver, $lev );
    return unless defined $vmx;

    if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
        debug("skipping expired $lev/$ver - $vmx");
        return 1;
    }

    return;
}

sub _check_result_nodes {
    my $me  = shift;
    my $lev = shift;
    my $chk = shift;

    # determine all of the base versions of the recvd data
    my %ver;
    for my $d (@$chk){
        my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
        $ver{"$ver $shard"} = { ver => $ver, shard => $shard };
    }

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

        # stick them at the front
        unshift @{$me->{badnode}}, { version => $d->{version}, shard => $d->{shard}, level => $lev };
    }
}

################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer
# (the one that said it has the data)

sub _next_get_kv {
    my $me = shift;

    return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
    return $me->_start_get_kv_any()  if @{$me->{kvneed}};
}

sub _start_get_kv_any {
    my $me = shift;

    my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();

    # pick a peer
    my $peer = $me->_pick_peer();
    debug("getting kv data from peer $peer->{id}");
    $me->_start_get_kv( $peer, 1, \@get);
}

sub _start_get_kv_orig {
    my $me = shift;

    my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();

    debug("getting kv data from current peer");
    $me->_start_get_kv( $me->{peer}, 0, \@get);
}

sub _start_get_kv {
    my $me    = shift;
    my $peer  = shift;
    my $retry = shift;
    my $get   = shift;

    # insert map into request
    $_->{map} = $me->{map} for @$get;

    # for (@$get){ debug("requesting $_->{key}/$_->{version}") }

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

                                                  info => "AE getkv from $peer->{id}" );

    if( $io ){
        $me->{kvfetching} ++;
        $io->set_callback('load',  \&_getkv_load,  $me, $retry, $get);
        $io->set_callback('error', \&_getkv_error, $me, $retry, $get);
        $io->start();
    }
}

sub _getkv_load {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

    $me->{kvfetching} --;
    $evt->{data} ||= {};

    debug("got kv data results");

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

                                     $d->{value}, $file, $d->{meta} );

    }

    push @{$me->{kvneedorig}}, (values %need) if $retry;

    $me->_next_get_kv();

}

sub _getkv_error {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

    $me->{kvfetching} --;

    if( $retry ){
        push @{$me->{kvneedorig}}, @$get;

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN

use Sys::SigAction 'set_sig_handler';
use strict;

my $TIMEOUT = 30;

AC::Yenta::Store::Map->add_backend( bdb 	=> 'AC::Yenta::Store::BDBI' );
AC::Yenta::Store::Map->add_backend( berkeley 	=> 'AC::Yenta::Store::BDBI' );

my %recovered;

sub new {
    my $class = shift;
    my $name  = shift;
    my $conf  = shift;

    my $file  = $conf->{dbfile};
    unless( $file ){
        problem("no dbfile specified for '$name'");
        return;
    }

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN

    chmod 0666, $file;

    return bless {
        dir	=> $dir,
        file	=> $file,
        db	=> $db,
        hasenv  => ($env ? 1 : 0),
    }, $class;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    my $v;
    debug("get $map/$sub/$key");

    $me->_start();
    my $r = $me->{db}->db_get( _key($map,$sub,$key), $v );
    $me->_finish();

    return if $r; # not found

    if( wantarray ){
        return ($v, 1);
    }
    return $v;
}

sub put {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $val = shift;

    debug("put $map/$sub/$key");

    $me->_start();
    my $r = $me->{db}->db_put( _key($map,$sub,$key), $val);
    $me->_finish();

    return !$r;
}

sub del {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    $me->_start();
    $me->{db}->db_del( _key($map,$sub,$key));
    $me->_finish();
}

sub sync {
    my $me  = shift;

    $me->{db}->db_sync();
}

sub range {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $end = shift;	# undef => to end of map

    my ($k, $v, @k);
    $me->_start();
    my $cursor = $me->{db}->db_cursor();
    $k = _key($map,$sub,$key);
    my $e = _key($map,$sub,$end);
    $cursor->c_get($k, $v, DB_SET_RANGE);

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN

        }
    }
    $cursor->c_close();
    $me->_finish();

    return @k;
}

################################################################

sub _sig {
    print STDERR "bdbi signal @_\n", AC::Error::stack_trace(), "\n";
    exit(-1);
}

sub _start {
    my $me = shift;

    $me->{alarmold} = alarm($TIMEOUT);
    return unless $me->{hasenv};

    # as long as perl handles the signals, everything gets cleaned up
    # well enough for the locks to be removed
    for my $sig (qw(INT QUIT KILL TERM ALRM)){
        $SIG{$sig} ||= \&_sig;
    }
}

sub _finish {
    my $me = shift;

    alarm($me->{alarmold} || 0);
    $me->{alarmold} = 0;
}


sub _key {
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    return "$map/$sub/$key";
}

1;

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN


my $msgid = $$;
my @DIST;

AC::DC::Sched->new(
    info	=> 'distribution',
    freq	=> 5,
    func	=> \&AC::Yenta::Store::Distrib::periodic,
   );

sub new {
    my $class = shift;
    my $req   = shift;
    my $cont  = shift;

    return if $req->{hop} >= $MAXHOP;
    return if $req->{expire} < $^T;

    my $sender = $req->{sender};
    my $sendat = AC::Yenta::Status->peer($sender);

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    my $max = conf_value('distrib_max') || $MAXUNDERWAY;
    if( @DIST < $max ){
        $me->_start_next();
    }
    push @DIST, $me;

    return $me;
}

# periodically, go through and restart or expire
sub periodic {

    my @keep;
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;

    my $chance = (@DIST > $max) ? ($max / @DIST) : 1;

    for my $r (@DIST){
        # debug("periodic $r->{info}");
        next if $^T > $r->{req}{expire};

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN


    @DIST = @keep;
}

################################################################
# determine distribution strategy
#   - if we recvd it from faraway, we will send it to other datacenters, and randomly in the same datacenter
#   - otherwise we send it in the same datacenter, in an orderly fashion
# RSN - find an strategy with faster convergence + less duplication

sub _init_strategy {
    my $me     = shift;
    my $sender = shift;

    my $here   = my_datacenter();
    my $dcs    = AC::Yenta::Status->datacenters();
    my $sendat = AC::Yenta::Status->peer($sender);
    my(@far, @near);

    for my $dc (keys %$dcs){
        if( $dc eq $here ){

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN


    if( $me->{faraway} ){
        $me->{nearsend} = shuffle(\@near);
        $me->{farsend}  = shuffle(\@far);
    }else{
        $me->{nearsend} = _orderly(\@near);
    }
}

# which yentas can do something with the update?
sub _compat_peers_in_dc {
    my $me = shift;
    my $dc = shift;

    my $env = conf_value('environment');
    my $dcs = AC::Yenta::Status->datacenters();
    my $map = $me->{map};
    my @id;

    for my $id (keys %{$dcs->{$dc}}){
        my $pd = AC::Yenta::Status->peer($id);

        next unless $pd->{subsystem}   eq 'yenta';
        next unless $pd->{environment} eq $env;
        next unless grep {$map eq $_} @{ $pd->{map} };
        push @id, $id;
    }
    return @id;
}

sub _start_far {
    my $me  = shift;

    my $d = shift @{ $me->{farsend} };
    return unless $d;

    # randomly pick one server in chosen dc
    my @id = grep {
        my $x = AC::Yenta::Status->peer($_);
        ($x->{status} == 200) ? 1 : 0;
    } @{$d->{id}};
    return unless @id;

    my $id = $id[ rand(@id) ];
    debug("sending $me->{info} to far site $id in $d->{dc}");
    $me->_start_peer( $id, 1 );
    inc_stat('dist_send_far');
    inc_stat('dist_send_total');
    return 1;
}

sub _start_near {
    my $me  = shift;

    my $id = shift @{ $me->{nearsend} };
    return unless $id;
    debug("sending $me->{info} to nearby site $id");
    $me->_start_peer( $id, 0 );
    inc_stat('dist_send_near');
    inc_stat('dist_send_total');
    return 1;
}

sub _start_next {
    my $me  = shift;

    my $sent;

    # pick next peers
    # start clients

    if( $me->{faraway} ){
        if( $me->{farseen} < $MAXFARSEE ){
            for (1 .. $FARSENDS){

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

                $sent ++ if $me->_start_near();
            }
        }
    }else{
        $sent ++ if $me->_start_near();
    }

    return $sent;
}

sub _start_one {
    my $me  = shift;
    my $far = shift;

    if( $far ){
        return if $me->{farseen}  >= $MAXFARSEE;
        $me->_start_far();
    }else{
        return if $me->{nearseen} >= $MAXNEARSEE;
        $me->_start_near();
    }
}

sub _start_peer {
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    my $pd   = AC::Yenta::Status->peer($id);
    my $addr = $pd->{ip};	# array of nat ip info

    my $enc = use_encryption($pd);
    my $ect = '';
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN


    if( $io ){
        $io->set_callback('load',  \&_onload,  $me, $id, $far);
        $io->set_callback('error', \&_onerror, $me, $id, $far);
        $io->start();
    }else{
        debug("start client failed");
    }
}

sub _onload {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");

    if( $evt->{data}{haveit} ){
        if( $far ){

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

            my $n = $me->{ordershift};
            $n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
            shift @{$me->{nearsend}} for (1 .. $n);
            $me->{ordershift} *= 2;
        }
    }

    $me->_start_one($far);
}

sub _onerror {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    verbose("error distributing $me->{info} to $id");
    # don't need to track anything

    $me->_start_one($far);
}

sub _orderly {
    my $peers = shift;

    my $myself = AC::Yenta::Status->my_server_id();
    my @p = sort {$a cmp $b} @$peers;

    my @left  = grep { $_ lt $myself } @p;
    my @right = grep { $_ gt $myself } @p;

    @p = (@right, @left);
    return \@p;



( run in 0.461 second using v1.01-cache-2.11-cpan-a5abf4f5562 )