AC-Yenta

 view release on metacpan or  search on metacpan

MANIFEST  view on Meta::CPAN

eg/yenta_put
MANIFEST
proto/yenta_status.proto
proto/std_reply.proto
proto/yenta_check.proto
proto/std_ipport.proto
proto/yenta_getset.proto
proto/heartbeat.proto
proto/auth.proto
Makefile.PL
META.yml                                 Module meta-data (added by MakeMaker)

META.yml  view on Meta::CPAN

--- #YAML:1.0
name:               AC-Yenta
version:            1.1
abstract:           eventually-consistent distributed key/value data store. et al.
author:
    - AdCopy <http://www.adcopy.com>
license:            perl
distribution_type:  module
configure_requires:
    ExtUtils::MakeMaker:  0
requires:
    AC::DC:               0
    BerkeleyDB:           0
    Crypt::Rijndael:      0

eg/yenta.conf  view on Meta::CPAN


# enable debugging?
#debug           ae
#debug           map
#debug           merkle
# ...


# maps
map testyfoo {
    # name of the data file
    dbfile      /home/data/testyfoo.ydb
    # how much history to keep 
    history     4
}

eg/yenta_put  view on Meta::CPAN

use JSON;
use strict;


my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });


my $key = 'YX3jSXD3CBRUDABm';

my $res = $ys->distribute(
    # map, key, version, data
    'mymap', $key, timet_to_yenta_version(time()),
    encode_json( {
        url_id	=> $key,
        url	=> 'http://www.example.com',
        acc_id	=> 'C9TdSgbUCBRUCABG',
        format	=> 'html',
    }),
   );

lib/AC/Yenta.pm  view on Meta::CPAN

#
# $Id$

package AC::Yenta;
use strict;

our $VERSION = 1.1;

=head1 NAME

AC::Yenta - eventually-consistent distributed key/value data store. et al.

=head1 SYNOPSIS

    use AC::Yenta::D;
    use strict;

    my $y = AC::Yenta::D->new( );

    $y->daemon( $configfile, {
      argv		=> \@ARGV,

lib/AC/Yenta.pm  view on Meta::CPAN

=head1 DESCRIPTION

=head2 Peers

All of the running yentas are peers. There is no master server.
New nodes can be added or removed on the fly with no configuration.

=head2 Kibitzing

Each yenta kibitzes (gossips) with the other yentas in the network
to exchange status information, distribute key-value data, and
detect and correct inconsistent data.

=head2 Eventual Consistency

Key-value data is versioned with timestamps. By default, newest wins.
Maps can be configured to keep and return multiple versions and client
code can use other conflict resolution mechanisms.

Lost, missing or otherwise inconsistent data is detected
by kibitzing merkle tree hash values.

=head2 Topological awareness

Yentas can take network topology into account when tranferring
data around to minimize long-distance transfers. You will need to
write a custom C<MySelf> class with a C<my_datacenter> function.

=head2 Multiple Network Interfaces / NAT

Yentas can take advantage of multiple network interfaces with
different IP addresses (eg. a private internal network + a public network),
or multiple addresses (eg. a private addresses and a public address)
and various NAT configurations.

You will need to write a custom C<MySelf> class and C<my_network_info>
function.

lib/AC/Yenta.pm  view on Meta::CPAN

=item seedpeer

specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.

    seedpeer 192.168.10.11:3503
    seedpeer 192.168.10.12:3503

=item secret

specify a secret key used to encrypt data transfered between
yentas in different datacenters.

    secret squeamish-ossifrage

=item syslog

specify a syslog facility for log messages.

    syslog local5

=item debug

enable debugging for a particular section

    debug map

=item map

configure a map (a collection of key-value data). you do not need
to configure the same set of maps on all servers. maps should be
configured similarly on all servers that they are on.

    map users {
	backend	    bdb
        dbfile      /home/acdata/users.ydb
        history     4
    }

=back

=head1 BUGS

Too many to list here.

=head1 SEE ALSO

lib/AC/Yenta/AC/MySelf.pm  view on Meta::CPAN

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 17:23 (EST)
# Function: 
#
# $Id$

package AC::Yenta::AC::MySelf;
use AC::Yenta::Config;
use AC::Yenta::Debug;
use AC::DataCenter;	# provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    my $me  = shift;
    my $map = shift;
    my $key = shift;
    my $ver = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_get',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        data	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
        } ]
    } );

    return $me->_send_request($map, $req);
}

sub _shard {

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    my $req  = shift;
    my $file = shift;	# reference

    my $ipn = inet_aton($addr);
    $req .= $$file if $file;

    $me->{debug}->("trying to contact yenta server $addr:$port");
    my $res;
    eval {
        $res = $me->{proto}->send_request($ipn, $port, $req, $me->{debug}, $me->{timeout});
        $res->{data} = $me->{proto}->decode_reply( $res ) if $res;
    };
    if(my $e = $@){
        $me->{debug}->("yenta request failed: $e");
        $res = undef;
    }
    return $res;
}


################################################################

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    my $me  = shift;
    my $map = shift;

    my $f;
    my @server;
    my @faraway;
    open($f, $me->{server_file});
    local $/ = "\n";
    while(<$f>){
        chop;
        my $data = decode_json( $_ );
        next unless grep { $_ eq $map } @{ $data->{map} };
        if( $data->{is_local} ){
            push @server, { addr => $data->{addr}, port => $data->{port} };
        }else{
            push @faraway, { addr => $data->{addr}, port => $data->{port} };
        }
    }

    # prefer local
    @server = @faraway unless @server;

    shuffle( \@server );
    push @{$me->{_server}}, @server;
}

lib/AC/Yenta/Default/MySelf.pm  view on Meta::CPAN

}

sub my_server_id {
    return $SERVERID;
}

sub my_network_info {
    return [ { ipa => $MYIP } ];
}

sub my_datacenter {
    return 'default';
}

1;

lib/AC/Yenta/Direct.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-07 18:09 (EDT)
# Function: direct access (read-only) to yenta data file
#
# $Id$

