view release on metacpan or search on metacpan
--- #YAML:1.0
name: AC-Yenta
version: 1.1
abstract: eventually-consistent distributed key/value data store. et al.
author:
- AdCopy <http://www.adcopy.com>
license: perl
distribution_type: module
configure_requires:
ExtUtils::MakeMaker: 0
requires:
AC::DC: 0
BerkeleyDB: 0
Crypt::Rijndael: 0
Digest::SHA: 0
Google::ProtocolBuffers: 0
JSON: 0
POSIX: 0
Sys::Hostname: 0
Time::HiRes: 0
no_index:
eg/yenta.conf view on Meta::CPAN
# example yenta config
#
# file will be reloaded automagically if it changes. no need to hup or restart.
port 3503
environment prod
# save peer status in a file?
savestatus /var/tmp/yenta.status yenta
allow 127.0.0.1
eg/yenta_get view on Meta::CPAN
my $map = shift @ARGV;
my $key = shift @ARGV;
die "usage: get [-h host] map key\n" unless $map && $key;
my $y = AC::Yenta::Client->new(
# server_file, servers[], or host + port
server_file => '/var/tmp/yenta.status',
debug => \&debug,
);
my $res = $y->get($map, $key);
print dumper($res), "\n";
exit;
sub debug {
print STDERR @_, "\n";
}
eg/yenta_put view on Meta::CPAN
use Time::HiRes 'time';
use JSON;
use strict;
my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });
my $key = 'YX3jSXD3CBRUDABm';
my $res = $ys->distribute(
# map, key, version, data
'mymap', $key, timet_to_yenta_version(time()),
encode_json( {
url_id => $key,
url => 'http://www.example.com',
acc_id => 'C9TdSgbUCBRUCABG',
format => 'html',
}),
);
lib/AC/Yenta.pm view on Meta::CPAN
=head2 Kibitzing
Each yenta kibitzes (gossips) with the other yentas in the network
to exchange status information, distribute key-value data, and
detect and correct inconsistent data.
=head2 Eventual Consistency
Key-value data is versioned with timestamps. By default, newest wins.
Maps can be configured to keep and return multiple versions and client
code can use other conflict resolution mechanisms.
Lost, missing or otherwise inconsistent data is detected
by kibitzing merkle tree hash values.
=head2 Topological awareness
Yentas can take network topology into account when tranferring
data around to minimize long-distance transfers. You will need to
write a custom C<MySelf> class with a C<my_datacenter> function.
=head2 Multiple Network Interfaces / NAT
Yentas can take advantage of multiple network interfaces with
different IP addresses (eg. a private internal network + a public network),
or multiple addresses (eg. a private addresses and a public address)
and various NAT configurations.
You will need to write a custom C<MySelf> class and C<my_network_info>
function.
=head2 Network Information
By default, yentas obtain their primary IP address by calling
C<gethostbyname( hostname() )>. If this either does not work on your
systems, or isn't the value you want to use,
you will need to write a custom C<MySelf> class and C<my_network_info>
function.
=head1 CONFIG FILE
various parameters need to be specified in a config file.
lib/AC/Yenta/Client.pm view on Meta::CPAN
require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';
our @EXPORT = 'timet_to_yenta_version'; # imported from Y/Conf
my $HOSTNAME = hostname();
my %MSGTYPE =
(
yenta_get => { num => 7, reqc => 'ACPYentaGetSet', resc => 'ACPYentaGetSet' },
yenta_distrib => { num => 8, reqc => 'ACPYentaDistRequest', resc => 'ACPYentaDistReply' },
yenta_check => { num => 9, reqc => 'ACPYentaCheckRequest', resc => 'ACPYentaCheckReply' },
);
for my $name (keys %MSGTYPE){
my $r = $MSGTYPE{$name};
AC::DC::Protocol->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}
# one or more of:
# new( host, port )
# new( servers => [ { host, port }, ... ] )
# new( server_file )
sub new {
my $class = shift;
lib/AC/Yenta/Client.pm view on Meta::CPAN
map => $map,
key => $key,
version => $ver,
shard => _shard($key), # NYI
value => $val,
meta => $meta,
} ]
}, $file );
return $me->_send_request($map, $req, $file);
# return undef | result
}
sub check {
my $me = shift;
my $map = shift;
my $ver = shift;
my $lev = shift;
my $req = $me->{proto}->encode_request( {
type => 'yenta_check',
lib/AC/Yenta/Client.pm view on Meta::CPAN
my $tries = $me->{retries} + 1;
my $copy = $me->{copies} || 1;
my $delay = 0.25;
$me->_init_hostlist($map);
my ($addr, $port) = $me->_next_host($map);
for (1 .. $tries){
return unless $addr;
my $res = $me->_try_server($addr, $port, $req, $file);
return $res if $res && !--$copy;
($addr, $port) = $me->_next_host($map);
sleep $delay;
$delay *= 1.414;
}
}
sub _try_server {
my $me = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
my $file = shift; # reference
my $ipn = inet_aton($addr);
$req .= $$file if $file;
$me->{debug}->("trying to contact yenta server $addr:$port");
my $res;
eval {
$res = $me->{proto}->send_request($ipn, $port, $req, $me->{debug}, $me->{timeout});
$res->{data} = $me->{proto}->decode_reply( $res ) if $res;
};
if(my $e = $@){
$me->{debug}->("yenta request failed: $e");
$res = undef;
}
return $res;
}
################################################################
sub _next_host {
my $me = shift;
my $map = shift;
$me->_read_serverfile($map) unless $me->{_server};
lib/AC/Yenta/Config.pm view on Meta::CPAN
);
my @MAP = qw(dbfile basedir keepold history expire backend sharded);
################################################################
sub handle_config {
my $me = shift;
my $key = shift;
my $rest = shift;
my $fnc = $CONFIG{$key};
return unless $fnc;
$fnc->($me, $key, $rest);
return 1;
}
################################################################
sub parse_map {
my $me = shift;
my $key = shift;
my $value = shift;
lib/AC/Yenta/Kibitz/Status.pm view on Meta::CPAN
################################################################
sub myself {
# tell server about ourself
return ACPYentaStatusRequest->encode({
myself => _myself(),
});
}
sub response {
# send client everything we know
my @peer = AC::Yenta::Status->allpeers();
push @peer, _myself();
# add the items we monitor
push @peer, AC::Yenta::Monitor::export();
return ACPYentaStatusReply->encode({
status => \@peer,
});
lib/AC/Yenta/Kibitz/Status/Server.pm view on Meta::CPAN
AC::Yenta::Kibitz::Status::update_sceptical( $gpb, $io );
}
unless( $proto->{want_reply} ){
$io->shut();
return;
}
AC::Yenta::NetMon::update( $io );
# respond with all known peers
my $response = AC::Yenta::Kibitz::Status::response();
my $yp = AC::Yenta::Protocol->new();
my $hdr = $yp->encode_header(
type => 'yenta_status',
data_length => length($response),
content_length => 0,
msgid => $proto->{msgid},
is_reply => 1,
);
debug("sending status reply");
$io->write_and_shut( $hdr . $response );
}
1;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
eval {
$req = ACPYentaGetSet->decode( $gpb );
};
if(my $e = $@){
problem("cannot decode request: $e");
$io->shut();
return;
}
# process requests
my @res;
my $rescont;
for my $r (@{ $req->{data} }){
debug("get request: $r->{map}, $r->{key}, $r->{version}");
my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
my $res = {
map => $r->{map},
key => $r->{key},
};
if( $meta && $file ){
unless( _check_content( $meta, $file ) ){
problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
# QQQ - remove from system, (and let AE get a new copy)?
store_remove($r->{map}, $r->{key}, $ver);
# tell caller it was not found
$ver = undef;
}
}
if( defined $ver ){
$res->{version} = $ver;
$res->{value} = $data;
$res->{meta} = $meta if defined $meta;
if( $file ){
# if one file to send, send it as content
if( @{$req->{data}} == 1 ){
$rescont = $file;
}else{
$res->{file} = $$file;
}
}
}
push @res, $res;
}
# encode results
my $ect = '';
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
$ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
my $response = $yp->encode_reply( {
type => 'yenta_get',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
content_encrypted => $proto->{data_encrypted},
}, { data => \@res }, \$ect );
debug("sending get reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response . $ect );
}
sub api_check {
my $io = shift;
my $proto = shift;
my $gpb = shift;
my $content = shift; # not used
unless( $proto->{want_reply} ){
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
$req = ACPYentaCheckRequest->decode( $gpb );
};
if(my $e = $@){
problem("cannot decode request: $e");
$io->shut();
return;
}
debug("check request: $req->{map}, $req->{version}, $req->{level}");
my @res;
my @todo = { version => $req->{version}, shard => $req->{shard} };
# the top of the tree will be fairly sparse,
# return up to several levels if they are sparse
for my $l (0 .. 32){
my $cl = $req->{level} + $l;
my @lres;
my $nexttot;
for my $do (@todo){
my @r = _get_check( $req->{map}, $do->{shard}, $do->{version}, $cl );
push @lres, @r;
for (@r){
$nexttot += $_->{key} ? ($_->{count} / 8) : $_->{count};
}
}
push @res, @lres;
@todo = ();
last unless @lres; # reached the bottom of the tree
last if @res > 64; # got enough results
last if (@lres > 2) && ($nexttot > @lres + 2); # less sparse region
# get the next level also
@todo = @lres;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_check',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { check => \@res } );
debug("sending check reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response );
}
# get + process merkle data
sub _get_check {
my $map = shift;
my $shard = shift;
my $ver = shift;
my $lev = shift;
my $res = store_get_merkle($map, $shard, $ver, $lev);
return unless $res;
for my $r (@$res) {
$r->{map} = $map;
}
return @$res;
}
sub api_distrib {
my $io = shift;
my $proto = shift;
my $gpb = shift;
my $content = shift; # reference
# decode request
my $req;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
}else{
debug("put from $io->{peerip} unwanted: $v->{map}, $v->{shard}, $v->{key}, $v->{version}");
}
unless( $proto->{want_reply} ){
$io->shut();
return;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { status_code => 200, status_message => 'OK', haveit => !$want } );
debug("sending distrib reply");
$io->timeout_rel($TIMEOUT);
$io->write_and_shut( $response );
}
sub _check_content {
my $meta = shift;
my $cont = shift;
return 1 unless $meta && $meta =~ /^\{/;
eval {
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
return 1;
}
sub _reply_error {
my $io = shift;
my $proto = shift;
my $code = shift;
my $msg = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
is_error => 1,
data_encrypted => $proto->{data_encrypted},
}, {
status_code => $code,
status_message => $msg,
haveit => 0,
} );
debug("sending distrib reply");
$io->write_and_shut( $response );
}
1;
lib/AC/Yenta/MySelf.pm view on Meta::CPAN
return [
# use this IP for communication with servers this datacenter (same natdom)
{ ip => $privat_ip, natdom => my_datacenter() },
# otherwise use this IP
{ ip => $public_ip },
]
}
=head2 init
inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.
my $HOSTNAME;
my $DOMAIN;
sub init {
$HOSTNAME = hostname();
($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
}
=head1 BUGS
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';
our @ISA = 'AC::DC::Protocol';
our @EXPORT = qw(read_protocol use_encryption);
my $HDRSIZE = __PACKAGE__->header_size();
my %MSGTYPE =
(
heartbeat_request => { num => 2, reqc => '', resc => 'ACPHeartBeat' },
yenta_status => { num => 6, reqc => 'ACPYentaStatusRequest', resc => 'ACPYentaStatusReply' },
yenta_get => { num => 7, reqc => 'ACPYentaGetSet', resc => 'ACPYentaGetSet' },
yenta_distrib => { num => 8, reqc => 'ACPYentaDistRequest', resc => 'ACPYentaDistReply' },
yenta_check => { num => 9, reqc => 'ACPYentaCheckRequest', resc => 'ACPYentaCheckReply' },
);
for my $name (keys %MSGTYPE){
my $r = $MSGTYPE{$name};
__PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}
sub read_protocol {
my $me = shift;
my $io = shift;
my $evt = shift;
$io->{rbuffer} .= $evt->{data};
lib/AC/Yenta/Stats.pm view on Meta::CPAN
$url =~ s/%(..)/chr(hex($1))/eg;
my $f = $HANDLER{$url};
$f = \&http_data if $url =~ m|^data/|;
$f = \&http_file if $url =~ m|^file/|;
$f ||= \&http_notfound;
my( $content, $code, $text ) = $f->($url);
$code ||= 200;
$text ||= 'OK';
my $res = "HTTP/1.0 $code $text\r\n"
. "Server: AC/Yenta\r\n"
. "Connection: close\r\n"
. "Content-Type: text/plain; charset=UTF-8\r\n"
. "Content-Length: " . length($content) . "\r\n"
. "\r\n"
. $content ;
$io->write($res);
$io->set_callback('write_buffer_empty', \&_done );
}
sub _done {
my $io = shift;
$io->shut();
}
################################################################
lib/AC/Yenta/Stats.pm view on Meta::CPAN
}
sub http_status {
my $status = AC::Yenta::NetMon::status_dom('public');
return "status: OK\n\n" if $status == 200;
return("status: PROBLEM\n\n", 500, "Problem");
}
sub http_stats {
my $res;
for my $k (sort keys %STATS){
$res .= sprintf("%-24s%s\n", "$k:", $STATS{$k});
}
my @peers = AC::Yenta::Status->allpeers();
$res .= sprintf("%-24s%s\n", "peers:", scalar @peers);
$res .= "\n";
return $res;
}
sub http_data {
my $url = shift;
my(undef, $map, $key, $ver) = split m|/|, $url;
my($data, $version, $file, $meta) = store_get($map, $key, $ver);
return http_notfound($url) unless $version;
return $data;
lib/AC/Yenta/Status.pm view on Meta::CPAN
# then (maybe) something about to expire
@peer = @old unless @peer || int rand(5);
# then (maybe) something far away
@peer = @away unless @peer || int rand(5);
# then something local
@peer = @local unless @peer;
# last resort
@peer = @all unless @peer;
# sometimes use the seed, in case there was a network split
if( @peer && int(rand(@all+1)) ){
my $p = $peer[ rand(@peer) ];
debug("using peer $p->{server_id}");
return ($p->{server_id}, $p->{ip}, undef);
}
# seed peer
lib/AC/Yenta/Status.pm view on Meta::CPAN
problem("cannot rename save file '$file': $!");
}
}
}
################################################################
# diagnostic reports
sub report {
my $res;
for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
my $id = sprintf '%-28s', $v->{server_id};
my $metric = int( $v->{sort_metric} );
$res .= "$id $v->{hostname}\t$v->{datacenter}\t$v->{subsystem}\t$v->{environment}\t$v->{status}\t$metric\n";
}
return $res;
}
sub report_long {
my $res;
for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
$res .= dumper( $v ) . "\n\n";
}
return $res;
}
################################################################
sub my_port { $PORT }
sub my_instance_id {
my $class = shift;
return my_server_id() . sprintf('/%04x', $$);
}
lib/AC/Yenta/Status.pm view on Meta::CPAN
$DATA->{sceptical}{$id} = $up;
}
sub update {
my $class = shift;
my $id = shift; # -> server_id
my $up = shift;
return unless $class->_env_ok($id, $up);
# only keep it if it is relatively fresh, and valid
return unless $up->{timestamp} > $^T - $KEEPLOST;
return unless $up->{status};
delete $DATA->{sceptical}{$id};
$up->{id} = $id;
my $previnfo = $DATA->{allpeer}{$id};
verbose("discovered new peer: $id ($up->{hostname})") unless $previnfo;
# only keep it if it is newer than what we have
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
if( int(rand(8)) ){
@peer = @{$me->{peers_near}};
}else{
@peer = @{$me->{peers_far}};
}
}elsif( $me->{peers_near} ){
@peer = @{$me->{peers_near}};
}elsif( $me->{peers_far} ){
@peer = @{$me->{peers_far}};
}elsif( $me->{peers_ood} ){
# only use out-of-date peers as a last resort
# NB: if we never used ood peers, we'd have a bootstrap deadlock
@peer = @{$me->{peers_ood}};
}
return unless @peer;
my $peer = $peer[ rand(@peer) ];
return $peer;
}
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
}
sub _check_load {
my $io = shift;
my $evt = shift;
my $me = shift;
debug("check results");
$evt->{data} ||= {};
# determine highest level returned
my @keydata;
my @nodedata;
my $maxlev = 0;
for my $d ( @{ $evt->{data}{check} }){
debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");
if( $d->{key} ){
push @keydata, $d;
next;
}
next if $d->{level} < $maxlev;
if( $d->{level} > $maxlev ){
@nodedata = ();
$maxlev = $d->{level};
}
push @nodedata, $d;
}
if( @keydata ){
$me->_check_result_keys( \@keydata );
}elsif( @nodedata ){
$me->_check_result_nodes( $maxlev, \@nodedata );
}
$me->_next_step();
}
sub _check_error {
my $io = shift;
my $evt = shift;
my $me = shift;
verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
$me->_next_step();
}
sub _check_result_keys {
my $me = shift;
my $chk = shift;
my %vscnt;
my %vsadd;
for my $d (@$chk){
inc_stat('ae_check_key');
my $vsk = "$d->{version} $d->{shard}";
$vscnt{ $vsk } ++;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
return unless defined $vmx;
if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
debug("skipping expired $lev/$ver - $vmx");
return 1;
}
return;
}
sub _check_result_nodes {
my $me = shift;
my $lev = shift;
my $chk = shift;
# determine all of the base versions of the recvd data
my %ver;
for my $d (@$chk){
my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
$ver{"$ver $shard"} = { ver => $ver, shard => $shard };
}
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
sub _getkv_load {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
$me->{kvfetching} --;
$evt->{data} ||= {};
debug("got kv data results");
my %need = map {
( "$_->{key}/$_->{version}" => $_ )
} @$get;
for my $d ( @{$evt->{data}{data}}){
debug("got $d->{map}/$d->{key}/$d->{version}");
next unless $d->{key} && $d->{version}; # not found
delete $need{ "$d->{key}/$d->{version}" };
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
# RSN - check load
my $max = conf_value('distrib_max') || $MAXUNDERWAY;
if( @DIST < $max ){
$me->_start_next();
}
push @DIST, $me;
return $me;
}
# periodically, go through and restart or expire
sub periodic {
my @keep;
my $max = conf_value('distrib_max') || $MAXUNDERWAY;
my $chance = (@DIST > $max) ? ($max / @DIST) : 1;
for my $r (@DIST){
# debug("periodic $r->{info}");
next if $^T > $r->{req}{expire};
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
my $db = $me->{db};
# walk merkle tree, find all k/v to remove
my @delete;
my @walk = { level => 0, version => 0, shard => 0 };
while(@walk){
my @next;
for my $node (@walk){
my $res = $me->get_merkle( $node->{shard}, $node->{version}, $node->{level} );
for my $r (@$res){
next if $r->{version} > $expire;
if( $r->{key} ){
push @delete, { key => $r->{key}, version => $r->{version}, shard => $r->{shard} };
}else{
push @next, $r;
}
}
}
@walk = @next;
}
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
1;
=head1 NAME
AC::Yenta::Store::Map - persistent storage for yenta maps
=head1 SYNOPSIS
your code:
AC::Yenta::Store::Map->add_backend( postgres => 'Local::Yenta::Postgres' );
your config:
map mappyfoo {
backend postgres
# ...
}
=cut
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
return if $lev > $me->{merkle_height};
my $db = $me->{db};
my $mk = $me->_mkey(encode_shard($shard), encode_version($ver), $lev);
debug("getting merkle for $mk");
my $d = $me->_mcget( $mk );
return unless $d;
my @d = split /\0/, $d;
my @res;
if( $^T - $cacheT > 60 ){
debug("merk cache stats: check: $cachechk, miss: $cachemiss") if $cachechk > 1;
$cacheT = $^T;
}
if( $lev == $me->{merkle_height} ){
# data is: lkey, ...
for my $r (@d){
my($s,$v,$k) = $me->_decode_lkey($r);
push @res, { version => decode_version($v), key => $k, count => 1, shard => decode_shard($s) };
}
}else{
# data is: mkey => hash count, ...
my %d = @d;
for my $lv (keys %d){
my($l, $s, $v) = $me->_decode_mkey( $lv );
my($h,$c) = split /\s/, $d{$lv};
push @res, { version => decode_version($v), level => hex($l), hash => $h, count => $c, shard => decode_shard($s) };
}
}
return \@res;
}
################################################################
# we maintain a 16-ary merkle tree of all of the <key,version>s we have stored
sub merkle {
my $me = shift;
my $add = shift;
my @del = @_;