AC-Yenta

 view release on metacpan or  search on metacpan

eg/yenta.conf  view on Meta::CPAN

savestatus      /var/tmp/yenta.status           yenta

allow		127.0.0.1
allow           10.200.2.0/23

# seed peers to locate the network at startup
seedpeer        10.200.2.4:3503
seedpeer        10.200.2.5:3503


# enable debugging?
#debug           ae
#debug           map
#debug           merkle
# ...


# maps
map testyfoo {
    # name of the data file
    dbfile      /home/data/testyfoo.ydb
    # how much history to keep 
    history     4
}

eg/yenta_get  view on Meta::CPAN

use AC::Dumper;
use strict;

my $map = shift @ARGV;
my $key = shift @ARGV;
die "usage: get [-h host] map key\n" unless $map && $key;

my $y   = AC::Yenta::Client->new(
    # server_file, servers[], or host + port
    server_file	=> '/var/tmp/yenta.status',
    debug 	=> \&debug,
   );

my $res = $y->get($map, $key);

print dumper($res), "\n";


exit;

sub debug {
    print STDERR @_, "\n";
}

eg/yenta_put  view on Meta::CPAN

# Function: put example
#
# $Id$

use AC::Yenta::Client;
use Time::HiRes 'time';
use JSON;
use strict;


my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });


my $key = 'YX3jSXD3CBRUDABm';