package AC::Yenta::Direct;
use AC::Yenta::Store::Map;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
use AC::Yenta::Store::Tokyo;
use AC::Misc;
use AC::Import;

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

            push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} }
              if $st == 200;
        }else{
            push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} };
            $status = 500 unless $st == 200;
        }
    }

    return {
        hostname	=> $HOSTNAME,
        datacenter	=> my_datacenter(),
        subsystem	=> 'yenta',
        environment	=> conf_value('environment'),
        via		=> AC::Yenta::Status->my_server_id(),
        server_id	=> AC::Yenta::Status->my_server_id(),
        instance_id	=> AC::Yenta::Status->my_instance_id(),
        path		=> '.',
        status		=> $status,
        uptodate	=> AC::Yenta::Store::AE->up_to_date(),
        timestamp   	=> $^T,
        lastup		=> $^T,

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

    my $gpb = shift;
    my $io  = shift;

    return unless $gpb;
    my $c;
    eval {
        $c = ACPYentaStatusRequest->decode( $gpb );
        $c = $c->{myself};
    };
    if(my $e = $@){
        problem("cannot decode status data: $e");
        return;
    }

    my $id = $c->{server_id};

    # don't track myself
    return if AC::Yenta::Status->my_server_id() eq $id;

    AC::Yenta::Status->update_sceptical($id, $c, $io);
}

sub update {
    my $gpb = shift;

    return unless $gpb;
    my $c;
    eval {
        $c = ACPYentaStatusReply->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode status data: $e");
        return;
    }

    debug("rcvd update");
    my $myself = AC::Yenta::Status->my_server_id();

    for my $up (@{$c->{status}}){
        my $id = $up->{server_id};
        next if $up->{via} eq $myself;
        next if $id eq $myself;

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

sub start {
    my $me = shift;

    $me->SUPER::start();

    # build request
    my $yp  = AC::Yenta::Protocol->new();
    my $pb  = AC::Yenta::Kibitz::Status::myself();
    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($pb),
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,
       );

    # write request
    $me->write( $hdr . $pb );
    $me->timeout_rel($TIMEOUT);
    return $me;
}

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    #debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;
    AC::Yenta::Kibitz::Status::update( $data );
    AC::Yenta::NetMon::update( $me );

    $me->shut();
}


1;

lib/AC/Yenta/Kibitz/Status/Server.pm  view on Meta::CPAN

    }

    AC::Yenta::NetMon::update( $io );

    # respond with all known peers
    my $response = AC::Yenta::Kibitz::Status::response();
    my $yp  = AC::Yenta::Protocol->new();

    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($response),
        content_length  => 0,
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
       );

    debug("sending status reply");
    $io->write_and_shut( $hdr . $response );
}

1;

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN

    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    $me->timeout_rel($TIMEOUT) if $evt->{data} && !$proto;
    return unless $proto;
    $proto->{data}    = $data;
    $proto->{content} = $content;
    eval {
        my $yp = AC::Yenta::Protocol->new();
        $proto->{data} = $yp->decode_reply($proto) if $data;
    };
    if(my $e = $@){
        problem("cannot decode reply: $e");
    }

    # process
    $me->{_store_ok} = 1;
    if( $proto->{is_error} || $@ ){
        my $e = $@ || 'remote error';
        $me->run_callback('error', {

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    };
    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    # process requests
    my @res;
    my $rescont;
    for my $r (@{ $req->{data} }){
        debug("get request: $r->{map}, $r->{key}, $r->{version}");
        my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
        my $res = {
            map		=> $r->{map},
            key		=> $r->{key},
        };

        if( $meta && $file ){
            unless( _check_content( $meta, $file ) ){
                problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
                # QQQ - remove from system, (and let AE get a new copy)?
                store_remove($r->{map}, $r->{key}, $ver);
                # tell caller it was not found
                $ver = undef;
            }
        }

        if( defined $ver ){
            $res->{version} = $ver;
            $res->{value}   = $data;
            $res->{meta}    = $meta  if defined $meta;
            if( $file ){
                # if one file to send, send it as content
                if( @{$req->{data}} == 1 ){
                    $rescont = $file;
                }else{
                    $res->{file} = $$file;
                }
            }
        }
        push @res, $res;
    }

    # encode results
    my $ect = '';
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
    my $response = $yp->encode_reply( {
        type		  => 'yenta_get',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        content_encrypted => $proto->{data_encrypted},
    }, { data => \@res }, \$ect );

    debug("sending get reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response . $ect );

}

sub api_check {
    my $io      = shift;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        # get the next level also
        @todo = @lres;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		  => 'yenta_check',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        }, { check => \@res } );

    debug("sending check reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response );

}

# get + process merkle data
sub _get_check {
    my $map   = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    my $res = store_get_merkle($map, $shard, $ver, $lev);
    return unless $res;
    for my $r (@$res) {
        $r->{map}   = $map;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    my $gpb     = shift;
    my $content = shift;	# reference

    # decode request
    my $req;
    eval {
        $req = ACPYentaDistRequest->decode( $gpb );
        die "invalid k/v for put request\n" unless $req->{datum}{key} && $req->{datum}{version};
    };
    if(my $e = $@){
        my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
        problem("cannot decode request: peer: $io->{peerip} $enc, $e");
        $io->shut();
        return;
    }

    unless( conf_map( $req->{datum}{map} ) ){
        problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
        _reply_error($io, $proto, 404, 'Map Not Found');
        return;
    }

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        $io->shut();
        return;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, { status_code => 200, status_message => 'OK', haveit => !$want } );

    debug("sending distrib reply");
    $io->timeout_rel($TIMEOUT);
    $io->write_and_shut( $response );

}

sub _check_content {
    my $meta = shift;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    my $proto = shift;
    my $code  = shift;
    my $msg   = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        is_error	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, {
        status_code     => $code,
        status_message  => $msg,
        haveit		=> 0,
    } );

    debug("sending distrib reply");
    $io->write_and_shut( $response );
}

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

        info	=> 'monitor',
        freq	=> $FREQ,
        func	=> \&periodic,
       );
}

