AC-Yenta

 view release on metacpan or  search on metacpan

LICENSE  view on Meta::CPAN

    This software may be copied and distributed under the terms
    found in the Perl "Artistic License".

    A copy of the "Artistic License" may be found in the standard
    Perl distribution.

MANIFEST  view on Meta::CPAN

lib/AC/Yenta/Debug.pm
lib/AC/Yenta/NetMon.pm
lib/AC/Yenta/Config.pm
lib/AC/Yenta/MySelf.pm
lib/AC/Yenta/Status.pm
lib/AC/Yenta/Default/MySelf.pm
lib/AC/Yenta/Store/BDBI.pm
lib/AC/Yenta/Store/Map.pm
lib/AC/Yenta/Store/Merkle.pm
lib/AC/Yenta/Store/LevelDB.pm
lib/AC/Yenta/Store/AE.pm
lib/AC/Yenta/Store/Distrib.pm
lib/AC/Yenta/Store/SQLite.pm
lib/AC/Yenta/Store/Sharded.pm
lib/AC/Yenta/Store/Tokyo.pm
lib/AC/Yenta/Store/File.pm
lib/AC/Yenta/Store/Expire.pm
lib/AC/Yenta/SixtyFour.pm
lib/AC/Yenta/IO/TCP/Client.pm
lib/AC/Yenta/Conf.pm
lib/AC/Yenta/Stats.pm
lib/AC/Yenta/D.pm
lib/AC/Yenta/Direct.pm
lib/AC/Yenta/Customize.pm
lib/AC/Yenta/Protocol.pm
lib/AC/Yenta/Monitor/Client.pm
lib/AC/Yenta/Kibitz/Status/Server.pm
lib/AC/Yenta/Kibitz/Status/Client.pm
lib/AC/Yenta/Kibitz/Store/Server.pm
lib/AC/Yenta/Kibitz/Store/Client.pm
lib/AC/Yenta/Kibitz/Status.pm
lib/AC/Yenta/AC/MySelf.pm
lib/AC/Yenta/Store.pm
lib/AC/Yenta/Client.pm
lib/AC/Yenta/Server.pm
lib/AC/Yenta/Crypto.pm
lib/AC/Yenta/Monitor.pm
lib/AC/protobuf/heartbeat.pl
lib/AC/protobuf/std_reply.pl
lib/AC/protobuf/auth.pl
lib/AC/protobuf/yenta_status.pl
lib/AC/protobuf/yenta_getset.pl
lib/AC/protobuf/std_ipport.pl
lib/AC/protobuf/yenta_check.pl
lib/AC/Yenta.pm
LICENSE
eg/yenta_get_direct
eg/myself.pm
eg/yentad
eg/yenta.conf
eg/yenta_get
eg/yenta_put
MANIFEST
proto/yenta_status.proto
proto/std_reply.proto
proto/yenta_check.proto
proto/std_ipport.proto
proto/yenta_getset.proto
proto/heartbeat.proto
proto/auth.proto
Makefile.PL
META.yml                                 Module meta-data (added by MakeMaker)

META.yml  view on Meta::CPAN

--- #YAML:1.0
name:               AC-Yenta
version:            1.1
abstract:           eventually-consistent distributed key/value data store. et al.
author:
    - AdCopy <http://www.adcopy.com>
license:            perl
distribution_type:  module
configure_requires:
    ExtUtils::MakeMaker:  0
requires:
    AC::DC:               0
    BerkeleyDB:           0
    Crypt::Rijndael:      0
    Digest::SHA:          0
    Google::ProtocolBuffers:  0
    JSON:                 0
    POSIX:                0
    Sys::Hostname:        0
    Time::HiRes:          0
no_index:
    directory:
        - t
        - inc
generated_by:       ExtUtils::MakeMaker version 6.48
meta-spec:
    url:      http://module-build.sourceforge.net/META-spec-v1.4.html
    version:  1.4

Makefile.PL  view on Meta::CPAN


use ExtUtils::MakeMaker;
WriteMakefile(
              NAME            => 'AC::Yenta',
              VERSION_FROM    => 'lib/AC/Yenta.pm',
              ABSTRACT_FROM   => 'lib/AC/Yenta.pm',
              AUTHOR          => 'AdCopy <http://www.adcopy.com>',
              LICENSE         => 'perl',
              PREREQ_PM       => {
                  'POSIX'                       => 0,
                  'Sys::Hostname'	        => 0,
                  'JSON'		        => 0,
                  'Digest::SHA'		        => 0,
                  'Crypt::Rijndael'		=> 0,
                  'BerkeleyDB'		        => 0,
                  'Time::HiRes'			=> 0,
                  'Google::ProtocolBuffers'	=> 0,
		  'AC::DC'			=> 0,
              }
);

eg/myself.pm  view on Meta::CPAN

# -*- perl -*-
# example myself

# $Id$

package Local::Yenta::MySelf;
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# our tcp port
    my $id    = shift;  # from cmd line

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.example.com//;	# remove domain
        $SERVERID = "yenta/$h";
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

eg/yenta.conf  view on Meta::CPAN

# example yenta config
#
# file will be reloaded automagically if it changes. no need to hup or restart.


port            3503

environment	prod

# save peer status in a file?
savestatus      /var/tmp/yenta.status           yenta

allow		127.0.0.1
allow           10.200.2.0/23

# seed peers to locate the network at startup
seedpeer        10.200.2.4:3503
seedpeer        10.200.2.5:3503


# enable debugging?
#debug           ae
#debug           map
#debug           merkle
# ...


# maps
map testyfoo {
    # name of the data file
    dbfile      /home/data/testyfoo.ydb
    # how much history to keep 
    history     4
}

eg/yenta_get  view on Meta::CPAN

#!/usr/local/bin/perl
# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-03 13:43 (EDT)
# Function: get value example
#
# $Id$

use AC::Yenta::Client;
use AC::Dumper;
use strict;

my $map = shift @ARGV;
my $key = shift @ARGV;
die "usage: get [-h host] map key\n" unless $map && $key;

my $y   = AC::Yenta::Client->new(
    # server_file, servers[], or host + port
    server_file	=> '/var/tmp/yenta.status',
    debug 	=> \&debug,
   );

my $res = $y->get($map, $key);

print dumper($res), "\n";


exit;

sub debug {
    print STDERR @_, "\n";
}

eg/yenta_get_direct  view on Meta::CPAN

