AC-Yenta

 view release on metacpan or  search on metacpan

META.yml  view on Meta::CPAN

--- #YAML:1.0
name:               AC-Yenta
version:            1.1
abstract:           eventually-consistent distributed key/value data store. et al.
author:
    - AdCopy <http://www.adcopy.com>
license:            perl
distribution_type:  module
configure_requires:
    ExtUtils::MakeMaker:  0
requires:
    AC::DC:               0
    BerkeleyDB:           0
    Crypt::Rijndael:      0
    Digest::SHA:          0
    Google::ProtocolBuffers:  0
    JSON:                 0
    POSIX:                0
    Sys::Hostname:        0
    Time::HiRes:          0
no_index:

eg/yenta.conf  view on Meta::CPAN

# example yenta config
#
# file will be reloaded automagically if it changes. no need to hup or restart.


port            3503

environment	prod

# save peer status in a file?
savestatus      /var/tmp/yenta.status           yenta

allow		127.0.0.1

eg/yenta_get  view on Meta::CPAN

my $map = shift @ARGV;
my $key = shift @ARGV;
die "usage: get [-h host] map key\n" unless $map && $key;

my $y   = AC::Yenta::Client->new(
    # server_file, servers[], or host + port
    server_file	=> '/var/tmp/yenta.status',
    debug 	=> \&debug,
   );

my $res = $y->get($map, $key);

print dumper($res), "\n";


exit;

sub debug {
    print STDERR @_, "\n";
}

eg/yenta_put  view on Meta::CPAN

use Time::HiRes 'time';
use JSON;
use strict;


my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });


my $key = 'YX3jSXD3CBRUDABm';

my $res = $ys->distribute(
    # map, key, version, data
    'mymap', $key, timet_to_yenta_version(time()),
    encode_json( {
        url_id	=> $key,
        url	=> 'http://www.example.com',
        acc_id	=> 'C9TdSgbUCBRUCABG',
        format	=> 'html',
    }),
   );

lib/AC/Yenta.pm  view on Meta::CPAN

=head2 Kibitzing

Each yenta kibitzes (gossips) with the other yentas in the network
to exchange status information, distribute key-value data, and
detect and correct inconsistent data.

=head2 Eventual Consistency

Key-value data is versioned with timestamps. By default, newest wins.
Maps can be configured to keep and return multiple versions and client
code can use other conflict resolution mechanisms.

Lost, missing or otherwise inconsistent data is detected
by kibitzing merkle tree hash values.

=head2 Topological awareness

Yentas can take network topology into account when tranferring
data around to minimize long-distance transfers. You will need to
write a custom C<MySelf> class with a C<my_datacenter> function.

=head2 Multiple Network Interfaces / NAT

Yentas can take advantage of multiple network interfaces with
different IP addresses (eg. a private internal network + a public network),
or multiple addresses (eg. a private addresses and a public address)
and various NAT configurations.

You will need to write a custom C<MySelf> class and C<my_network_info>
function.

=head2 Network Information

By default, yentas obtain their primary IP address by calling
C<gethostbyname( hostname() )>. If this either does not work on your
systems, or isn't the value you want to use,
you will need to write a custom C<MySelf> class and C<my_network_info>
function.



=head1 CONFIG FILE

various parameters need to be specified in a config file.

lib/AC/Yenta/Client.pm  view on Meta::CPAN


require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';

our @EXPORT = 'timet_to_yenta_version';	# imported from Y/Conf

my $HOSTNAME = hostname();

my %MSGTYPE =
 (
  yenta_get		=> { num => 7, reqc => 'ACPYentaGetSet',        resc => 'ACPYentaGetSet' },
  yenta_distrib		=> { num => 8, reqc => 'ACPYentaDistRequest',   resc => 'ACPYentaDistReply' },
  yenta_check		=> { num => 9, reqc => 'ACPYentaCheckRequest',  resc => 'ACPYentaCheckReply' },
 );

for my $name (keys %MSGTYPE){
    my $r = $MSGTYPE{$name};
    AC::DC::Protocol->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


# one or more of:
#   new( host, port )
#   new( servers => [ { host, port }, ... ] )
#   new( server_file )

sub new {
    my $class = shift;

lib/AC/Yenta/Client.pm  view on Meta::CPAN

            map		=> $map,
            key		=> $key,
            version	=> $ver,
            shard	=> _shard($key),	# NYI
            value	=> $val,
            meta	=> $meta,
        } ]
    }, $file );

    return $me->_send_request($map, $req, $file);
    # return undef | result
}