sub periodic {

    my $mon = conf_value('monitor');

    # clean up old data
    for my $id (keys %MON){
        isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
    }

    # start monitoring (send heartbeat request)
    for my $m (@$mon){
        my $ip   = $m->{ipa};
        my $port = $m->{port};
        my $id   = "$ip:$port";
        debug("start monitoring $id");

        my $ok = AC::Yenta::Monitor::Client->new( $ip, $port,
                                                info 		=> "monitor client: $id",
                                                monitor_peer	=> $id,
                                               );

        isdown($id, 0) unless $ok;
    }
}

# data for kibitzing around
# array of ACPYentaStatus
sub export {

    my @d;
    my $here = my_datacenter();
    my $self = my_server_id();

    for my $v (values %MON){

        push @d, {
            id			=> $v->{id},	# from config, typically localhost:port
            datacenter		=> $here,
            via			=> $self,
            hostname		=> $v->{hostname},
            subsystem		=> $v->{subsystem},
            environment		=> $v->{environment},
            status		=> $v->{status_code},
            timestamp		=> $v->{timestamp},
            lastup		=> $v->{lastup},
            sort_metric		=> $v->{sort_metric},
            capacity_metric	=> $v->{capacity_metric},
            server_id		=> $v->{server_id},

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

sub update {
    my $id = shift;
    my $gb = shift;	# ACPHeartbeat

    my $up;
    eval {
        $up = ACPHeartBeat->decode( $gb );
        $up->{id} = $id;
    };
    if(my $e = $@){
        problem("cannot decode hb data: $e");
        return isdown($id, 0);
    }
    unless( $up->{status_code} == 200 ){
        return isdown($id, $up->{status_code});
    }
    return isdown($id, 0) unless $^T - $up->{timestamp} < $OLD_DOWN;

    debug("monitor $id is up");
    $up->{lastup} = $^T;
    $up->{downcount} = 0;

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    $me->start();

    # build request
    my $yp  = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $hdr = $yp->encode_header(
        type		=> 'heartbeat_request',
        data_length	=> 0,
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,
       );

    # write request
    $me->write( $hdr );

    $me->timeout_rel($TIMEOUT);

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN

    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new();
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;
    AC::Yenta::Monitor::update( $me->{monitor_peer}, $data );

    $me->shut();
}


1;

lib/AC/Yenta/MySelf.pm  view on Meta::CPAN

# Function: stub for customization
#
# $Id$

package AC::Yenta::MySelf;
use AC::Yenta::Customize;
use AC::Import;
use strict;

our @ISA    = 'AC::Yenta::Customize';
our @EXPORT = qw(my_server_id my_network_info my_datacenter);
our @CUSTOM = (@EXPORT, 'init');


1;

=head1 NAME

AC::Yenta::MySelf - customize yenta to your own environment

=head1 SYNOPSIS

lib/AC/Yenta/MySelf.pm  view on Meta::CPAN


=head2 my_server_id

return a unique identity for this yenta instance. typically,
something similar to the server hostname.

    sub my_server_id {
        return 'yenta@' . hostname();
    }

=head2 my_datacenter

return the name of the local datacenter. yenta will use this
to determine which systems are local (same datacenter) and
which are remote (different datacenter), and will tune various
behaviors accordingly.

    sub my_datacenter {
        my($domain) = hostname() =~ /^[\.]+\.(.*)/;
        return $domain;
    }

=head2 my_network_info

return information about the various networks this server has.

    sub my_network_info {
        my $public_ip = inet_ntoa(scalar gethostbyname(hostname()));
        my $privat_ip = inet_ntoa(scalar gethostbyname('internal-' . hostname()));


        return [
            # use this IP for communication with servers this datacenter (same natdom)
            { ip => $privat_ip, natdom => my_datacenter() },
            # otherwise use this IP
            { ip => $public_ip },
        ]
    }

=head2 init

inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-30 13:22 (EDT)
# Function: read protocol data
#
# $Id$

package AC::Yenta::Protocol;
use AC::Yenta::Debug 'protocol';
use AC::Yenta::Config;
use AC::DC::Protocol;
use AC::Yenta::MySelf;
use AC::Yenta::Crypto;
use AC::Misc;

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

    my $r = $MSGTYPE{$name};
    __PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


sub read_protocol {
    my $me  = shift;
    my $io  = shift;
    my $evt = shift;

    $io->{rbuffer} .= $evt->{data};

    return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;

    if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
        # decode header
        eval {
            $io->{proto_header} = $me->decode_header( $io->{rbuffer} );
        };
        if(my $e=$@){
            verbose("cannot decode protocol header: $e");

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

            });
            $io->shut();
            return;
        }
    }

    my $p = $io->{proto_header};
    return unless $p; 	# read more

    # do we have everything?
    return unless length($io->{rbuffer}) >= ($p->{auth_length} + $p->{data_length} + $p->{content_length} + $HDRSIZE);

    my $auth    = substr($io->{rbuffer}, $HDRSIZE,  $p->{auth_length});
    my $data    = substr($io->{rbuffer}, $HDRSIZE + $p->{auth_length},  $p->{data_length});
    my $content = substr($io->{rbuffer}, $HDRSIZE + $p->{auth_length} + $p->{data_length}, $p->{content_length});

    # RSN - validate auth

    if( $p->{data_encrypted} && $data ){
        $data = $me->_decrypt_data( $io, $auth, $data );
        return unless $data;
    }

    if( $p->{content_encrypted} && $content ){
        $content = $me->_decrypt_data( $io, $auth, $content );
        return unless $content;
    }

    # content is passed as reference
    return ($p, $data, ($content ? \$content : undef));
}

# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
    my $io  = shift;
    my $evt = shift;

    return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
    my($get, $url, $http) = split /\s+/, $io->{rbuffer};

    return ( { type => 'http', method => $get }, $url );
}

################################################################