#!/usr/local/bin/perl
# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Sep-16 12:00 (EDT)
# Function: get - using bdb file directly
#
# $Id$

use AC::Dumper;
use AC::Yenta::Direct;
use strict;

my $map  = shift @ARGV;
my $file = shift @ARGV;
my $key  = shift @ARGV;

my $y = AC::Yenta::Direct->new( $map, $file );
my $v = $y->get($key);
print  dumper($v), "\n";

eg/yenta_put  view on Meta::CPAN

#!/usr/local/bin/perl
# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-01 12:17 (EDT)
# Function: put example
#
# $Id$

use AC::Yenta::Client;
use Time::HiRes 'time';
use JSON;
use strict;


my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });


my $key = 'YX3jSXD3CBRUDABm';

my $res = $ys->distribute(
    # map, key, version, data
    'mymap', $key, timet_to_yenta_version(time()),
    encode_json( {
        url_id	=> $key,
        url	=> 'http://www.example.com',
        acc_id	=> 'C9TdSgbUCBRUCABG',
        format	=> 'html',
    }),
   );

eg/yentad  view on Meta::CPAN

#!/usr/local/bin/perl
# -*- perl -*-
#
# example yenta daemon

use Getopt::Std;
use AC::Yenta::D;
use strict;

my @saved_argv = @ARGV;
my %OPT;

getopts('c:dfp:', \%OPT) || die "usage...\n";
    # -c config file
    # -d    enable all debugging
    # -f    foreground
    # -p port


my $y = AC::Yenta::D->new(
    class_myself	=> 'My::Code::MySelf',
   );


$y->daemon( $OPT{c}, {
    argv	=> \@saved_argv,
    foreground	=> $OPT{f},
    debugall	=> $OPT{d},
    port	=> $OPT{p},
} );

lib/AC/Yenta.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-May-13 18:18 (EDT)
# Function: documentation
#
# $Id$

package AC::Yenta;
use strict;

our $VERSION = 1.1;

=head1 NAME

AC::Yenta - eventually-consistent distributed key/value data store. et al.

=head1 SYNOPSIS

    use AC::Yenta::D;
    use strict;

    my $y = AC::Yenta::D->new( );

    $y->daemon( $configfile, {
      argv		=> \@ARGV,
      foreground	=> $OPT{f},
      debugall		=> $OPT{d},
      port		=> $OPT{p},
    } );

    exit;


=head1 USAGE

    Copy + Paste from the example code into your own code.
    Copy + Paste from the example config into your own config.
    Send in bug report.

=head1 YIDDISH-ENGLISH GLOSSARY

	Kibitz - Gossip. Casual information exchange with ones peers.

	Yenta - 1. An old woman who kibitzes with other yentas.
		2. Software which kibitzes with other yentas.


=head1 DESCRIPTION

=head2 Peers

All of the running yentas are peers. There is no master server.
New nodes can be added or removed on the fly with no configuration.

=head2 Kibitzing

Each yenta kibitzes (gossips) with the other yentas in the network
to exchange status information, distribute key-value data, and
detect and correct inconsistent data.

=head2 Eventual Consistency

Key-value data is versioned with timestamps. By default, newest wins.
Maps can be configured to keep and return multiple versions and client
code can use other conflict resolution mechanisms.

Lost, missing or otherwise inconsistent data is detected
by kibitzing merkle tree hash values.

=head2 Topological awareness

Yentas can take network topology into account when tranferring
data around to minimize long-distance transfers. You will need to
write a custom C<MySelf> class with a C<my_datacenter> function.

=head2 Multiple Network Interfaces / NAT

Yentas can take advantage of multiple network interfaces with
different IP addresses (eg. a private internal network + a public network),
or multiple addresses (eg. a private addresses and a public address)
and various NAT configurations.

You will need to write a custom C<MySelf> class and C<my_network_info>
function.

=head2 Network Information

By default, yentas obtain their primary IP address by calling
C<gethostbyname( hostname() )>. If this either does not work on your
systems, or isn't the value you want to use,
you will need to write a custom C<MySelf> class and C<my_network_info>
function.



=head1 CONFIG FILE

various parameters need to be specified in a config file.
if you modify the file, it will be reloaded automagically.

=over 4

=item port

specify the TCP port to use

    port 3503

=item environment

specify the environment or realm to run in, so you can run multiple
independent yenta networks, such as production, staging, and dev.

    environment prod

=item allow

specify networks allowed to connect.

    allow 127.0.0.1
    allow 192.168.10.0/24

=item seedpeer

specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.

    seedpeer 192.168.10.11:3503
    seedpeer 192.168.10.12:3503

=item secret

specify a secret key used to encrypt data transfered between
yentas in different datacenters.

    secret squeamish-ossifrage

=item syslog

specify a syslog facility for log messages.

    syslog local5

=item debug

enable debugging for a particular section

    debug map

=item map

configure a map (a collection of key-value data). you do not need
to configure the same set of maps on all servers. maps should be
configured similarly on all servers that they are on.

    map users {
	backend	    bdb
        dbfile      /home/acdata/users.ydb
        history     4
    }

=back

=head1 BUGS

Too many to list here.

=head1 SEE ALSO

    AC::Yenta::Client

    Amazon Dynamo - http://www.allthingsdistributed.com/2007/10/amazons_dynamo.html

=head1 AUTHOR

    Jeff Weisberg - http://www.solvemedia.com/

=cut

1;

lib/AC/Yenta/AC/MySelf.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 17:23 (EST)
# Function: 
#
# $Id$

package AC::Yenta::AC::MySelf;
use AC::Yenta::Config;
use AC::Yenta::Debug;
use AC::DataCenter;	# provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.adcopy.*//;
        my $v = conf_value('environment');
        $SERVERID = 'yenta';
        $SERVERID .= '/' . $v unless $v eq 'prod';
        $SERVERID .= '@' . $h;
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

lib/AC/Yenta/Client.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-07 11:37 (EDT)
# Function: for other programs to talk to yentad
#
# $Id$

package AC::Yenta::Client;
use AC::Yenta::Conf;
use AC::DC::Protocol;
use AC::Import;
use AC::Misc;
use Sys::Hostname;
use JSON;
use Digest::SHA 'sha1';
use Socket;
use strict;

require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';

our @EXPORT = 'timet_to_yenta_version';	# imported from Y/Conf

my $HOSTNAME = hostname();