my $res = $ys->distribute(
    # map, key, version, data
    'mymap', $key, timet_to_yenta_version(time()),
    encode_json( {
        url_id	=> $key,
        url	=> 'http://www.example.com',

eg/yentad  view on Meta::CPAN


use Getopt::Std;
use AC::Yenta::D;
use strict;

my @saved_argv = @ARGV;
my %OPT;

getopts('c:dfp:', \%OPT) || die "usage...\n";
    # -c config file
    # -d    enable all debugging
    # -f    foreground
    # -p port


my $y = AC::Yenta::D->new(
    class_myself	=> 'My::Code::MySelf',
   );


$y->daemon( $OPT{c}, {
    argv	=> \@saved_argv,
    foreground	=> $OPT{f},
    debugall	=> $OPT{d},
    port	=> $OPT{p},
} );

lib/AC/Yenta.pm  view on Meta::CPAN

=head1 SYNOPSIS

    use AC::Yenta::D;
    use strict;

    my $y = AC::Yenta::D->new( );

    $y->daemon( $configfile, {
      argv		=> \@ARGV,
      foreground	=> $OPT{f},
      debugall		=> $OPT{d},
      port		=> $OPT{p},
    } );

    exit;


=head1 USAGE

    Copy + Paste from the example code into your own code.
    Copy + Paste from the example config into your own config.

lib/AC/Yenta.pm  view on Meta::CPAN

yentas in different datacenters.

    secret squeamish-ossifrage

=item syslog

specify a syslog facility for log messages.

    syslog local5

=item debug

enable debugging for a particular section

    debug map

=item map

configure a map (a collection of key-value data). you do not need
to configure the same set of maps on all servers. maps should be
configured similarly on all servers that they are on.

    map users {
	backend	    bdb
        dbfile      /home/acdata/users.ydb

lib/AC/Yenta/Client.pm  view on Meta::CPAN


# one or more of:
#   new( host, port )
#   new( servers => [ { host, port }, ... ] )
#   new( server_file )

sub new {
    my $class = shift;

    my $me = bless {
        debug	=> sub{ },
        host	=> 'localhost',
        proto	=> AC::DC::Protocol->new(),
        copies	=> 1,
        @_,
    }, $class;

    $me->{server_file} ||= $me->{altfile};	# compat

    die "servers or server_file?\n" unless $me->{servers} || $me->{server_file};

lib/AC/Yenta/Client.pm  view on Meta::CPAN

sub _try_server {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;
    my $file = shift;	# reference

    my $ipn = inet_aton($addr);
    $req .= $$file if $file;

    $me->{debug}->("trying to contact yenta server $addr:$port");
    my $res;
    eval {
        $res = $me->{proto}->send_request($ipn, $port, $req, $me->{debug}, $me->{timeout});
        $res->{data} = $me->{proto}->decode_reply( $res ) if $res;
    };
    if(my $e = $@){
        $me->{debug}->("yenta request failed: $e");
        $res = undef;
    }
    return $res;
}


################################################################

sub _next_host {
    my $me  = shift;

lib/AC/Yenta/Config.pm  view on Meta::CPAN

use Socket;
use strict;

our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value conf_map);


my %CONFIG = (

    include	=> \&AC::ConfigFile::Simple::include_file,
    debug	=> \&AC::ConfigFile::Simple::parse_debug,
    allow	=> \&AC::ConfigFile::Simple::parse_allow,
    port	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    environment => \&AC::ConfigFile::Simple::parse_keyvalue,
    secret 	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    seedpeer	=> \&AC::ConfigFile::Simple::parse_keyarray,

    ae_maxload	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    distrib_max	=> \&AC::ConfigFile::Simple::parse_keyvalue,

    savestatus	=> \&parse_savefile,

lib/AC/Yenta/Crypto.pm  view on Meta::CPAN


    my $eb     = ACPEncrypt->encode( {
        algorithm	=> $ALGORITHM,
        seqno		=> $seqno,
        nonce		=> $nonce,
        hmac		=> $hmac,
        length		=> length($buf),
        ciphertext	=> $ct,
    } );

    debug("encrypted <$seqno,$nonce,$hmac>");

    return $eb;
}

sub decrypt {
    my $me  = shift;
    my $buf = shift;

    my $ed     = ACPEncrypt->decode( $buf );
    die "cannot decrypt: unknown alg\n" unless $ed->{algorithm} eq $ALGORITHM;

lib/AC/Yenta/Crypto.pm  view on Meta::CPAN

    my $key    = $me->_key($seqno, $nonce);
    my $iv     = $me->_iv($key, $seqno, $nonce);

    my $hmac   = hmac_sha256_base64($ed->{ciphertext}, $key);
    die "cannot decrypt: hmac mismatch\n" unless $hmac eq $ed->{hmac};

    my $aes    = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
    $aes->set_iv( $iv );
    my $pt     = substr($aes->decrypt( $ed->{ciphertext} ), 0, $ed->{length});

    debug("decrypted <$seqno,$nonce,$hmac>");

    return $pt;
}


sub _key {
    my $me    = shift;
    my $seqno = shift;
    my $nonce = shift;

lib/AC/Yenta/D.pm  view on Meta::CPAN


    AC::Yenta::MySelf->customize( $p{class_myself} );
    # ...

    return bless \$class, $class;
}

sub daemon {
    my $me    = shift;
    my $cfile = shift;
    my $opt   = shift;	# foreground, debugall, persistent_id, argv

    die "no config file specified\n" unless $cfile;

    # configure
    $AC::Yenta::CONF = AC::Yenta::Config->new(
        $cfile, onreload => sub {
            AC::Yenta::Store::configure();
        });


    initlog( 'yenta', (conf_value('syslog') || 'local5'), $opt->{debugall} );

    AC::Yenta::Debug->init( $opt->{debugall}, $AC::Yenta::CONF);
    daemonize(5, 'yentad', $opt->{argv}) unless $opt->{foreground};
    verbose("starting.");


    $SIG{CHLD} = $SIG{PIPE} = sub{};        				# ignore
    $SIG{INT}  = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit;  # abort

    # initialize subsystems
    my $port = $opt->{port} || conf_value('port');

lib/AC/Yenta/D.pm  view on Meta::CPAN



    # start "cronjobs"
    AC::DC::Sched->new(
        info	=> 'check config files',
        freq	=> 30,
        func	=> sub { $AC::Yenta::CONF->check() },
       );

    run_and_watch(
        ($opt->{foreground} || $opt->{debugall}),
        \&AC::DC::IO::mainloop,
       );
}


1;

lib/AC/Yenta/Debug.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-27 11:40 (EDT)
# Function: debugging + log msgs
#
# $Id$

package AC::Yenta::Debug;
use AC::DC::Debug;
our @ISA = 'AC::DC::Debug';
use strict;


1;

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

    return unless $gpb;
    my $c;
    eval {
        $c = ACPYentaStatusReply->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode status data: $e");
        return;
    }

    debug("rcvd update");
    my $myself = AC::Yenta::Status->my_server_id();

    for my $up (@{$c->{status}}){
        my $id = $up->{server_id};
        next if $up->{via} eq $myself;
        next if $id eq $myself;
        AC::Yenta::Status->update($id, $up);
    }
}

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN

our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting kibitz status client');
    my $me = $class->SUPER::new( @_ );
    return unless $me;

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN


    unless( $me->{status_ok} ){
        AC::Yenta::Kibitz::Status::isdown( $me->{status_peer} );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    #debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;
    AC::Yenta::Kibitz::Status::update( $data );
    AC::Yenta::NetMon::update( $me );

    $me->shut();

lib/AC/Yenta/Kibitz/Status/Server.pm  view on Meta::CPAN

    my $yp  = AC::Yenta::Protocol->new();

    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($response),
        content_length  => 0,
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
       );

    debug("sending status reply");
    $io->write_and_shut( $hdr . $response );
}

1;

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN

our @ISA = 'AC::Yenta::IO::TCP::Client';

my $TIMEOUT = 5;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $req   = shift;

    debug('starting kibitz store client');
    my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
    return unless $me;

    $me->{_req} = $req;
    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN

sub timeout {
    my $me = shift;

    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    $me->timeout_rel($TIMEOUT) if $evt->{data} && !$proto;
    return unless $proto;
    $proto->{data}    = $data;
    $proto->{content} = $content;
    eval {
        my $yp = AC::Yenta::Protocol->new();
        $proto->{data} = $yp->decode_reply($proto) if $data;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    # process requests
    my @res;
    my $rescont;
    for my $r (@{ $req->{data} }){
        debug("get request: $r->{map}, $r->{key}, $r->{version}");
        my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
        my $res = {
            map		=> $r->{map},
            key		=> $r->{key},
        };

        if( $meta && $file ){
            unless( _check_content( $meta, $file ) ){
                problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
                # QQQ - remove from system, (and let AE get a new copy)?

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
    my $response = $yp->encode_reply( {
        type		  => 'yenta_get',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        content_encrypted => $proto->{data_encrypted},
    }, { data => \@res }, \$ect );

    debug("sending get reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response . $ect );

}

sub api_check {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    my $req;
    eval {
        $req = ACPYentaCheckRequest->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    debug("check request: $req->{map}, $req->{version}, $req->{level}");

    my @res;
    my @todo = { version => $req->{version}, shard => $req->{shard} };

    # the top of the tree will be fairly sparse,
    # return up to several levels if they are sparse
    for my $l (0 .. 32){
        my $cl = $req->{level} + $l;
        my @lres;
        my $nexttot;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN


    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		  => 'yenta_check',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        }, { check => \@res } );

    debug("sending check reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response );

}

# get + process merkle data
sub _get_check {
    my $map   = shift;
    my $shard = shift;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    }

    my $v = $req->{datum};

    # do we already have ?
    my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );

    if( $want ){
        # put

        debug("put request from $io->{peerip}: $v->{map}, $v->{key}, $v->{version}");

        # file content is passed by reference, to avoid large copies
        $content ||= \ $v->{file} if $v->{file};

        # check
        if( $v->{meta} && $content ){
            unless( _check_content( $v->{meta}, $content ) ){
                problem("content SHA1 check failed: $req->{datum}{map}, $req->{datum}{key}, $req->{datum}{version}");
                $io->shut();
                return;
            }
        }

        $want = store_put( $v->{map}, $v->{shard}, $v->{key}, $v->{version}, $v->{value},
                   $content, $v->{meta} );

        # distribute to other systems
        AC::Yenta::Store::Distrib->new( $req, $content ) if $want;
    }else{
        debug("put from $io->{peerip} unwanted: $v->{map}, $v->{shard}, $v->{key}, $v->{version}");
    }


    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, { status_code => 200, status_message => 'OK', haveit => !$want } );

    debug("sending distrib reply");
    $io->timeout_rel($TIMEOUT);
    $io->write_and_shut( $response );

}

sub _check_content {
    my $meta = shift;
    my $cont = shift;

    return 1 unless $meta && $meta =~ /^\{/;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        is_error	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, {
        status_code     => $code,
        status_message  => $msg,
        haveit		=> 0,
    } );

    debug("sending distrib reply");
    $io->write_and_shut( $response );
}

1;

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

    # clean up old data
    for my $id (keys %MON){
        isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
    }

    # start monitoring (send heartbeat request)
    for my $m (@$mon){
        my $ip   = $m->{ipa};
        my $port = $m->{port};
        my $id   = "$ip:$port";
        debug("start monitoring $id");

        my $ok = AC::Yenta::Monitor::Client->new( $ip, $port,
                                                info 		=> "monitor client: $id",
                                                monitor_peer	=> $id,
                                               );

        isdown($id, 0) unless $ok;
    }
}

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

    my $d = $MON{$id};
    return unless $d;

    # require 2 polls to fail
    return unless $^T - $d->{lastup} >= 2 * $FREQ;

    $code = 0 if $code == 200;
    $d->{status_code} = $code || 0;
    $d->{timestamp}   = $^T;

    debug("monitor $id is down");

    if( $d->{lastup} < $^T - $OLD_KEEP ){
        debug("monitor $id down too long. removing from report");
        delete $MON{$id};
    }
}

sub update {
    my $id = shift;
    my $gb = shift;	# ACPHeartbeat

    my $up;
    eval {

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN

    };
    if(my $e = $@){
        problem("cannot decode hb data: $e");
        return isdown($id, 0);
    }
    unless( $up->{status_code} == 200 ){
        return isdown($id, $up->{status_code});
    }
    return isdown($id, 0) unless $^T - $up->{timestamp} < $OLD_DOWN;

    debug("monitor $id is up");
    $up->{lastup} = $^T;
    $up->{downcount} = 0;

    _hb_ip_info( $up, $MON{$id} );
    $MON{$id} = $up;
}

sub _hb_ip_info {
    my $up  = shift;
    my $old = shift;

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN

our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting monitor status client');
    my $me = $class->SUPER::new( @_ );
    unless($me){
        return;
    }

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    $me->start();

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN


    unless( $me->{status_ok} ){
        AC::Yenta::Monitor::isdown( $me->{monitor_peer} );
    }
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new();
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    $me->{status_ok} = 1;
    AC::Yenta::Monitor::update( $me->{monitor_peer}, $data );

    $me->shut();
}

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN


    if( $p->{content_encrypted} && $content ){
        $content = $me->_decrypt_data( $io, $auth, $content );
        return unless $content;
    }

    # content is passed as reference
    return ($p, $data, ($content ? \$content : undef));
}

# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
    my $io  = shift;
    my $evt = shift;

    return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
    my($get, $url, $http) = split /\s+/, $io->{rbuffer};

    return ( { type => 'http', method => $get }, $url );
}

lib/AC/Yenta/Server.pm  view on Meta::CPAN

    $me->init($fd);
    $me->wantread(1);
    $me->timeout_rel($TIMEOUT);
    $me->set_callback('read',    \&read);
    $me->set_callback('timeout', \&timeout);
}

sub timeout {
    my $me = shift;

    debug("connection timed out");
    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

    unless( $h ){
        verbose("unknown message type: $proto->{type}");
        $me->shut();
        return;
    }

    debug("handling request - $proto->{type}");

    if( ref $h ){
        $h->( $me, $proto, $data, $content );
    }else{
        $h->handler( $me, $proto, $data, $content );
    }
}


1;

lib/AC/Yenta/Stats.pm  view on Meta::CPAN


    $STATS{$stat} ++;
}