sub _decrypt_data {
    my $me   = shift;
    my $io   = shift;
    my $auth = shift;
    my $data = shift;

    eval {
        $data = $me->decrypt( $auth, $data );
    };
    if(my $e=$@){
        verbose("cannot decrypt protocol data: $e");
        $io->run_callback('error', {
            cause	=> 'read',
            error	=> "cannot decrypt protocol: $e",
        });
        $io->shut();
        return;
    }

    return $data;
}

sub use_encryption {
    my $peer = shift;

    return unless conf_value('secret');
    # only encrypt far-away traffic, not local
    return $peer->{datacenter} ne my_datacenter();
}

sub encrypt {
    my $me    = shift;
    my $auth  = shift;	# not currently used
    my $buf   = shift;

    my $secret = $me->{secret};
    return $buf unless $secret;
    return unless $buf;

lib/AC/Yenta/Server.pm  view on Meta::CPAN


    debug("connection timed out");
    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

    unless( $h ){
        verbose("unknown message type: $proto->{type}");
        $me->shut();
        return;
    }

    debug("handling request - $proto->{type}");

    if( ref $h ){
        $h->( $me, $proto, $data, $content );
    }else{
        $h->handler( $me, $proto, $data, $content );
    }
}


1;

lib/AC/Yenta/Stats.pm  view on Meta::CPAN

    my $class = shift;
    my $io    = shift;
    my $proto = shift;
    my $url   = shift;

    debug("http request $url");
    $url =~ s|^/||;
    $url =~ s/%(..)/chr(hex($1))/eg;

    my $f = $HANDLER{$url};
    $f = \&http_data if $url =~ m|^data/|;
    $f = \&http_file if $url =~ m|^file/|;
    $f ||= \&http_notfound;
    my( $content, $code, $text ) = $f->($url);
    $code ||= 200;
    $text ||= 'OK';

    my $res = "HTTP/1.0 $code $text\r\n"
      . "Server: AC/Yenta\r\n"
      . "Connection: close\r\n"
      . "Content-Type: text/plain; charset=UTF-8\r\n"

lib/AC/Yenta/Stats.pm  view on Meta::CPAN

    for my $k (sort keys %STATS){
        $res .= sprintf("%-24s%s\n", "$k:", $STATS{$k});
    }

    my @peers = AC::Yenta::Status->allpeers();
    $res .= sprintf("%-24s%s\n", "peers:", scalar @peers);
    $res .= "\n";
    return $res;
}

sub http_data {
    my $url = shift;

    my(undef, $map, $key, $ver) = split m|/|, $url;
    my($data, $version, $file, $meta) = store_get($map, $key, $ver);

    return http_notfound($url) unless $version;
    return $data;
}

sub http_file {
    my $url = shift;

    my(undef, $map, $key, $ver) = split m|/|, $url;
    my($data, $version, $file, $meta) = store_get($map, $key, $ver);

    return http_notfound($url) unless $version && $file;
    return $$file;
}

1;

lib/AC/Yenta/Status.pm  view on Meta::CPAN

use AC::Yenta::Config;
use AC::Yenta::MySelf;
use AC::Dumper;
use AC::Misc;
use Sys::Hostname;
use JSON;
use Socket;
require 'AC/protobuf/yenta_status.pl';
use strict;

my $KEEPDOWN = 1800;	# keep data about down servers for how long?
my $KEEPLOST = 600;	# keep data about servers we have not heard about for how long?
my $SAVEMAX  = 1800;	# do not save if older than

my $PORT;

our $DATA = bless {
    allpeer	=> {},		# yenta_status
    sceptical	=> {},
    mappeer	=> {},		# {map} => { id => id }
    peermap	=> {},		# {id}  => @map
    datacenter  => {},		# {dc}  => { id => id }
    peertype	=> {},		# {ss}  => { id => id }
};

sub init {
    my $port = shift;

    $PORT = $port;

    AC::DC::Sched->new(
        info	=> 'kibitz status',

lib/AC/Yenta/Status.pm  view on Meta::CPAN

                                            info 	=> "status client: $id",
                                            status_peer	=> $id,
                                           );
    return __PACKAGE__->isdown($id) unless $c;

    $c->start();
}

sub _random_peer {

    my $here  = my_datacenter();

    # sceptical
    my @scept = values %{$DATA->{sceptical}};

    my @all   = map  { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};
    my @old   = grep { $_->{timestamp} < $^T - $KEEPLOST *.75 } @all;
    my @local = grep { $_->{datacenter} eq $here } @all;	# this datacenter
    my @away  = grep { $_->{datacenter} ne $here } @all;	# not this datacenter

    # first check anything sceptical
    my @peer  = @scept;

    # then (maybe) something about to expire
    @peer = @old  unless @peer || int rand(5);

    # then (maybe) something far away
    @peer = @away unless @peer || int rand(5);

lib/AC/Yenta/Status.pm  view on Meta::CPAN


    return("seed/$ip:$port", $ip, $port);
}

# server list for save file
sub server_list {
    my $type = shift;

    ($type, my $where) = split m|/|, $type;
    # where - no longer used
    $where ||= my_datacenter();

    my @peer = keys %{ $DATA->{peertype}{$type} };
    return unless @peer;

    # nothing too old
    @peer = grep { $DATA->{allpeer}{$_}{lastup} > $^T - $SAVEMAX } @peer;
    return unless @peer;

    return map { $DATA->{allpeer}{$_} } @peer;
}