my %MSGTYPE =
 (
  yenta_get		=> { num => 7, reqc => 'ACPYentaGetSet',        resc => 'ACPYentaGetSet' },
  yenta_distrib		=> { num => 8, reqc => 'ACPYentaDistRequest',   resc => 'ACPYentaDistReply' },
  yenta_check		=> { num => 9, reqc => 'ACPYentaCheckRequest',  resc => 'ACPYentaCheckReply' },
 );

for my $name (keys %MSGTYPE){
    my $r = $MSGTYPE{$name};
    AC::DC::Protocol->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


# one or more of:
#   new( host, port )
#   new( servers => [ { host, port }, ... ] )
#   new( server_file )

sub new {
    my $class = shift;

    my $me = bless {
        debug	=> sub{ },
        host	=> 'localhost',
        proto	=> AC::DC::Protocol->new(),
        copies	=> 1,
        @_,
    }, $class;

    $me->{server_file} ||= $me->{altfile};	# compat

    die "servers or server_file?\n" unless $me->{servers} || $me->{server_file};

    return $me;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $key = shift;
    my $ver = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_get',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        data	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
        } ]
    } );

    return $me->_send_request($map, $req);
}

sub _shard {
    my $key = shift;

    my $sh = sha1($key);
    my($a, $b) = unpack('NN', $sh);
    return $a<<32 | $b;
}


sub distribute {
    my $me   = shift;
    my $map  = shift;
    my $key  = shift;
    my $ver  = shift;
    my $val  = shift;
    my $file = shift;	# reference
    my $meta = shift;

    return unless $key && $ver;
    $me->{retries} = 25 unless $me->{retries};

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_distrib',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        sender		=> "$HOSTNAME/$$",
        hop		=> 0,
        expire		=> time() + 120,
        datum	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
            shard	=> _shard($key),	# NYI
            value	=> $val,
            meta	=> $meta,
        } ]
    }, $file );

    return $me->_send_request($map, $req, $file);
    # return undef | result
}

sub check {
    my $me  = shift;
    my $map = shift;
    my $ver = shift;
    my $lev = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        map		=> $map,
        level		=> $lev,
        version		=> $ver,
    } );

    return $me->_send_request($map, $req);
}

################################################################

sub _send_request {
    my $me   = shift;
    my $map  = shift;
    my $req  = shift;
    my $file = shift;	# reference

    my $tries = $me->{retries} + 1;
    my $copy  = $me->{copies} || 1;
    my $delay = 0.25;

    $me->_init_hostlist($map);
    my ($addr, $port) = $me->_next_host($map);

    for (1 .. $tries){
        return unless $addr;
        my $res = $me->_try_server($addr, $port, $req, $file);
        return $res if $res && !--$copy;
        ($addr, $port) = $me->_next_host($map);
        sleep $delay;
        $delay *= 1.414;
    }
}

sub _try_server {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;
    my $file = shift;	# reference

    my $ipn = inet_aton($addr);
    $req .= $$file if $file;

    $me->{debug}->("trying to contact yenta server $addr:$port");
    my $res;
    eval {
        $res = $me->{proto}->send_request($ipn, $port, $req, $me->{debug}, $me->{timeout});
        $res->{data} = $me->{proto}->decode_reply( $res ) if $res;
    };
    if(my $e = $@){
        $me->{debug}->("yenta request failed: $e");
        $res = undef;
    }
    return $res;
}


################################################################

sub _next_host {
    my $me  = shift;
    my $map = shift;

    $me->_read_serverfile($map) unless $me->{_server};
    return unless $me->{_server} && @{$me->{_server}};
    my $next = shift @{$me->{_server}};
    return( $next->{addr}, $next->{port} );
}

sub _init_hostlist {
    my $me  = shift;
    my $map = shift;

    my @server;
    push @server, {
        addr	=> $me->{host},
        port	=> $me->{port},
    } if $me->{host} && $me->{port};

    push @server, @{$me->{servers}} if $me->{servers};
    $me->{_server} = \@server;

    $me->_read_serverfile($map);
}

# yentad saves a list of alternate peers to try in case it dies
sub _read_serverfile {
    my $me  = shift;
    my $map = shift;

    my $f;
    my @server;
    my @faraway;
    open($f, $me->{server_file});
    local $/ = "\n";
    while(<$f>){
        chop;
        my $data = decode_json( $_ );
        next unless grep { $_ eq $map } @{ $data->{map} };
        if( $data->{is_local} ){
            push @server, { addr => $data->{addr}, port => $data->{port} };
        }else{
            push @faraway, { addr => $data->{addr}, port => $data->{port} };
        }
    }

    # prefer local
    @server = @faraway unless @server;

    shuffle( \@server );
    push @{$me->{_server}}, @server;
}

################################################################

1;

lib/AC/Yenta/Conf.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-13 12:34 (EDT)
# Function: defs
#
# $Id$

package AC::Yenta::Conf;
use AC::Yenta::SixtyFour;
use AC::Import;
use strict;

our @EXPORT = qw(timet_to_yenta_version_factor timet_to_yenta_version);

my $TTVF = x64_one_million();


# deprecated
sub timet_to_yenta_version_factor {
    return $TTVF;
}

sub timet_to_yenta_version {
    my $t = shift;

    return unless defined $t;
    return $t * $TTVF unless ref $TTVF;

    # math::bigint does not like to multiply floats
    my $ti = int $t;
    my $tf = $t - $ti;

    return $ti * $TTVF + int($TTVF->numify() * $tf);
}

1;

lib/AC/Yenta/Config.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-27 17:31 (EDT)
# Function: 
#
# $Id$

package AC::Yenta::Config;
use AC::Misc;
use AC::Import;
use AC::DC::Debug;
use AC::ConfigFile::Simple;
use Socket;
use strict;

our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value conf_map);


my %CONFIG = (

    include	=> \&AC::ConfigFile::Simple::include_file,
    debug	=> \&AC::ConfigFile::Simple::parse_debug,
    allow	=> \&AC::ConfigFile::Simple::parse_allow,
    port	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    environment => \&AC::ConfigFile::Simple::parse_keyvalue,
    secret 	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    seedpeer	=> \&AC::ConfigFile::Simple::parse_keyarray,

    ae_maxload	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    distrib_max	=> \&AC::ConfigFile::Simple::parse_keyvalue,

    savestatus	=> \&parse_savefile,
    monitor	=> \&parse_monitor,
    map		=> \&parse_map,

);

my @MAP = qw(dbfile basedir keepold history expire backend sharded);


################################################################

