AC-Yenta

 view release on metacpan or  search on metacpan

LICENSE  view on Meta::CPAN

    This software may be copied and distributed under the terms
    found in the Perl "Artistic License".

    A copy of the "Artistic License" may be found in the standard
    Perl distribution.

META.yml  view on Meta::CPAN

--- #YAML:1.0
name:               AC-Yenta
version:            1.1
abstract:           eventually-consistent distributed key/value data store. et al.
author:
    - AdCopy <http://www.adcopy.com>
license:            perl
distribution_type:  module
configure_requires:
    ExtUtils::MakeMaker:  0
requires:
    AC::DC:               0
    BerkeleyDB:           0
    Crypt::Rijndael:      0
    Digest::SHA:          0
    Google::ProtocolBuffers:  0
    JSON:                 0
    POSIX:                0
    Sys::Hostname:        0
    Time::HiRes:          0
no_index:
    directory:
        - t
        - inc
generated_by:       ExtUtils::MakeMaker version 6.48
meta-spec:
    url:      http://module-build.sourceforge.net/META-spec-v1.4.html
    version:  1.4

Makefile.PL  view on Meta::CPAN


use ExtUtils::MakeMaker;
WriteMakefile(
              NAME            => 'AC::Yenta',
              VERSION_FROM    => 'lib/AC/Yenta.pm',
              ABSTRACT_FROM   => 'lib/AC/Yenta.pm',
              AUTHOR          => 'AdCopy <http://www.adcopy.com>',
              LICENSE         => 'perl',
              PREREQ_PM       => {
                  'POSIX'                       => 0,
                  'Sys::Hostname'	        => 0,
                  'JSON'		        => 0,
                  'Digest::SHA'		        => 0,
                  'Crypt::Rijndael'		=> 0,
                  'BerkeleyDB'		        => 0,
                  'Time::HiRes'			=> 0,
                  'Google::ProtocolBuffers'	=> 0,
		  'AC::DC'			=> 0,
              }
);

eg/myself.pm  view on Meta::CPAN


# $Id$

package Local::Yenta::MySelf;
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# our tcp port
    my $id    = shift;  # from cmd line

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.example.com//;	# remove domain
        $SERVERID = "yenta/$h";
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

eg/yenta.conf  view on Meta::CPAN


# enable debugging?
#debug           ae
#debug           map
#debug           merkle
# ...


# maps
map testyfoo {
    # name of the data file
    dbfile      /home/data/testyfoo.ydb
    # how much history to keep 
    history     4
}

eg/yenta_get  view on Meta::CPAN


use AC::Yenta::Client;
use AC::Dumper;
use strict;

my $map = shift @ARGV;
my $key = shift @ARGV;
die "usage: get [-h host] map key\n" unless $map && $key;

my $y   = AC::Yenta::Client->new(
    # server_file, servers[], or host + port
    server_file	=> '/var/tmp/yenta.status',
    debug 	=> \&debug,
   );

my $res = $y->get($map, $key);

print dumper($res), "\n";


exit;

sub debug {
    print STDERR @_, "\n";
}

eg/yenta_put  view on Meta::CPAN

use JSON;
use strict;


my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });


my $key = 'YX3jSXD3CBRUDABm';

my $res = $ys->distribute(
    # map, key, version, data
    'mymap', $key, timet_to_yenta_version(time()),
    encode_json( {
        url_id	=> $key,
        url	=> 'http://www.example.com',
        acc_id	=> 'C9TdSgbUCBRUCABG',
        format	=> 'html',
    }),
   );

eg/yentad  view on Meta::CPAN

# example yenta daemon

use Getopt::Std;
use AC::Yenta::D;
use strict;

my @saved_argv = @ARGV;
my %OPT;

getopts('c:dfp:', \%OPT) || die "usage...\n";
    # -c config file
    # -d    enable all debugging
    # -f    foreground
    # -p port


my $y = AC::Yenta::D->new(
    class_myself	=> 'My::Code::MySelf',
   );


$y->daemon( $OPT{c}, {
    argv	=> \@saved_argv,
    foreground	=> $OPT{f},
    debugall	=> $OPT{d},
    port	=> $OPT{p},
} );

lib/AC/Yenta.pm  view on Meta::CPAN

use strict;

our $VERSION = 1.1;

=head1 NAME

AC::Yenta - eventually-consistent distributed key/value data store. et al.

=head1 SYNOPSIS

    use AC::Yenta::D;
    use strict;

    my $y = AC::Yenta::D->new( );

    $y->daemon( $configfile, {
      argv		=> \@ARGV,
      foreground	=> $OPT{f},
      debugall		=> $OPT{d},
      port		=> $OPT{p},
    } );

    exit;


=head1 USAGE

    Copy + Paste from the example code into your own code.
    Copy + Paste from the example config into your own config.
    Send in bug report.

=head1 YIDDISH-ENGLISH GLOSSARY

	Kibitz - Gossip. Casual information exchange with ones peers.

	Yenta - 1. An old woman who kibitzes with other yentas.
		2. Software which kibitzes with other yentas.


=head1 DESCRIPTION

=head2 Peers

All of the running yentas are peers. There is no master server.
New nodes can be added or removed on the fly with no configuration.

=head2 Kibitzing

lib/AC/Yenta/Client.pm  view on Meta::CPAN

use strict;

require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';

our @EXPORT = 'timet_to_yenta_version';	# imported from Y/Conf

my $HOSTNAME = hostname();

my %MSGTYPE =
 (
  yenta_get		=> { num => 7, reqc => 'ACPYentaGetSet',        resc => 'ACPYentaGetSet' },
  yenta_distrib		=> { num => 8, reqc => 'ACPYentaDistRequest',   resc => 'ACPYentaDistReply' },
  yenta_check		=> { num => 9, reqc => 'ACPYentaCheckRequest',  resc => 'ACPYentaCheckReply' },
 );