sub check {
    my $me  = shift;
    my $map = shift;
    my $ver = shift;
    my $lev = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_check',

lib/AC/Yenta/Client.pm  view on Meta::CPAN


    my $tries = $me->{retries} + 1;
    my $copy  = $me->{copies} || 1;
    my $delay = 0.25;

    $me->_init_hostlist($map);
    my ($addr, $port) = $me->_next_host($map);

    for (1 .. $tries){
        return unless $addr;
        my $res = $me->_try_server($addr, $port, $req, $file);
        return $res if $res && !--$copy;
        ($addr, $port) = $me->_next_host($map);
        sleep $delay;
        $delay *= 1.414;
    }
}

sub _try_server {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;
    my $file = shift;	# reference

    my $ipn = inet_aton($addr);
    $req .= $$file if $file;

    $me->{debug}->("trying to contact yenta server $addr:$port");
    my $res;
    eval {
        $res = $me->{proto}->send_request($ipn, $port, $req, $me->{debug}, $me->{timeout});
        $res->{data} = $me->{proto}->decode_reply( $res ) if $res;
    };
    if(my $e = $@){
        $me->{debug}->("yenta request failed: $e");
        $res = undef;
    }
    return $res;
}


################################################################

sub _next_host {
    my $me  = shift;
    my $map = shift;

    $me->_read_serverfile($map) unless $me->{_server};

lib/AC/Yenta/Config.pm  view on Meta::CPAN

);

my @MAP = qw(dbfile basedir keepold history expire backend sharded);


################################################################

sub handle_config {
    my $me   = shift;
    my $key  = shift;
    my $rest = shift;

    my $fnc = $CONFIG{$key};
    return unless $fnc;
    $fnc->($me, $key, $rest);
    return 1;
}

################################################################

sub parse_map {
    my $me    = shift;
    my $key   = shift;
    my $value = shift;

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

################################################################

sub myself {

    # tell server about ourself
    return ACPYentaStatusRequest->encode({
        myself => _myself(),
    });
}

sub response {

    # send client everything we know
    my @peer = AC::Yenta::Status->allpeers();
    push @peer, _myself();
    # add the items we monitor
    push @peer, AC::Yenta::Monitor::export();

    return ACPYentaStatusReply->encode({
        status	=> \@peer,
    });

lib/AC/Yenta/Kibitz/Status/Server.pm  view on Meta::CPAN

        AC::Yenta::Kibitz::Status::update_sceptical( $gpb, $io );
    }

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    AC::Yenta::NetMon::update( $io );

    # respond with all known peers
    my $response = AC::Yenta::Kibitz::Status::response();
    my $yp  = AC::Yenta::Protocol->new();

    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($response),
        content_length  => 0,
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
       );

    debug("sending status reply");
    $io->write_and_shut( $hdr . $response );
}