sub handle_config {
    my $me   = shift;
    my $key  = shift;
    my $rest = shift;

    my $fnc = $CONFIG{$key};
    return unless $fnc;
    $fnc->($me, $key, $rest);
    return 1;
}

################################################################

sub parse_map {
    my $me    = shift;
    my $key   = shift;
    my $value = shift;

    my($map) = $value =~ /(\S+)\s+\{\s*/;
    die "invalid map spec\n" unless $map;

    my $md = {};
    problem("map '$map' redefined") if $me->{_pending}{map}{$map};

    while( defined(my $l = $me->_nextline()) ){
        last if $l eq '}';
        my($k, $v) = split /\s+/, $l, 2;

        if( grep {$_ eq $k} @MAP ){
            $v = cvt_timespec($v) if $k eq 'expire';
            $md->{$k} = $v;
        }else{
            problem("unknown map option '$k'");
        }
    }

    $me->{_pending}{map}{$map} = $md;
}

sub parse_monitor {
    my $me  = shift;
    my $key = shift;
    my $mon = shift;

    my($ip, $port) = split /:/, $mon;
    push @{$me->{_pending}{monitor}}, {
        monitor	=> $mon,
        ipa	=> $ip,
        ipn	=> inet_aton($ip),
        ipi	=> inet_atoi($ip),
        port	=> $port,
    };
}

sub parse_savefile {
    my $me   = shift;
    my $key  = shift;
    my $save = shift;

    my($file, @type) = split /\s+/, $save;
    push @{$me->{_pending}{savestatus}}, {
        type	=> \@type,
        file	=> $file,
    };
}

sub cvt_timespec {
    my $t = shift;

    my %f = ( m => 60, h => 3600, d => 86400 );
    my($n, $f) = $t =~ /(\d+)(\D?)/;

    $f = $f{$f} || 1;
    return $n * $f;
}


################################################################

sub conf_value {
    my $key = shift;

    return $AC::Yenta::CONF->{config}{$key};
}

sub conf_map {
    my $map = shift;

    return $AC::Yenta::CONF->{config}{map}{$map};
}


1;

lib/AC/Yenta/Crypto.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jun-16 10:50 (EDT)
# Function: 
#
# $Id$

package AC::Yenta::Crypto;
use AC::Yenta::Debug 'crypto';
use AC::Misc;
use Time::HiRes 'time';
use Crypt::Rijndael;
use Digest::SHA  qw(sha256 hmac_sha256_base64);
use strict;

require 'AC/protobuf/auth.pl';

my $ALGORITHM = 'x-acy-aes-1';

sub new {
    my $class  = shift;
    my $secret = shift;

    return bless {
        secret	=> $secret,
    }, $class;
}

sub encrypt {
    my $me  = shift;
    my $buf = shift;

    my $seqno  = int( time() * 1_000_000 );
    my $nonce  = random_text(48);
    my $key    = $me->_key($seqno, $nonce);
    my $iv     = $me->_iv($key, $seqno, $nonce);

    # pad
    my $pbuf = $buf;
    $pbuf .= "\0" x (16 - length($pbuf) & 0xF) if length($pbuf) & 0xF;

    my $aes    = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
    $aes->set_iv( $iv );
    my $ct     = $aes->encrypt( $pbuf );
    my $hmac   = hmac_sha256_base64($ct, $key);

    my $eb     = ACPEncrypt->encode( {
        algorithm	=> $ALGORITHM,
        seqno		=> $seqno,
        nonce		=> $nonce,
        hmac		=> $hmac,
        length		=> length($buf),
        ciphertext	=> $ct,
    } );

    debug("encrypted <$seqno,$nonce,$hmac>");

    return $eb;
}

sub decrypt {
    my $me  = shift;
    my $buf = shift;

    my $ed     = ACPEncrypt->decode( $buf );
    die "cannot decrypt: unknown alg\n" unless $ed->{algorithm} eq $ALGORITHM;

    my $seqno  = $ed->{seqno},
    my $nonce  = $ed->{nonce};
    my $key    = $me->_key($seqno, $nonce);
    my $iv     = $me->_iv($key, $seqno, $nonce);

    my $hmac   = hmac_sha256_base64($ed->{ciphertext}, $key);
    die "cannot decrypt: hmac mismatch\n" unless $hmac eq $ed->{hmac};

    my $aes    = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
    $aes->set_iv( $iv );
    my $pt     = substr($aes->decrypt( $ed->{ciphertext} ), 0, $ed->{length});

    debug("decrypted <$seqno,$nonce,$hmac>");

    return $pt;
}


sub _key {
    my $me    = shift;
    my $seqno = shift;
    my $nonce = shift;

    return sha256( 'key1' . $me->{secret} . $seqno . $nonce . '1yek' );
}

sub _iv {
    my $me    = shift;
    my $key   = shift;
    my $seqno = shift;
    my $nonce = shift;

    return substr(sha256( 'iv'   . $key . $seqno ), 0, 16);
}


1;

lib/AC/Yenta/Customize.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 10:37 (EST)
# Function: connect user provided implementation
#
# $Id$

package AC::Yenta::Customize;
use strict;

sub customize {
    my $class  = shift;
    my $implby = shift;

    (my $default = $class) =~ s/(.*)::([^:]+)$/$1::Default::$2/;

    # load user's implemantation + default
    for my $p ($implby, $default){
        eval "require $p" if $p;
        die $@ if $@;
    }

    # import/export
    no strict;
    no warnings;
    for my $f ( @{$class . '::CUSTOM'} ){
        *{$class . '::' . $f} = ($implby && $implby->can($f)) || $default->can($f);
    }
}

1;

lib/AC/Yenta/D.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-May-12 15:57 (EDT)
# Function: yenta daemon
#
# $Id$

package AC::Yenta::D;

use AC::Daemon;
use AC::DC::IO { monitor => 'AC::Yenta::Stats' };
use AC::Yenta::Config;
use AC::Yenta::Debug;
use AC::Yenta::Stats;
use AC::Yenta::Monitor;
use AC::Yenta::Server;
use AC::Yenta::Status;
use AC::Yenta::Store;
use AC::Yenta::MySelf;
use AC::Yenta::NetMon;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
# use AC::Yenta::Store::Tokyo;

use strict;

sub new {
    my $class = shift;
    my %p = @_;

    AC::Yenta::MySelf->customize( $p{class_myself} );
    # ...

    return bless \$class, $class;
}

sub daemon {
    my $me    = shift;
    my $cfile = shift;
    my $opt   = shift;	# foreground, debugall, persistent_id, argv

    die "no config file specified\n" unless $cfile;

    # configure
    $AC::Yenta::CONF = AC::Yenta::Config->new(
        $cfile, onreload => sub {
            AC::Yenta::Store::configure();
        });


    initlog( 'yenta', (conf_value('syslog') || 'local5'), $opt->{debugall} );

    AC::Yenta::Debug->init( $opt->{debugall}, $AC::Yenta::CONF);
    daemonize(5, 'yentad', $opt->{argv}) unless $opt->{foreground};
    verbose("starting.");


    $SIG{CHLD} = $SIG{PIPE} = sub{};        				# ignore
    $SIG{INT}  = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit;  # abort

    # initialize subsystems
    my $port = $opt->{port} || conf_value('port');

    AC::Yenta::MySelf->init( $port, $opt->{persistent_id} );
    AC::Yenta::Store::configure();
    AC::Yenta::Status::init( $port );
    AC::Yenta::Monitor::init();
    AC::Yenta::NetMon::init();
    AC::DC::IO::TCP::Server->new( $port, 'AC::Yenta::Server' );
    verbose("server started on tcp/$port");


    # start "cronjobs"
    AC::DC::Sched->new(
        info	=> 'check config files',
        freq	=> 30,
        func	=> sub { $AC::Yenta::CONF->check() },
       );

    run_and_watch(
        ($opt->{foreground} || $opt->{debugall}),
        \&AC::DC::IO::mainloop,
       );
}


1;

lib/AC/Yenta/Debug.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-27 11:40 (EDT)
# Function: debugging + log msgs
#
# $Id$

package AC::Yenta::Debug;
use AC::DC::Debug;
our @ISA = 'AC::DC::Debug';
use strict;


1;

lib/AC/Yenta/Default/MySelf.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-18 18:10 (EST)
# Function: info about myself - default implementation
#
# $Id$

package AC::Yenta::Default::MySelf;
use AC::Yenta::Config;
use AC::Yenta::Debug;
use Sys::Hostname;
use Socket;
use strict;


my $SERVERID;
my $MYIP = inet_ntoa(scalar gethostbyname(hostname()));
die "cannot determine my IP addr.\nsee 'class_myself' in the documentation\n" unless $MYIP;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        $SERVERID = 'yenta/' . conf_value('environment') . '@' . hostname();
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

sub my_network_info {
    return [ { ipa => $MYIP } ];
}

sub my_datacenter {
    return 'default';
}

1;

lib/AC/Yenta/Direct.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-07 18:09 (EDT)
# Function: direct access (read-only) to yenta data file
#
# $Id$

package AC::Yenta::Direct;
use AC::Yenta::Store::Map;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
use AC::Yenta::Store::Tokyo;
use AC::Misc;
use AC::Import;
use strict;

our @EXPORT = 'yenta_direct_get';

sub yenta_direct_get {
    my $file = shift;
    my $map  = shift;
    my $key  = shift;

    my $me = __PACKAGE__->new($map, $file);
    return unless $me;
    return $me->get( $key );
}


sub new {
    my $class = shift;
    my $map   = shift;
    my $file  = shift;

    my $db = AC::Yenta::Store::Map->new( $map, undef, { dbfile => $file, readonly => 1 } );
    return unless $db;

    return bless { db => $db }, $class;
}

sub get {
    my $me  = shift;
    my $key = shift;

    my $db = $me->{db};
    return $db->get($key);
}

sub allkeys {
    my $me  = shift;

    return $me->getrange( '', '' );
}

sub getrange {
    my $me  = shift;
    my $lo  = shift;
    my $hi  = shift;

    my $db = $me->{db};

    return map { $_->{k} } $db->range( ($lo||''), ($hi||'') );
}

1;

lib/AC/Yenta/IO/TCP/Client.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Aug-10 12:38 (EDT)
# Function: choose best IP addr to use in NAT/cloud env
#
# $Id$

package AC::Yenta::IO::TCP::Client;
our @ISA = 'AC::DC::IO::TCP::Client';

use AC::Yenta::MySelf;
use AC::Misc;
use strict;

my $inited;
my $natdom;

sub use_addr_port {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;

    # is addr + port => return
    return ($addr, $port) unless ref $addr;

    # addr is array of nat ip info (ACPIPPort)

    _init() unless $inited;

    my $public;
    my $private;

    for my $i ( @$addr ){
        # skip unreachable networks
        my $ok = AC::Yenta::NetMon::status_dom( $i->{natdom} );
        next unless $ok == 200;
        $public  = $i unless $i->{natdom};
        $private = $i if $i->{natdom} eq $natdom;
    }

    # prefer private addr if available (cheaper)
    my $prefer = $private || $public;
    return unless $prefer;

    return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}

sub _init {

    # determine my local NAT domain
    my $nat = my_network_info();

    for my $i (@$nat){
        $natdom ||= $i->{natdom};
    }
    $inited = 1;
}

1;

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-30 10:20 (EDT)
# Function: exchange status info via gossip protocol
#
# $Id$

package AC::Yenta::Kibitz::Status;
use AC::Yenta::Kibitz::Status::Server;
use AC::Yenta::Kibitz::Status::Client;
use AC::Yenta::Debug 'kibitz_status';
use AC::Yenta::Config;
use AC::Yenta::Stats;
use AC::Yenta::MySelf;
use AC::Misc;

use Sys::Hostname;
use Socket;
require 'AC/protobuf/yenta_status.pl';
use strict;


my $HOSTNAME = hostname();

################################################################

sub _myself {

    my $maps = conf_value('map');

    my @ipinfo;
    my $natinfo = my_network_info();
    my $status  = 200;

    for my $i ( @$natinfo ){
        my $st = AC::Yenta::NetMon::status_dom( $i->{natdom} );

        # if a private network is down, announce the network + 500
        # if a private network is down, stop announcing it

        if( $i->{natdom} ){
            push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} }
              if $st == 200;
        }else{
            push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} };
            $status = 500 unless $st == 200;
        }
    }

    return {
        hostname	=> $HOSTNAME,
        datacenter	=> my_datacenter(),
        subsystem	=> 'yenta',
        environment	=> conf_value('environment'),
        via		=> AC::Yenta::Status->my_server_id(),
        server_id	=> AC::Yenta::Status->my_server_id(),
        instance_id	=> AC::Yenta::Status->my_instance_id(),
        path		=> '.',
        status		=> $status,
        uptodate	=> AC::Yenta::Store::AE->up_to_date(),
        timestamp   	=> $^T,
        lastup		=> $^T,
        ip		=> \@ipinfo,
        map		=> [ keys %$maps ],
        sort_metric	=> loadave() * 1000,
    };
}