for my $name (keys %MSGTYPE){
    my $r = $MSGTYPE{$name};
    AC::DC::Protocol->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


# one or more of:
#   new( host, port )
#   new( servers => [ { host, port }, ... ] )
#   new( server_file )

sub new {
    my $class = shift;

    my $me = bless {
        debug	=> sub{ },
        host	=> 'localhost',
        proto	=> AC::DC::Protocol->new(),
        copies	=> 1,
        @_,
    }, $class;

    $me->{server_file} ||= $me->{altfile};	# compat

    die "servers or server_file?\n" unless $me->{servers} || $me->{server_file};

    return $me;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $key = shift;
    my $ver = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_get',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        data	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
        } ]
    } );

    return $me->_send_request($map, $req);
}

sub _shard {
    my $key = shift;

    my $sh = sha1($key);
    my($a, $b) = unpack('NN', $sh);
    return $a<<32 | $b;
}


sub distribute {
    my $me   = shift;
    my $map  = shift;
    my $key  = shift;
    my $ver  = shift;
    my $val  = shift;
    my $file = shift;	# reference
    my $meta = shift;

    return unless $key && $ver;
    $me->{retries} = 25 unless $me->{retries};

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_distrib',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        sender		=> "$HOSTNAME/$$",
        hop		=> 0,
        expire		=> time() + 120,
        datum	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
            shard	=> _shard($key),	# NYI
            value	=> $val,
            meta	=> $meta,
        } ]
    }, $file );

    return $me->_send_request($map, $req, $file);
    # return undef | result
}

sub check {
    my $me  = shift;
    my $map = shift;
    my $ver = shift;
    my $lev = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        map		=> $map,
        level		=> $lev,
        version		=> $ver,
    } );

    return $me->_send_request($map, $req);
}

################################################################

sub _send_request {
    my $me   = shift;
    my $map  = shift;
    my $req  = shift;
    my $file = shift;	# reference

    my $tries = $me->{retries} + 1;
    my $copy  = $me->{copies} || 1;
    my $delay = 0.25;

    $me->_init_hostlist($map);
    my ($addr, $port) = $me->_next_host($map);

    for (1 .. $tries){
        return unless $addr;
        my $res = $me->_try_server($addr, $port, $req, $file);
        return $res if $res && !--$copy;
        ($addr, $port) = $me->_next_host($map);
        sleep $delay;
        $delay *= 1.414;
    }
}

sub _try_server {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;
    my $file = shift;	# reference

    my $ipn = inet_aton($addr);
    $req .= $$file if $file;

    $me->{debug}->("trying to contact yenta server $addr:$port");
    my $res;
    eval {
        $res = $me->{proto}->send_request($ipn, $port, $req, $me->{debug}, $me->{timeout});
        $res->{data} = $me->{proto}->decode_reply( $res ) if $res;
    };
    if(my $e = $@){
        $me->{debug}->("yenta request failed: $e");
        $res = undef;
    }
    return $res;
}


################################################################

sub _next_host {
    my $me  = shift;
    my $map = shift;

    $me->_read_serverfile($map) unless $me->{_server};
    return unless $me->{_server} && @{$me->{_server}};
    my $next = shift @{$me->{_server}};
    return( $next->{addr}, $next->{port} );
}

sub _init_hostlist {
    my $me  = shift;
    my $map = shift;

    my @server;
    push @server, {
        addr	=> $me->{host},
        port	=> $me->{port},
    } if $me->{host} && $me->{port};

    push @server, @{$me->{servers}} if $me->{servers};
    $me->{_server} = \@server;

    $me->_read_serverfile($map);
}

# yentad saves a list of alternate peers to try in case it dies
sub _read_serverfile {
    my $me  = shift;
    my $map = shift;

    my $f;
    my @server;
    my @faraway;
    open($f, $me->{server_file});
    local $/ = "\n";
    while(<$f>){
        chop;
        my $data = decode_json( $_ );
        next unless grep { $_ eq $map } @{ $data->{map} };
        if( $data->{is_local} ){
            push @server, { addr => $data->{addr}, port => $data->{port} };
        }else{
            push @faraway, { addr => $data->{addr}, port => $data->{port} };
        }
    }

    # prefer local
    @server = @faraway unless @server;

    shuffle( \@server );
    push @{$me->{_server}}, @server;
}

################################################################

1;

lib/AC/Yenta/Conf.pm  view on Meta::CPAN

use AC::Import;
use strict;

our @EXPORT = qw(timet_to_yenta_version_factor timet_to_yenta_version);

my $TTVF = x64_one_million();


# deprecated
sub timet_to_yenta_version_factor {
    return $TTVF;
}

sub timet_to_yenta_version {
    my $t = shift;

    return unless defined $t;
    return $t * $TTVF unless ref $TTVF;

    # math::bigint does not like to multiply floats
    my $ti = int $t;
    my $tf = $t - $ti;

    return $ti * $TTVF + int($TTVF->numify() * $tf);
}

1;

lib/AC/Yenta/Config.pm  view on Meta::CPAN

use AC::ConfigFile::Simple;
use Socket;
use strict;

our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value conf_map);


my %CONFIG = (

    include	=> \&AC::ConfigFile::Simple::include_file,
    debug	=> \&AC::ConfigFile::Simple::parse_debug,
    allow	=> \&AC::ConfigFile::Simple::parse_allow,
    port	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    environment => \&AC::ConfigFile::Simple::parse_keyvalue,
    secret 	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    seedpeer	=> \&AC::ConfigFile::Simple::parse_keyarray,

    ae_maxload	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    distrib_max	=> \&AC::ConfigFile::Simple::parse_keyvalue,

    savestatus	=> \&parse_savefile,
    monitor	=> \&parse_monitor,
    map		=> \&parse_map,

);

my @MAP = qw(dbfile basedir keepold history expire backend sharded);


################################################################