sub handler {
    my $class = shift;
    my $io    = shift;
    my $proto = shift;
    my $url   = shift;

    debug("http request $url");
    $url =~ s|^/||;
    $url =~ s/%(..)/chr(hex($1))/eg;

    my $f = $HANDLER{$url};
    $f = \&http_data if $url =~ m|^data/|;
    $f = \&http_file if $url =~ m|^file/|;
    $f ||= \&http_notfound;
    my( $content, $code, $text ) = $f->($url);
    $code ||= 200;
    $text ||= 'OK';

lib/AC/Yenta/Status.pm  view on Meta::CPAN


        next if $p->{status} == 200 && $p->{timestamp} > $^T - $KEEPLOST;
        _maybe_remove( $id );
    }

    # randomly pick a peer
    my($id, $ip, $port) = _random_peer();
    return unless $id;

    # start a client
    debug("starting status kibitz client to $id");

    my $c = AC::Yenta::Kibitz::Status::Client->new( $ip, $port,
                                            info 	=> "status client: $id",
                                            status_peer	=> $id,
                                           );
    return __PACKAGE__->isdown($id) unless $c;

    $c->start();
}

lib/AC/Yenta/Status.pm  view on Meta::CPAN


    # then something local
    @peer = @local unless @peer;

    # last resort
    @peer = @all unless @peer;

    # sometimes use the seed, in case there was a network split
    if( @peer && int(rand(@all+1)) ){
        my $p = $peer[ rand(@peer) ];
        debug("using peer $p->{server_id}");
        return ($p->{server_id}, $p->{ip}, undef);
    }

    # seed peer
    my $seed = conf_value('seedpeer');
    my $p = $seed->[ rand(@$seed) ];
    my ($ip, $port) = split /:/, $p;
    $port ||= my_port();

    # don't talk to self. any of my addrs.