# save a list of peers, in case I crash, and for others to use
sub save_status {

    my $save = conf_value('savestatus');
    my $here = my_datacenter();

    # also save locally running services
    my @mon  = AC::Yenta::Monitor::export();

    for my $s ( @$save ){
        my $file  = $s->{file};
        my $types = $s->{type};

        my @peer;
        for my $type (@$types){

lib/AC/Yenta/Status.pm  view on Meta::CPAN

        debug("saving peer status file");
        unless( open(FILE, ">$file.tmp") ){
            problem("cannot open save file '$file.tmp': $!");
            return;
        }

        for my $pd (@peer){
            # only save best addr in save file
            my($ip, $port) = AC::Yenta::IO::TCP::Client->use_addr_port( $pd->{ip} );

            my $data = {
                id		=> $pd->{server_id},
                addr		=> $ip,
                port		=> int($port),
                status		=> int($pd->{status}),
                subsystem	=> $pd->{subsystem},
                environment	=> $pd->{environment},
                sort_metric	=> int($pd->{sort_metric}),
                capacity_metric => int($pd->{capacity_metric}),
                datacenter	=> $pd->{datacenter},
                is_local	=> ($here eq $pd->{datacenter} ? 1 : 0),
            };
            if( $pd->{subsystem} eq 'yenta' ){
                $data->{map} = $pd->{map};
            }

            print FILE encode_json( $data ), "\n";
        }

        close FILE;
        unless( rename("$file.tmp", $file) ){
            problem("cannot rename save file '$file': $!");
        }

    }
}

################################################################
# diagnostic reports
sub report {

    my $res;

    for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
        my $id = sprintf '%-28s', $v->{server_id};
        my $metric = int( $v->{sort_metric} );
        $res .= "$id $v->{hostname}\t$v->{datacenter}\t$v->{subsystem}\t$v->{environment}\t$v->{status}\t$metric\n";
    }

    return $res;
}

sub report_long {

    my $res;

    for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    return grep { $_->{status} } values %{$DATA->{allpeer}};
}

sub mappeers {
    my $class = shift;
    my $map   = shift;

    return keys %{ $DATA->{mappeer}{$map} };
}

sub datacenters {
    my $class = shift;

    return $DATA->{datacenter};
}
################################################################

sub _remove {
    my $id = shift;

    my $ss = $DATA->{allpeer}{$id}{subsystem};
    delete $DATA->{peertype}{$ss}{$id} if $ss;

    my $dc = $DATA->{allpeer}{$id}{datacenter};
    delete $DATA->{datacenter}{$dc}{$id} if $dc;

    verbose("deleting peer: $id");
    delete $DATA->{allpeer}{$id};

    # remove map info
    for my $map ( @{$DATA->{peermap}{$id}} ){
        delete $DATA->{mappeer}{$map}{$id};
    }
    delete $DATA->{peermap}{$id};

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    # debug("updating $id => $up->{status} => " . dumper($up));
    debug("updating $id => $up->{status}");

    $DATA->{allpeer}{$id} = $up;

    if( $up->{status} != 200 ){
        _maybe_remove( $id );
        return ;
    }

    # update datacenter info
    unless( $DATA->{datacenter}{$up->{datacenter}}{$id} ){
        my $pdc = $previnfo->{datacenter};
        delete $DATA->{datacenter}{$pdc}{$id} if $pdc;
        $DATA->{datacenter}{$up->{datacenter}}{$id} = $id;
    }

    # update subsystem info
    unless( $DATA->{peertype}{$up->{subsystem}}{$id} ){
        my $ss = $previnfo->{subsystem};
        delete $DATA->{peertype}{$ss}{$id} if $ss;
        $DATA->{peertype}{$up->{subsystem}}{$id} = $id;
    }

    # update map info

lib/AC/Yenta/Store.pm  view on Meta::CPAN

    return unless $m;

    return $m->want($shard, $key, $ver);
}

sub store_put {
    my $map   = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;
    my $data  = shift;
    my $file  = shift;	# reference
    my $meta  = shift;

    my $m = $STORE{$map};
    return unless $m;

    debug("storing $map/$key/$ver");
    $m->put($shard, $key, $ver, $data, $file, $meta);
}

# NB: only removes local copy temporarily. will be replaced at next AE run
sub store_remove {
    my $map   = shift;
    my $key   = shift;
    my $ver   = shift;

    my $m = $STORE{$map};
    return unless $m;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-03 10:05 (EDT)
# Function: Anti-Entropy (find missing/stale data, and sync up)
#
# $Id$

package AC::Yenta::Store::AE;
use AC::Yenta::Store;
use AC::Yenta::Config;
use AC::Yenta::Debug 'ae';
use AC::Yenta::Stats;
use AC::Yenta::Conf;
use AC::Yenta::MySelf;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

        }elsif( $lt == $bestv ){
            push @best, $m;
        }
    }

    return unless @best;

    my $map = $best[ rand(@best) ];
    $me->{map} = $map;

    # is this a data or file map?
    # adjust accordingly
    my $cf = conf_map( $map );
    $me->{has_files} = 1 if $cf->{basedir};
    $me->{maxget} = $me->{has_files} ? $MAXFILES : $MAXGET;
    $me->{expire} = $cf->{expire};

    return 1;
}

sub _init_peer {
    my $me = shift;

    my $here = my_datacenter();
    my @peer = AC::Yenta::Status->mappeers( $me->{map} );
    my $env  = conf_value('environment');

    my(@near, @far, @ood);

    for my $p (@peer){
        my $d = AC::Yenta::Status->peer($p);
        next unless $d->{environment} eq $env;
        next unless $d->{status}      == 200;

        if( $d->{uptodate} ){
            if( $d->{datacenter} eq $here ){
                push @near, $d;
            }else{
                push @far, $d;
            }
        }else{
            push @ood, $d;
        }
    }

    $me->{peers_near} = \@near if @near;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
    @AE = grep{ $_ != $me } @AE;
}

sub _next_step {
    my $me = shift;

    $me->{timestamp} = $^T;

    if( $me->{kvfetching} < $MAXFETCH ){
        # any missing data?
        if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
            debug("starting nextgetkv ($me->{kvfetching})");
            $me->_next_get_kv();
        }
    }

    # check nodes?
    if( @{$me->{badnode}} ){
        $me->_start_check();
        return;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    my $node = shift @{$me->{badnode}};
    debug("checking next node: $me->{map} $node->{level}/$node->{version}");
    inc_stat('ae_check_node');

    my $enc     = use_encryption($me->{peer});
    my $proto   = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $request = $proto->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> $msgid++,
        want_reply	=> 1,
        data_encrypted	=> $enc,
    }, {
        map		=> $me->{map},
        level		=> $node->{level},
        version		=> $node->{version},
        shard		=> $node->{shard},
    } );

    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new(
        $me->{peer}{ip}, undef,

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


}