sub handle_config {
    my $me   = shift;
    my $key  = shift;
    my $rest = shift;

    my $fnc = $CONFIG{$key};
    return unless $fnc;
    $fnc->($me, $key, $rest);
    return 1;
}

################################################################

sub parse_map {
    my $me    = shift;
    my $key   = shift;
    my $value = shift;

    my($map) = $value =~ /(\S+)\s+\{\s*/;
    die "invalid map spec\n" unless $map;

    my $md = {};
    problem("map '$map' redefined") if $me->{_pending}{map}{$map};

    while( defined(my $l = $me->_nextline()) ){
        last if $l eq '}';
        my($k, $v) = split /\s+/, $l, 2;

        if( grep {$_ eq $k} @MAP ){
            $v = cvt_timespec($v) if $k eq 'expire';
            $md->{$k} = $v;
        }else{
            problem("unknown map option '$k'");
        }
    }

    $me->{_pending}{map}{$map} = $md;
}

sub parse_monitor {
    my $me  = shift;
    my $key = shift;
    my $mon = shift;

    my($ip, $port) = split /:/, $mon;
    push @{$me->{_pending}{monitor}}, {
        monitor	=> $mon,
        ipa	=> $ip,
        ipn	=> inet_aton($ip),
        ipi	=> inet_atoi($ip),
        port	=> $port,
    };
}

sub parse_savefile {
    my $me   = shift;
    my $key  = shift;
    my $save = shift;

    my($file, @type) = split /\s+/, $save;
    push @{$me->{_pending}{savestatus}}, {
        type	=> \@type,
        file	=> $file,
    };
}

sub cvt_timespec {
    my $t = shift;

    my %f = ( m => 60, h => 3600, d => 86400 );
    my($n, $f) = $t =~ /(\d+)(\D?)/;

    $f = $f{$f} || 1;
    return $n * $f;
}


################################################################

sub conf_value {
    my $key = shift;

    return $AC::Yenta::CONF->{config}{$key};
}

sub conf_map {
    my $map = shift;

    return $AC::Yenta::CONF->{config}{map}{$map};
}


1;

lib/AC/Yenta/Crypto.pm  view on Meta::CPAN

use Time::HiRes 'time';
use Crypt::Rijndael;
use Digest::SHA  qw(sha256 hmac_sha256_base64);
use strict;

require 'AC/protobuf/auth.pl';

my $ALGORITHM = 'x-acy-aes-1';

sub new {
    my $class  = shift;
    my $secret = shift;

    return bless {
        secret	=> $secret,
    }, $class;
}

sub encrypt {
    my $me  = shift;
    my $buf = shift;

    my $seqno  = int( time() * 1_000_000 );
    my $nonce  = random_text(48);
    my $key    = $me->_key($seqno, $nonce);
    my $iv     = $me->_iv($key, $seqno, $nonce);

    # pad
    my $pbuf = $buf;
    $pbuf .= "\0" x (16 - length($pbuf) & 0xF) if length($pbuf) & 0xF;

    my $aes    = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
    $aes->set_iv( $iv );
    my $ct     = $aes->encrypt( $pbuf );
    my $hmac   = hmac_sha256_base64($ct, $key);

    my $eb     = ACPEncrypt->encode( {
        algorithm	=> $ALGORITHM,
        seqno		=> $seqno,
        nonce		=> $nonce,
        hmac		=> $hmac,
        length		=> length($buf),
        ciphertext	=> $ct,
    } );

    debug("encrypted <$seqno,$nonce,$hmac>");

    return $eb;
}

sub decrypt {
    my $me  = shift;
    my $buf = shift;

    my $ed     = ACPEncrypt->decode( $buf );
    die "cannot decrypt: unknown alg\n" unless $ed->{algorithm} eq $ALGORITHM;

    my $seqno  = $ed->{seqno},
    my $nonce  = $ed->{nonce};
    my $key    = $me->_key($seqno, $nonce);
    my $iv     = $me->_iv($key, $seqno, $nonce);

    my $hmac   = hmac_sha256_base64($ed->{ciphertext}, $key);
    die "cannot decrypt: hmac mismatch\n" unless $hmac eq $ed->{hmac};

    my $aes    = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
    $aes->set_iv( $iv );
    my $pt     = substr($aes->decrypt( $ed->{ciphertext} ), 0, $ed->{length});

    debug("decrypted <$seqno,$nonce,$hmac>");

    return $pt;
}


sub _key {
    my $me    = shift;
    my $seqno = shift;
    my $nonce = shift;

    return sha256( 'key1' . $me->{secret} . $seqno . $nonce . '1yek' );
}

sub _iv {
    my $me    = shift;
    my $key   = shift;
    my $seqno = shift;
    my $nonce = shift;

    return substr(sha256( 'iv'   . $key . $seqno ), 0, 16);
}


1;

lib/AC/Yenta/Customize.pm  view on Meta::CPAN

# Author: Jeff Weisberg
# Created: 2010-Jan-26 10:37 (EST)
# Function: connect user provided implementation
#
# $Id$

package AC::Yenta::Customize;
use strict;

sub customize {
    my $class  = shift;
    my $implby = shift;

    (my $default = $class) =~ s/(.*)::([^:]+)$/$1::Default::$2/;

    # load user's implemantation + default
    for my $p ($implby, $default){
        eval "require $p" if $p;
        die $@ if $@;
    }

    # import/export
    no strict;
    no warnings;
    for my $f ( @{$class . '::CUSTOM'} ){
        *{$class . '::' . $f} = ($implby && $implby->can($f)) || $default->can($f);
    }
}

1;

lib/AC/Yenta/D.pm  view on Meta::CPAN

use AC::Yenta::Store;
use AC::Yenta::MySelf;
use AC::Yenta::NetMon;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
# use AC::Yenta::Store::Tokyo;

use strict;

sub new {
    my $class = shift;
    my %p = @_;

    AC::Yenta::MySelf->customize( $p{class_myself} );
    # ...

    return bless \$class, $class;
}

sub daemon {
    my $me    = shift;
    my $cfile = shift;
    my $opt   = shift;	# foreground, debugall, persistent_id, argv

    die "no config file specified\n" unless $cfile;

    # configure
    $AC::Yenta::CONF = AC::Yenta::Config->new(
        $cfile, onreload => sub {
            AC::Yenta::Store::configure();
        });


    initlog( 'yenta', (conf_value('syslog') || 'local5'), $opt->{debugall} );

    AC::Yenta::Debug->init( $opt->{debugall}, $AC::Yenta::CONF);
    daemonize(5, 'yentad', $opt->{argv}) unless $opt->{foreground};
    verbose("starting.");


    $SIG{CHLD} = $SIG{PIPE} = sub{};        				# ignore
    $SIG{INT}  = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit;  # abort

    # initialize subsystems
    my $port = $opt->{port} || conf_value('port');

    AC::Yenta::MySelf->init( $port, $opt->{persistent_id} );
    AC::Yenta::Store::configure();
    AC::Yenta::Status::init( $port );
    AC::Yenta::Monitor::init();
    AC::Yenta::NetMon::init();
    AC::DC::IO::TCP::Server->new( $port, 'AC::Yenta::Server' );
    verbose("server started on tcp/$port");


    # start "cronjobs"
    AC::DC::Sched->new(
        info	=> 'check config files',
        freq	=> 30,
        func	=> sub { $AC::Yenta::CONF->check() },
       );

    run_and_watch(
        ($opt->{foreground} || $opt->{debugall}),
        \&AC::DC::IO::mainloop,
       );
}


1;

lib/AC/Yenta/Default/MySelf.pm  view on Meta::CPAN

use Sys::Hostname;
use Socket;
use strict;


my $SERVERID;
my $MYIP = inet_ntoa(scalar gethostbyname(hostname()));
die "cannot determine my IP addr.\nsee 'class_myself' in the documentation\n" unless $MYIP;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        $SERVERID = 'yenta/' . conf_value('environment') . '@' . hostname();
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

sub my_network_info {
    return [ { ipa => $MYIP } ];
}

sub my_datacenter {
    return 'default';
}

1;

lib/AC/Yenta/Direct.pm  view on Meta::CPAN

use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
use AC::Yenta::Store::Tokyo;
use AC::Misc;
use AC::Import;
use strict;

our @EXPORT = 'yenta_direct_get';

sub yenta_direct_get {
    my $file = shift;
    my $map  = shift;
    my $key  = shift;

    my $me = __PACKAGE__->new($map, $file);
    return unless $me;
    return $me->get( $key );
}


sub new {
    my $class = shift;
    my $map   = shift;
    my $file  = shift;

    my $db = AC::Yenta::Store::Map->new( $map, undef, { dbfile => $file, readonly => 1 } );
    return unless $db;

    return bless { db => $db }, $class;
}

sub get {
    my $me  = shift;
    my $key = shift;

    my $db = $me->{db};
    return $db->get($key);
}

sub allkeys {
    my $me  = shift;

    return $me->getrange( '', '' );
}

sub getrange {
    my $me  = shift;
    my $lo  = shift;
    my $hi  = shift;

    my $db = $me->{db};

    return map { $_->{k} } $db->range( ($lo||''), ($hi||'') );
}

1;

lib/AC/Yenta/IO/TCP/Client.pm  view on Meta::CPAN

our @ISA = 'AC::DC::IO::TCP::Client';

use AC::Yenta::MySelf;
use AC::Misc;
use strict;

my $inited;
my $natdom;

sub use_addr_port {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;

    # is addr + port => return
    return ($addr, $port) unless ref $addr;

    # addr is array of nat ip info (ACPIPPort)

    _init() unless $inited;

    my $public;
    my $private;

    for my $i ( @$addr ){
        # skip unreachable networks
        my $ok = AC::Yenta::NetMon::status_dom( $i->{natdom} );
        next unless $ok == 200;
        $public  = $i unless $i->{natdom};
        $private = $i if $i->{natdom} eq $natdom;
    }

    # prefer private addr if available (cheaper)
    my $prefer = $private || $public;
    return unless $prefer;

    return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}

sub _init {

    # determine my local NAT domain
    my $nat = my_network_info();

    for my $i (@$nat){
        $natdom ||= $i->{natdom};
    }
    $inited = 1;
}

1;

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

require 'AC/protobuf/yenta_status.pl';
use strict;


my $HOSTNAME = hostname();

################################################################

sub _myself {

    my $maps = conf_value('map');

    my @ipinfo;
    my $natinfo = my_network_info();
    my $status  = 200;

    for my $i ( @$natinfo ){
        my $st = AC::Yenta::NetMon::status_dom( $i->{natdom} );

        # if a private network is down, announce the network + 500
        # if a private network is down, stop announcing it

        if( $i->{natdom} ){
            push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} }
              if $st == 200;
        }else{
            push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} };
            $status = 500 unless $st == 200;
        }
    }

    return {
        hostname	=> $HOSTNAME,
        datacenter	=> my_datacenter(),
        subsystem	=> 'yenta',
        environment	=> conf_value('environment'),
        via		=> AC::Yenta::Status->my_server_id(),
        server_id	=> AC::Yenta::Status->my_server_id(),
        instance_id	=> AC::Yenta::Status->my_instance_id(),
        path		=> '.',
        status		=> $status,
        uptodate	=> AC::Yenta::Store::AE->up_to_date(),
        timestamp   	=> $^T,
        lastup		=> $^T,
        ip		=> \@ipinfo,
        map		=> [ keys %$maps ],
        sort_metric	=> loadave() * 1000,
    };
}