1;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    eval {
        $req = ACPYentaGetSet->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    # process requests
    my @res;
    my $rescont;
    for my $r (@{ $req->{data} }){
        debug("get request: $r->{map}, $r->{key}, $r->{version}");
        my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
        my $res = {
            map		=> $r->{map},
            key		=> $r->{key},
        };

        if( $meta && $file ){
            unless( _check_content( $meta, $file ) ){
                problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
                # QQQ - remove from system, (and let AE get a new copy)?
                store_remove($r->{map}, $r->{key}, $ver);
                # tell caller it was not found
                $ver = undef;
            }
        }

        if( defined $ver ){
            $res->{version} = $ver;
            $res->{value}   = $data;
            $res->{meta}    = $meta  if defined $meta;
            if( $file ){
                # if one file to send, send it as content
                if( @{$req->{data}} == 1 ){
                    $rescont = $file;
                }else{
                    $res->{file} = $$file;
                }
            }
        }
        push @res, $res;
    }

    # encode results
    my $ect = '';
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
    my $response = $yp->encode_reply( {
        type		  => 'yenta_get',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        content_encrypted => $proto->{data_encrypted},
    }, { data => \@res }, \$ect );

    debug("sending get reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response . $ect );

}

sub api_check {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# not used

    unless( $proto->{want_reply} ){

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        $req = ACPYentaCheckRequest->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    debug("check request: $req->{map}, $req->{version}, $req->{level}");

    my @res;
    my @todo = { version => $req->{version}, shard => $req->{shard} };

    # the top of the tree will be fairly sparse,
    # return up to several levels if they are sparse
    for my $l (0 .. 32){
        my $cl = $req->{level} + $l;
        my @lres;
        my $nexttot;
        for my $do (@todo){
            my @r = _get_check( $req->{map}, $do->{shard}, $do->{version}, $cl );
            push @lres, @r;
            for (@r){
                $nexttot += $_->{key} ? ($_->{count} / 8) : $_->{count};
            }
        }
        push @res, @lres;
        @todo = ();
        last unless @lres;				# reached the bottom of the tree
        last if @res > 64;				# got enough results
        last if (@lres > 2) && ($nexttot > @lres + 2);	# less sparse region
        # get the next level also
        @todo = @lres;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		  => 'yenta_check',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        }, { check => \@res } );

    debug("sending check reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response );

}

# get + process merkle data
sub _get_check {
    my $map   = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    my $res = store_get_merkle($map, $shard, $ver, $lev);
    return unless $res;
    for my $r (@$res) {
        $r->{map}   = $map;
    }

    return @$res;
}

sub api_distrib {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# reference

    # decode request
    my $req;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    }else{
        debug("put from $io->{peerip} unwanted: $v->{map}, $v->{shard}, $v->{key}, $v->{version}");
    }


    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, { status_code => 200, status_message => 'OK', haveit => !$want } );

    debug("sending distrib reply");
    $io->timeout_rel($TIMEOUT);
    $io->write_and_shut( $response );

}

sub _check_content {
    my $meta = shift;
    my $cont = shift;

    return 1 unless $meta && $meta =~ /^\{/;

    eval {

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    return 1;
}

sub _reply_error {
    my $io    = shift;
    my $proto = shift;
    my $code  = shift;
    my $msg   = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        is_error	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, {
        status_code     => $code,
        status_message  => $msg,
        haveit		=> 0,
    } );

    debug("sending distrib reply");
    $io->write_and_shut( $response );
}

1;

lib/AC/Yenta/MySelf.pm  view on Meta::CPAN

        return [
            # use this IP for communication with servers this datacenter (same natdom)
            { ip => $privat_ip, natdom => my_datacenter() },
            # otherwise use this IP
            { ip => $public_ip },
        ]
    }

=head2 init

inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.

    my $HOSTNAME;
    my $DOMAIN;
    sub init {
        $HOSTNAME = hostname();
        ($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
    }

=head1 BUGS

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';


our @ISA    = 'AC::DC::Protocol';
our @EXPORT = qw(read_protocol use_encryption);
my $HDRSIZE = __PACKAGE__->header_size();

my %MSGTYPE =
 (
  heartbeat_request	=> { num => 2, reqc => '', 			resc => 'ACPHeartBeat' },

  yenta_status		=> { num => 6, reqc => 'ACPYentaStatusRequest', resc => 'ACPYentaStatusReply' },
  yenta_get		=> { num => 7, reqc => 'ACPYentaGetSet',        resc => 'ACPYentaGetSet' },
  yenta_distrib		=> { num => 8, reqc => 'ACPYentaDistRequest',   resc => 'ACPYentaDistReply' },
  yenta_check		=> { num => 9, reqc => 'ACPYentaCheckRequest',  resc => 'ACPYentaCheckReply' },
 );


for my $name (keys %MSGTYPE){
    my $r = $MSGTYPE{$name};
    __PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


sub read_protocol {
    my $me  = shift;
    my $io  = shift;
    my $evt = shift;

    $io->{rbuffer} .= $evt->{data};

lib/AC/Yenta/Stats.pm  view on Meta::CPAN

    $url =~ s/%(..)/chr(hex($1))/eg;

    my $f = $HANDLER{$url};
    $f = \&http_data if $url =~ m|^data/|;
    $f = \&http_file if $url =~ m|^file/|;
    $f ||= \&http_notfound;
    my( $content, $code, $text ) = $f->($url);
    $code ||= 200;
    $text ||= 'OK';

    my $res = "HTTP/1.0 $code $text\r\n"
      . "Server: AC/Yenta\r\n"
      . "Connection: close\r\n"
      . "Content-Type: text/plain; charset=UTF-8\r\n"
      . "Content-Length: " . length($content) . "\r\n"
      . "\r\n"
      . $content ;

    $io->write($res);
    $io->set_callback('write_buffer_empty', \&_done );
}

sub _done {
    my $io = shift;
    $io->shut();
}

################################################################

lib/AC/Yenta/Stats.pm  view on Meta::CPAN

}

sub http_status {
    my $status = AC::Yenta::NetMon::status_dom('public');
    return "status: OK\n\n" if $status == 200;
    return("status: PROBLEM\n\n", 500, "Problem");
}

sub http_stats {

    my $res;
    for my $k (sort keys %STATS){
        $res .= sprintf("%-24s%s\n", "$k:", $STATS{$k});
    }

    my @peers = AC::Yenta::Status->allpeers();
    $res .= sprintf("%-24s%s\n", "peers:", scalar @peers);
    $res .= "\n";
    return $res;
}

sub http_data {
    my $url = shift;

    my(undef, $map, $key, $ver) = split m|/|, $url;
    my($data, $version, $file, $meta) = store_get($map, $key, $ver);

    return http_notfound($url) unless $version;
    return $data;

lib/AC/Yenta/Status.pm  view on Meta::CPAN


    # then (maybe) something about to expire
    @peer = @old  unless @peer || int rand(5);

    # then (maybe) something far away
    @peer = @away unless @peer || int rand(5);

    # then something local
    @peer = @local unless @peer;

    # last resort
    @peer = @all unless @peer;

    # sometimes use the seed, in case there was a network split
    if( @peer && int(rand(@all+1)) ){
        my $p = $peer[ rand(@peer) ];
        debug("using peer $p->{server_id}");
        return ($p->{server_id}, $p->{ip}, undef);
    }

    # seed peer

lib/AC/Yenta/Status.pm  view on Meta::CPAN

            problem("cannot rename save file '$file': $!");
        }

    }
}

################################################################
# diagnostic reports
sub report {

    my $res;

    for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
        my $id = sprintf '%-28s', $v->{server_id};
        my $metric = int( $v->{sort_metric} );
        $res .= "$id $v->{hostname}\t$v->{datacenter}\t$v->{subsystem}\t$v->{environment}\t$v->{status}\t$metric\n";
    }

    return $res;
}

sub report_long {

    my $res;

    for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
        $res .= dumper( $v ) . "\n\n";
    }
    return $res;
}
################################################################

sub my_port { $PORT }


sub my_instance_id {
    my $class = shift;
    return my_server_id() . sprintf('/%04x', $$);
}

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    $DATA->{sceptical}{$id} = $up;
}

sub update {
    my $class = shift;
    my $id    = shift;	# -> server_id
    my $up    = shift;

    return unless $class->_env_ok($id, $up);

    # only keep it if it is relatively fresh, and valid
    return unless $up->{timestamp} > $^T - $KEEPLOST;
    return unless $up->{status};

    delete $DATA->{sceptical}{$id};

    $up->{id} = $id;
    my $previnfo = $DATA->{allpeer}{$id};
    verbose("discovered new peer: $id ($up->{hostname})") unless $previnfo;

    # only keep it if it is newer than what we have

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

        if( int(rand(8)) ){
            @peer = @{$me->{peers_near}};
        }else{
            @peer = @{$me->{peers_far}};
        }
    }elsif( $me->{peers_near} ){
        @peer = @{$me->{peers_near}};
    }elsif( $me->{peers_far} ){
        @peer = @{$me->{peers_far}};
    }elsif( $me->{peers_ood} ){
        # only use out-of-date peers as a last resort
        # NB: if we never used ood peers, we'd have a bootstrap deadlock
        @peer = @{$me->{peers_ood}};
    }

    return unless @peer;
    my $peer = $peer[ rand(@peer) ];

    return $peer;
}

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    }

}

sub _check_load {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;


    debug("check results");
    $evt->{data} ||= {};

    # determine highest level returned

    my @keydata;
    my @nodedata;
    my $maxlev = 0;

    for my $d ( @{ $evt->{data}{check} }){
        debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");

        if( $d->{key} ){
            push @keydata, $d;
            next;
        }
        next if $d->{level} < $maxlev;

        if( $d->{level} > $maxlev ){
            @nodedata = ();
            $maxlev   = $d->{level};
        }
        push @nodedata, $d;
    }

    if( @keydata ){
        $me->_check_result_keys( \@keydata );
    }elsif( @nodedata ){
        $me->_check_result_nodes( $maxlev, \@nodedata );
    }

    $me->_next_step();
}