sub _check_load {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;


    debug("check results");
    $evt->{data} ||= {};

    # determine highest level returned

    my @keydata;
    my @nodedata;
    my $maxlev = 0;

    for my $d ( @{ $evt->{data}{check} }){
        debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");

        if( $d->{key} ){
            push @keydata, $d;
            next;
        }
        next if $d->{level} < $maxlev;

        if( $d->{level} > $maxlev ){
            @nodedata = ();
            $maxlev   = $d->{level};
        }
        push @nodedata, $d;
    }

    if( @keydata ){
        $me->_check_result_keys( \@keydata );
    }elsif( @nodedata ){
        $me->_check_result_nodes( $maxlev, \@nodedata );
    }

    $me->_next_step();
}

sub _check_error {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    my %vscnt;
    my %vsadd;

    for my $d (@$chk){
        inc_stat('ae_check_key');
        my $vsk = "$d->{version} $d->{shard}";
        $vscnt{ $vsk } ++;

        next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );

        debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
        push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
        inc_stat('ae_key_missing');
        $me->{missing} ++;
        $vsadd{ $vsk } ++;
    }
}

sub _is_expired {
    my $me  = shift;
    my $map = shift;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    }

    return;
}

sub _check_result_nodes {
    my $me  = shift;
    my $lev = shift;
    my $chk = shift;

    # determine all of the base versions of the recvd data
    my %ver;
    for my $d (@$chk){
        my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
        $ver{"$ver $shard"} = { ver => $ver, shard => $shard };
    }

    # get all of our merkle data for these versions
    my %merkle;
    my $t_new = timet_to_yenta_version($^T - $TOONEW);

    for my $d (values %ver){
        next if $d->{ver} > $t_new;				# too new, ignore
        next if $me->_is_expired($me->{map}, $lev, $d->{ver});
        # RSN - skip unwanted shards
        my $ms = AC::Yenta::Store::store_get_merkle($me->{map}, $d->{shard}, $d->{ver}, $lev - 1);
        for my $m (@$ms){
            # debug("my hash $me->{map} $m->{level}/$m->{shard}/$m->{version} => $m->{hash}");
            $merkle{"$m->{version} $m->{shard}"} = $m->{hash};
        }
    }

    # compare (don't bother with things that are too new (the data may still be en route))
    for my $d (@$chk){
        next if $d->{version} > $t_new;				# too new, ignore
        next if $me->_is_expired($me->{map}, $lev, $d->{version});
        # RSN - skip unwanted shards
        my $hash = $merkle{"$d->{version} $d->{shard}"};

        if( $d->{hash} eq $hash ){
            debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} => match");
            next;
        }else{
            debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} != $hash");
        }

        # stick them at the front
        unshift @{$me->{badnode}}, { version => $d->{version}, shard => $d->{shard}, level => $lev };
    }
}

################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer
# (the one that said it has the data)

sub _next_get_kv {
    my $me = shift;

    return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
    return $me->_start_get_kv_any()  if @{$me->{kvneed}};
}

sub _start_get_kv_any {
    my $me = shift;

    my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();

    # pick a peer
    my $peer = $me->_pick_peer();
    debug("getting kv data from peer $peer->{id}");
    $me->_start_get_kv( $peer, 1, \@get);
}

sub _start_get_kv_orig {
    my $me = shift;

    my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();

    debug("getting kv data from current peer");
    $me->_start_get_kv( $me->{peer}, 0, \@get);
}

sub _start_get_kv {
    my $me    = shift;
    my $peer  = shift;
    my $retry = shift;
    my $get   = shift;

    # insert map into request

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    # for (@$get){ debug("requesting $_->{key}/$_->{version}") }

    my $enc   = use_encryption($peer);
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_get',
        msgidno		  => $msgid++,
        want_reply	  => 1,
        data_encrypted	  => $enc,
    }, {
        data		  => $get,
    } );

    # connect + send
    debug("sending to $peer->{id}");
    my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
                                                  info => "AE getkv from $peer->{id}" );

    if( $io ){
        $me->{kvfetching} ++;
        $io->set_callback('load',  \&_getkv_load,  $me, $retry, $get);

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

}

sub _getkv_load {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

    $me->{kvfetching} --;
    $evt->{data} ||= {};

    debug("got kv data results");

    my %need = map {
        ( "$_->{key}/$_->{version}" => $_ )
    } @$get;

    for my $d ( @{$evt->{data}{data}}){
        debug("got $d->{map}/$d->{key}/$d->{version}");
        next unless $d->{key} && $d->{version};		# not found

        delete $need{ "$d->{key}/$d->{version}" };
        my $file = $evt->{content};
        $file = \ $d->{file} if $d->{file};

        AC::Yenta::Store::store_put( $d->{map}, $d->{shard}, $d->{key}, $d->{version},
                                     $d->{value}, $file, $d->{meta} );

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-01 18:56 (EDT)
# Function: distribute data to other peers
#
# $Id$

package AC::Yenta::Store::Distrib;
use AC::Yenta::Kibitz::Store::Client;
use AC::Yenta::Debug 'distrib';
use AC::Yenta::Config;
use AC::Yenta::Protocol;
use AC::Yenta::Stats;
use AC::Yenta::MySelf;

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN


    my $sender = $req->{sender};
    my $sendat = AC::Yenta::Status->peer($sender);

    my $me = bless {
        info		=> "$req->{datum}{map}/$req->{datum}{key}/$req->{datum}{version}",
        map		=> $req->{datum}{map},
        req		=> $req,
        content		=> $cont,
        # we tune the distribution algorithm based on where it came from:
        faraway 	=> (my_datacenter() ne $sendat->{datacenter}),

        farseen		=> 0,
        nearseen	=> 0,
        farsend		=> [],
        nearsend	=> [],
        ordershift	=> 4,
    }, $class;

    debug("distributing $me->{info}");
    inc_stat( 'dist_requests' );

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

        }else{
            push @keep, $r;
        }
    }

    @DIST = @keep;
}