################################################################

sub myself {

    # tell server about ourself
    return ACPYentaStatusRequest->encode({
        myself => _myself(),
    });
}

sub response {

    # send client everything we know
    my @peer = AC::Yenta::Status->allpeers();
    push @peer, _myself();
    # add the items we monitor
    push @peer, AC::Yenta::Monitor::export();

    return ACPYentaStatusReply->encode({
        status	=> \@peer,
    });
}

################################################################

# do not believe a client that says it is up
# put it on the sceptical queue, and check for ourself
sub update_sceptical {
    my $gpb = shift;
    my $io  = shift;

    return unless $gpb;
    my $c;
    eval {
        $c = ACPYentaStatusRequest->decode( $gpb );
        $c = $c->{myself};
    };
    if(my $e = $@){
        problem("cannot decode status data: $e");
        return;
    }

    my $id = $c->{server_id};

    # don't track myself
    return if AC::Yenta::Status->my_server_id() eq $id;

    AC::Yenta::Status->update_sceptical($id, $c, $io);
}

sub update {
    my $gpb = shift;

    return unless $gpb;
    my $c;
    eval {
        $c = ACPYentaStatusReply->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode status data: $e");
        return;
    }

    debug("rcvd update");
    my $myself = AC::Yenta::Status->my_server_id();

    for my $up (@{$c->{status}}){
        my $id = $up->{server_id};
        next if $up->{via} eq $myself;
        next if $id eq $myself;
        AC::Yenta::Status->update($id, $up);
    }
}