lib/AC/Yenta/Status.pm  view on Meta::CPAN

        for my $type (@$types){
            push @peer, server_list($type);

            for my $m (@mon){
                push @peer, $m if $m->{subsystem} eq $type;
            }
        }

        next unless @peer;

        debug("saving peer status file");
        unless( open(FILE, ">$file.tmp") ){
            problem("cannot open save file '$file.tmp': $!");
            return;
        }

        for my $pd (@peer){
            # only save best addr in save file
            my($ip, $port) = AC::Yenta::IO::TCP::Client->use_addr_port( $pd->{ip} );

            my $data = {

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    if( ($^T - $d->{lastup} > $KEEPDOWN) || ($^T - $d->{timestamp} > $KEEPLOST) ){

        _remove($id);
    }
}

sub isdown {
    my $class = shift;
    my $id    = shift;

    debug("marking peer '$id' as down");

    if( ! $DATA->{allpeer}{$id} ){
        return unless $DATA->{sceptical}{$id};
        # we know it is down, and want to kibbitz this fact
        $DATA->{allpeer}{$id} = $DATA->{sceptical}{$id};
    }

    delete $DATA->{sceptical}{$id};

    if( $DATA->{allpeer}{$id} ){

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    my $io    = shift;

    return unless $class->_env_ok($id, $up);

    if( $DATA->{allpeer}{$id} ){
        # already known
        delete $DATA->{sceptical}{$id};
        return;
    }

    debug("rcvd update (sceptical) about $id from $io->{peerip}");

    # only accept updates from the server itself
    # no 3rd party updates. no misconfigured serevrs.
    problem("server misconfigured $id != $io->{peerip}")
      unless grep { inet_atoi($io->{peerip}) == $_->{ipv4}  } @{$up->{ip}};

    $up->{id} = $id;
    delete $up->{lastup};
    $DATA->{sceptical}{$id} = $up;
}

lib/AC/Yenta/Status.pm  view on Meta::CPAN


    $up->{id} = $id;
    my $previnfo = $DATA->{allpeer}{$id};
    verbose("discovered new peer: $id ($up->{hostname})") unless $previnfo;

    # only keep it if it is newer than what we have
    return if $previnfo && $up->{timestamp} <= $previnfo->{timestamp};

    $up->{path} .= ' ' . my_server_id();

    # debug("updating $id => $up->{status} => " . dumper($up));
    debug("updating $id => $up->{status}");

    $DATA->{allpeer}{$id} = $up;

    if( $up->{status} != 200 ){
        _maybe_remove( $id );
        return ;
    }

    # update datacenter info
    unless( $DATA->{datacenter}{$up->{datacenter}}{$id} ){

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    return if "@curmap" eq "@newmap";		# unchanged

    # what do we need to add/remove
    my (%remove, %add);
    @remove{@curmap} = @curmap;
    @add{@newmap}    = @newmap;
    delete $remove{$_} for @newmap;
    delete $add{$_}    for @curmap;

    for my $map (keys %remove){
        debug("removing $map from $id");
        delete $DATA->{mappeer}{$map}{$id};
    }
    for my $map (keys %add){
        debug("adding $map to $id");
        $DATA->{mappeer}{$map}{$id} = $id;
    }
    $DATA->{peermap}{$id} = \@newmap;
}

lib/AC/Yenta/Store.pm  view on Meta::CPAN

my %STORE;


# create maps from config
sub configure {

    my $maps = $AC::Yenta::CONF->{config}{map};

    my %remove = %STORE;
    for my $map (keys %{$maps}){
        debug("configuring map $map");

        my $conf = $maps->{$map};
        my $sharded = $conf->{sharded};
        my $c  = $sharded ? 'AC::Yenta::Store::Sharded' : 'AC::Yenta::Store::Map';
        my $be = $conf->{backend};

        my $m = $c->new( $map, $be, { %$conf, recovery => 1 } );
        $STORE{$map} = $m;
        delete $remove{$map};
    }

    for my $map (keys %remove){
        debug("removing unused map '$map'");
        delete $STORE{$map};
    }
}

sub store_get {
    my $map   = shift;
    my $key   = shift;
    my $ver   = shift;

    my $m = $STORE{$map};

lib/AC/Yenta/Store.pm  view on Meta::CPAN

    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;
    my $data  = shift;
    my $file  = shift;	# reference
    my $meta  = shift;

    my $m = $STORE{$map};
    return unless $m;

    debug("storing $map/$key/$ver");
    $m->put($shard, $key, $ver, $data, $file, $meta);
}

# NB: only removes local copy temporarily. will be replaced at next AE run
sub store_remove {
    my $map   = shift;
    my $key   = shift;
    my $ver   = shift;

    my $m = $STORE{$map};

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


    my $me = bless {
        badnode		=> [ {version => 0, shard => 0, level => 0} ],
        cache		=> {},
        kvneed		=> [],
        kvneedorig	=> [],
        kvfetching	=> 0,
        missing		=> 0,
    }, $class;

    debug("new ae");
    $me->_pick_map()  || return;

    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
    $me->_init_peer() || return;

    debug("checking $me->{map} with $me->{peer}{id}");
    inc_stat('ae_runs');
    $me->_next_step();

    push @AE, $me;
    return $me;
}

sub periodic {
    # kill dead sessions, start new ones

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    my $peer = $peer[ rand(@peer) ];

    return $peer;
}

################################################################

sub _finished {
    my $me = shift;

    debug("finished $me->{map}");
    $DONE{$me->{map}} = $^T if $me->{missing} < $MAXMISSING;
    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
    @AE = grep{ $_ != $me } @AE;
}

sub _next_step {
    my $me = shift;

    $me->{timestamp} = $^T;

    if( $me->{kvfetching} < $MAXFETCH ){
        # any missing data?
        if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
            debug("starting nextgetkv ($me->{kvfetching})");
            $me->_next_get_kv();
        }
    }

    # check nodes?
    if( @{$me->{badnode}} ){
        $me->_start_check();
        return;
    }

    $me->_finished();
}

################################################################

sub _start_check {
    my $me = shift;

    my $node = shift @{$me->{badnode}};
    debug("checking next node: $me->{map} $node->{level}/$node->{version}");
    inc_stat('ae_check_node');

    my $enc     = use_encryption($me->{peer});
    my $proto   = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $request = $proto->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> $msgid++,
        want_reply	=> 1,
        data_encrypted	=> $enc,
    }, {

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    }

}