################################################################

sub myself {

    # tell server about ourself
    return ACPYentaStatusRequest->encode({
        myself => _myself(),
    });
}

sub response {

    # send client everything we know
    my @peer = AC::Yenta::Status->allpeers();
    push @peer, _myself();
    # add the items we monitor
    push @peer, AC::Yenta::Monitor::export();

    return ACPYentaStatusReply->encode({
        status	=> \@peer,
    });
}

################################################################

# do not believe a client that says it is up
# put it on the sceptical queue, and check for ourself
sub update_sceptical {
    my $gpb = shift;
    my $io  = shift;

    return unless $gpb;
    my $c;
    eval {
        $c = ACPYentaStatusRequest->decode( $gpb );
        $c = $c->{myself};
    };
    if(my $e = $@){
        problem("cannot decode status data: $e");
        return;
    }

    my $id = $c->{server_id};

    # don't track myself
    return if AC::Yenta::Status->my_server_id() eq $id;

    AC::Yenta::Status->update_sceptical($id, $c, $io);
}

sub update {
    my $gpb = shift;

    return unless $gpb;
    my $c;
    eval {
        $c = ACPYentaStatusReply->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode status data: $e");
        return;
    }

    debug("rcvd update");
    my $myself = AC::Yenta::Status->my_server_id();

    for my $up (@{$c->{status}}){
        my $id = $up->{server_id};
        next if $up->{via} eq $myself;
        next if $id eq $myself;
        AC::Yenta::Status->update($id, $up);
    }
}