# unable to connect to server. mark it down
sub isdown {
    my $id = shift;

    return unless $id;

    AC::Yenta::Status->isdown( $id );
}
1;

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

use AC::Misc;
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting kibitz status client');
    my $me = $class->SUPER::new( @_ );
    return unless $me;

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

sub use_addr_port {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;

    # is addr + port => return
    return ($addr, $port) unless ref $addr;

    # addr is array of nat ip info (ACPIPPort)

    my $down;
    my $public;
    my $private;

    for my $i ( @$addr ){
        # usually, skip unreachable networks
        my $ok = AC::Yenta::NetMon::status_dom( $i->{natdom} );
        next unless defined $ok;	# remote private network

        if( $ok == 200 ){
            if( $i->{natdom} ){
                $private = $i;
            }else{
                $public = $i;
            }
        }else{
            $down = $i;
        }
    }

    my $prefer;
    # make sure we use all networks once in a while
    $prefer ||= $down   unless int rand(20);
    $prefer ||= $public unless int rand(20);
    # prefer private addr if available (cheaper)
    $prefer ||= $private || $public || $down;
    return unless $prefer;

    #print STDERR "using ", inet_itoa($prefer->{ipv4}), "\n";
    return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}


sub start {
    my $me = shift;

    $me->SUPER::start();

    # build request
    my $yp  = AC::Yenta::Protocol->new();
    my $pb  = AC::Yenta::Kibitz::Status::myself();
    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($pb),
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,
       );

    # write request
    $me->write( $hdr . $pb );
    $me->timeout_rel($TIMEOUT);
    return $me;
}


sub timeout {
    my $me = shift;
    $me->shut();
}

sub shutdown {
    my $me = shift;

    unless( $me->{status_ok} ){
        AC::Yenta::Kibitz::Status::isdown( $me->{status_peer} );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    #debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;
    AC::Yenta::Kibitz::Status::update( $data );
    AC::Yenta::NetMon::update( $me );

    $me->shut();
}


1;

lib/AC/Yenta/Kibitz/Status/Server.pm  view on Meta::CPAN

#
# $Id$

package AC::Yenta::Kibitz::Status::Server;
use AC::Dumper;
use AC::Yenta::Debug 'status_server';
use strict;


sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;

    if( $gpb ){
        AC::Yenta::Kibitz::Status::update_sceptical( $gpb, $io );
    }

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    AC::Yenta::NetMon::update( $io );

    # respond with all known peers
    my $response = AC::Yenta::Kibitz::Status::response();
    my $yp  = AC::Yenta::Protocol->new();

    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($response),
        content_length  => 0,
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
       );

    debug("sending status reply");
    $io->write_and_shut( $hdr . $response );
}

1;

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN

use AC::Yenta::IO::TCP::Client;
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;

our @ISA = 'AC::Yenta::IO::TCP::Client';

my $TIMEOUT = 5;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $req   = shift;

    debug('starting kibitz store client');
    my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
    return unless $me;

    $me->{_req} = $req;
    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

sub start {
    my $me = shift;

    $me->SUPER::start();
    $me->write( $me->{_req} );
    $me->timeout_rel($TIMEOUT);
    return $me;
}

sub shutdown {
    my $me = shift;

    # maybe call error handler
    $me->run_callback('error', undef) unless $me->{_store_ok};
}

sub timeout {
    my $me = shift;

    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    $me->timeout_rel($TIMEOUT) if $evt->{data} && !$proto;
    return unless $proto;
    $proto->{data}    = $data;
    $proto->{content} = $content;
    eval {
        my $yp = AC::Yenta::Protocol->new();
        $proto->{data} = $yp->decode_reply($proto) if $data;
    };
    if(my $e = $@){
        problem("cannot decode reply: $e");
    }

    # process
    $me->{_store_ok} = 1;
    if( $proto->{is_error} || $@ ){
        my $e = $@ || 'remote error';
        $me->run_callback('error', {
            error	=> $e,
        });
    }else{
        $me->run_callback('load', $proto);
    }

    $me->shut();
}


1;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

use AC::Dumper;
use JSON;
use Digest::SHA 'sha1_base64';
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;

my $TIMEOUT = 1;