sub _check_load {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;


    debug("check results");
    $evt->{data} ||= {};

    # determine highest level returned

    my @keydata;
    my @nodedata;
    my $maxlev = 0;

    for my $d ( @{ $evt->{data}{check} }){
        debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");

        if( $d->{key} ){
            push @keydata, $d;
            next;
        }
        next if $d->{level} < $maxlev;

        if( $d->{level} > $maxlev ){
            @nodedata = ();
            $maxlev   = $d->{level};

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    my %vscnt;
    my %vsadd;

    for my $d (@$chk){
        inc_stat('ae_check_key');
        my $vsk = "$d->{version} $d->{shard}";
        $vscnt{ $vsk } ++;

        next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );

        debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
        push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
        inc_stat('ae_key_missing');
        $me->{missing} ++;
        $vsadd{ $vsk } ++;
    }
}

sub _is_expired {
    my $me  = shift;
    my $map = shift;
    my $lev = shift;
    my $ver = shift;

    return unless $me->{expire};

    my $vmx = AC::Yenta::Store::store_version_max( $map, $ver, $lev );
    return unless defined $vmx;

    if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
        debug("skipping expired $lev/$ver - $vmx");
        return 1;
    }

    return;
}

sub _check_result_nodes {
    my $me  = shift;
    my $lev = shift;
    my $chk = shift;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    # get all of our merkle data for these versions
    my %merkle;
    my $t_new = timet_to_yenta_version($^T - $TOONEW);

    for my $d (values %ver){
        next if $d->{ver} > $t_new;				# too new, ignore
        next if $me->_is_expired($me->{map}, $lev, $d->{ver});
        # RSN - skip unwanted shards
        my $ms = AC::Yenta::Store::store_get_merkle($me->{map}, $d->{shard}, $d->{ver}, $lev - 1);
        for my $m (@$ms){
            # debug("my hash $me->{map} $m->{level}/$m->{shard}/$m->{version} => $m->{hash}");
            $merkle{"$m->{version} $m->{shard}"} = $m->{hash};
        }
    }

    # compare (don't bother with things that are too new (the data may still be en route))
    for my $d (@$chk){
        next if $d->{version} > $t_new;				# too new, ignore
        next if $me->_is_expired($me->{map}, $lev, $d->{version});
        # RSN - skip unwanted shards
        my $hash = $merkle{"$d->{version} $d->{shard}"};

        if( $d->{hash} eq $hash ){
            debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} => match");
            next;
        }else{
            debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} != $hash");
        }

        # stick them at the front
        unshift @{$me->{badnode}}, { version => $d->{version}, shard => $d->{shard}, level => $lev };
    }
}

################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    return $me->_start_get_kv_any()  if @{$me->{kvneed}};
}

sub _start_get_kv_any {
    my $me = shift;

    my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();

    # pick a peer
    my $peer = $me->_pick_peer();
    debug("getting kv data from peer $peer->{id}");
    $me->_start_get_kv( $peer, 1, \@get);
}

sub _start_get_kv_orig {
    my $me = shift;

    my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();

    debug("getting kv data from current peer");
    $me->_start_get_kv( $me->{peer}, 0, \@get);
}

sub _start_get_kv {
    my $me    = shift;
    my $peer  = shift;
    my $retry = shift;
    my $get   = shift;

    # insert map into request
    $_->{map} = $me->{map} for @$get;

    # for (@$get){ debug("requesting $_->{key}/$_->{version}") }

    my $enc   = use_encryption($peer);
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_get',
        msgidno		  => $msgid++,
        want_reply	  => 1,
        data_encrypted	  => $enc,
    }, {
        data		  => $get,
    } );

    # connect + send
    debug("sending to $peer->{id}");
    my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
                                                  info => "AE getkv from $peer->{id}" );

    if( $io ){
        $me->{kvfetching} ++;
        $io->set_callback('load',  \&_getkv_load,  $me, $retry, $get);
        $io->set_callback('error', \&_getkv_error, $me, $retry, $get);
        $io->start();
    }
}

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

