view release on metacpan or search on metacpan
eg/yenta_put
MANIFEST
proto/yenta_status.proto
proto/std_reply.proto
proto/yenta_check.proto
proto/std_ipport.proto
proto/yenta_getset.proto
proto/heartbeat.proto
proto/auth.proto
Makefile.PL
META.yml Module meta-data (added by MakeMaker)
--- #YAML:1.0
name: AC-Yenta
version: 1.1
abstract: eventually-consistent distributed key/value data store. et al.
author:
- AdCopy <http://www.adcopy.com>
license: perl
distribution_type: module
configure_requires:
ExtUtils::MakeMaker: 0
requires:
AC::DC: 0
BerkeleyDB: 0
Crypt::Rijndael: 0
eg/yenta.conf view on Meta::CPAN
# enable debugging?
#debug ae
#debug map
#debug merkle
# ...
# maps
map testyfoo {
# name of the data file
dbfile /home/data/testyfoo.ydb
# how much history to keep
history 4
}
eg/yenta_put view on Meta::CPAN
use JSON;
use strict;
my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });
my $key = 'YX3jSXD3CBRUDABm';
my $res = $ys->distribute(
# map, key, version, data
'mymap', $key, timet_to_yenta_version(time()),
encode_json( {
url_id => $key,
url => 'http://www.example.com',
acc_id => 'C9TdSgbUCBRUCABG',
format => 'html',
}),
);
lib/AC/Yenta.pm view on Meta::CPAN
#
# $Id$
package AC::Yenta;
use strict;
our $VERSION = 1.1;
=head1 NAME
AC::Yenta - eventually-consistent distributed key/value data store. et al.
=head1 SYNOPSIS
use AC::Yenta::D;
use strict;
my $y = AC::Yenta::D->new( );
$y->daemon( $configfile, {
argv => \@ARGV,
lib/AC/Yenta.pm view on Meta::CPAN
=head1 DESCRIPTION
=head2 Peers
All of the running yentas are peers. There is no master server.
New nodes can be added or removed on the fly with no configuration.
=head2 Kibitzing
Each yenta kibitzes (gossips) with the other yentas in the network
to exchange status information, distribute key-value data, and
detect and correct inconsistent data.
=head2 Eventual Consistency
Key-value data is versioned with timestamps. By default, newest wins.
Maps can be configured to keep and return multiple versions and client
code can use other conflict resolution mechanisms.
Lost, missing or otherwise inconsistent data is detected
by kibitzing merkle tree hash values.
=head2 Topological awareness
Yentas can take network topology into account when tranferring
data around to minimize long-distance transfers. You will need to
write a custom C<MySelf> class with a C<my_datacenter> function.
=head2 Multiple Network Interfaces / NAT
Yentas can take advantage of multiple network interfaces with
different IP addresses (eg. a private internal network + a public network),
or multiple addresses (eg. a private addresses and a public address)
and various NAT configurations.
You will need to write a custom C<MySelf> class and C<my_network_info>
function.
lib/AC/Yenta.pm view on Meta::CPAN
=item seedpeer
specify initial peers to contact when starting. the author generally
specifies 2 on the east coast, and 2 on the west coast.
seedpeer 192.168.10.11:3503
seedpeer 192.168.10.12:3503
=item secret
specify a secret key used to encrypt data transfered between
yentas in different datacenters.
secret squeamish-ossifrage
=item syslog
specify a syslog facility for log messages.
syslog local5
=item debug
enable debugging for a particular section
debug map
=item map
configure a map (a collection of key-value data). you do not need
to configure the same set of maps on all servers. maps should be
configured similarly on all servers that they are on.
map users {
backend bdb
dbfile /home/acdata/users.ydb
history 4
}
=back
=head1 BUGS
Too many to list here.
=head1 SEE ALSO
lib/AC/Yenta/AC/MySelf.pm view on Meta::CPAN
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 17:23 (EST)
# Function:
#
# $Id$
package AC::Yenta::AC::MySelf;
use AC::Yenta::Config;
use AC::Yenta::Debug;
use AC::DataCenter; # provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;
my $SERVERID;
sub init {
my $class = shift;
my $port = shift; # not used
my $id = shift;
lib/AC/Yenta/Client.pm view on Meta::CPAN
my $me = shift;
my $map = shift;
my $key = shift;
my $ver = shift;
my $req = $me->{proto}->encode_request( {
type => 'yenta_get',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
data => [ {
map => $map,
key => $key,
version => $ver,
} ]
} );
return $me->_send_request($map, $req);
}
sub _shard {
lib/AC/Yenta/Client.pm view on Meta::CPAN
my $req = shift;
my $file = shift; # reference
my $ipn = inet_aton($addr);
$req .= $$file if $file;
$me->{debug}->("trying to contact yenta server $addr:$port");
my $res;
eval {
$res = $me->{proto}->send_request($ipn, $port, $req, $me->{debug}, $me->{timeout});
$res->{data} = $me->{proto}->decode_reply( $res ) if $res;
};
if(my $e = $@){
$me->{debug}->("yenta request failed: $e");
$res = undef;
}
return $res;
}
################################################################
lib/AC/Yenta/Client.pm view on Meta::CPAN
my $me = shift;
my $map = shift;
my $f;
my @server;
my @faraway;
open($f, $me->{server_file});
local $/ = "\n";
while(<$f>){
chop;
my $data = decode_json( $_ );
next unless grep { $_ eq $map } @{ $data->{map} };
if( $data->{is_local} ){
push @server, { addr => $data->{addr}, port => $data->{port} };
}else{
push @faraway, { addr => $data->{addr}, port => $data->{port} };
}
}
# prefer local
@server = @faraway unless @server;
shuffle( \@server );
push @{$me->{_server}}, @server;
}
lib/AC/Yenta/Default/MySelf.pm view on Meta::CPAN
}
sub my_server_id {
return $SERVERID;
}
sub my_network_info {
return [ { ipa => $MYIP } ];
}
sub my_datacenter {
return 'default';
}
1;
lib/AC/Yenta/Direct.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-07 18:09 (EDT)
# Function: direct access (read-only) to yenta data file
#
# $Id$
package AC::Yenta::Direct;
use AC::Yenta::Store::Map;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
use AC::Yenta::Store::Tokyo;
use AC::Misc;
use AC::Import;
lib/AC/Yenta/Kibitz/Status.pm view on Meta::CPAN
push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} }
if $st == 200;
}else{
push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} };
$status = 500 unless $st == 200;
}
}
return {
hostname => $HOSTNAME,
datacenter => my_datacenter(),
subsystem => 'yenta',
environment => conf_value('environment'),
via => AC::Yenta::Status->my_server_id(),
server_id => AC::Yenta::Status->my_server_id(),
instance_id => AC::Yenta::Status->my_instance_id(),
path => '.',
status => $status,
uptodate => AC::Yenta::Store::AE->up_to_date(),
timestamp => $^T,
lastup => $^T,
lib/AC/Yenta/Kibitz/Status.pm view on Meta::CPAN
my $gpb = shift;
my $io = shift;
return unless $gpb;
my $c;
eval {
$c = ACPYentaStatusRequest->decode( $gpb );
$c = $c->{myself};
};
if(my $e = $@){
problem("cannot decode status data: $e");
return;
}
my $id = $c->{server_id};
# don't track myself
return if AC::Yenta::Status->my_server_id() eq $id;
AC::Yenta::Status->update_sceptical($id, $c, $io);
}
sub update {
my $gpb = shift;
return unless $gpb;
my $c;
eval {
$c = ACPYentaStatusReply->decode( $gpb );
};
if(my $e = $@){
problem("cannot decode status data: $e");
return;
}
debug("rcvd update");
my $myself = AC::Yenta::Status->my_server_id();
for my $up (@{$c->{status}}){
my $id = $up->{server_id};
next if $up->{via} eq $myself;
next if $id eq $myself;
lib/AC/Yenta/Kibitz/Status/Client.pm view on Meta::CPAN
sub start {
my $me = shift;
$me->SUPER::start();
# build request
my $yp = AC::Yenta::Protocol->new();
my $pb = AC::Yenta::Kibitz::Status::myself();
my $hdr = $yp->encode_header(
type => 'yenta_status',
data_length => length($pb),
content_length => 0,
want_reply => 1,
msgid => $msgid++,
);
# write request
$me->write( $hdr . $pb );
$me->timeout_rel($TIMEOUT);
return $me;
}
lib/AC/Yenta/Kibitz/Status/Client.pm view on Meta::CPAN
}
}
sub read {
my $me = shift;
my $evt = shift;
#debug("recvd reply");
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
$me->{status_ok} = 1;
AC::Yenta::Kibitz::Status::update( $data );
AC::Yenta::NetMon::update( $me );
$me->shut();
}
1;
lib/AC/Yenta/Kibitz/Status/Server.pm view on Meta::CPAN
}
AC::Yenta::NetMon::update( $io );
# respond with all known peers
my $response = AC::Yenta::Kibitz::Status::response();
my $yp = AC::Yenta::Protocol->new();
my $hdr = $yp->encode_header(
type => 'yenta_status',
data_length => length($response),
content_length => 0,
msgid => $proto->{msgid},
is_reply => 1,
);
debug("sending status reply");
$io->write_and_shut( $hdr . $response );
}
1;
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
$me->shut();
}
sub read {
my $me = shift;
my $evt = shift;
debug("recvd reply");
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
$me->timeout_rel($TIMEOUT) if $evt->{data} && !$proto;
return unless $proto;
$proto->{data} = $data;
$proto->{content} = $content;
eval {
my $yp = AC::Yenta::Protocol->new();
$proto->{data} = $yp->decode_reply($proto) if $data;
};
if(my $e = $@){
problem("cannot decode reply: $e");
}
# process
$me->{_store_ok} = 1;
if( $proto->{is_error} || $@ ){
my $e = $@ || 'remote error';
$me->run_callback('error', {
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
};
if(my $e = $@){
problem("cannot decode request: $e");
$io->shut();
return;
}
# process requests
my @res;
my $rescont;
for my $r (@{ $req->{data} }){
debug("get request: $r->{map}, $r->{key}, $r->{version}");
my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
my $res = {
map => $r->{map},
key => $r->{key},
};
if( $meta && $file ){
unless( _check_content( $meta, $file ) ){
problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
# QQQ - remove from system, (and let AE get a new copy)?
store_remove($r->{map}, $r->{key}, $ver);
# tell caller it was not found
$ver = undef;
}
}
if( defined $ver ){
$res->{version} = $ver;
$res->{value} = $data;
$res->{meta} = $meta if defined $meta;
if( $file ){
# if one file to send, send it as content
if( @{$req->{data}} == 1 ){
$rescont = $file;
}else{
$res->{file} = $$file;
}
}
}
push @res, $res;
}
# encode results
my $ect = '';
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
$ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
my $response = $yp->encode_reply( {
type => 'yenta_get',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
content_encrypted => $proto->{data_encrypted},
}, { data => \@res }, \$ect );
debug("sending get reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response . $ect );
}
sub api_check {
my $io = shift;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
# get the next level also
@todo = @lres;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_check',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { check => \@res } );
debug("sending check reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response );
}
# get + process merkle data
sub _get_check {
my $map = shift;
my $shard = shift;
my $ver = shift;
my $lev = shift;
my $res = store_get_merkle($map, $shard, $ver, $lev);
return unless $res;
for my $r (@$res) {
$r->{map} = $map;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
my $gpb = shift;
my $content = shift; # reference
# decode request
my $req;
eval {
$req = ACPYentaDistRequest->decode( $gpb );
die "invalid k/v for put request\n" unless $req->{datum}{key} && $req->{datum}{version};
};
if(my $e = $@){
my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
problem("cannot decode request: peer: $io->{peerip} $enc, $e");
$io->shut();
return;
}
unless( conf_map( $req->{datum}{map} ) ){
problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
_reply_error($io, $proto, 404, 'Map Not Found');
return;
}
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
$io->shut();
return;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { status_code => 200, status_message => 'OK', haveit => !$want } );
debug("sending distrib reply");
$io->timeout_rel($TIMEOUT);
$io->write_and_shut( $response );
}
sub _check_content {
my $meta = shift;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
my $proto = shift;
my $code = shift;
my $msg = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
is_error => 1,
data_encrypted => $proto->{data_encrypted},
}, {
status_code => $code,
status_message => $msg,
haveit => 0,
} );
debug("sending distrib reply");
$io->write_and_shut( $response );
}
lib/AC/Yenta/Monitor.pm view on Meta::CPAN
info => 'monitor',
freq => $FREQ,
func => \&periodic,
);
}
sub periodic {
my $mon = conf_value('monitor');
# clean up old data
for my $id (keys %MON){
isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
}
# start monitoring (send heartbeat request)
for my $m (@$mon){
my $ip = $m->{ipa};
my $port = $m->{port};
my $id = "$ip:$port";
debug("start monitoring $id");
my $ok = AC::Yenta::Monitor::Client->new( $ip, $port,
info => "monitor client: $id",
monitor_peer => $id,
);
isdown($id, 0) unless $ok;
}
}
# data for kibitzing around
# array of ACPYentaStatus
sub export {
my @d;
my $here = my_datacenter();
my $self = my_server_id();
for my $v (values %MON){
push @d, {
id => $v->{id}, # from config, typically localhost:port
datacenter => $here,
via => $self,
hostname => $v->{hostname},
subsystem => $v->{subsystem},
environment => $v->{environment},
status => $v->{status_code},
timestamp => $v->{timestamp},
lastup => $v->{lastup},
sort_metric => $v->{sort_metric},
capacity_metric => $v->{capacity_metric},
server_id => $v->{server_id},
lib/AC/Yenta/Monitor.pm view on Meta::CPAN
sub update {
my $id = shift;
my $gb = shift; # ACPHeartbeat
my $up;
eval {
$up = ACPHeartBeat->decode( $gb );
$up->{id} = $id;
};
if(my $e = $@){
problem("cannot decode hb data: $e");
return isdown($id, 0);
}
unless( $up->{status_code} == 200 ){
return isdown($id, $up->{status_code});
}
return isdown($id, 0) unless $^T - $up->{timestamp} < $OLD_DOWN;
debug("monitor $id is up");
$up->{lastup} = $^T;
$up->{downcount} = 0;
lib/AC/Yenta/Monitor/Client.pm view on Meta::CPAN
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
$me->start();
# build request
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $hdr = $yp->encode_header(
type => 'heartbeat_request',
data_length => 0,
content_length => 0,
want_reply => 1,
msgid => $msgid++,
);
# write request
$me->write( $hdr );
$me->timeout_rel($TIMEOUT);
lib/AC/Yenta/Monitor/Client.pm view on Meta::CPAN
}
}
sub read {
my $me = shift;
my $evt = shift;
debug("recvd reply");
my $yp = AC::Yenta::Protocol->new();
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
$me->{status_ok} = 1;
AC::Yenta::Monitor::update( $me->{monitor_peer}, $data );
$me->shut();
}
1;
lib/AC/Yenta/MySelf.pm view on Meta::CPAN
# Function: stub for customization
#
# $Id$
package AC::Yenta::MySelf;
use AC::Yenta::Customize;
use AC::Import;
use strict;
our @ISA = 'AC::Yenta::Customize';
our @EXPORT = qw(my_server_id my_network_info my_datacenter);
our @CUSTOM = (@EXPORT, 'init');
1;
=head1 NAME
AC::Yenta::MySelf - customize yenta to your own environment
=head1 SYNOPSIS
lib/AC/Yenta/MySelf.pm view on Meta::CPAN
=head2 my_server_id
return a unique identity for this yenta instance. typically,
something similar to the server hostname.
sub my_server_id {
return 'yenta@' . hostname();
}
=head2 my_datacenter
return the name of the local datacenter. yenta will use this
to determine which systems are local (same datacenter) and
which are remote (different datacenter), and will tune various
behaviors accordingly.
sub my_datacenter {
my($domain) = hostname() =~ /^[\.]+\.(.*)/;
return $domain;
}
=head2 my_network_info
return information about the various networks this server has.
sub my_network_info {
my $public_ip = inet_ntoa(scalar gethostbyname(hostname()));
my $privat_ip = inet_ntoa(scalar gethostbyname('internal-' . hostname()));
return [
# use this IP for communication with servers this datacenter (same natdom)
{ ip => $privat_ip, natdom => my_datacenter() },
# otherwise use this IP
{ ip => $public_ip },
]
}
=head2 init
inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-30 13:22 (EDT)
# Function: read protocol data
#
# $Id$
package AC::Yenta::Protocol;
use AC::Yenta::Debug 'protocol';
use AC::Yenta::Config;
use AC::DC::Protocol;
use AC::Yenta::MySelf;
use AC::Yenta::Crypto;
use AC::Misc;
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
my $r = $MSGTYPE{$name};
__PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}
sub read_protocol {
my $me = shift;
my $io = shift;
my $evt = shift;
$io->{rbuffer} .= $evt->{data};
return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;
if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
# decode header
eval {
$io->{proto_header} = $me->decode_header( $io->{rbuffer} );
};
if(my $e=$@){
verbose("cannot decode protocol header: $e");
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
});
$io->shut();
return;
}
}
my $p = $io->{proto_header};
return unless $p; # read more
# do we have everything?
return unless length($io->{rbuffer}) >= ($p->{auth_length} + $p->{data_length} + $p->{content_length} + $HDRSIZE);
my $auth = substr($io->{rbuffer}, $HDRSIZE, $p->{auth_length});
my $data = substr($io->{rbuffer}, $HDRSIZE + $p->{auth_length}, $p->{data_length});
my $content = substr($io->{rbuffer}, $HDRSIZE + $p->{auth_length} + $p->{data_length}, $p->{content_length});
# RSN - validate auth
if( $p->{data_encrypted} && $data ){
$data = $me->_decrypt_data( $io, $auth, $data );
return unless $data;
}
if( $p->{content_encrypted} && $content ){
$content = $me->_decrypt_data( $io, $auth, $content );
return unless $content;
}
# content is passed as reference
return ($p, $data, ($content ? \$content : undef));
}
# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
my $io = shift;
my $evt = shift;
return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
my($get, $url, $http) = split /\s+/, $io->{rbuffer};
return ( { type => 'http', method => $get }, $url );
}
################################################################
sub _decrypt_data {
my $me = shift;
my $io = shift;
my $auth = shift;
my $data = shift;
eval {
$data = $me->decrypt( $auth, $data );
};
if(my $e=$@){
verbose("cannot decrypt protocol data: $e");
$io->run_callback('error', {
cause => 'read',
error => "cannot decrypt protocol: $e",
});
$io->shut();
return;
}
return $data;
}
sub use_encryption {
my $peer = shift;
return unless conf_value('secret');
# only encrypt far-away traffic, not local
return $peer->{datacenter} ne my_datacenter();
}
sub encrypt {
my $me = shift;
my $auth = shift; # not currently used
my $buf = shift;
my $secret = $me->{secret};
return $buf unless $secret;
return unless $buf;
lib/AC/Yenta/Server.pm view on Meta::CPAN
debug("connection timed out");
$me->shut();
}
sub read {
my $me = shift;
my $evt = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
# dispatch request
my $h = $HANDLER{ $proto->{type} };
unless( $h ){
verbose("unknown message type: $proto->{type}");
$me->shut();
return;
}
debug("handling request - $proto->{type}");
if( ref $h ){
$h->( $me, $proto, $data, $content );
}else{
$h->handler( $me, $proto, $data, $content );
}
}
1;
lib/AC/Yenta/Stats.pm view on Meta::CPAN
my $class = shift;
my $io = shift;
my $proto = shift;
my $url = shift;
debug("http request $url");
$url =~ s|^/||;
$url =~ s/%(..)/chr(hex($1))/eg;
my $f = $HANDLER{$url};
$f = \&http_data if $url =~ m|^data/|;
$f = \&http_file if $url =~ m|^file/|;
$f ||= \&http_notfound;
my( $content, $code, $text ) = $f->($url);
$code ||= 200;
$text ||= 'OK';
my $res = "HTTP/1.0 $code $text\r\n"
. "Server: AC/Yenta\r\n"
. "Connection: close\r\n"
. "Content-Type: text/plain; charset=UTF-8\r\n"
lib/AC/Yenta/Stats.pm view on Meta::CPAN
for my $k (sort keys %STATS){
$res .= sprintf("%-24s%s\n", "$k:", $STATS{$k});
}
my @peers = AC::Yenta::Status->allpeers();
$res .= sprintf("%-24s%s\n", "peers:", scalar @peers);
$res .= "\n";
return $res;
}
sub http_data {
my $url = shift;
my(undef, $map, $key, $ver) = split m|/|, $url;
my($data, $version, $file, $meta) = store_get($map, $key, $ver);
return http_notfound($url) unless $version;
return $data;
}
sub http_file {
my $url = shift;
my(undef, $map, $key, $ver) = split m|/|, $url;
my($data, $version, $file, $meta) = store_get($map, $key, $ver);
return http_notfound($url) unless $version && $file;
return $$file;
}
1;
lib/AC/Yenta/Status.pm view on Meta::CPAN
use AC::Yenta::Config;
use AC::Yenta::MySelf;
use AC::Dumper;
use AC::Misc;
use Sys::Hostname;
use JSON;
use Socket;
require 'AC/protobuf/yenta_status.pl';
use strict;
my $KEEPDOWN = 1800; # keep data about down servers for how long?
my $KEEPLOST = 600; # keep data about servers we have not heard about for how long?
my $SAVEMAX = 1800; # do not save if older than
my $PORT;
our $DATA = bless {
allpeer => {}, # yenta_status
sceptical => {},
mappeer => {}, # {map} => { id => id }
peermap => {}, # {id} => @map
datacenter => {}, # {dc} => { id => id }
peertype => {}, # {ss} => { id => id }
};
sub init {
my $port = shift;
$PORT = $port;
AC::DC::Sched->new(
info => 'kibitz status',
lib/AC/Yenta/Status.pm view on Meta::CPAN
info => "status client: $id",
status_peer => $id,
);
return __PACKAGE__->isdown($id) unless $c;
$c->start();
}
sub _random_peer {
my $here = my_datacenter();
# sceptical
my @scept = values %{$DATA->{sceptical}};
my @all = map { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};
my @old = grep { $_->{timestamp} < $^T - $KEEPLOST *.75 } @all;
my @local = grep { $_->{datacenter} eq $here } @all; # this datacenter
my @away = grep { $_->{datacenter} ne $here } @all; # not this datacenter
# first check anything sceptical
my @peer = @scept;
# then (maybe) something about to expire
@peer = @old unless @peer || int rand(5);
# then (maybe) something far away
@peer = @away unless @peer || int rand(5);
lib/AC/Yenta/Status.pm view on Meta::CPAN
return("seed/$ip:$port", $ip, $port);
}
# server list for save file
sub server_list {
my $type = shift;
($type, my $where) = split m|/|, $type;
# where - no longer used
$where ||= my_datacenter();
my @peer = keys %{ $DATA->{peertype}{$type} };
return unless @peer;
# nothing too old
@peer = grep { $DATA->{allpeer}{$_}{lastup} > $^T - $SAVEMAX } @peer;
return unless @peer;
return map { $DATA->{allpeer}{$_} } @peer;
}
# save a list of peers, in case I crash, and for others to use
sub save_status {
my $save = conf_value('savestatus');
my $here = my_datacenter();
# also save locally running services
my @mon = AC::Yenta::Monitor::export();
for my $s ( @$save ){
my $file = $s->{file};
my $types = $s->{type};
my @peer;
for my $type (@$types){
lib/AC/Yenta/Status.pm view on Meta::CPAN
debug("saving peer status file");
unless( open(FILE, ">$file.tmp") ){
problem("cannot open save file '$file.tmp': $!");
return;
}
for my $pd (@peer){
# only save best addr in save file
my($ip, $port) = AC::Yenta::IO::TCP::Client->use_addr_port( $pd->{ip} );
my $data = {
id => $pd->{server_id},
addr => $ip,
port => int($port),
status => int($pd->{status}),
subsystem => $pd->{subsystem},
environment => $pd->{environment},
sort_metric => int($pd->{sort_metric}),
capacity_metric => int($pd->{capacity_metric}),
datacenter => $pd->{datacenter},
is_local => ($here eq $pd->{datacenter} ? 1 : 0),
};
if( $pd->{subsystem} eq 'yenta' ){
$data->{map} = $pd->{map};
}
print FILE encode_json( $data ), "\n";
}
close FILE;
unless( rename("$file.tmp", $file) ){
problem("cannot rename save file '$file': $!");
}
}
}
################################################################
# diagnostic reports
sub report {
my $res;
for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
my $id = sprintf '%-28s', $v->{server_id};
my $metric = int( $v->{sort_metric} );
$res .= "$id $v->{hostname}\t$v->{datacenter}\t$v->{subsystem}\t$v->{environment}\t$v->{status}\t$metric\n";
}
return $res;
}
sub report_long {
my $res;
for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
lib/AC/Yenta/Status.pm view on Meta::CPAN
return grep { $_->{status} } values %{$DATA->{allpeer}};
}
sub mappeers {
my $class = shift;
my $map = shift;
return keys %{ $DATA->{mappeer}{$map} };
}
sub datacenters {
my $class = shift;
return $DATA->{datacenter};
}
################################################################
sub _remove {
my $id = shift;
my $ss = $DATA->{allpeer}{$id}{subsystem};
delete $DATA->{peertype}{$ss}{$id} if $ss;
my $dc = $DATA->{allpeer}{$id}{datacenter};
delete $DATA->{datacenter}{$dc}{$id} if $dc;
verbose("deleting peer: $id");
delete $DATA->{allpeer}{$id};
# remove map info
for my $map ( @{$DATA->{peermap}{$id}} ){
delete $DATA->{mappeer}{$map}{$id};
}
delete $DATA->{peermap}{$id};
lib/AC/Yenta/Status.pm view on Meta::CPAN
# debug("updating $id => $up->{status} => " . dumper($up));
debug("updating $id => $up->{status}");
$DATA->{allpeer}{$id} = $up;
if( $up->{status} != 200 ){
_maybe_remove( $id );
return ;
}
# update datacenter info
unless( $DATA->{datacenter}{$up->{datacenter}}{$id} ){
my $pdc = $previnfo->{datacenter};
delete $DATA->{datacenter}{$pdc}{$id} if $pdc;
$DATA->{datacenter}{$up->{datacenter}}{$id} = $id;
}
# update subsystem info
unless( $DATA->{peertype}{$up->{subsystem}}{$id} ){
my $ss = $previnfo->{subsystem};
delete $DATA->{peertype}{$ss}{$id} if $ss;
$DATA->{peertype}{$up->{subsystem}}{$id} = $id;
}
# update map info
lib/AC/Yenta/Store.pm view on Meta::CPAN
return unless $m;
return $m->want($shard, $key, $ver);
}
sub store_put {
my $map = shift;
my $shard = shift;
my $key = shift;
my $ver = shift;
my $data = shift;
my $file = shift; # reference
my $meta = shift;
my $m = $STORE{$map};
return unless $m;
debug("storing $map/$key/$ver");
$m->put($shard, $key, $ver, $data, $file, $meta);
}
# NB: only removes local copy temporarily. will be replaced at next AE run
sub store_remove {
my $map = shift;
my $key = shift;
my $ver = shift;
my $m = $STORE{$map};
return unless $m;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-03 10:05 (EDT)
# Function: Anti-Entropy (find missing/stale data, and sync up)
#
# $Id$
package AC::Yenta::Store::AE;
use AC::Yenta::Store;
use AC::Yenta::Config;
use AC::Yenta::Debug 'ae';
use AC::Yenta::Stats;
use AC::Yenta::Conf;
use AC::Yenta::MySelf;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}elsif( $lt == $bestv ){
push @best, $m;
}
}
return unless @best;
my $map = $best[ rand(@best) ];
$me->{map} = $map;
# is this a data or file map?
# adjust accordingly
my $cf = conf_map( $map );
$me->{has_files} = 1 if $cf->{basedir};
$me->{maxget} = $me->{has_files} ? $MAXFILES : $MAXGET;
$me->{expire} = $cf->{expire};
return 1;
}
sub _init_peer {
my $me = shift;
my $here = my_datacenter();
my @peer = AC::Yenta::Status->mappeers( $me->{map} );
my $env = conf_value('environment');
my(@near, @far, @ood);
for my $p (@peer){
my $d = AC::Yenta::Status->peer($p);
next unless $d->{environment} eq $env;
next unless $d->{status} == 200;
if( $d->{uptodate} ){
if( $d->{datacenter} eq $here ){
push @near, $d;
}else{
push @far, $d;
}
}else{
push @ood, $d;
}
}
$me->{peers_near} = \@near if @near;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
@AE = grep{ $_ != $me } @AE;
}
sub _next_step {
my $me = shift;
$me->{timestamp} = $^T;
if( $me->{kvfetching} < $MAXFETCH ){
# any missing data?
if( @{$me->{kvneedorig}} || @{$me->{kvneed}} ){
debug("starting nextgetkv ($me->{kvfetching})");
$me->_next_get_kv();
}
}
# check nodes?
if( @{$me->{badnode}} ){
$me->_start_check();
return;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my $node = shift @{$me->{badnode}};
debug("checking next node: $me->{map} $node->{level}/$node->{version}");
inc_stat('ae_check_node');
my $enc = use_encryption($me->{peer});
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $request = $proto->encode_request( {
type => 'yenta_check',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
map => $me->{map},
level => $node->{level},
version => $node->{version},
shard => $node->{shard},
} );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new(
$me->{peer}{ip}, undef,
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
sub _check_load {
my $io = shift;
my $evt = shift;
my $me = shift;
debug("check results");
$evt->{data} ||= {};
# determine highest level returned
my @keydata;
my @nodedata;
my $maxlev = 0;
for my $d ( @{ $evt->{data}{check} }){
debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");
if( $d->{key} ){
push @keydata, $d;
next;
}
next if $d->{level} < $maxlev;
if( $d->{level} > $maxlev ){
@nodedata = ();
$maxlev = $d->{level};
}
push @nodedata, $d;
}
if( @keydata ){
$me->_check_result_keys( \@keydata );
}elsif( @nodedata ){
$me->_check_result_nodes( $maxlev, \@nodedata );
}
$me->_next_step();
}
sub _check_error {
my $io = shift;
my $evt = shift;
my $me = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my %vscnt;
my %vsadd;
for my $d (@$chk){
inc_stat('ae_check_key');
my $vsk = "$d->{version} $d->{shard}";
$vscnt{ $vsk } ++;
next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );
debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
inc_stat('ae_key_missing');
$me->{missing} ++;
$vsadd{ $vsk } ++;
}
}
sub _is_expired {
my $me = shift;
my $map = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
return;
}
sub _check_result_nodes {
my $me = shift;
my $lev = shift;
my $chk = shift;
# determine all of the base versions of the recvd data
my %ver;
for my $d (@$chk){
my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
$ver{"$ver $shard"} = { ver => $ver, shard => $shard };
}
# get all of our merkle data for these versions
my %merkle;
my $t_new = timet_to_yenta_version($^T - $TOONEW);
for my $d (values %ver){
next if $d->{ver} > $t_new; # too new, ignore
next if $me->_is_expired($me->{map}, $lev, $d->{ver});
# RSN - skip unwanted shards
my $ms = AC::Yenta::Store::store_get_merkle($me->{map}, $d->{shard}, $d->{ver}, $lev - 1);
for my $m (@$ms){
# debug("my hash $me->{map} $m->{level}/$m->{shard}/$m->{version} => $m->{hash}");
$merkle{"$m->{version} $m->{shard}"} = $m->{hash};
}
}
# compare (don't bother with things that are too new (the data may still be en route))
for my $d (@$chk){
next if $d->{version} > $t_new; # too new, ignore
next if $me->_is_expired($me->{map}, $lev, $d->{version});
# RSN - skip unwanted shards
my $hash = $merkle{"$d->{version} $d->{shard}"};
if( $d->{hash} eq $hash ){
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} => match");
next;
}else{
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} != $hash");
}
# stick them at the front
unshift @{$me->{badnode}}, { version => $d->{version}, shard => $d->{shard}, level => $lev };
}
}
################################################################
# we try to spread the load out by picking a random peer to fetch from
# if that peer does not have the data, we retry using the original peer
# (the one that said it has the data)
sub _next_get_kv {
my $me = shift;
return $me->_start_get_kv_orig() if @{$me->{kvneedorig}};
return $me->_start_get_kv_any() if @{$me->{kvneed}};
}
sub _start_get_kv_any {
my $me = shift;
my @get = splice @{$me->{kvneed}}, 0, $me->{maxget}, ();
# pick a peer
my $peer = $me->_pick_peer();
debug("getting kv data from peer $peer->{id}");
$me->_start_get_kv( $peer, 1, \@get);
}
sub _start_get_kv_orig {
my $me = shift;
my @get = splice @{$me->{kvneedorig}}, 0, $me->{maxget}, ();
debug("getting kv data from current peer");
$me->_start_get_kv( $me->{peer}, 0, \@get);
}
sub _start_get_kv {
my $me = shift;
my $peer = shift;
my $retry = shift;
my $get = shift;
# insert map into request
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
# for (@$get){ debug("requesting $_->{key}/$_->{version}") }
my $enc = use_encryption($peer);
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
# build request
my $request = $proto->encode_request( {
type => 'yenta_get',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
data => $get,
} );
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
info => "AE getkv from $peer->{id}" );
if( $io ){
$me->{kvfetching} ++;
$io->set_callback('load', \&_getkv_load, $me, $retry, $get);
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
sub _getkv_load {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
$me->{kvfetching} --;
$evt->{data} ||= {};
debug("got kv data results");
my %need = map {
( "$_->{key}/$_->{version}" => $_ )
} @$get;
for my $d ( @{$evt->{data}{data}}){
debug("got $d->{map}/$d->{key}/$d->{version}");
next unless $d->{key} && $d->{version}; # not found
delete $need{ "$d->{key}/$d->{version}" };
my $file = $evt->{content};
$file = \ $d->{file} if $d->{file};
AC::Yenta::Store::store_put( $d->{map}, $d->{shard}, $d->{key}, $d->{version},
$d->{value}, $file, $d->{meta} );
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-01 18:56 (EDT)
# Function: distribute data to other peers
#
# $Id$
package AC::Yenta::Store::Distrib;
use AC::Yenta::Kibitz::Store::Client;
use AC::Yenta::Debug 'distrib';
use AC::Yenta::Config;
use AC::Yenta::Protocol;
use AC::Yenta::Stats;
use AC::Yenta::MySelf;
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
my $sender = $req->{sender};
my $sendat = AC::Yenta::Status->peer($sender);
my $me = bless {
info => "$req->{datum}{map}/$req->{datum}{key}/$req->{datum}{version}",
map => $req->{datum}{map},
req => $req,
content => $cont,
# we tune the distribution algorithm based on where it came from:
faraway => (my_datacenter() ne $sendat->{datacenter}),
farseen => 0,
nearseen => 0,
farsend => [],
nearsend => [],
ordershift => 4,
}, $class;
debug("distributing $me->{info}");
inc_stat( 'dist_requests' );
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
}else{
push @keep, $r;
}
}
@DIST = @keep;
}
################################################################
# determine distribution strategy
# - if we recvd it from faraway, we will send it to other datacenters, and randomly in the same datacenter
# - otherwise we send it in the same datacenter, in an orderly fashion
# RSN - find an strategy with faster convergence + less duplication
sub _init_strategy {
my $me = shift;
my $sender = shift;
my $here = my_datacenter();
my $dcs = AC::Yenta::Status->datacenters();
my $sendat = AC::Yenta::Status->peer($sender);
my(@far, @near);
for my $dc (keys %$dcs){
if( $dc eq $here ){
push @near, grep { $_ ne $sender } $me->_compat_peers_in_dc($dc);
}else{
next if $dc eq $sendat->{datacenter};
push @far, {
dc => $dc,
id => [ $me->_compat_peers_in_dc($dc) ],
};
}
}
if( $me->{faraway} ){
$me->{nearsend} = shuffle(\@near);
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
$me->{nearsend} = _orderly(\@near);
}
}
# which yentas can do something with the update?
sub _compat_peers_in_dc {
my $me = shift;
my $dc = shift;
my $env = conf_value('environment');
my $dcs = AC::Yenta::Status->datacenters();
my $map = $me->{map};
my @id;
for my $id (keys %{$dcs->{$dc}}){
my $pd = AC::Yenta::Status->peer($id);
next unless $pd->{subsystem} eq 'yenta';
next unless $pd->{environment} eq $env;
next unless grep {$map eq $_} @{ $pd->{map} };
push @id, $id;
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
my $enc = use_encryption($pd);
my $ect = '';
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
$ect = $enc ? $proto->encrypt(undef, ${$me->{content}}) : ${$me->{content}} if $me->{content};
# build request
my $request = $proto->encode_request( {
type => 'yenta_distrib',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
content_encrypted => $enc,
}, {
sender => AC::Yenta::Status->my_server_id(),
hop => $me->{req}{hop} + 1,
expire => $me->{req}{expire},
datum => $me->{req}{datum},
}, \$ect );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
}
}
sub _onload {
my $io = shift;
my $evt = shift;
my $me = shift;
my $id = shift;
my $far = shift;
debug("dist finish $me->{info} with $id => $evt->{data}{haveit}");
if( $evt->{data}{haveit} ){
if( $far ){
$me->{farseen} ++;
inc_stat('dist_send_far_seen');
}else{
$me->{nearseen} ++;
inc_stat('dist_send_near_seen');
}
}
if( !$me->{faraway} && !$far ){
# orderly distribution. hop away.
if( $evt->{data}{haveit} ){
shift @{$me->{nearsend}};
}else{
my $n = $me->{ordershift};
$n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
shift @{$me->{nearsend}} for (1 .. $n);
$me->{ordershift} *= 2;
}
}
$me->_start_one($far);
lib/AC/Yenta/Store/Expire.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-13 11:55 (EDT)
# Function: auto-expire old data
#
# $Id$
package AC::Yenta::Store::Expire;
use AC::Yenta::Config;
use AC::Yenta::Debug 'expire';
use AC::Yenta::Conf;
use AC::DC::Sched;
use strict;
lib/AC/Yenta/Store/File.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-31 16:04 (EDT)
# Function: store file data
#
# $Id$
package AC::Yenta::Store::File;
use AC::Yenta::Debug 'store_file';
use File::Path;
use strict;
sub new {
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
if( $ver ){
$ver = encode_version($ver);
return unless grep { $_ eq $ver } @versions;
}else{
$ver = $versions[0];
}
my $vk = $me->vkey($key, $ver);
my $extver = decode_version($ver);
my($data, $founddat) = $db->get($me->{name}, 'data', $vk);
if( wantarray ){
if( $founddat ){
my $meta = $db->get($me->{name}, 'meta', $vk);
my $file = $me->{fs}->get($data) if $data;
return( $data, $extver, $file, $meta );
}else{
# we don't have data, but we have it in history; fake it.
return (undef, $extver, undef, undef);
}
}
return $data;
}
# someone sent me something, do I want it?
sub want {
my $me = shift;
my $shard = shift;
my $key = shift;
my $ver = shift;
my $cf = $me->{conf};
my $db = $me->{db};
my $v = encode_version($ver);
# data belongs here?
return if $me->is_sharded() && !$me->is_my_shard($shard);
my @versions = $me->_versget( $key );
if( $^T - $cacheT > 60 ){
debug("cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
$cacheT = $^T;
}
# I have it?
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
# I'll just throw it away.
return;
}
sub put {
my $me = shift;
my $shard = shift;
my $key = shift;
my $ver = shift;
my $data = shift;
my $file = shift; # reference
my $meta = shift;
my $cf = $me->{conf};
my $db = $me->{db};
my $v = encode_version($ver);
my $vk = $me->vkey($key, $v);
debug("storing $vk");
# get version history
my @deletehist;
my %deletedata;
my @versions = $me->_versget( $key );
return if grep { $_ eq $v } @versions; # dupe!
# is this the newest version? should we save this data?
if( !@versions || ($v gt $versions[0]) || $cf->{keepold} ){
# save file; data is filename
if( $file ){
my $r = $me->{fs}->put($data, $file);
return unless $r;
}
# put meta + data
$db->put($me->{name}, 'meta', $vk, $meta) if length $meta;
$db->put($me->{name}, 'data', $vk, $data);
unless( $cf->{keepold} ){
# unless we are keeping old data, remove previous version
$deletedata{$versions[0]} = 1 if @versions;
}
}
# add new version to list. newest 1st
@versions = sort {$b cmp $a} (@versions, $v);
if( $cf->{history} && @versions > $cf->{history} ){
# trim list
my @rm = splice @versions, $cf->{history}, @versions, ();
push @deletehist, (map { ({version => decode_version($_), key => $key, shard => $shard}) } @rm);
$deletedata{$_} = 1 for @_;
}
if( $me->is_sharded() ){
# QQQ - shard changed?
$db->put($me->{name}, 'shard', $key, encode_shard($shard || 0));
}
my $dd = join(' ', map { $_->{version} } @deletehist);
debug("version list: @versions [delete: $dd]");
$me->_versput( $key, @versions );
# update merkles
$me->merkle( { shard => $shard, key => $key, version => $ver }, @deletehist);
# delete old data
for my $rm (keys %deletedata){
debug("removing old version $key/$rm");
my $rmvk = $me->vkey($key, $rm);
$db->del($me->{name}, 'data', $rmvk);
$db->del($me->{name}, 'meta', $rmvk);
}
$db->sync();
return 1;
}
sub remove {
my $me = shift;
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
if( @versions ){
$me->_versput( $key, @versions );
}else{
$db->del($me->{name}, 'vers', $key);
$db->del($me->{name}, 'shard', $key);
$me->_versdel( $key );
}
my $vk = $me->vkey($key, $ver);
$db->del($me->{name}, 'data', $vk);
$db->del($me->{name}, 'meta', $vk);
return $cshard;
}
################################################################
sub range {
my $me = shift;
my $start = shift;
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jun-01 18:11 (EDT)
# Function: merkle tree for detecting missing data
#
# $Id$
package AC::Yenta::Store::Merkle;
use AC::Yenta::Debug 'merkle';
use AC::Yenta::SixtyFour;
use AC::Cache;
use AC::Import;
use Digest::SHA 'sha1_base64';
use strict;
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
my @d = split /\0/, $d;
my @res;
if( $^T - $cacheT > 60 ){
debug("merk cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
$cacheT = $^T;
}
if( $lev == $me->{merkle_height} ){
# data is: lkey, ...
for my $r (@d){
my($s,$v,$k) = $me->_decode_lkey($r);
push @res, { version => decode_version($v), key => $k, count => 1, shard => decode_shard($s) };
}
}else{
# data is: mkey => hash count, ...
my %d = @d;
for my $lv (keys %d){
my($l, $s, $v) = $me->_decode_mkey( $lv );
my($h,$c) = split /\s/, $d{$lv};
push @res, { version => decode_version($v), level => hex($l), hash => $h, count => $c, shard => decode_shard($s) };
}
}
return \@res;
}
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
my $shard = shift;
my $key = shift;
my $ver = shift;
my $db = $me->{db};
my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
my $vk = $me->_lkey($key, $ver, $shard);
debug("adding to merkle leaf $mk - $vk");
# get current data
my $d = $me->_mcget( $mk );
my @d = split /\0/, $d;
# append new item + uniqify
my %d;
@d{@d} = ();
$d{$vk} = undef;
$d = join("\0", sort keys %d);
$me->_mcput( $mk, $d );
my $hash = sha1_base64($d);
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
my $shard = shift;
my $key = shift;
my $ver = shift;
my $db = $me->{db};
my $mk = $me->_mkey($shard, $ver, $me->{merkle_height});
my $vk = $me->_lkey($key, $ver, $shard);
debug("removing from merkle leaf $mk - $vk");
# get current data
my $d = $me->_mcget( $mk );
my @d = split /\0/, $d;
# remove item
@d = grep { $vk ne $_ } @d;
if( @d ){
$d = join("\0", @d);
$me->_mcput( $mk, $d );
my $hash = sha1_base64($d);
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
################################################################
sub _get_actual_keys {
my $me = shift;
my $shard = shift;
my $ver = shift;
my $db = $me->{db};
# get range on data
my @key = map {
my $k = $_->{k}; $k =~ s|.*/||; $k
} $db->range($me->{name}, 'data', encode_version($ver), encode_version($ver + 1));
debug("actual key: @key");
return @key unless defined $shard;
# get vers list to filter on shard
my $sh = encode_shard($shard);
return grep {
my $k = $_;
my $vl = $db->get($me->{name}, 'vers', $k);
my($s) = $vl =~ /;\s*(.*)/;
$s == $sh;
} @key;
}
# check merkle leaf node against actual data
sub merkle_scrub {
my $me = shift;
my $shard = shift;
my $ver = shift;
debug("scrub $me->{name} $shard/$ver");
# get list of keys from merkle leaf node
my $mlist = $me->get_merkle($shard, $ver, $me->{merkle_height}) || [];
my @mkey = map { $_->{key} } @$mlist;
my %mkey;
@mkey{@mkey} = @mkey;
# get list of keys from actual data
my @akey = $me->_get_actual_keys( $shard, $ver );
# compare lists
for my $k (@akey){
next if $mkey{$k};
debug("missing key in merkle tree: $shard/$ver/$k");
$me->merkle( { key => $k, shard => $shard, version => $ver } );
}
}
lib/AC/protobuf/yenta_getset.pl view on Meta::CPAN
);
}
unless (ACPYentaGetSet->can('_pb_fields_list')) {
Google::ProtocolBuffers->create_message(
'ACPYentaGetSet',
[
[
Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
'ACPYentaMapDatum',
'data', 1, undef
],
],
{ 'create_accessors' => 1, 'follow_best_practice' => 1, }
);
}
unless (ACPYentaDistReply->can('_pb_fields_list')) {
Google::ProtocolBuffers->create_message(
'ACPYentaDistReply',
lib/AC/protobuf/yenta_status.pl view on Meta::CPAN
'ACPYentaStatus',
[
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'hostname', 1, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'datacenter', 2, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'subsystem', 3, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'environment', 4, undef