view release on metacpan or search on metacpan
eg/yenta.conf view on Meta::CPAN
savestatus /var/tmp/yenta.status yenta
allow 127.0.0.1
allow 10.200.2.0/23
# seed peers to locate the network at startup
seedpeer 10.200.2.4:3503
seedpeer 10.200.2.5:3503
# enable debugging?
#debug ae
#debug map
#debug merkle
# ...
# maps
map testyfoo {
# name of the data file
dbfile /home/data/testyfoo.ydb
# how much history to keep
history 4
}
eg/yenta_get view on Meta::CPAN
use AC::Dumper;
use strict;
my $map = shift @ARGV;
my $key = shift @ARGV;
die "usage: get [-h host] map key\n" unless $map && $key;
my $y = AC::Yenta::Client->new(
# server_file, servers[], or host + port
server_file => '/var/tmp/yenta.status',
debug => \&debug,
);
my $res = $y->get($map, $key);
print dumper($res), "\n";
exit;
sub debug {
print STDERR @_, "\n";
}
eg/yenta_put view on Meta::CPAN
# Function: put example
#
# $Id$
use AC::Yenta::Client;
use Time::HiRes 'time';
use JSON;
use strict;
my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });
my $key = 'YX3jSXD3CBRUDABm';
my $res = $ys->distribute(
# map, key, version, data
'mymap', $key, timet_to_yenta_version(time()),
encode_json( {
url_id => $key,
url => 'http://www.example.com',
use Getopt::Std;
use AC::Yenta::D;
use strict;
my @saved_argv = @ARGV;
my %OPT;
getopts('c:dfp:', \%OPT) || die "usage...\n";
# -c config file
# -d enable all debugging
# -f foreground
# -p port
my $y = AC::Yenta::D->new(
class_myself => 'My::Code::MySelf',
);
$y->daemon( $OPT{c}, {
argv => \@saved_argv,
foreground => $OPT{f},
debugall => $OPT{d},
port => $OPT{p},
} );
lib/AC/Yenta.pm view on Meta::CPAN
=head1 SYNOPSIS
use AC::Yenta::D;
use strict;
my $y = AC::Yenta::D->new( );
$y->daemon( $configfile, {
argv => \@ARGV,
foreground => $OPT{f},
debugall => $OPT{d},
port => $OPT{p},
} );
exit;
=head1 USAGE
Copy + Paste from the example code into your own code.
Copy + Paste from the example config into your own config.
lib/AC/Yenta.pm view on Meta::CPAN
yentas in different datacenters.
secret squeamish-ossifrage
=item syslog
specify a syslog facility for log messages.
syslog local5
=item debug
enable debugging for a particular section
debug map
=item map
configure a map (a collection of key-value data). you do not need
to configure the same set of maps on all servers. maps should be
configured similarly on all servers that they are on.
map users {
backend bdb
dbfile /home/acdata/users.ydb
lib/AC/Yenta/Client.pm view on Meta::CPAN
# one or more of:
# new( host, port )
# new( servers => [ { host, port }, ... ] )
# new( server_file )
sub new {
my $class = shift;
my $me = bless {
debug => sub{ },
host => 'localhost',
proto => AC::DC::Protocol->new(),
copies => 1,
@_,
}, $class;
$me->{server_file} ||= $me->{altfile}; # compat
die "servers or server_file?\n" unless $me->{servers} || $me->{server_file};
lib/AC/Yenta/Client.pm view on Meta::CPAN
sub _try_server {
my $me = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
my $file = shift; # reference
my $ipn = inet_aton($addr);
$req .= $$file if $file;
$me->{debug}->("trying to contact yenta server $addr:$port");
my $res;
eval {
$res = $me->{proto}->send_request($ipn, $port, $req, $me->{debug}, $me->{timeout});
$res->{data} = $me->{proto}->decode_reply( $res ) if $res;
};
if(my $e = $@){
$me->{debug}->("yenta request failed: $e");
$res = undef;
}
return $res;
}
################################################################
sub _next_host {
my $me = shift;
lib/AC/Yenta/Config.pm view on Meta::CPAN
use Socket;
use strict;
our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value conf_map);
my %CONFIG = (
include => \&AC::ConfigFile::Simple::include_file,
debug => \&AC::ConfigFile::Simple::parse_debug,
allow => \&AC::ConfigFile::Simple::parse_allow,
port => \&AC::ConfigFile::Simple::parse_keyvalue,
environment => \&AC::ConfigFile::Simple::parse_keyvalue,
secret => \&AC::ConfigFile::Simple::parse_keyvalue,
seedpeer => \&AC::ConfigFile::Simple::parse_keyarray,
ae_maxload => \&AC::ConfigFile::Simple::parse_keyvalue,
distrib_max => \&AC::ConfigFile::Simple::parse_keyvalue,
savestatus => \&parse_savefile,
lib/AC/Yenta/Crypto.pm view on Meta::CPAN
my $eb = ACPEncrypt->encode( {
algorithm => $ALGORITHM,
seqno => $seqno,
nonce => $nonce,
hmac => $hmac,
length => length($buf),
ciphertext => $ct,
} );
debug("encrypted <$seqno,$nonce,$hmac>");
return $eb;
}
sub decrypt {
my $me = shift;
my $buf = shift;
my $ed = ACPEncrypt->decode( $buf );
die "cannot decrypt: unknown alg\n" unless $ed->{algorithm} eq $ALGORITHM;
lib/AC/Yenta/Crypto.pm view on Meta::CPAN
my $key = $me->_key($seqno, $nonce);
my $iv = $me->_iv($key, $seqno, $nonce);
my $hmac = hmac_sha256_base64($ed->{ciphertext}, $key);
die "cannot decrypt: hmac mismatch\n" unless $hmac eq $ed->{hmac};
my $aes = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
$aes->set_iv( $iv );
my $pt = substr($aes->decrypt( $ed->{ciphertext} ), 0, $ed->{length});
debug("decrypted <$seqno,$nonce,$hmac>");
return $pt;
}
sub _key {
my $me = shift;
my $seqno = shift;
my $nonce = shift;
lib/AC/Yenta/D.pm view on Meta::CPAN
AC::Yenta::MySelf->customize( $p{class_myself} );
# ...
return bless \$class, $class;
}
sub daemon {
my $me = shift;
my $cfile = shift;
my $opt = shift; # foreground, debugall, persistent_id, argv
die "no config file specified\n" unless $cfile;
# configure
$AC::Yenta::CONF = AC::Yenta::Config->new(
$cfile, onreload => sub {
AC::Yenta::Store::configure();
});
initlog( 'yenta', (conf_value('syslog') || 'local5'), $opt->{debugall} );
AC::Yenta::Debug->init( $opt->{debugall}, $AC::Yenta::CONF);
daemonize(5, 'yentad', $opt->{argv}) unless $opt->{foreground};
verbose("starting.");
$SIG{CHLD} = $SIG{PIPE} = sub{}; # ignore
$SIG{INT} = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit; # abort
# initialize subsystems
my $port = $opt->{port} || conf_value('port');
lib/AC/Yenta/D.pm view on Meta::CPAN
# start "cronjobs"
AC::DC::Sched->new(
info => 'check config files',
freq => 30,
func => sub { $AC::Yenta::CONF->check() },
);
run_and_watch(
($opt->{foreground} || $opt->{debugall}),
\&AC::DC::IO::mainloop,
);
}
1;
lib/AC/Yenta/Debug.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-27 11:40 (EDT)
# Function: debugging + log msgs
#
# $Id$
package AC::Yenta::Debug;
use AC::DC::Debug;
our @ISA = 'AC::DC::Debug';
use strict;
1;
lib/AC/Yenta/Kibitz/Status.pm view on Meta::CPAN
return unless $gpb;
my $c;
eval {
$c = ACPYentaStatusReply->decode( $gpb );
};
if(my $e = $@){
problem("cannot decode status data: $e");
return;
}
debug("rcvd update");
my $myself = AC::Yenta::Status->my_server_id();
for my $up (@{$c->{status}}){
my $id = $up->{server_id};
next if $up->{via} eq $myself;
next if $id eq $myself;
AC::Yenta::Status->update($id, $up);
}
}
lib/AC/Yenta/Kibitz/Status/Client.pm view on Meta::CPAN
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid = $$;
sub new {
my $class = shift;
debug('starting kibitz status client');
my $me = $class->SUPER::new( @_ );
return unless $me;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
return $me;
}
lib/AC/Yenta/Kibitz/Status/Client.pm view on Meta::CPAN
unless( $me->{status_ok} ){
AC::Yenta::Kibitz::Status::isdown( $me->{status_peer} );
}
}
sub read {
my $me = shift;
my $evt = shift;
#debug("recvd reply");
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
$me->{status_ok} = 1;
AC::Yenta::Kibitz::Status::update( $data );
AC::Yenta::NetMon::update( $me );
$me->shut();
lib/AC/Yenta/Kibitz/Status/Server.pm view on Meta::CPAN
my $yp = AC::Yenta::Protocol->new();
my $hdr = $yp->encode_header(
type => 'yenta_status',
data_length => length($response),
content_length => 0,
msgid => $proto->{msgid},
is_reply => 1,
);
debug("sending status reply");
$io->write_and_shut( $hdr . $response );
}
1;
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $TIMEOUT = 5;
sub new {
my $class = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
debug('starting kibitz store client');
my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
return unless $me;
$me->{_req} = $req;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
return $me;
}
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
sub timeout {
my $me = shift;
$me->shut();
}
sub read {
my $me = shift;
my $evt = shift;
debug("recvd reply");
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
$me->timeout_rel($TIMEOUT) if $evt->{data} && !$proto;
return unless $proto;
$proto->{data} = $data;
$proto->{content} = $content;
eval {
my $yp = AC::Yenta::Protocol->new();
$proto->{data} = $yp->decode_reply($proto) if $data;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
if(my $e = $@){
problem("cannot decode request: $e");
$io->shut();
return;
}
# process requests
my @res;
my $rescont;
for my $r (@{ $req->{data} }){
debug("get request: $r->{map}, $r->{key}, $r->{version}");
my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
my $res = {
map => $r->{map},
key => $r->{key},
};
if( $meta && $file ){
unless( _check_content( $meta, $file ) ){
problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
# QQQ - remove from system, (and let AE get a new copy)?
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
$ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
my $response = $yp->encode_reply( {
type => 'yenta_get',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
content_encrypted => $proto->{data_encrypted},
}, { data => \@res }, \$ect );
debug("sending get reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response . $ect );
}
sub api_check {
my $io = shift;
my $proto = shift;
my $gpb = shift;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
my $req;
eval {
$req = ACPYentaCheckRequest->decode( $gpb );
};
if(my $e = $@){
problem("cannot decode request: $e");
$io->shut();
return;
}
debug("check request: $req->{map}, $req->{version}, $req->{level}");
my @res;
my @todo = { version => $req->{version}, shard => $req->{shard} };
# the top of the tree will be fairly sparse,
# return up to several levels if they are sparse
for my $l (0 .. 32){
my $cl = $req->{level} + $l;
my @lres;
my $nexttot;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_check',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { check => \@res } );
debug("sending check reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response );
}
# get + process merkle data
sub _get_check {
my $map = shift;
my $shard = shift;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
}
my $v = $req->{datum};
# do we already have ?
my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );
if( $want ){
# put
debug("put request from $io->{peerip}: $v->{map}, $v->{key}, $v->{version}");
# file content is passed by reference, to avoid large copies
$content ||= \ $v->{file} if $v->{file};
# check
if( $v->{meta} && $content ){
unless( _check_content( $v->{meta}, $content ) ){
problem("content SHA1 check failed: $req->{datum}{map}, $req->{datum}{key}, $req->{datum}{version}");
$io->shut();
return;
}
}
$want = store_put( $v->{map}, $v->{shard}, $v->{key}, $v->{version}, $v->{value},
$content, $v->{meta} );
# distribute to other systems
AC::Yenta::Store::Distrib->new( $req, $content ) if $want;
}else{
debug("put from $io->{peerip} unwanted: $v->{map}, $v->{shard}, $v->{key}, $v->{version}");
}
unless( $proto->{want_reply} ){
$io->shut();
return;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { status_code => 200, status_message => 'OK', haveit => !$want } );
debug("sending distrib reply");
$io->timeout_rel($TIMEOUT);
$io->write_and_shut( $response );
}
sub _check_content {
my $meta = shift;
my $cont = shift;
return 1 unless $meta && $meta =~ /^\{/;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
msgid => $proto->{msgid},
is_reply => 1,
is_error => 1,
data_encrypted => $proto->{data_encrypted},
}, {
status_code => $code,
status_message => $msg,
haveit => 0,
} );
debug("sending distrib reply");
$io->write_and_shut( $response );
}
1;
lib/AC/Yenta/Monitor.pm view on Meta::CPAN
# clean up old data
for my $id (keys %MON){
isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
}
# start monitoring (send heartbeat request)
for my $m (@$mon){
my $ip = $m->{ipa};
my $port = $m->{port};
my $id = "$ip:$port";
debug("start monitoring $id");
my $ok = AC::Yenta::Monitor::Client->new( $ip, $port,
info => "monitor client: $id",
monitor_peer => $id,
);
isdown($id, 0) unless $ok;
}
}
lib/AC/Yenta/Monitor.pm view on Meta::CPAN
my $d = $MON{$id};
return unless $d;
# require 2 polls to fail
return unless $^T - $d->{lastup} >= 2 * $FREQ;
$code = 0 if $code == 200;
$d->{status_code} = $code || 0;
$d->{timestamp} = $^T;
debug("monitor $id is down");
if( $d->{lastup} < $^T - $OLD_KEEP ){
debug("monitor $id down too long. removing from report");
delete $MON{$id};
}
}
sub update {
my $id = shift;
my $gb = shift; # ACPHeartbeat
my $up;
eval {
lib/AC/Yenta/Monitor.pm view on Meta::CPAN
};
if(my $e = $@){
problem("cannot decode hb data: $e");
return isdown($id, 0);
}
unless( $up->{status_code} == 200 ){
return isdown($id, $up->{status_code});
}
return isdown($id, 0) unless $^T - $up->{timestamp} < $OLD_DOWN;
debug("monitor $id is up");
$up->{lastup} = $^T;
$up->{downcount} = 0;
_hb_ip_info( $up, $MON{$id} );
$MON{$id} = $up;
}
sub _hb_ip_info {
my $up = shift;
my $old = shift;
lib/AC/Yenta/Monitor/Client.pm view on Meta::CPAN
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid = $$;
sub new {
my $class = shift;
debug('starting monitor status client');
my $me = $class->SUPER::new( @_ );
unless($me){
return;
}
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
$me->start();
lib/AC/Yenta/Monitor/Client.pm view on Meta::CPAN
unless( $me->{status_ok} ){
AC::Yenta::Monitor::isdown( $me->{monitor_peer} );
}
}
sub read {
my $me = shift;
my $evt = shift;
debug("recvd reply");
my $yp = AC::Yenta::Protocol->new();
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
$me->{status_ok} = 1;
AC::Yenta::Monitor::update( $me->{monitor_peer}, $data );
$me->shut();
}
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
if( $p->{content_encrypted} && $content ){
$content = $me->_decrypt_data( $io, $auth, $content );
return unless $content;
}
# content is passed as reference
return ($p, $data, ($content ? \$content : undef));
}
# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
my $io = shift;
my $evt = shift;
return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
my($get, $url, $http) = split /\s+/, $io->{rbuffer};
return ( { type => 'http', method => $get }, $url );
}
lib/AC/Yenta/Server.pm view on Meta::CPAN
$me->init($fd);
$me->wantread(1);
$me->timeout_rel($TIMEOUT);
$me->set_callback('read', \&read);
$me->set_callback('timeout', \&timeout);
}
sub timeout {
my $me = shift;
debug("connection timed out");
$me->shut();
}
sub read {
my $me = shift;
my $evt = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
# dispatch request
my $h = $HANDLER{ $proto->{type} };
unless( $h ){
verbose("unknown message type: $proto->{type}");
$me->shut();
return;
}
debug("handling request - $proto->{type}");
if( ref $h ){
$h->( $me, $proto, $data, $content );
}else{
$h->handler( $me, $proto, $data, $content );
}
}
1;
lib/AC/Yenta/Stats.pm view on Meta::CPAN
$STATS{$stat} ++;
}
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $url = shift;
debug("http request $url");
$url =~ s|^/||;
$url =~ s/%(..)/chr(hex($1))/eg;
my $f = $HANDLER{$url};
$f = \&http_data if $url =~ m|^data/|;
$f = \&http_file if $url =~ m|^file/|;
$f ||= \&http_notfound;
my( $content, $code, $text ) = $f->($url);
$code ||= 200;
$text ||= 'OK';
lib/AC/Yenta/Status.pm view on Meta::CPAN
next if $p->{status} == 200 && $p->{timestamp} > $^T - $KEEPLOST;
_maybe_remove( $id );
}
# randomly pick a peer
my($id, $ip, $port) = _random_peer();
return unless $id;
# start a client
debug("starting status kibitz client to $id");
my $c = AC::Yenta::Kibitz::Status::Client->new( $ip, $port,
info => "status client: $id",
status_peer => $id,
);
return __PACKAGE__->isdown($id) unless $c;
$c->start();
}
lib/AC/Yenta/Status.pm view on Meta::CPAN
# then something local
@peer = @local unless @peer;
# last resort
@peer = @all unless @peer;
# sometimes use the seed, in case there was a network split
if( @peer && int(rand(@all+1)) ){
my $p = $peer[ rand(@peer) ];
debug("using peer $p->{server_id}");
return ($p->{server_id}, $p->{ip}, undef);
}
# seed peer
my $seed = conf_value('seedpeer');
my $p = $seed->[ rand(@$seed) ];
my ($ip, $port) = split /:/, $p;
$port ||= my_port();
# don't talk to self. any of my addrs.
lib/AC/Yenta/Status.pm view on Meta::CPAN
for my $type (@$types){
push @peer, server_list($type);
for my $m (@mon){
push @peer, $m if $m->{subsystem} eq $type;
}
}
next unless @peer;
debug("saving peer status file");
unless( open(FILE, ">$file.tmp") ){
problem("cannot open save file '$file.tmp': $!");
return;
}
for my $pd (@peer){
# only save best addr in save file
my($ip, $port) = AC::Yenta::IO::TCP::Client->use_addr_port( $pd->{ip} );
my $data = {
lib/AC/Yenta/Status.pm view on Meta::CPAN
if( ($^T - $d->{lastup} > $KEEPDOWN) || ($^T - $d->{timestamp} > $KEEPLOST) ){
_remove($id);
}
}
sub isdown {
my $class = shift;
my $id = shift;
debug("marking peer '$id' as down");
if( ! $DATA->{allpeer}{$id} ){
return unless $DATA->{sceptical}{$id};
# we know it is down, and want to kibbitz this fact
$DATA->{allpeer}{$id} = $DATA->{sceptical}{$id};
}
delete $DATA->{sceptical}{$id};
if( $DATA->{allpeer}{$id} ){
lib/AC/Yenta/Status.pm view on Meta::CPAN
my $io = shift;
return unless $class->_env_ok($id, $up);
if( $DATA->{allpeer}{$id} ){
# already known
delete $DATA->{sceptical}{$id};
return;
}
debug("rcvd update (sceptical) about $id from $io->{peerip}");
# only accept updates from the server itself
# no 3rd party updates. no misconfigured serevrs.
problem("server misconfigured $id != $io->{peerip}")
unless grep { inet_atoi($io->{peerip}) == $_->{ipv4} } @{$up->{ip}};
$up->{id} = $id;
delete $up->{lastup};
$DATA->{sceptical}{$id} = $up;
}
lib/AC/Yenta/Status.pm view on Meta::CPAN
$up->{id} = $id;
my $previnfo = $DATA->{allpeer}{$id};
verbose("discovered new peer: $id ($up->{hostname})") unless $previnfo;
# only keep it if it is newer than what we have
return if $previnfo && $up->{timestamp} <= $previnfo->{timestamp};
$up->{path} .= ' ' . my_server_id();
# debug("updating $id => $up->{status} => " . dumper($up));
debug("updating $id => $up->{status}");
$DATA->{allpeer}{$id} = $up;
if( $up->{status} != 200 ){
_maybe_remove( $id );
return ;
}
# update datacenter info
unless( $DATA->{datacenter}{$up->{datacenter}}{$id} ){
lib/AC/Yenta/Status.pm view on Meta::CPAN
return if "@curmap" eq "@newmap"; # unchanged
# what do we need to add/remove
my (%remove, %add);
@remove{@curmap} = @curmap;
@add{@newmap} = @newmap;
delete $remove{$_} for @newmap;
delete $add{$_} for @curmap;
for my $map (keys %remove){
debug("removing $map from $id");
delete $DATA->{mappeer}{$map}{$id};
}
for my $map (keys %add){
debug("adding $map to $id");
$DATA->{mappeer}{$map}{$id} = $id;
}
$DATA->{peermap}{$id} = \@newmap;
}
lib/AC/Yenta/Store.pm view on Meta::CPAN
my %STORE;
# create maps from config
sub configure {
my $maps = $AC::Yenta::CONF->{config}{map};
my %remove = %STORE;
for my $map (keys %{$maps}){
debug("configuring map $map");
my $conf = $maps->{$map};
my $sharded = $conf->{sharded};
my $c = $sharded ? 'AC::Yenta::Store::Sharded' : 'AC::Yenta::Store::Map';
my $be = $conf->{backend};
my $m = $c->new( $map, $be, { %$conf, recovery => 1 } );
$STORE{$map} = $m;
delete $remove{$map};
}
for my $map (keys %remove){
debug("removing unused map '$map'");
delete $STORE{$map};
}
}
sub store_get {
my $map = shift;
my $key = shift;
my $ver = shift;
my $m = $STORE{$map};
lib/AC/Yenta/Store.pm view on Meta::CPAN
my $shard = shift;
my $key = shift;
my $ver = shift;
my $data = shift;
my $file = shift; # reference
my $meta = shift;
my $m = $STORE{$map};
return unless $m;
debug("storing $map/$key/$ver");
$m->put($shard, $key, $ver, $data, $file, $meta);
}
# NB: only removes local copy temporarily. will be replaced at next AE run
sub store_remove {
my $map = shift;
my $key = shift;
my $ver = shift;
my $m = $STORE{$map};
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my $me = bless {
badnode => [ {version => 0, shard => 0, level => 0} ],
cache => {},
kvneed => [],
kvneedorig => [],
kvfetching => 0,
missing => 0,
}, $class;
debug("new ae");
$me->_pick_map() || return;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
$me->_init_peer() || return;
debug("checking $me->{map} with $me->{peer}{id}");
inc_stat('ae_runs');
$me->_next_step();
push @AE, $me;
return $me;
}
sub periodic {
# kill dead sessions, start new ones
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my $peer = $peer[ rand(@peer) ];
return $peer;
}
################################################################
sub _finished {
my $me = shift;
debug("finished $me->{map}");
$DONE{$me->{map}} = $^T if $me->{missing} < $MAXMISSING;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
@AE = grep{ $_ != $me } @AE;
}
sub _next_step {
my $me = shift;
$me->{timestamp} = $^T;
if( $me->{kvfetching} < $MAXFETCH ){
# any missing data?
if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
debug("starting nextgetkv ($me->{kvfetching})");
$me->_next_get_kv();
}
}
# check nodes?
if( @{$me->{badnode}} ){
$me->_start_check();
return;
}
$me->_finished();
}
################################################################
sub _start_check {
my $me = shift;
my $node = shift @{$me->{badnode}};
debug("checking next node: $me->{map} $node->{level}/$node->{version}");
inc_stat('ae_check_node');
my $enc = use_encryption($me->{peer});
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $request = $proto->encode_request( {
type => 'yenta_check',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
}
sub _check_load {
my $io = shift;
my $evt = shift;
my $me = shift;
debug("check results");
$evt->{data} ||= {};
# determine highest level returned
my @keydata;
my @nodedata;
my $maxlev = 0;
for my $d ( @{ $evt->{data}{check} }){
debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");
if( $d->{key} ){
push @keydata, $d;
next;
}
next if $d->{level} < $maxlev;
if( $d->{level} > $maxlev ){
@nodedata = ();
$maxlev = $d->{level};
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my %vscnt;
my %vsadd;
for my $d (@$chk){
inc_stat('ae_check_key');
my $vsk = "$d->{version} $d->{shard}";
$vscnt{ $vsk } ++;
next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );
debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
inc_stat('ae_key_missing');
$me->{missing} ++;
$vsadd{ $vsk } ++;
}
}
sub _is_expired {
my $me = shift;
my $map = shift;
my $lev = shift;
my $ver = shift;
return unless $me->{expire};
my $vmx = AC::Yenta::Store::store_version_max( $map, $ver, $lev );
return unless defined $vmx;
if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
debug("skipping expired $lev/$ver - $vmx");
return 1;
}
return;
}
sub _check_result_nodes {
my $me = shift;
my $lev = shift;
my $chk = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
# get all of our merkle data for these versions
my %merkle;
my $t_new = timet_to_yenta_version($^T - $TOONEW);
for my $d (values %ver){
next if $d->{ver} > $t_new; # too new, ignore
next if $me->_is_expired($me->{map}, $lev, $d->{ver});
# RSN - skip unwanted shards
my $ms = AC::Yenta::Store::store_get_merkle($me->{map}, $d->{shard}, $d->{ver}, $lev - 1);
for my $m (@$ms){
# debug("my hash $me->{map} $m->{level}/$m->{shard}/$m->{version} => $m->{hash}");
$merkle{"$m->{version} $m->{shard}"} = $m->{hash};
}
}
# compare (don't bother with things that are too new (the data may still be en route))
for my $d (@$chk){
next if $d->{version} > $t_new; # too new, ignore
next if $me->_is_expired($me->{map}, $lev, $d->{version});
# RSN - skip unwanted shards
my $hash = $merkle{"$d->{version} $d->{shard}"};
if( $d->{hash} eq $hash ){
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} => match");
next;
}else{
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} != $hash");
}
# stick them at the front
unshift @{$me->{badnode}}, { version => $d->{version}, shard => $d->{shard}, level => $lev };
}
}
################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
return $me->_start_get_kv_any() if @{$me->{kvneed}};
}
sub _start_get_kv_any {
my $me = shift;
my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();
# pick a peer
my $peer = $me->_pick_peer();
debug("getting kv data from peer $peer->{id}");
$me->_start_get_kv( $peer, 1, \@get);
}
sub _start_get_kv_orig {
my $me = shift;
my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();
debug("getting kv data from current peer");
$me->_start_get_kv( $me->{peer}, 0, \@get);
}
sub _start_get_kv {
my $me = shift;
my $peer = shift;
my $retry = shift;
my $get = shift;
# insert map into request
$_->{map} = $me->{map} for @$get;
# for (@$get){ debug("requesting $_->{key}/$_->{version}") }
my $enc = use_encryption($peer);
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
# build request
my $request = $proto->encode_request( {
type => 'yenta_get',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
data => $get,
} );
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
info => "AE getkv from $peer->{id}" );
if( $io ){
$me->{kvfetching} ++;
$io->set_callback('load', \&_getkv_load, $me, $retry, $get);
$io->set_callback('error', \&_getkv_error, $me, $retry, $get);
$io->start();
}
}
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
sub _getkv_load {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
$me->{kvfetching} --;
$evt->{data} ||= {};
debug("got kv data results");
my %need = map {
( "$_->{key}/$_->{version}" => $_ )
} @$get;
for my $d ( @{$evt->{data}{data}}){
debug("got $d->{map}/$d->{key}/$d->{version}");
next unless $d->{key} && $d->{version}; # not found
delete $need{ "$d->{key}/$d->{version}" };
my $file = $evt->{content};
$file = \ $d->{file} if $d->{file};
AC::Yenta::Store::store_put( $d->{map}, $d->{shard}, $d->{key}, $d->{version},
$d->{value}, $file, $d->{meta} );
}
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
# recover only once per dir
my $recov = ( $conf->{recovery} && !$recovered{$dir} );
$recovered{$dir} = 1 if $recov;
if( $recov ){
unlink $_ for glob "$dir/__*";
}
my $flags = $conf->{readonly} ? 0 : (DB_CREATE| DB_INIT_CDB | DB_INIT_MPOOL);
debug("opening Berkeley dir=$dir, file=$file (recov $recov)");
my $env = BerkeleyDB::Env->new(
-Home => $dir,
-Flags => $flags,
);
# microsecs
$env->set_timeout($TIMEOUT * 1_000_000 / 2, DB_SET_LOCK_TIMEOUT) if $env;
my $db = BerkeleyDB::Btree->new(
-Filename => $file,
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
}, $class;
}
sub get {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $v;
debug("get $map/$sub/$key");
$me->_start();
my $r = $me->{db}->db_get( _key($map,$sub,$key), $v );
$me->_finish();
return if $r; # not found
if( wantarray ){
return ($v, 1);
}
return $v;
}
sub put {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $val = shift;
debug("put $map/$sub/$key");
$me->_start();
my $r = $me->{db}->db_put( _key($map,$sub,$key), $val);
$me->_finish();
return !$r;
}
sub del {
my $me = shift;
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
$me->_start();
my $cursor = $me->{db}->db_cursor();
$k = _key($map,$sub,$key);
my $e = _key($map,$sub,$end);
$cursor->c_get($k, $v, DB_SET_RANGE);
my $MAX = 100;
my $max = $MAX;
while( !$end || ($k lt $e) ){
debug("range $k");
last unless $k =~ m|$map/$sub/|;
$k =~ s|$map/$sub/||;
push @k, { k => $k, v => $v };
my $r = $cursor->c_get($k, $v, DB_NEXT);
last if $r; # error
# cursor locks the db
# close+recreate so other processes can proceed
unless( $max -- ){
$cursor->c_close();
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
# we tune the distribution algorithm based on where it came from:
faraway => (my_datacenter() ne $sendat->{datacenter}),
farseen => 0,
nearseen => 0,
farsend => [],
nearsend => [],
ordershift => 4,
}, $class;
debug("distributing $me->{info}");
inc_stat( 'dist_requests' );
inc_stat( 'dist_requests_faraway' ) if $me->{faraway};
$me->_init_strategy($sender);
# RSN - check load
my $max = conf_value('distrib_max') || $MAXUNDERWAY;
if( @DIST < $max ){
$me->_start_next();
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
# periodically, go through and restart or expire
sub periodic {
my @keep;
my $max = conf_value('distrib_max') || $MAXUNDERWAY;
my $chance = (@DIST > $max) ? ($max / @DIST) : 1;
for my $r (@DIST){
# debug("periodic $r->{info}");
next if $^T > $r->{req}{expire};
if( (rand() <= $chance) && (AC::DC::IO->underway() <= 2 * $max) ){
my $keep = $r->_start_next();
push @keep, $r if $keep;
}else{
push @keep, $r;
}
}
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
return unless $d;
# randomly pick one server in chosen dc
my @id = grep {
my $x = AC::Yenta::Status->peer($_);
($x->{status} == 200) ? 1 : 0;
} @{$d->{id}};
return unless @id;
my $id = $id[ rand(@id) ];
debug("sending $me->{info} to far site $id in $d->{dc}");
$me->_start_peer( $id, 1 );
inc_stat('dist_send_far');
inc_stat('dist_send_total');
return 1;
}
sub _start_near {
my $me = shift;
my $id = shift @{ $me->{nearsend} };
return unless $id;
debug("sending $me->{info} to nearby site $id");
$me->_start_peer( $id, 0 );
inc_stat('dist_send_near');
inc_stat('dist_send_total');
return 1;
}
sub _start_next {
my $me = shift;
my $sent;
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
$request . $ect,
info => "distrib $me->{info} to $id",
);
if( $io ){
$io->set_callback('load', \&_onload, $me, $id, $far);
$io->set_callback('error', \&_onerror, $me, $id, $far);
$io->start();
}else{
debug("start client failed");
}
}
sub _onload {
my $io = shift;
my $evt = shift;
my $me = shift;
my $id = shift;
my $far = shift;
debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");
if( $evt->{data}{haveit} ){
if( $far ){
$me->{farseen} ++;
inc_stat('dist_send_far_seen');
}else{
$me->{nearseen} ++;
inc_stat('dist_send_near_seen');
}
}
lib/AC/Yenta/Store/Expire.pm view on Meta::CPAN
);
sub periodic {
my $maps = conf_value('map');
for my $map (keys %$maps){
my $cf = conf_map($map);
next unless $cf->{expire};
my $expire = timet_to_yenta_version($^T - $cf->{expire});
debug("running expire from $expire");
AC::Yenta::Store::store_expire( $map, $expire );
}
}
################################################################
1;
lib/AC/Yenta/Store/File.pm view on Meta::CPAN
return if $name =~ m%(^\.\./)|(/\.\./)%;
my $cf = $me->{conf};
my $base = $cf->{basedir};
return 1 unless $base;
# split name into dir / file
my($dir, $file) = $name =~ m|(.*)/([^/]+)$|;
# create directory
debug("mkpath: $base/$dir");
my $mask = umask 0;
eval { mkpath("$base/$dir", undef, 0777); };
umask $mask;
# save file
my $f;
unless( open($f, "> $base/$name.tmp") ){
problem("cannot save file '$base/$name.tmp': $!");
return;
}
debug("saving file '$base/$name'");
print $f $$cont;
close $f;
rename "$base/$name.tmp", "$base/$name";
return 1;
}
1;
lib/AC/Yenta/Store/LevelDB.pm view on Meta::CPAN
my $class = shift;
my $name = shift;
my $conf = shift;
my $file = $conf->{dbfile};
unless( $file ){
problem("no dbfile specified for '$name'");
return;
}
debug("opening LevelDB file=$file");
my $db = $OPEN{$file} || Tie::LevelDB::DB->new( $file );
$OPEN{$file} = $db;
problem("cannot open db file $file") unless $db;
# web server will need access
chmod 0777, $file;
return bless {
file => $file,
lib/AC/Yenta/Store/LevelDB.pm view on Meta::CPAN
}, $class;
}
sub get {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $v;
debug("get $map/$sub/$key");
my $v = $me->{db}->Get( _key($map,$sub,$key) );
return unless $v; # not found
if( wantarray ){
return ($v, 1);
}
return $v;
}
sub put {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $val = shift;
debug("put $map/$sub/$key");
my $r = $me->{db}->Put( _key($map,$sub,$key), $val);
return 1;
}
sub del {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
lib/AC/Yenta/Store/LevelDB.pm view on Meta::CPAN
my $k = _key($map,$sub,$key);
my $r = $it->Seek($k);
while( $it->Valid() ){
my $k = $it->key();
my $v = $it->value();
last if $end && ($k ge $e);
debug("range $k");
last unless $k =~ m|$map/$sub/|;
$k =~ s|$map/$sub/||;
push @k, { k => $k, v => $v };
$it->Next();
}
return @k;
}
################################################################
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
my($ext) = $conf->{dbfile} =~ /\.(.+)$/;
$bkend = $ext if $BACKEND{$ext};
}
my $c = $BACKEND{$bkend || $DEFAULT};
unless( $c ){
problem("invalid storage backend: $bkend - ignoring map");
return ;
}
debug("configuring map $name with $c");
my $db = $c->new( $name, $conf );
my $fs = AC::Yenta::Store::File->new( $name, $conf );
my $me = bless {
name => $name,
conf => $conf,
db => $db,
fs => $fs,
merkle_height => 16,
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
sub get {
my $me = shift;
my $key = shift;
my $ver = shift;
my $db = $me->{db};
my @versions = $me->_versget( $key );
return unless @versions;
debug("found ver: @versions");
if( $ver ){
$ver = encode_version($ver);
return unless grep { $_ eq $ver } @versions;
}else{
$ver = $versions[0];
}
my $vk = $me->vkey($key, $ver);
my $extver = decode_version($ver);
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
my $cf = $me->{conf};
my $db = $me->{db};
my $v = encode_version($ver);
# data belongs here?
return if $me->is_sharded() && !$me->is_my_shard($shard);
my @versions = $me->_versget( $key );
if( $^T - $cacheT > 60 ){
debug("cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
$cacheT = $^T;
}
# I have it?
return if grep { $_ eq $v } @versions;
# expired?
return if $cf->{expire} && ($ver < timet_to_yenta_version($^T - $cf->{expire}));
# I want everything?
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
my $ver = shift;
my $data = shift;
my $file = shift; # reference
my $meta = shift;
my $cf = $me->{conf};
my $db = $me->{db};
my $v = encode_version($ver);
my $vk = $me->vkey($key, $v);
debug("storing $vk");
# get version history
my @deletehist;
my %deletedata;
my @versions = $me->_versget( $key );
return if grep { $_ eq $v } @versions; # dupe!
# is this the newest version? should we save this data?
if( !@versions || ($v gt $versions[0]) || $cf->{keepold} ){
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
my @rm = splice @versions, $cf->{history}, @versions, ();
push @deletehist, (map { ({version => decode_version($_), key => $key, shard => $shard}) } @rm);
$deletedata{$_} = 1 for @_;
}
if( $me->is_sharded() ){
# QQQ - shard changed?
$db->put($me->{name}, 'shard', $key, encode_shard($shard || 0));
}
my $dd = join(' ', map { $_->{version} } @deletehist);
debug("version list: @versions [delete: $dd]");
$me->_versput( $key, @versions );
# update merkles
$me->merkle( { shard => $shard, key => $key, version => $ver }, @deletehist);
# delete old data
for my $rm (keys %deletedata){
debug("removing old version $key/$rm");
my $rmvk = $me->vkey($key, $rm);
$db->del($me->{name}, 'data', $rmvk);
$db->del($me->{name}, 'meta', $rmvk);
}
$db->sync();
return 1;
}
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
my $me = shift;
my $key = shift;
my $ver = shift;
my $db = $me->{db};
my $v = encode_version($ver);
my $cshard = $db->get($me->{name}, 'shard', $key);
my @versions = grep { $_ ne $v } $me->_versget( $key );
debug("new ver list: @versions");
if( @versions ){
$me->_versput( $key, @versions );
}else{
$db->del($me->{name}, 'vers', $key);
$db->del($me->{name}, 'shard', $key);
$me->_versdel( $key );
}
my $vk = $me->vkey($key, $ver);
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
$me->{db}->put($me->{name}, 'internal', $key, $val);
}
################################################################
sub expire {
my $me = shift;
my $expire = shift;
debug("expiring $me->{name}");
my $db = $me->{db};
# walk merkle tree, find all k/v to remove
my @delete;
my @walk = { level => 0, version => 0, shard => 0 };
while(@walk){
my @next;
for my $node (@walk){
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
}else{
push @next, $r;
}
}
}
@walk = @next;
}
# remove k/v
for my $r (@delete){
debug("expiring $r->{key}/$r->{version}");
$me->_remove( $r->{key}, $r->{version} );
}
# update merkle
$me->merkle(undef, @delete);
$db->sync();
}
################################################################
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
sub get_merkle {
my $me = shift;
my $shard = shift;
my $ver = shift;
my $lev = shift;
return if $lev > $me->{merkle_height};
my $db = $me->{db};
my $mk = $me->_mkey(encode_shard($shard), encode_version($ver), $lev);
debug("getting merkle for $mk");
my $d = $me->_mcget( $mk );
return unless $d;
my @d = split /\0/, $d;
my @res;
if( $^T - $cacheT > 60 ){
debug("merk cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
$cacheT = $^T;
}
if( $lev == $me->{merkle_height} ){
# data is: lkey, ...
for my $r (@d){
my($s,$v,$k) = $me->_decode_lkey($r);
push @res, { version => decode_version($v), key => $k, count => 1, shard => decode_shard($s) };
}
}else{
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
my $db = $me->{db};
my $k0 = $me->_mkey($shard, $ver, $lev);
my $k1 = $me->_mkey($shard, $ver, $lev - 1);
my(undef, $nextshard, $nextver) = $me->_decode_mkey($k1);
unless( $lev ){
# root hash - not used
debug("updating merkle node root => $hash");
$me->_mcput( 'root', $hash );
return;
}
# get node
my $d = $me->_mcget( $k1 );
my $oldh = sha1_base64($d);
my %d = split /\0/, $d;
if($hash){
# add/update
debug("updating merkle node $k1 + { $k0 => $hash, $count }");
$d{$k0} = "$hash $count";
}else{
# remove
debug("updating merkle node $k1 - { $k0 => empty }");
delete $d{$k0};
}
if( keys %d ){
$d = join("\0", map {"$_\0$d{$_}"} (sort keys %d));
$me->_mcput( $k1, $d );
my $newh = sha1_base64($d);
return if $newh eq $oldh; # unchanged
return ($nextshard, $nextver, $newh, scalar keys %d);
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
sub _merkle_leaf_add {
my $me = shift;
my $shard = shift;
my $key = shift;
my $ver = shift;
my $db = $me->{db};
my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
my $vk = $me->_lkey($key, $ver, $shard);
debug("adding to merkle leaf $mk - $vk");
# get current data
my $d = $me->_mcget( $mk );
my @d = split /\0/, $d;
# append new item + uniqify
my %d;
@d{@d} = ();
$d{$vk} = undef;
$d = join("\0", sort keys %d);
$me->_mcput( $mk, $d );
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
sub _merkle_leaf_del {
my $me = shift;
my $shard = shift;
my $key = shift;
my $ver = shift;
my $db = $me->{db};
my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
my $vk = $me->_lkey($key, $ver, $shard);
debug("removing from merkle leaf $mk - $vk");
# get current data
my $d = $me->_mcget( $mk );
my @d = split /\0/, $d;
# remove item
@d = grep { $vk ne $_ } @d;
if( @d ){
$d = join("\0", @d);
$me->_mcput( $mk, $d );
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
my $me = shift;
my $shard = shift;
my $ver = shift;
my $db = $me->{db};
# get range on data
my @key = map {
my $k = $_->{k}; $k =~ s|.*/||; $k
} $db->range($me->{name}, 'data', encode_version($ver), encode_version($ver + 1));
debug("actual key: @key");
return @key unless defined $shard;
# get vers list to filter on shard
my $sh = encode_shard($shard);
return grep {
my $k = $_;
my $vl = $db->get($me->{name}, 'vers', $k);
my($s) = $vl =~ /;\s*(.*)/;
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
$s == $sh;
} @key;
}
# check merkle leaf node against actual data
sub merkle_scrub {
my $me = shift;
my $shard = shift;
my $ver = shift;
debug("scrub $me->{name} $shard/$ver");
# get list of keys from merkle leaf node
my $mlist = $me->get_merkle($shard, $ver, $me->{merkle_height}) || [];
my @mkey = map { $_->{key} } @$mlist;
my %mkey;
@mkey{@mkey} = @mkey;
# get list of keys from actual data
my @akey = $me->_get_actual_keys( $shard, $ver );
# compare lists
for my $k (@akey){
next if $mkey{$k};
debug("missing key in merkle tree: $shard/$ver/$k");
$me->merkle( { key => $k, shard => $shard, version => $ver } );
}
}
################################################################
sub _mkey {
my $me = shift;
my $shard = shift;