sub api_get {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# not used

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # decode request
    my $req;
    eval {
        $req = ACPYentaGetSet->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    # process requests
    my @res;
    my $rescont;
    for my $r (@{ $req->{data} }){
        debug("get request: $r->{map}, $r->{key}, $r->{version}");
        my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
        my $res = {
            map		=> $r->{map},
            key		=> $r->{key},
        };

        if( $meta && $file ){
            unless( _check_content( $meta, $file ) ){
                problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
                # QQQ - remove from system, (and let AE get a new copy)?
                store_remove($r->{map}, $r->{key}, $ver);
                # tell caller it was not found
                $ver = undef;
            }
        }

        if( defined $ver ){
            $res->{version} = $ver;
            $res->{value}   = $data;
            $res->{meta}    = $meta  if defined $meta;
            if( $file ){
                # if one file to send, send it as content
                if( @{$req->{data}} == 1 ){
                    $rescont = $file;
                }else{
                    $res->{file} = $$file;
                }
            }
        }
        push @res, $res;
    }

    # encode results
    my $ect = '';
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
    my $response = $yp->encode_reply( {
        type		  => 'yenta_get',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        content_encrypted => $proto->{data_encrypted},
    }, { data => \@res }, \$ect );

    debug("sending get reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response . $ect );

}

sub api_check {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# not used

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # decode request
    my $req;
    eval {
        $req = ACPYentaCheckRequest->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    debug("check request: $req->{map}, $req->{version}, $req->{level}");

    my @res;
    my @todo = { version => $req->{version}, shard => $req->{shard} };

    # the top of the tree will be fairly sparse,
    # return up to several levels if they are sparse
    for my $l (0 .. 32){
        my $cl = $req->{level} + $l;
        my @lres;
        my $nexttot;
        for my $do (@todo){
            my @r = _get_check( $req->{map}, $do->{shard}, $do->{version}, $cl );
            push @lres, @r;
            for (@r){
                $nexttot += $_->{key} ? ($_->{count} / 8) : $_->{count};
            }
        }
        push @res, @lres;
        @todo = ();
        last unless @lres;				# reached the bottom of the tree
        last if @res > 64;				# got enough results
        last if (@lres > 2) && ($nexttot > @lres + 2);	# less sparse region
        # get the next level also
        @todo = @lres;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		  => 'yenta_check',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        }, { check => \@res } );

    debug("sending check reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response );

}

# get + process merkle data
sub _get_check {
    my $map   = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    my $res = store_get_merkle($map, $shard, $ver, $lev);
    return unless $res;
    for my $r (@$res) {
        $r->{map}   = $map;
    }

    return @$res;
}

sub api_distrib {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# reference

    # decode request
    my $req;
    eval {
        $req = ACPYentaDistRequest->decode( $gpb );
        die "invalid k/v for put request\n" unless $req->{datum}{key} && $req->{datum}{version};
    };
    if(my $e = $@){
        my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
        problem("cannot decode request: peer: $io->{peerip} $enc, $e");
        $io->shut();
        return;
    }

    unless( conf_map( $req->{datum}{map} ) ){
        problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
        _reply_error($io, $proto, 404, 'Map Not Found');
        return;
    }

    my $v = $req->{datum};

    # do we already have ?
    my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );

    if( $want ){
        # put

        debug("put request from $io->{peerip}: $v->{map}, $v->{key}, $v->{version}");

        # file content is passed by reference, to avoid large copies
        $content ||= \ $v->{file} if $v->{file};

        # check
        if( $v->{meta} && $content ){
            unless( _check_content( $v->{meta}, $content ) ){
                problem("content SHA1 check failed: $req->{datum}{map}, $req->{datum}{key}, $req->{datum}{version}");
                $io->shut();
                return;
            }
        }

        $want = store_put( $v->{map}, $v->{shard}, $v->{key}, $v->{version}, $v->{value},
                   $content, $v->{meta} );

        # distribute to other systems
        AC::Yenta::Store::Distrib->new( $req, $content ) if $want;
    }else{
        debug("put from $io->{peerip} unwanted: $v->{map}, $v->{shard}, $v->{key}, $v->{version}");
    }


    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, { status_code => 200, status_message => 'OK', haveit => !$want } );

    debug("sending distrib reply");
    $io->timeout_rel($TIMEOUT);
    $io->write_and_shut( $response );

}

sub _check_content {
    my $meta = shift;
    my $cont = shift;

    return 1 unless $meta && $meta =~ /^\{/;

    eval {
        $meta = decode_json($meta);
    };
    return 1 if $@;

    if( $meta->{sha1} ){
        my $chk = sha1_base64( $$cont );
        return unless $chk eq $meta->{sha1};
    }
    if( $meta->{size} ){
        my $len = length($$cont);
        return unless $len == $meta->{size};
    }

    return 1;
}

sub _reply_error {
    my $io    = shift;
    my $proto = shift;
    my $code  = shift;
    my $msg   = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        is_error	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, {
        status_code     => $code,
        status_message  => $msg,
        haveit		=> 0,
    } );

    debug("sending distrib reply");
    $io->write_and_shut( $response );
}

1;

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

require 'AC/protobuf/heartbeat.pl';

my $FREQ	= 2;
my $OLD_DOWN	= 30;
my $OLD_KEEP	= 1800;

my %MON;	# by 'id' (from config file)

sub init {

    AC::DC::Sched->new(
        info	=> 'monitor',
        freq	=> $FREQ,
        func	=> \&periodic,
       );
}

sub periodic {

    my $mon = conf_value('monitor');

    # clean up old data
    for my $id (keys %MON){
        isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
    }

    # start monitoring (send heartbeat request)
    for my $m (@$mon){
        my $ip   = $m->{ipa};
        my $port = $m->{port};
        my $id   = "$ip:$port";
        debug("start monitoring $id");

        my $ok = AC::Yenta::Monitor::Client->new( $ip, $port,
                                                info 		=> "monitor client: $id",
                                                monitor_peer	=> $id,
                                               );

        isdown($id, 0) unless $ok;
    }
}

# data for kibitzing around
# array of ACPYentaStatus
sub export {

    my @d;
    my $here = my_datacenter();
    my $self = my_server_id();

    for my $v (values %MON){

        push @d, {
            id			=> $v->{id},	# from config, typically localhost:port
            datacenter		=> $here,
            via			=> $self,
            hostname		=> $v->{hostname},
            subsystem		=> $v->{subsystem},
            environment		=> $v->{environment},
            status		=> $v->{status_code},
            timestamp		=> $v->{timestamp},
            lastup		=> $v->{lastup},
            sort_metric		=> $v->{sort_metric},
            capacity_metric	=> $v->{capacity_metric},
            server_id		=> $v->{server_id},
            instance_id 	=> $v->{server_id},
            ip			=> $v->{ip},
            path		=> '.',
        };
    }
    return @d;
}

sub isdown {
    my $id   = shift;
    my $code = shift;

    my $d = $MON{$id};
    return unless $d;

    # require 2 polls to fail
    return unless $^T - $d->{lastup} >= 2 * $FREQ;

    $code = 0 if $code == 200;
    $d->{status_code} = $code || 0;
    $d->{timestamp}   = $^T;

    debug("monitor $id is down");

    if( $d->{lastup} < $^T - $OLD_KEEP ){
        debug("monitor $id down too long. removing from report");
        delete $MON{$id};
    }
}

sub update {
    my $id = shift;
    my $gb = shift;	# ACPHeartbeat

    my $up;
    eval {
        $up = ACPHeartBeat->decode( $gb );
        $up->{id} = $id;
    };
    if(my $e = $@){
        problem("cannot decode hb data: $e");
        return isdown($id, 0);
    }
    unless( $up->{status_code} == 200 ){
        return isdown($id, $up->{status_code});
    }
    return isdown($id, 0) unless $^T - $up->{timestamp} < $OLD_DOWN;

    debug("monitor $id is up");
    $up->{lastup} = $^T;
    $up->{downcount} = 0;

    _hb_ip_info( $up, $MON{$id} );
    $MON{$id} = $up;
}

sub _hb_ip_info {
    my $up  = shift;
    my $old = shift;

    my $ip;

    $ip = $old->{ip} if ($old->{process_id} == $up->{process_id}) && ($old->{server_id} eq $up->{server_id});

    unless( $ip ){
        my $port = $up->{port};
        unless( $port ){
            # use monitored port (id is from config)
            (undef, $port) = split /:/, $up->{id};
        }

        if( $up->{ip} ){
            $ip = [ { ipv4 => $up->{ip}, port => $port, natdom => undef } ];
        }else{
            my $mynat = my_network_info();

            for my $i ( @$mynat ){
                push @$ip, { ipv4 => $i->{ipi}, port => $port, natdom => $i->{natdom} };
            }
        }
    }

    $up->{ip} = $ip;
}

1;

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN

use AC::Yenta::IO::TCP::Client;
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting monitor status client');
    my $me = $class->SUPER::new( @_ );
    unless($me){
        return;
    }

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    $me->start();

    # build request
    my $yp  = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $hdr = $yp->encode_header(
        type		=> 'heartbeat_request',
        data_length	=> 0,
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,
       );

    # write request
    $me->write( $hdr );

    $me->timeout_rel($TIMEOUT);

    return $me;
}