# unable to connect to server. mark it down
sub isdown {
    my $id = shift;

    return unless $id;

    AC::Yenta::Status->isdown( $id );
}
1;

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-30 10:20 (EDT)
# Function: 
#
# $Id$

package AC::Yenta::Kibitz::Status::Client;
use AC::Yenta::Protocol;
use AC::Yenta::Config;
use AC::Yenta::Debug 'status_client';
use AC::Yenta::IO::TCP::Client;
use AC::Dumper;
use AC::Misc;
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting kibitz status client');
    my $me = $class->SUPER::new( @_ );
    return unless $me;

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

sub use_addr_port {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;

    # is addr + port => return
    return ($addr, $port) unless ref $addr;

    # addr is array of nat ip info (ACPIPPort)

    my $down;
    my $public;
    my $private;

    for my $i ( @$addr ){
        # usually, skip unreachable networks
        my $ok = AC::Yenta::NetMon::status_dom( $i->{natdom} );
        next unless defined $ok;	# remote private network

        if( $ok == 200 ){
            if( $i->{natdom} ){
                $private = $i;
            }else{
                $public = $i;
            }
        }else{
            $down = $i;
        }
    }

    my $prefer;
    # make sure we use all networks once in a while
    $prefer ||= $down   unless int rand(20);
    $prefer ||= $public unless int rand(20);
    # prefer private addr if available (cheaper)
    $prefer ||= $private || $public || $down;
    return unless $prefer;

    #print STDERR "using ", inet_itoa($prefer->{ipv4}), "\n";
    return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}


sub start {
    my $me = shift;

    $me->SUPER::start();

    # build request
    my $yp  = AC::Yenta::Protocol->new();
    my $pb  = AC::Yenta::Kibitz::Status::myself();
    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($pb),
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,
       );

    # write request
    $me->write( $hdr . $pb );
    $me->timeout_rel($TIMEOUT);
    return $me;
}


sub timeout {
    my $me = shift;
    $me->shut();
}

sub shutdown {
    my $me = shift;

    unless( $me->{status_ok} ){
        AC::Yenta::Kibitz::Status::isdown( $me->{status_peer} );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    #debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;
    AC::Yenta::Kibitz::Status::update( $data );
    AC::Yenta::NetMon::update( $me );

    $me->shut();
}


1;

lib/AC/Yenta/Kibitz/Status/Server.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-30 10:19 (EDT)
# Function: 
#
# $Id$

package AC::Yenta::Kibitz::Status::Server;
use AC::Dumper;
use AC::Yenta::Debug 'status_server';
use strict;


sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;

    if( $gpb ){
        AC::Yenta::Kibitz::Status::update_sceptical( $gpb, $io );
    }

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    AC::Yenta::NetMon::update( $io );

    # respond with all known peers
    my $response = AC::Yenta::Kibitz::Status::response();
    my $yp  = AC::Yenta::Protocol->new();

    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($response),
        content_length  => 0,
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
       );

    debug("sending status reply");
    $io->write_and_shut( $hdr . $response );
}

1;

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-01 16:31 (EDT)
# Function: client side interface for storage subsystem
#
# $Id$

package AC::Yenta::Kibitz::Store::Client;
use AC::Yenta::Debug 'store_client';
use AC::Yenta::Config;
use AC::Yenta::Protocol;
use AC::Yenta::IO::TCP::Client;
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;

our @ISA = 'AC::Yenta::IO::TCP::Client';

my $TIMEOUT = 5;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $req   = shift;

    debug('starting kibitz store client');
    my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
    return unless $me;

    $me->{_req} = $req;
    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

sub start {
    my $me = shift;

    $me->SUPER::start();
    $me->write( $me->{_req} );
    $me->timeout_rel($TIMEOUT);
    return $me;
}

sub shutdown {
    my $me = shift;

    # maybe call error handler
    $me->run_callback('error', undef) unless $me->{_store_ok};
}

sub timeout {
    my $me = shift;

    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    $me->timeout_rel($TIMEOUT) if $evt->{data} && !$proto;
    return unless $proto;
    $proto->{data}    = $data;
    $proto->{content} = $content;
    eval {
        my $yp = AC::Yenta::Protocol->new();
        $proto->{data} = $yp->decode_reply($proto) if $data;
    };
    if(my $e = $@){
        problem("cannot decode reply: $e");
    }

    # process
    $me->{_store_ok} = 1;
    if( $proto->{is_error} || $@ ){
        my $e = $@ || 'remote error';
        $me->run_callback('error', {
            error	=> $e,
        });
    }else{
        $me->run_callback('load', $proto);
    }

    $me->shut();
}


1;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-01 11:06 (EDT)
# Function: server side api of storage system
#
# $Id$