################################################################
# determine distribution strategy
#   - if we recvd it from faraway, we will send it to other datacenters, and randomly in the same datacenter
#   - otherwise we send it in the same datacenter, in an orderly fashion
# RSN - find an strategy with faster convergence + less duplication

sub _init_strategy {
    my $me     = shift;
    my $sender = shift;

    my $here   = my_datacenter();
    my $dcs    = AC::Yenta::Status->datacenters();
    my $sendat = AC::Yenta::Status->peer($sender);
    my(@far, @near);

    for my $dc (keys %$dcs){
        if( $dc eq $here ){
            push @near, grep { $_ ne $sender } $me->_compat_peers_in_dc($dc);
        }else{
            next if $dc eq $sendat->{datacenter};

            push @far, {
                dc	=> $dc,
                id	=> [ $me->_compat_peers_in_dc($dc) ],
            };
        }
    }

    if( $me->{faraway} ){
        $me->{nearsend} = shuffle(\@near);

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

        $me->{nearsend} = _orderly(\@near);
    }
}

# which yentas can do something with the update?
sub _compat_peers_in_dc {
    my $me = shift;
    my $dc = shift;

    my $env = conf_value('environment');
    my $dcs = AC::Yenta::Status->datacenters();
    my $map = $me->{map};
    my @id;

    for my $id (keys %{$dcs->{$dc}}){
        my $pd = AC::Yenta::Status->peer($id);

        next unless $pd->{subsystem}   eq 'yenta';
        next unless $pd->{environment} eq $env;
        next unless grep {$map eq $_} @{ $pd->{map} };
        push @id, $id;

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    my $enc = use_encryption($pd);
    my $ect = '';
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $enc ? $proto->encrypt(undef, ${$me->{content}}) : ${$me->{content}} if $me->{content};

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_distrib',
        msgidno		  => $msgid++,
        want_reply	  => 1,
        data_encrypted	  => $enc,
        content_encrypted => $enc,
    }, {
        sender		  => AC::Yenta::Status->my_server_id(),
        hop		  => $me->{req}{hop} + 1,
        expire		  => $me->{req}{expire},
        datum		  => $me->{req}{datum},
    }, \$ect );

    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    }
}