sub timeout {
    my $me = shift;
    $me->shut();
}

sub shutdown {
    my $me = shift;

    unless( $me->{status_ok} ){
        AC::Yenta::Monitor::isdown( $me->{monitor_peer} );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new();
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;
    AC::Yenta::Monitor::update( $me->{monitor_peer}, $data );

    $me->shut();
}


1;

lib/AC/Yenta/MySelf.pm  view on Meta::CPAN



1;

=head1 NAME

AC::Yenta::MySelf - customize yenta to your own environment

=head1 SYNOPSIS

    emacs /myperldir/Local/Yenta/MySelf.pm
    copy. paste. edit.

    use lib '/myperldir';
    my $y = AC::Yenta::D->new(
        class_myself        => 'Local::Yenta::MySelf',
    );

=head1 DESCRIPTION

provide functions to override default behavior. you may define
any or all of the following functions.

=head2 my_server_id

return a unique identity for this yenta instance. typically,
something similar to the server hostname.

    sub my_server_id {
        return 'yenta@' . hostname();
    }

=head2 my_datacenter

return the name of the local datacenter. yenta will use this
to determine which systems are local (same datacenter) and
which are remote (different datacenter), and will tune various
behaviors accordingly.

    sub my_datacenter {
        my($domain) = hostname() =~ /^[\.]+\.(.*)/;
        return $domain;
    }

=head2 my_network_info

return information about the various networks this server has.

    sub my_network_info {
        my $public_ip = inet_ntoa(scalar gethostbyname(hostname()));
        my $privat_ip = inet_ntoa(scalar gethostbyname('internal-' . hostname()));


        return [
            # use this IP for communication with servers this datacenter (same natdom)
            { ip => $privat_ip, natdom => my_datacenter() },
            # otherwise use this IP
            { ip => $public_ip },
        ]
    }

=head2 init

inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.

    my $HOSTNAME;
    my $DOMAIN;
    sub init {
        $HOSTNAME = hostname();
        ($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
    }

=head1 BUGS

none. you write this yourself.

=head1 SEE ALSO

    AC::Yenta

=head1 AUTHOR

    You!

=cut

lib/AC/Yenta/NetMon.pm  view on Meta::CPAN

use strict;

my $STALE	= 120;

my %lastok;	# natdom => T
my %natdom;	# ip => natdom


sub init {

    my $natinfo = my_network_info();
    for my $n ( @$natinfo ){
        my $dom = $n->{natdom} || 'public';
        $natdom{ $n->{ipa} } = $dom;
        $lastok{ $dom } = $^T;	# assume everything is working
    }
}

sub update {
    my $io = shift;

    my $ip  = inet_ntoa( (sockaddr_in(getsockname($io->{fd})))[1] );
    my $dom = $natdom{ $ip } || 'public';

    $lastok{$dom} = $^T;
}

sub status_dom {
    my $dom = shift;

    $dom ||= 'public';
    return unless exists $lastok{$dom};	# not local
    return ($lastok{$dom || 'public'} + $STALE < $^T) ? 0 : 200;
}


1;

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

require 'AC/protobuf/yenta_status.pl';
require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';


our @ISA    = 'AC::DC::Protocol';
our @EXPORT = qw(read_protocol use_encryption);
my $HDRSIZE = __PACKAGE__->header_size();

my %MSGTYPE =
 (
  heartbeat_request	=> { num => 2, reqc => '', 			resc => 'ACPHeartBeat' },

  yenta_status		=> { num => 6, reqc => 'ACPYentaStatusRequest', resc => 'ACPYentaStatusReply' },
  yenta_get		=> { num => 7, reqc => 'ACPYentaGetSet',        resc => 'ACPYentaGetSet' },
  yenta_distrib		=> { num => 8, reqc => 'ACPYentaDistRequest',   resc => 'ACPYentaDistReply' },
  yenta_check		=> { num => 9, reqc => 'ACPYentaCheckRequest',  resc => 'ACPYentaCheckReply' },
 );


for my $name (keys %MSGTYPE){
    my $r = $MSGTYPE{$name};
    __PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


sub read_protocol {
    my $me  = shift;
    my $io  = shift;
    my $evt = shift;

    $io->{rbuffer} .= $evt->{data};

    return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;

    if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
        # decode header
        eval {
            $io->{proto_header} = $me->decode_header( $io->{rbuffer} );
        };
        if(my $e=$@){
            verbose("cannot decode protocol header: $e");
            $io->run_callback('error', {
                cause	=> 'read',
                error	=> "cannot decode protocol: $e",
            });
            $io->shut();
            return;
        }
    }

    my $p = $io->{proto_header};
    return unless $p; 	# read more

    # do we have everything?
    return unless length($io->{rbuffer}) >= ($p->{auth_length} + $p->{data_length} + $p->{content_length} + $HDRSIZE);

    my $auth    = substr($io->{rbuffer}, $HDRSIZE,  $p->{auth_length});
    my $data    = substr($io->{rbuffer}, $HDRSIZE + $p->{auth_length},  $p->{data_length});
    my $content = substr($io->{rbuffer}, $HDRSIZE + $p->{auth_length} + $p->{data_length}, $p->{content_length});

    # RSN - validate auth

    if( $p->{data_encrypted} && $data ){
        $data = $me->_decrypt_data( $io, $auth, $data );
        return unless $data;
    }

    if( $p->{content_encrypted} && $content ){
        $content = $me->_decrypt_data( $io, $auth, $content );
        return unless $content;
    }

    # content is passed as reference
    return ($p, $data, ($content ? \$content : undef));
}

# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
    my $io  = shift;
    my $evt = shift;

    return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
    my($get, $url, $http) = split /\s+/, $io->{rbuffer};

    return ( { type => 'http', method => $get }, $url );
}

################################################################

sub _decrypt_data {
    my $me   = shift;
    my $io   = shift;
    my $auth = shift;
    my $data = shift;

    eval {
        $data = $me->decrypt( $auth, $data );
    };
    if(my $e=$@){
        verbose("cannot decrypt protocol data: $e");
        $io->run_callback('error', {
            cause	=> 'read',
            error	=> "cannot decrypt protocol: $e",
        });
        $io->shut();
        return;
    }

    return $data;
}

sub use_encryption {
    my $peer = shift;

    return unless conf_value('secret');
    # only encrypt far-away traffic, not local
    return $peer->{datacenter} ne my_datacenter();
}

sub encrypt {
    my $me    = shift;
    my $auth  = shift;	# not currently used
    my $buf   = shift;

    my $secret = $me->{secret};
    return $buf unless $secret;
    return unless $buf;
    my $crypto = AC::Yenta::Crypto->new( $secret );
    return $crypto->encrypt( $buf );
}

sub decrypt {
    my $me    = shift;
    my $abuf  = shift;	# not currently used
    my $buf   = shift;

    my $secret = $me->{secret};
    return $buf unless $secret;
    return unless $buf;
    my $crypto = AC::Yenta::Crypto->new( $secret );
    return $crypto->decrypt( $buf );
}

1;

lib/AC/Yenta/Server.pm  view on Meta::CPAN

use AC::Yenta::Kibitz::Status::Server;
use AC::Yenta::Kibitz::Store::Server;
use strict;

our @ISA = 'AC::DC::IO::TCP';

my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;

my %HANDLER = (
    yenta_status	=> 'AC::Yenta::Kibitz::Status::Server',
    yenta_get		=> \&AC::Yenta::Kibitz::Store::Server::api_get,
    yenta_distrib	=> \&AC::Yenta::Kibitz::Store::Server::api_distrib,
    yenta_check		=> \&AC::Yenta::Kibitz::Store::Server::api_check,
    http		=> 'AC::Yenta::Stats',
   );

sub new {
    my $class = shift;
    my $fd    = shift;
    my $ip    = shift;

    unless( $AC::Yenta::CONF->check_acl( $ip ) ){
        verbose("rejecting connection from $ip");
        return;
    }

    my $me = $class->SUPER::new( peerip => $ip, info => "tcp yenta server (from: $ip)" );

    $me->init($fd);
    $me->wantread(1);
    $me->timeout_rel($TIMEOUT);
    $me->set_callback('read',    \&read);
    $me->set_callback('timeout', \&timeout);
}

sub timeout {
    my $me = shift;

    debug("connection timed out");
    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

    unless( $h ){
        verbose("unknown message type: $proto->{type}");
        $me->shut();
        return;
    }

    debug("handling request - $proto->{type}");

    if( ref $h ){
        $h->( $me, $proto, $data, $content );
    }else{
        $h->handler( $me, $proto, $data, $content );
    }
}


1;

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 2.120 seconds using v1.00-cache-2.02-grep-82fe00e-cpan-cec75d87357c )