package AC::Yenta::Kibitz::Store::Server;
use AC::Yenta::Debug 'store_server';
use AC::Yenta::Store;
use AC::Yenta::Config;
use AC::Dumper;
use JSON;
use Digest::SHA 'sha1_base64';
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;

my $TIMEOUT = 1;

sub api_get {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# not used

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # decode request
    my $req;
    eval {
        $req = ACPYentaGetSet->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    # process requests
    my @res;
    my $rescont;
    for my $r (@{ $req->{data} }){
        debug("get request: $r->{map}, $r->{key}, $r->{version}");
        my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
        my $res = {
            map		=> $r->{map},
            key		=> $r->{key},
        };

        if( $meta && $file ){
            unless( _check_content( $meta, $file ) ){
                problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
                # QQQ - remove from system, (and let AE get a new copy)?
                store_remove($r->{map}, $r->{key}, $ver);
                # tell caller it was not found
                $ver = undef;
            }
        }

        if( defined $ver ){
            $res->{version} = $ver;
            $res->{value}   = $data;
            $res->{meta}    = $meta  if defined $meta;
            if( $file ){
                # if one file to send, send it as content
                if( @{$req->{data}} == 1 ){
                    $rescont = $file;
                }else{
                    $res->{file} = $$file;
                }
            }
        }
        push @res, $res;
    }

    # encode results
    my $ect = '';
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
    my $response = $yp->encode_reply( {
        type		  => 'yenta_get',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        content_encrypted => $proto->{data_encrypted},
    }, { data => \@res }, \$ect );

    debug("sending get reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response . $ect );

}

sub api_check {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# not used

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # decode request
    my $req;
    eval {
        $req = ACPYentaCheckRequest->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    debug("check request: $req->{map}, $req->{version}, $req->{level}");

    my @res;
    my @todo = { version => $req->{version}, shard => $req->{shard} };

    # the top of the tree will be fairly sparse,
    # return up to several levels if they are sparse
    for my $l (0 .. 32){
        my $cl = $req->{level} + $l;
        my @lres;
        my $nexttot;
        for my $do (@todo){
            my @r = _get_check( $req->{map}, $do->{shard}, $do->{version}, $cl );
            push @lres, @r;
            for (@r){
                $nexttot += $_->{key} ? ($_->{count} / 8) : $_->{count};
            }
        }
        push @res, @lres;
        @todo = ();
        last unless @lres;				# reached the bottom of the tree
        last if @res > 64;				# got enough results
        last if (@lres > 2) && ($nexttot > @lres + 2);	# less sparse region
        # get the next level also
        @todo = @lres;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		  => 'yenta_check',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        }, { check => \@res } );

    debug("sending check reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response );

}

# get + process merkle data
sub _get_check {
    my $map   = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    my $res = store_get_merkle($map, $shard, $ver, $lev);
    return unless $res;
    for my $r (@$res) {
        $r->{map}   = $map;
    }

    return @$res;
}

sub api_distrib {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# reference

    # decode request
    my $req;
    eval {
        $req = ACPYentaDistRequest->decode( $gpb );
        die "invalid k/v for put request\n" unless $req->{datum}{key} && $req->{datum}{version};
    };
    if(my $e = $@){
        my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
        problem("cannot decode request: peer: $io->{peerip} $enc, $e");
        $io->shut();
        return;
    }

    unless( conf_map( $req->{datum}{map} ) ){
        problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
        _reply_error($io, $proto, 404, 'Map Not Found');
        return;
    }

    my $v = $req->{datum};

    # do we already have ?
    my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );

    if( $want ){
        # put

        debug("put request from $io->{peerip}: $v->{map}, $v->{key}, $v->{version}");

        # file content is passed by reference, to avoid large copies
        $content ||= \ $v->{file} if $v->{file};

        # check
        if( $v->{meta} && $content ){
            unless( _check_content( $v->{meta}, $content ) ){
                problem("content SHA1 check failed: $req->{datum}{map}, $req->{datum}{key}, $req->{datum}{version}");
                $io->shut();
                return;
            }
        }

        $want = store_put( $v->{map}, $v->{shard}, $v->{key}, $v->{version}, $v->{value},
                   $content, $v->{meta} );

        # distribute to other systems
        AC::Yenta::Store::Distrib->new( $req, $content ) if $want;
    }else{
        debug("put from $io->{peerip} unwanted: $v->{map}, $v->{shard}, $v->{key}, $v->{version}");
    }


    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, { status_code => 200, status_message => 'OK', haveit => !$want } );

    debug("sending distrib reply");
    $io->timeout_rel($TIMEOUT);
    $io->write_and_shut( $response );

}

sub _check_content {
    my $meta = shift;
    my $cont = shift;

    return 1 unless $meta && $meta =~ /^\{/;

    eval {
        $meta = decode_json($meta);
    };
    return 1 if $@;

    if( $meta->{sha1} ){
        my $chk = sha1_base64( $$cont );
        return unless $chk eq $meta->{sha1};
    }
    if( $meta->{size} ){
        my $len = length($$cont);
        return unless $len == $meta->{size};
    }

    return 1;
}

sub _reply_error {
    my $io    = shift;
    my $proto = shift;
    my $code  = shift;
    my $msg   = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        is_error	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, {
        status_code     => $code,
        status_message  => $msg,
        haveit		=> 0,
    } );

    debug("sending distrib reply");
    $io->write_and_shut( $response );
}

1;

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-May-12 11:03 (EDT)
# Function: monitor related processes
#
# $Id$

# we periodically check the heartbeats of various processes (dancrs, scriblers, ...)
# and kibitz their info around the network

package AC::Yenta::Monitor;
use AC::Yenta::Debug 'monitor';
use AC::Yenta::Config;
use AC::Yenta::Monitor::Client;
use AC::Misc;
use AC::Yenta::MySelf;

use Sys::Hostname;
use Socket;
use strict;

require 'AC/protobuf/yenta_status.pl';
require 'AC/protobuf/heartbeat.pl';

my $FREQ	= 2;
my $OLD_DOWN	= 30;
my $OLD_KEEP	= 1800;

my %MON;	# by 'id' (from config file)

sub init {

    AC::DC::Sched->new(
        info	=> 'monitor',
        freq	=> $FREQ,
        func	=> \&periodic,
       );
}

sub periodic {

    my $mon = conf_value('monitor');

    # clean up old data
    for my $id (keys %MON){
        isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
    }

    # start monitoring (send heartbeat request)
    for my $m (@$mon){
        my $ip   = $m->{ipa};
        my $port = $m->{port};
        my $id   = "$ip:$port";
        debug("start monitoring $id");

        my $ok = AC::Yenta::Monitor::Client->new( $ip, $port,
                                                info 		=> "monitor client: $id",
                                                monitor_peer	=> $id,
                                               );

        isdown($id, 0) unless $ok;
    }
}

# data for kibitzing around
# array of ACPYentaStatus
sub export {

    my @d;
    my $here = my_datacenter();
    my $self = my_server_id();

    for my $v (values %MON){

        push @d, {
            id			=> $v->{id},	# from config, typically localhost:port
            datacenter		=> $here,
            via			=> $self,
            hostname		=> $v->{hostname},
            subsystem		=> $v->{subsystem},
            environment		=> $v->{environment},
            status		=> $v->{status_code},
            timestamp		=> $v->{timestamp},
            lastup		=> $v->{lastup},
            sort_metric		=> $v->{sort_metric},
            capacity_metric	=> $v->{capacity_metric},
            server_id		=> $v->{server_id},
            instance_id 	=> $v->{server_id},
            ip			=> $v->{ip},
            path		=> '.',
        };
    }
    return @d;
}

sub isdown {
    my $id   = shift;
    my $code = shift;

    my $d = $MON{$id};
    return unless $d;

    # require 2 polls to fail
    return unless $^T - $d->{lastup} >= 2 * $FREQ;

    $code = 0 if $code == 200;
    $d->{status_code} = $code || 0;
    $d->{timestamp}   = $^T;

    debug("monitor $id is down");

    if( $d->{lastup} < $^T - $OLD_KEEP ){
        debug("monitor $id down too long. removing from report");
        delete $MON{$id};
    }
}

sub update {
    my $id = shift;
    my $gb = shift;	# ACPHeartbeat

    my $up;
    eval {
        $up = ACPHeartBeat->decode( $gb );
        $up->{id} = $id;
    };
    if(my $e = $@){
        problem("cannot decode hb data: $e");
        return isdown($id, 0);
    }
    unless( $up->{status_code} == 200 ){
        return isdown($id, $up->{status_code});
    }
    return isdown($id, 0) unless $^T - $up->{timestamp} < $OLD_DOWN;

    debug("monitor $id is up");
    $up->{lastup} = $^T;
    $up->{downcount} = 0;

    _hb_ip_info( $up, $MON{$id} );
    $MON{$id} = $up;
}

sub _hb_ip_info {
    my $up  = shift;
    my $old = shift;

    my $ip;

    $ip = $old->{ip} if ($old->{process_id} == $up->{process_id}) && ($old->{server_id} eq $up->{server_id});

    unless( $ip ){
        my $port = $up->{port};
        unless( $port ){
            # use monitored port (id is from config)
            (undef, $port) = split /:/, $up->{id};
        }

        if( $up->{ip} ){
            $ip = [ { ipv4 => $up->{ip}, port => $port, natdom => undef } ];
        }else{
            my $mynat = my_network_info();

            for my $i ( @$mynat ){
                push @$ip, { ipv4 => $i->{ipi}, port => $port, natdom => $i->{natdom} };
            }
        }
    }

    $up->{ip} = $ip;
}

1;

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-May-12 11:06 (EDT)
# Function: 
#
# $Id$

package AC::Yenta::Monitor::Client;
use AC::Yenta::Protocol;
use AC::Yenta::Config;
use AC::Yenta::Debug 'monitor_client';
use AC::Yenta::IO::TCP::Client;
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting monitor status client');
    my $me = $class->SUPER::new( @_ );
    unless($me){
        return;
    }

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    $me->start();

    # build request
    my $yp  = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $hdr = $yp->encode_header(
        type		=> 'heartbeat_request',
        data_length	=> 0,
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,
       );

    # write request
    $me->write( $hdr );

    $me->timeout_rel($TIMEOUT);

    return $me;
}