sub _getkv_load {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

    $me->{kvfetching} --;
    $evt->{data} ||= {};

    debug("got kv data results");

    my %need = map {
        ( "$_->{key}/$_->{version}" => $_ )
    } @$get;

    for my $d ( @{$evt->{data}{data}}){
        debug("got $d->{map}/$d->{key}/$d->{version}");
        next unless $d->{key} && $d->{version};		# not found

        delete $need{ "$d->{key}/$d->{version}" };
        my $file = $evt->{content};
        $file = \ $d->{file} if $d->{file};

        AC::Yenta::Store::store_put( $d->{map}, $d->{shard}, $d->{key}, $d->{version},
                                     $d->{value}, $file, $d->{meta} );

    }

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN

    # recover only once per dir
    my $recov = ( $conf->{recovery} && !$recovered{$dir} );
    $recovered{$dir} = 1 if $recov;

    if( $recov ){
        unlink $_ for glob "$dir/__*";
    }

    my $flags = $conf->{readonly} ? 0 : (DB_CREATE| DB_INIT_CDB | DB_INIT_MPOOL);

    debug("opening Berkeley dir=$dir, file=$file (recov $recov)");
    my $env = BerkeleyDB::Env->new(
        -Home       => $dir,
        -Flags      => $flags,
       );

    # microsecs
    $env->set_timeout($TIMEOUT * 1_000_000 / 2, DB_SET_LOCK_TIMEOUT) if $env;

    my $db = BerkeleyDB::Btree->new(
        -Filename   => $file,

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN

    }, $class;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    my $v;
    debug("get $map/$sub/$key");

    $me->_start();
    my $r = $me->{db}->db_get( _key($map,$sub,$key), $v );
    $me->_finish();

    return if $r; # not found

    if( wantarray ){
        return ($v, 1);
    }
    return $v;
}

sub put {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $val = shift;

    debug("put $map/$sub/$key");

    $me->_start();
    my $r = $me->{db}->db_put( _key($map,$sub,$key), $val);
    $me->_finish();

    return !$r;
}

sub del {
    my $me  = shift;

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN

    $me->_start();
    my $cursor = $me->{db}->db_cursor();
    $k = _key($map,$sub,$key);
    my $e = _key($map,$sub,$end);
    $cursor->c_get($k, $v, DB_SET_RANGE);

    my $MAX = 100;
    my $max = $MAX;

    while( !$end || ($k lt $e) ){
        debug("range $k");
        last unless $k =~ m|$map/$sub/|;
        $k =~ s|$map/$sub/||;
        push @k, { k => $k, v => $v };
        my $r = $cursor->c_get($k, $v, DB_NEXT);
        last if $r;	# error

        # cursor locks the db
        # close+recreate so other processes can proceed
        unless( $max -- ){
            $cursor->c_close();

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

        # we tune the distribution algorithm based on where it came from:
        faraway 	=> (my_datacenter() ne $sendat->{datacenter}),

        farseen		=> 0,
        nearseen	=> 0,
        farsend		=> [],
        nearsend	=> [],
        ordershift	=> 4,
    }, $class;

    debug("distributing $me->{info}");
    inc_stat( 'dist_requests' );
    inc_stat( 'dist_requests_faraway' ) if $me->{faraway};


    $me->_init_strategy($sender);

    # RSN - check load
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;
    if( @DIST < $max ){
        $me->_start_next();

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN


# periodically, go through and restart or expire
sub periodic {

    my @keep;
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;

    my $chance = (@DIST > $max) ? ($max / @DIST) : 1;

    for my $r (@DIST){
        # debug("periodic $r->{info}");
        next if $^T > $r->{req}{expire};

        if( (rand() <= $chance) && (AC::DC::IO->underway() <= 2 * $max) ){
            my $keep = $r->_start_next();
            push @keep, $r if $keep;
        }else{
            push @keep, $r;
        }
    }

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    return unless $d;

    # randomly pick one server in chosen dc
    my @id = grep {
        my $x = AC::Yenta::Status->peer($_);
        ($x->{status} == 200) ? 1 : 0;
    } @{$d->{id}};
    return unless @id;

    my $id = $id[ rand(@id) ];
    debug("sending $me->{info} to far site $id in $d->{dc}");
    $me->_start_peer( $id, 1 );
    inc_stat('dist_send_far');
    inc_stat('dist_send_total');
    return 1;
}

sub _start_near {
    my $me  = shift;

    my $id = shift @{ $me->{nearsend} };
    return unless $id;
    debug("sending $me->{info} to nearby site $id");
    $me->_start_peer( $id, 0 );
    inc_stat('dist_send_near');
    inc_stat('dist_send_total');
    return 1;
}

sub _start_next {
    my $me  = shift;

    my $sent;

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
                                                   $request . $ect,
                                                   info => "distrib $me->{info} to $id",
                                                  );

    if( $io ){
        $io->set_callback('load',  \&_onload,  $me, $id, $far);
        $io->set_callback('error', \&_onerror, $me, $id, $far);
        $io->start();
    }else{
        debug("start client failed");
    }
}

sub _onload {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");

    if( $evt->{data}{haveit} ){
        if( $far ){
            $me->{farseen}  ++;
            inc_stat('dist_send_far_seen');
        }else{
            $me->{nearseen} ++;
            inc_stat('dist_send_near_seen');
        }
    }

lib/AC/Yenta/Store/Expire.pm  view on Meta::CPAN

   );

sub periodic {

    my $maps = conf_value('map');
    for my $map (keys %$maps){
        my $cf = conf_map($map);
        next unless $cf->{expire};

        my $expire = timet_to_yenta_version($^T - $cf->{expire});
        debug("running expire from $expire");
        AC::Yenta::Store::store_expire( $map, $expire );
    }
}

################################################################


1;

lib/AC/Yenta/Store/File.pm  view on Meta::CPAN

    return if $name =~ m%(^\.\./)|(/\.\./)%;

    my $cf   = $me->{conf};
    my $base = $cf->{basedir};
    return 1 unless $base;

    # split name into dir / file
    my($dir, $file) = $name =~ m|(.*)/([^/]+)$|;

    # create directory
    debug("mkpath: $base/$dir");
    my $mask = umask 0;
    eval { mkpath("$base/$dir", undef, 0777); };
    umask $mask;

    # save file
    my $f;
    unless( open($f, "> $base/$name.tmp") ){
        problem("cannot save file '$base/$name.tmp': $!");
        return;
    }

    debug("saving file '$base/$name'");
    print $f $$cont;
    close $f;
    rename "$base/$name.tmp", "$base/$name";

    return 1;
}


1;

lib/AC/Yenta/Store/LevelDB.pm  view on Meta::CPAN

    my $class = shift;
    my $name  = shift;
    my $conf  = shift;

    my $file  = $conf->{dbfile};
    unless( $file ){
        problem("no dbfile specified for '$name'");
        return;
    }

    debug("opening LevelDB file=$file");
    my $db = $OPEN{$file} || Tie::LevelDB::DB->new( $file );
    $OPEN{$file} = $db;

    problem("cannot open db file $file") unless $db;

    # web server will need access
    chmod 0777, $file;

    return bless {
        file	=> $file,

lib/AC/Yenta/Store/LevelDB.pm  view on Meta::CPAN

    }, $class;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    my $v;
    debug("get $map/$sub/$key");
    my $v = $me->{db}->Get( _key($map,$sub,$key) );

    return unless $v; # not found

    if( wantarray ){
        return ($v, 1);
    }
    return $v;
}