sub _onload {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");

    if( $evt->{data}{haveit} ){
        if( $far ){
            $me->{farseen}  ++;
            inc_stat('dist_send_far_seen');
        }else{
            $me->{nearseen} ++;
            inc_stat('dist_send_near_seen');
        }
    }

    if( !$me->{faraway} && !$far ){
        # orderly distribution. hop away.
        if( $evt->{data}{haveit} ){
            shift @{$me->{nearsend}};
        }else{
            my $n = $me->{ordershift};
            $n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
            shift @{$me->{nearsend}} for (1 .. $n);
            $me->{ordershift} *= 2;
        }
    }

    $me->_start_one($far);

lib/AC/Yenta/Store/Expire.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-13 11:55 (EDT)
# Function: auto-expire old data
#
# $Id$

package AC::Yenta::Store::Expire;
use AC::Yenta::Config;
use AC::Yenta::Debug 'expire';
use AC::Yenta::Conf;
use AC::DC::Sched;
use strict;

lib/AC/Yenta/Store/File.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-31 16:04 (EDT)
# Function: store file data
#
# $Id$

package AC::Yenta::Store::File;
use AC::Yenta::Debug 'store_file';

use File::Path;
use strict;

sub new {

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

    if( $ver ){
        $ver = encode_version($ver);
        return unless grep { $_ eq $ver } @versions;
    }else{
        $ver = $versions[0];
    }

    my $vk = $me->vkey($key, $ver);
    my $extver = decode_version($ver);

    my($data, $founddat) = $db->get($me->{name}, 'data', $vk);

    if( wantarray ){
        if( $founddat ){
            my $meta = $db->get($me->{name}, 'meta', $vk);
            my $file = $me->{fs}->get($data) if $data;
            return( $data, $extver, $file, $meta );
        }else{
            # we don't have data, but we have it in history; fake it.
            return (undef, $extver, undef, undef);
        }
    }

    return $data;
}

# someone sent me something, do I want it?
sub want {
    my $me    = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;

    my $cf = $me->{conf};
    my $db = $me->{db};
    my $v  = encode_version($ver);

    # data belongs here?
    return if $me->is_sharded() && !$me->is_my_shard($shard);

    my @versions = $me->_versget( $key );

    if( $^T - $cacheT > 60 ){
        debug("cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
        $cacheT = $^T;
    }

    # I have it?

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN


    # I'll just throw it away.
    return;
}

sub put {
    my $me    = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;
    my $data  = shift;
    my $file  = shift;	# reference
    my $meta  = shift;

    my $cf = $me->{conf};
    my $db = $me->{db};
    my $v  = encode_version($ver);
    my $vk = $me->vkey($key, $v);

    debug("storing $vk");

    # get version history
    my @deletehist;
    my %deletedata;
    my @versions = $me->_versget( $key );

    return if grep { $_ eq $v } @versions;	# dupe!

    # is this the newest version? should we save this data?
    if( !@versions || ($v gt $versions[0]) || $cf->{keepold} ){

        # save file; data is filename
        if( $file ){
            my $r = $me->{fs}->put($data, $file);
            return unless $r;
        }
        # put meta + data
        $db->put($me->{name}, 'meta', $vk, $meta) if length $meta;
        $db->put($me->{name}, 'data', $vk, $data);

        unless( $cf->{keepold} ){
            # unless we are keeping old data, remove previous version
            $deletedata{$versions[0]} = 1 if @versions;
        }
    }

    # add new version to list. newest 1st
    @versions = sort {$b cmp $a} (@versions, $v);
    if( $cf->{history} && @versions > $cf->{history} ){
        # trim list
        my @rm = splice @versions, $cf->{history}, @versions, ();
        push @deletehist, (map { ({version => decode_version($_), key => $key, shard => $shard}) } @rm);
        $deletedata{$_} = 1 for @_;
    }
    if( $me->is_sharded() ){
        # QQQ - shard changed?
        $db->put($me->{name}, 'shard', $key, encode_shard($shard || 0));
    }

    my $dd = join(' ', map { $_->{version} } @deletehist);
    debug("version list: @versions [delete: $dd]");

    $me->_versput( $key, @versions );

    # update merkles
    $me->merkle( { shard => $shard, key => $key, version => $ver }, @deletehist);

    # delete old data
    for my $rm (keys %deletedata){
        debug("removing old version $key/$rm");
        my $rmvk = $me->vkey($key, $rm);
        $db->del($me->{name}, 'data', $rmvk);
        $db->del($me->{name}, 'meta', $rmvk);
    }

    $db->sync();

    return 1;
}

sub remove {
    my $me  = shift;

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN


    if( @versions ){
        $me->_versput( $key, @versions );
    }else{
        $db->del($me->{name}, 'vers',  $key);
        $db->del($me->{name}, 'shard', $key);
        $me->_versdel( $key );
    }

    my $vk = $me->vkey($key, $ver);
    $db->del($me->{name}, 'data', $vk);
    $db->del($me->{name}, 'meta', $vk);

    return $cshard;
}

################################################################

sub range {
    my $me    = shift;
    my $start = shift;

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jun-01 18:11 (EDT)
# Function: merkle tree for detecting missing data
#
# $Id$

package AC::Yenta::Store::Merkle;
use AC::Yenta::Debug 'merkle';
use AC::Yenta::SixtyFour;
use AC::Cache;
use AC::Import;
use Digest::SHA 'sha1_base64';
use strict;

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

    my @d = split /\0/, $d;

    my @res;

    if( $^T - $cacheT > 60 ){
        debug("merk cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
        $cacheT = $^T;
    }

    if( $lev == $me->{merkle_height} ){
        # data is: lkey, ...
        for my $r (@d){
            my($s,$v,$k) = $me->_decode_lkey($r);
            push @res, { version => decode_version($v), key => $k, count => 1, shard => decode_shard($s) };
        }
    }else{
        # data is: mkey => hash count, ...
        my %d = @d;
        for my $lv (keys %d){
            my($l, $s, $v) = $me->_decode_mkey( $lv );
            my($h,$c)  = split /\s/, $d{$lv};
            push @res, { version => decode_version($v), level => hex($l), hash => $h, count => $c, shard => decode_shard($s) };
        }
    }

    return \@res;
}

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;

    my $db = $me->{db};
    my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
    my $vk = $me->_lkey($key, $ver, $shard);

    debug("adding to merkle leaf $mk - $vk");

    # get current data
    my $d = $me->_mcget( $mk );
    my @d = split /\0/, $d;
    # append new item + uniqify
    my %d;
    @d{@d} = ();
    $d{$vk}   = undef;
    $d = join("\0", sort keys %d);
    $me->_mcput( $mk, $d );

    my $hash = sha1_base64($d);

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;

    my $db = $me->{db};
    my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
    my $vk = $me->_lkey($key, $ver, $shard);

    debug("removing from merkle leaf $mk - $vk");

    # get current data
    my $d = $me->_mcget( $mk );
    my @d = split /\0/, $d;
    # remove item
    @d = grep { $vk ne $_ } @d;

    if( @d ){
        $d = join("\0", @d);
        $me->_mcput( $mk, $d );

        my $hash = sha1_base64($d);

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN


################################################################

sub _get_actual_keys {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;

    my $db = $me->{db};

    # get range on data
    my @key = map {
        my $k = $_->{k}; $k =~ s|.*/||; $k
    } $db->range($me->{name}, 'data', encode_version($ver), encode_version($ver + 1));
    debug("actual key: @key");

    return @key unless defined $shard;

    # get vers list to filter on shard
    my $sh = encode_shard($shard);
    return grep {
        my $k = $_;

        my $vl = $db->get($me->{name}, 'vers', $k);
        my($s) = $vl =~ /;\s*(.*)/;

        $s == $sh;
    } @key;
}

# check merkle leaf node against actual data
sub merkle_scrub {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;

    debug("scrub $me->{name} $shard/$ver");

    # get list of keys from merkle leaf node
    my $mlist = $me->get_merkle($shard, $ver, $me->{merkle_height}) || [];
    my @mkey  = map { $_->{key} } @$mlist;
    my %mkey;
    @mkey{@mkey} = @mkey;

    # get list of keys from actual data
    my @akey  = $me->_get_actual_keys( $shard, $ver );

    # compare lists

    for my $k (@akey){
        next if $mkey{$k};
        debug("missing key in merkle tree: $shard/$ver/$k");
        $me->merkle( { key => $k, shard => $shard, version => $ver } );
    }
}

lib/AC/protobuf/yenta_getset.pl  view on Meta::CPAN

        );
    }

    unless (ACPYentaGetSet->can('_pb_fields_list')) {
        Google::ProtocolBuffers->create_message(
            'ACPYentaGetSet',
            [
                [
                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
                    'ACPYentaMapDatum', 
                    'data', 1, undef
                ],

            ],
            { 'create_accessors' => 1, 'follow_best_practice' => 1,  }
        );
    }

    unless (ACPYentaDistReply->can('_pb_fields_list')) {
        Google::ProtocolBuffers->create_message(
            'ACPYentaDistReply',

lib/AC/protobuf/yenta_status.pl  view on Meta::CPAN

            'ACPYentaStatus',
            [
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'hostname', 1, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'datacenter', 2, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'subsystem', 3, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'environment', 4, undef



( run in 0.631 second using v1.01-cache-2.11-cpan-8d75d55dd25 )