sub timeout {
    my $me = shift;
    $me->shut();
}

sub shutdown {
    my $me = shift;

    unless( $me->{status_ok} ){
        AC::Yenta::Monitor::isdown( $me->{monitor_peer} );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new();
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;
    AC::Yenta::Monitor::update( $me->{monitor_peer}, $data );

    $me->shut();
}


1;

lib/AC/Yenta/MySelf.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-18 18:10 (EST)
# Function: stub for customization
#
# $Id$

package AC::Yenta::MySelf;
use AC::Yenta::Customize;
use AC::Import;
use strict;

our @ISA    = 'AC::Yenta::Customize';
our @EXPORT = qw(my_server_id my_network_info my_datacenter);
our @CUSTOM = (@EXPORT, 'init');


1;

=head1 NAME

AC::Yenta::MySelf - customize yenta to your own environment

=head1 SYNOPSIS

    emacs /myperldir/Local/Yenta/MySelf.pm
    copy. paste. edit.

    use lib '/myperldir';
    my $y = AC::Yenta::D->new(
        class_myself        => 'Local::Yenta::MySelf',
    );

=head1 DESCRIPTION

provide functions to override default behavior. you may define
any or all of the following functions.

=head2 my_server_id

return a unique identity for this yenta instance. typically,
something similar to the server hostname.

    sub my_server_id {
        return 'yenta@' . hostname();
    }

=head2 my_datacenter

return the name of the local datacenter. yenta will use this
to determine which systems are local (same datacenter) and
which are remote (different datacenter), and will tune various
behaviors accordingly.

    sub my_datacenter {
        my($domain) = hostname() =~ /^[\.]+\.(.*)/;
        return $domain;
    }

=head2 my_network_info

return information about the various networks this server has.

    sub my_network_info {
        my $public_ip = inet_ntoa(scalar gethostbyname(hostname()));
        my $privat_ip = inet_ntoa(scalar gethostbyname('internal-' . hostname()));


        return [
            # use this IP for communication with servers this datacenter (same natdom)
            { ip => $privat_ip, natdom => my_datacenter() },
            # otherwise use this IP
            { ip => $public_ip },
        ]
    }

=head2 init

inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.

    my $HOSTNAME;
    my $DOMAIN;
    sub init {
        $HOSTNAME = hostname();
        ($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
    }

=head1 BUGS

none. you write this yourself.

=head1 SEE ALSO

    AC::Yenta

=head1 AUTHOR

    You!

=cut

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 0.856 second using v1.00-cache-2.02-grep-82fe00e-cpan-503542c4f10 )