sub put {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $val = shift;

    debug("put $map/$sub/$key");

    my $r = $me->{db}->Put( _key($map,$sub,$key), $val);
    return 1;
}

sub del {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

lib/AC/Yenta/Store/LevelDB.pm  view on Meta::CPAN


    my $k = _key($map,$sub,$key);
    my $r = $it->Seek($k);

    while( $it->Valid() ){
        my $k = $it->key();
        my $v = $it->value();

        last if $end && ($k ge $e);

        debug("range $k");
        last unless $k =~ m|$map/$sub/|;
        $k =~ s|$map/$sub/||;
        push @k, { k => $k, v => $v };
        $it->Next();
    }

    return @k;
}

################################################################

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

        my($ext) = $conf->{dbfile} =~ /\.(.+)$/;
        $bkend = $ext if $BACKEND{$ext};
    }

    my $c  = $BACKEND{$bkend || $DEFAULT};
    unless( $c ){
        problem("invalid storage backend: $bkend - ignoring map");
        return ;
    }

    debug("configuring map $name with $c");

    my $db = $c->new( $name, $conf );
    my $fs = AC::Yenta::Store::File->new( $name, $conf );

    my $me = bless {
        name		=> $name,
        conf		=> $conf,
        db		=> $db,
        fs		=> $fs,
        merkle_height	=> 16,

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN


sub get {
    my $me   = shift;
    my $key  = shift;
    my $ver  = shift;

    my $db = $me->{db};

    my @versions = $me->_versget( $key );
    return unless @versions;
    debug("found ver: @versions");

    if( $ver ){
        $ver = encode_version($ver);
        return unless grep { $_ eq $ver } @versions;
    }else{
        $ver = $versions[0];
    }

    my $vk = $me->vkey($key, $ver);
    my $extver = decode_version($ver);

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

    my $cf = $me->{conf};
    my $db = $me->{db};
    my $v  = encode_version($ver);

    # data belongs here?
    return if $me->is_sharded() && !$me->is_my_shard($shard);

    my @versions = $me->_versget( $key );

    if( $^T - $cacheT > 60 ){
        debug("cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
        $cacheT = $^T;
    }

    # I have it?
    return if grep { $_ eq $v } @versions;

    # expired?
    return if $cf->{expire} && ($ver < timet_to_yenta_version($^T - $cf->{expire}));

    # I want everything?

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

    my $ver   = shift;
    my $data  = shift;
    my $file  = shift;	# reference
    my $meta  = shift;

    my $cf = $me->{conf};
    my $db = $me->{db};
    my $v  = encode_version($ver);
    my $vk = $me->vkey($key, $v);

    debug("storing $vk");

    # get version history
    my @deletehist;
    my %deletedata;
    my @versions = $me->_versget( $key );

    return if grep { $_ eq $v } @versions;	# dupe!

    # is this the newest version? should we save this data?
    if( !@versions || ($v gt $versions[0]) || $cf->{keepold} ){

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

        my @rm = splice @versions, $cf->{history}, @versions, ();
        push @deletehist, (map { ({version => decode_version($_), key => $key, shard => $shard}) } @rm);
        $deletedata{$_} = 1 for @_;
    }
    if( $me->is_sharded() ){
        # QQQ - shard changed?
        $db->put($me->{name}, 'shard', $key, encode_shard($shard || 0));
    }

    my $dd = join(' ', map { $_->{version} } @deletehist);
    debug("version list: @versions [delete: $dd]");

    $me->_versput( $key, @versions );

    # update merkles
    $me->merkle( { shard => $shard, key => $key, version => $ver }, @deletehist);

    # delete old data
    for my $rm (keys %deletedata){
        debug("removing old version $key/$rm");
        my $rmvk = $me->vkey($key, $rm);
        $db->del($me->{name}, 'data', $rmvk);
        $db->del($me->{name}, 'meta', $rmvk);
    }

    $db->sync();

    return 1;
}

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

    my $me  = shift;
    my $key = shift;
    my $ver = shift;

    my $db = $me->{db};
    my $v  = encode_version($ver);

    my $cshard   = $db->get($me->{name}, 'shard', $key);
    my @versions = grep { $_ ne $v } $me->_versget( $key );

    debug("new ver list: @versions");

    if( @versions ){
        $me->_versput( $key, @versions );
    }else{
        $db->del($me->{name}, 'vers',  $key);
        $db->del($me->{name}, 'shard', $key);
        $me->_versdel( $key );
    }

    my $vk = $me->vkey($key, $ver);

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN


    $me->{db}->put($me->{name}, 'internal', $key, $val);
}

################################################################

sub expire {
    my $me     = shift;
    my $expire = shift;

    debug("expiring $me->{name}");

    my $db = $me->{db};

    # walk merkle tree, find all k/v to remove
    my @delete;

    my @walk = { level => 0, version => 0, shard => 0 };
    while(@walk){
        my @next;
        for my $node (@walk){

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

                }else{
                    push @next, $r;
                }
            }
        }
        @walk = @next;
    }

    # remove k/v
    for my $r (@delete){
        debug("expiring $r->{key}/$r->{version}");
        $me->_remove( $r->{key}, $r->{version} );
    }

    # update merkle
    $me->merkle(undef, @delete);

    $db->sync();
}

