view release on metacpan or search on metacpan
lib/AC/Yenta/AC/MySelf.pm
lib/AC/Yenta/Store.pm
lib/AC/Yenta/Client.pm
lib/AC/Yenta/Server.pm
lib/AC/Yenta/Crypto.pm
lib/AC/Yenta/Monitor.pm
lib/AC/protobuf/heartbeat.pl
lib/AC/protobuf/std_reply.pl
lib/AC/protobuf/auth.pl
lib/AC/protobuf/yenta_status.pl
lib/AC/protobuf/yenta_getset.pl
lib/AC/protobuf/std_ipport.pl
lib/AC/protobuf/yenta_check.pl
lib/AC/Yenta.pm
LICENSE
eg/yenta_get_direct
eg/myself.pm
eg/yentad
eg/yenta.conf
eg/yenta_get
eg/yenta_put
MANIFEST
proto/yenta_status.proto
proto/std_reply.proto
proto/yenta_check.proto
proto/std_ipport.proto
proto/yenta_getset.proto
proto/heartbeat.proto
proto/auth.proto
Makefile.PL
META.yml Module meta-data (added by MakeMaker)
lib/AC/Yenta.pm view on Meta::CPAN
=item debug
enable debugging for a particular section
debug map
=item map
configure a map (a collection of key-value data). you do not need
to configure the same set of maps on all servers. maps should be
configured similarly on all servers that they are on.
map users {
backend bdb
dbfile /home/acdata/users.ydb
history 4
}
=back
lib/AC/Yenta/Client.pm view on Meta::CPAN
use AC::DC::Protocol;
use AC::Import;
use AC::Misc;
use Sys::Hostname;
use JSON;
use Digest::SHA 'sha1';
use Socket;
use strict;
require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';
our @EXPORT = 'timet_to_yenta_version'; # imported from Y/Conf
my $HOSTNAME = hostname();
my %MSGTYPE =
(
yenta_get => { num => 7, reqc => 'ACPYentaGetSet', resc => 'ACPYentaGetSet' },
yenta_distrib => { num => 8, reqc => 'ACPYentaDistRequest', resc => 'ACPYentaDistReply' },
yenta_check => { num => 9, reqc => 'ACPYentaCheckRequest', resc => 'ACPYentaCheckReply' },
lib/AC/Yenta/Crypto.pm view on Meta::CPAN
my $seqno = int( time() * 1_000_000 );
my $nonce = random_text(48);
my $key = $me->_key($seqno, $nonce);
my $iv = $me->_iv($key, $seqno, $nonce);
# pad
my $pbuf = $buf;
$pbuf .= "\0" x (16 - length($pbuf) & 0xF) if length($pbuf) & 0xF;
my $aes = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
$aes->set_iv( $iv );
my $ct = $aes->encrypt( $pbuf );
my $hmac = hmac_sha256_base64($ct, $key);
my $eb = ACPEncrypt->encode( {
algorithm => $ALGORITHM,
seqno => $seqno,
nonce => $nonce,
hmac => $hmac,
length => length($buf),
ciphertext => $ct,
lib/AC/Yenta/Crypto.pm view on Meta::CPAN
my $seqno = $ed->{seqno},
my $nonce = $ed->{nonce};
my $key = $me->_key($seqno, $nonce);
my $iv = $me->_iv($key, $seqno, $nonce);
my $hmac = hmac_sha256_base64($ed->{ciphertext}, $key);
die "cannot decrypt: hmac mismatch\n" unless $hmac eq $ed->{hmac};
my $aes = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
$aes->set_iv( $iv );
my $pt = substr($aes->decrypt( $ed->{ciphertext} ), 0, $ed->{length});
debug("decrypted <$seqno,$nonce,$hmac>");
return $pt;
}
sub _key {
my $me = shift;
lib/AC/Yenta/Kibitz/Status/Client.pm view on Meta::CPAN
my $TIMEOUT = 2;
my $msgid = $$;
sub new {
my $class = shift;
debug('starting kibitz status client');
my $me = $class->SUPER::new( @_ );
return unless $me;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
return $me;
}
sub use_addr_port {
my $class = shift;
my $addr = shift;
my $port = shift;
# is addr + port => return
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
# Created: 2009-Apr-01 16:31 (EDT)
# Function: client side interface for storage subsystem
#
# $Id$
package AC::Yenta::Kibitz::Store::Client;
use AC::Yenta::Debug 'store_client';
use AC::Yenta::Config;
use AC::Yenta::Protocol;
use AC::Yenta::IO::TCP::Client;
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $TIMEOUT = 5;
sub new {
my $class = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
debug('starting kibitz store client');
my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
return unless $me;
$me->{_req} = $req;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
return $me;
}
sub start {
my $me = shift;
$me->SUPER::start();
$me->write( $me->{_req} );
$me->timeout_rel($TIMEOUT);
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
#
# $Id$
package AC::Yenta::Kibitz::Store::Server;
use AC::Yenta::Debug 'store_server';
use AC::Yenta::Store;
use AC::Yenta::Config;
use AC::Dumper;
use JSON;
use Digest::SHA 'sha1_base64';
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;
my $TIMEOUT = 1;
sub api_get {
my $io = shift;
my $proto = shift;
my $gpb = shift;
my $content = shift; # not used
lib/AC/Yenta/Monitor/Client.pm view on Meta::CPAN
sub new {
my $class = shift;
debug('starting monitor status client');
my $me = $class->SUPER::new( @_ );
unless($me){
return;
}
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
$me->start();
# build request
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $hdr = $yp->encode_header(
type => 'heartbeat_request',
data_length => 0,
content_length => 0,
want_reply => 1,
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
use AC::Yenta::Crypto;
use AC::Misc;
use AC::Import;
use strict;
require 'AC/protobuf/heartbeat.pl';
require 'AC/protobuf/auth.pl';
require 'AC/protobuf/std_reply.pl';
require 'AC/protobuf/yenta_status.pl';
require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';
our @ISA = 'AC::DC::Protocol';
our @EXPORT = qw(read_protocol use_encryption);
my $HDRSIZE = __PACKAGE__->header_size();
my %MSGTYPE =
(
heartbeat_request => { num => 2, reqc => '', resc => 'ACPHeartBeat' },
lib/AC/Yenta/Server.pm view on Meta::CPAN
unless( $AC::Yenta::CONF->check_acl( $ip ) ){
verbose("rejecting connection from $ip");
return;
}
my $me = $class->SUPER::new( peerip => $ip, info => "tcp yenta server (from: $ip)" );
$me->init($fd);
$me->wantread(1);
$me->timeout_rel($TIMEOUT);
$me->set_callback('read', \&read);
$me->set_callback('timeout', \&timeout);
}
sub timeout {
my $me = shift;
debug("connection timed out");
$me->shut();
}
sub read {
lib/AC/Yenta/SixtyFour.pm view on Meta::CPAN
# Copyright (c) 2010 by Jeff Weisberg
# Author: Jeff Weisberg <jaw @ tcp4me.com>
# Created: 2010-Dec-23 11:39 (EST)
# Function: 64 bit numbers as native integers or math::bigints?
#
# $Id$
package AC::Yenta::SixtyFour;
use strict;
# export one set of functions as x64_<name>
sub import {
my $class = shift;
my $caller = caller;
my $l = length(sprintf '%x', -1);
my $prefix;
if( $l >= 16 ){
$prefix = 'native_';
}else{
$prefix = 'bigint_';
lib/AC/Yenta/Stats.pm view on Meta::CPAN
$f = \&http_data if $url =~ m|^data/|;
$f = \&http_file if $url =~ m|^file/|;
$f ||= \&http_notfound;
my( $content, $code, $text ) = $f->($url);
$code ||= 200;
$text ||= 'OK';
my $res = "HTTP/1.0 $code $text\r\n"
. "Server: AC/Yenta\r\n"
. "Connection: close\r\n"
. "Content-Type: text/plain; charset=UTF-8\r\n"
. "Content-Length: " . length($content) . "\r\n"
. "\r\n"
. $content ;
$io->write($res);
$io->set_callback('write_buffer_empty', \&_done );
}
sub _done {
my $io = shift;
$io->shut();
}
################################################################
sub http_notfound {
lib/AC/Yenta/Status.pm view on Meta::CPAN
sub peer {
my $class = shift;
my $id = shift;
return $DATA->{allpeer}{$id};
}
sub allpeers {
my $class = shift;
# idown sets status to 0 (below), skip such
return grep { $_->{status} } values %{$DATA->{allpeer}};
}
sub mappeers {
my $class = shift;
my $map = shift;
return keys %{ $DATA->{mappeer}{$map} };
}
lib/AC/Yenta/Store.pm view on Meta::CPAN
use AC::Yenta::Debug 'store';
use AC::Yenta::Config;
use AC::Import;
use AC::Yenta::Store::Map;
use AC::Yenta::Store::Sharded;
use AC::Yenta::Store::Distrib;
use AC::Yenta::Store::AE;
use AC::Yenta::Store::Expire;
use strict;
our @EXPORT = qw(store_get store_put store_want store_get_merkle store_get_internal store_set_internal store_expire store_remove);
my %STORE;
# create maps from config
sub configure {
my $maps = $AC::Yenta::CONF->{config}{map};
my %remove = %STORE;
lib/AC/Yenta/Store.pm view on Meta::CPAN
sub store_get_internal {
my $map = shift;
my $key = shift;
my $m = $STORE{$map};
return unless $m;
return $m->get_internal($key);
}
sub store_set_internal {
my $map = shift;
my $key = shift;
my $val = shift;
my $m = $STORE{$map};
return unless $m;
$m->set_internal($key, $val);
}
sub store_expire {
my $map = shift;
my $max = shift; # all versions before this
my $m = $STORE{$map};
return unless $m;
$m->expire($max);
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
cache => {},
kvneed => [],
kvneedorig => [],
kvfetching => 0,
missing => 0,
}, $class;
debug("new ae");
$me->_pick_map() || return;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
$me->_init_peer() || return;
debug("checking $me->{map} with $me->{peer}{id}");
inc_stat('ae_runs');
$me->_next_step();
push @AE, $me;
return $me;
}
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
return $peer;
}
################################################################
sub _finished {
my $me = shift;
debug("finished $me->{map}");
$DONE{$me->{map}} = $^T if $me->{missing} < $MAXMISSING;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
@AE = grep{ $_ != $me } @AE;
}
sub _next_step {
my $me = shift;
$me->{timestamp} = $^T;
if( $me->{kvfetching} < $MAXFETCH ){
# any missing data?
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
shard => $node->{shard},
} );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new(
$me->{peer}{ip}, undef,
$request,
info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );
if( $io ){
$io->set_callback('load', \&_check_load, $me);
$io->set_callback('error', \&_check_error, $me);
$io->start();
}
}
sub _check_load {
my $io = shift;
my $evt = shift;
my $me = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
data => $get,
} );
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
info => "AE getkv from $peer->{id}" );
if( $io ){
$me->{kvfetching} ++;
$io->set_callback('load', \&_getkv_load, $me, $retry, $get);
$io->set_callback('error', \&_getkv_error, $me, $retry, $get);
$io->start();
}
}
sub _getkv_load {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
# Author: Jeff Weisberg
# Created: 2009-Mar-30 18:39 (EDT)
# Function: interface with Berkeley DB
#
# $Id$
package AC::Yenta::Store::BDBI;
use AC::Yenta::Debug 'bdbi';
use BerkeleyDB;
use POSIX;
use Sys::SigAction 'set_sig_handler';
use strict;
my $TIMEOUT = 30;
AC::Yenta::Store::Map->add_backend( bdb => 'AC::Yenta::Store::BDBI' );
AC::Yenta::Store::Map->add_backend( berkeley => 'AC::Yenta::Store::BDBI' );
my %recovered;
sub new {
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
my $flags = $conf->{readonly} ? 0 : (DB_CREATE| DB_INIT_CDB | DB_INIT_MPOOL);
debug("opening Berkeley dir=$dir, file=$file (recov $recov)");
my $env = BerkeleyDB::Env->new(
-Home => $dir,
-Flags => $flags,
);
# microsecs
$env->set_timeout($TIMEOUT * 1_000_000 / 2, DB_SET_LOCK_TIMEOUT) if $env;
my $db = BerkeleyDB::Btree->new(
-Filename => $file,
-Env => $env,
-Flags => DB_CREATE,
);
problem("cannot open db file $file") unless $db;
# web server will need access
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
datum => $me->{req}{datum},
}, \$ect );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
$request . $ect,
info => "distrib $me->{info} to $id",
);
if( $io ){
$io->set_callback('load', \&_onload, $me, $id, $far);
$io->set_callback('error', \&_onerror, $me, $id, $far);
$io->start();
}else{
debug("start client failed");
}
}
sub _onload {
my $io = shift;
my $evt = shift;
my $me = shift;
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
################################################################
sub get_internal {
my $me = shift;
my $key = shift;
my($d, $found) = $me->{db}->get($me->{name}, 'internal', $key);
return $d;
}
sub set_internal {
my $me = shift;
my $key = shift;
my $val = shift;
$me->{db}->put($me->{name}, 'internal', $key, $val);
}
################################################################
sub expire {
lib/AC/Yenta/Store/SQLite.pm view on Meta::CPAN
my $sub = shift;
my $key = shift;
my $val = shift;
debug("put $map/$sub/$key");
my $st = _do($me->{db}, 'select 1 from ykv where map = ? and sub = ? and key = ?', $map, $sub, $key);
my($found) = $st->fetchrow_array();
if( $found ){
_do($me->{db}, 'update ykv set value = ? where map = ? and sub = ? and key = ?', encode_base64($val), $map, $sub, $key);
}else{
_do($me->{db}, 'insert into ykv (map,sub,key,value) values (?,?,?,?)', $map, $sub, $key, encode_base64($val));
}
return 1;
}
sub del {
my $me = shift;
my $map = shift;