sub _check_error {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
    $me->_next_step();
}

sub _check_result_keys {
    my $me  = shift;
    my $chk = shift;

    my %vscnt;
    my %vsadd;

    for my $d (@$chk){
        inc_stat('ae_check_key');
        my $vsk = "$d->{version} $d->{shard}";
        $vscnt{ $vsk } ++;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    return unless defined $vmx;

    if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
        debug("skipping expired $lev/$ver - $vmx");
        return 1;
    }

    return;
}

sub _check_result_nodes {
    my $me  = shift;
    my $lev = shift;
    my $chk = shift;

    # determine all of the base versions of the recvd data
    my %ver;
    for my $d (@$chk){
        my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
        $ver{"$ver $shard"} = { ver => $ver, shard => $shard };
    }

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

sub _getkv_load {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

    $me->{kvfetching} --;
    $evt->{data} ||= {};

    debug("got kv data results");

    my %need = map {
        ( "$_->{key}/$_->{version}" => $_ )
    } @$get;

    for my $d ( @{$evt->{data}{data}}){
        debug("got $d->{map}/$d->{key}/$d->{version}");
        next unless $d->{key} && $d->{version};		# not found

        delete $need{ "$d->{key}/$d->{version}" };

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    # RSN - check load
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;
    if( @DIST < $max ){
        $me->_start_next();
    }
    push @DIST, $me;

    return $me;
}

# periodically, go through and restart or expire
sub periodic {

    my @keep;
    my $max = conf_value('distrib_max') || $MAXUNDERWAY;

    my $chance = (@DIST > $max) ? ($max / @DIST) : 1;

    for my $r (@DIST){
        # debug("periodic $r->{info}");
        next if $^T > $r->{req}{expire};

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN


    my $db = $me->{db};

    # walk merkle tree, find all k/v to remove
    my @delete;

    my @walk = { level => 0, version => 0, shard => 0 };
    while(@walk){
        my @next;
        for my $node (@walk){
            my $res = $me->get_merkle( $node->{shard}, $node->{version}, $node->{level} );

            for my $r (@$res){
                next if $r->{version} > $expire;
                if( $r->{key} ){
                    push @delete, { key => $r->{key}, version => $r->{version}, shard => $r->{shard} };
                }else{
                    push @next, $r;
                }
            }
        }
        @walk = @next;
    }

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

1;

=head1 NAME

AC::Yenta::Store::Map - persistent storage for yenta maps

=head1 SYNOPSIS

  your code:

    AC::Yenta::Store::Map->add_backend( postgres => 'Local::Yenta::Postgres' );

  your config:

    map mappyfoo {
        backend     postgres
        # ...
    }

=cut

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

    return if $lev > $me->{merkle_height};

    my $db = $me->{db};
    my $mk = $me->_mkey(encode_shard($shard), encode_version($ver), $lev);
    debug("getting merkle for $mk");

    my $d = $me->_mcget( $mk );
    return unless $d;
    my @d = split /\0/, $d;

    my @res;

    if( $^T - $cacheT > 60 ){
        debug("merk cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
        $cacheT = $^T;
    }

    if( $lev == $me->{merkle_height} ){
        # data is: lkey, ...
        for my $r (@d){
            my($s,$v,$k) = $me->_decode_lkey($r);
            push @res, { version => decode_version($v), key => $k, count => 1, shard => decode_shard($s) };
        }
    }else{
        # data is: mkey => hash count, ...
        my %d = @d;
        for my $lv (keys %d){
            my($l, $s, $v) = $me->_decode_mkey( $lv );
            my($h,$c)  = split /\s/, $d{$lv};
            push @res, { version => decode_version($v), level => hex($l), hash => $h, count => $c, shard => decode_shard($s) };
        }
    }

    return \@res;
}

################################################################
# we maintain a 16-ary merkle tree of all of the <key,version>s we have stored

sub merkle {
    my $me    = shift;
    my $add   = shift;
    my @del   = @_;



( run in 1.135 second using v1.01-cache-2.11-cpan-49f99fa48dc )