################################################################

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

sub get_merkle {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    return if $lev > $me->{merkle_height};

    my $db = $me->{db};
    my $mk = $me->_mkey(encode_shard($shard), encode_version($ver), $lev);
    debug("getting merkle for $mk");

    my $d = $me->_mcget( $mk );
    return unless $d;
    my @d = split /\0/, $d;

    my @res;

    if( $^T - $cacheT > 60 ){
        debug("merk cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
        $cacheT = $^T;
    }

    if( $lev == $me->{merkle_height} ){
        # data is: lkey, ...
        for my $r (@d){
            my($s,$v,$k) = $me->_decode_lkey($r);
            push @res, { version => decode_version($v), key => $k, count => 1, shard => decode_shard($s) };
        }
    }else{

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN


    my $db = $me->{db};

    my $k0 = $me->_mkey($shard, $ver, $lev);
    my $k1 = $me->_mkey($shard, $ver, $lev - 1);

    my(undef, $nextshard, $nextver) = $me->_decode_mkey($k1);

    unless( $lev ){
        # root hash - not used
        debug("updating merkle node root => $hash");
        $me->_mcput( 'root', $hash );
        return;
    }

    # get node
    my $d = $me->_mcget( $k1 );
    my $oldh = sha1_base64($d);
    my %d = split /\0/, $d;

    if($hash){
        # add/update
        debug("updating merkle node $k1 + { $k0 => $hash, $count }");
        $d{$k0} = "$hash $count";
    }else{
        # remove
        debug("updating merkle node $k1 - { $k0 => empty }");
        delete $d{$k0};
    }

    if( keys %d ){

        $d = join("\0", map {"$_\0$d{$_}"} (sort keys %d));
        $me->_mcput( $k1, $d );
        my $newh = sha1_base64($d);
        return if $newh eq $oldh;	# unchanged
        return ($nextshard, $nextver, $newh, scalar keys %d);

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

sub _merkle_leaf_add {
    my $me    = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;

    my $db = $me->{db};
    my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
    my $vk = $me->_lkey($key, $ver, $shard);

    debug("adding to merkle leaf $mk - $vk");

    # get current data
    my $d = $me->_mcget( $mk );
    my @d = split /\0/, $d;
    # append new item + uniqify
    my %d;
    @d{@d} = ();
    $d{$vk}   = undef;
    $d = join("\0", sort keys %d);
    $me->_mcput( $mk, $d );

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

sub _merkle_leaf_del {
    my $me    = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;

    my $db = $me->{db};
    my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
    my $vk = $me->_lkey($key, $ver, $shard);

    debug("removing from merkle leaf $mk - $vk");

    # get current data
    my $d = $me->_mcget( $mk );
    my @d = split /\0/, $d;
    # remove item
    @d = grep { $vk ne $_ } @d;

    if( @d ){
        $d = join("\0", @d);
        $me->_mcput( $mk, $d );

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;

    my $db = $me->{db};

    # get range on data
    my @key = map {
        my $k = $_->{k}; $k =~ s|.*/||; $k
    } $db->range($me->{name}, 'data', encode_version($ver), encode_version($ver + 1));
    debug("actual key: @key");

    return @key unless defined $shard;

    # get vers list to filter on shard
    my $sh = encode_shard($shard);
    return grep {
        my $k = $_;

        my $vl = $db->get($me->{name}, 'vers', $k);
        my($s) = $vl =~ /;\s*(.*)/;

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

        $s == $sh;
    } @key;
}

# check merkle leaf node against actual data
sub merkle_scrub {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;

    debug("scrub $me->{name} $shard/$ver");

    # get list of keys from merkle leaf node
    my $mlist = $me->get_merkle($shard, $ver, $me->{merkle_height}) || [];
    my @mkey  = map { $_->{key} } @$mlist;
    my %mkey;
    @mkey{@mkey} = @mkey;

    # get list of keys from actual data
    my @akey  = $me->_get_actual_keys( $shard, $ver );

    # compare lists

    for my $k (@akey){
        next if $mkey{$k};
        debug("missing key in merkle tree: $shard/$ver/$k");
        $me->merkle( { key => $k, shard => $shard, version => $ver } );
    }
}


################################################################

sub _mkey {
    my $me    = shift;
    my $shard = shift;



( run in 1.474 second using v1.01-cache-2.11-cpan-49f99fa48dc )