view release on metacpan or search on metacpan
This software may be copied and distributed under the terms
found in the Perl "Artistic License".
A copy of the "Artistic License" may be found in the standard
Perl distribution.
--- #YAML:1.0
name: AC-Yenta
version: 1.1
abstract: eventually-consistent distributed key/value data store. et al.
author:
- AdCopy <http://www.adcopy.com>
license: perl
distribution_type: module
configure_requires:
ExtUtils::MakeMaker: 0
requires:
AC::DC: 0
BerkeleyDB: 0
Crypt::Rijndael: 0
Digest::SHA: 0
Google::ProtocolBuffers: 0
JSON: 0
POSIX: 0
Sys::Hostname: 0
Time::HiRes: 0
no_index:
directory:
- t
- inc
generated_by: ExtUtils::MakeMaker version 6.48
meta-spec:
url: http://module-build.sourceforge.net/META-spec-v1.4.html
version: 1.4
Makefile.PL view on Meta::CPAN
use ExtUtils::MakeMaker;
WriteMakefile(
NAME => 'AC::Yenta',
VERSION_FROM => 'lib/AC/Yenta.pm',
ABSTRACT_FROM => 'lib/AC/Yenta.pm',
AUTHOR => 'AdCopy <http://www.adcopy.com>',
LICENSE => 'perl',
PREREQ_PM => {
'POSIX' => 0,
'Sys::Hostname' => 0,
'JSON' => 0,
'Digest::SHA' => 0,
'Crypt::Rijndael' => 0,
'BerkeleyDB' => 0,
'Time::HiRes' => 0,
'Google::ProtocolBuffers' => 0,
'AC::DC' => 0,
}
);
eg/myself.pm view on Meta::CPAN
# $Id$
package Local::Yenta::MySelf;
use Sys::Hostname;
use strict;
my $SERVERID;
sub init {
my $class = shift;
my $port = shift; # our tcp port
my $id = shift; # from cmd line
$SERVERID = $id;
unless( $SERVERID ){
(my $h = hostname()) =~ s/\.example.com//; # remove domain
$SERVERID = "yenta/$h";
}
verbose("system persistent-id: $SERVERID");
}
sub my_server_id {
return $SERVERID;
}
1;
eg/yenta.conf view on Meta::CPAN
# enable debugging?
#debug ae
#debug map
#debug merkle
# ...
# maps
map testyfoo {
# name of the data file
dbfile /home/data/testyfoo.ydb
# how much history to keep
history 4
}
eg/yenta_get view on Meta::CPAN
use AC::Yenta::Client;
use AC::Dumper;
use strict;
my $map = shift @ARGV;
my $key = shift @ARGV;
die "usage: get [-h host] map key\n" unless $map && $key;
my $y = AC::Yenta::Client->new(
# server_file, servers[], or host + port
server_file => '/var/tmp/yenta.status',
debug => \&debug,
);
my $res = $y->get($map, $key);
print dumper($res), "\n";
exit;
sub debug {
print STDERR @_, "\n";
}
eg/yenta_put view on Meta::CPAN
use JSON;
use strict;
my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });
my $key = 'YX3jSXD3CBRUDABm';
my $res = $ys->distribute(
# map, key, version, data
'mymap', $key, timet_to_yenta_version(time()),
encode_json( {
url_id => $key,
url => 'http://www.example.com',
acc_id => 'C9TdSgbUCBRUCABG',
format => 'html',
}),
);
# example yenta daemon
use Getopt::Std;
use AC::Yenta::D;
use strict;
my @saved_argv = @ARGV;
my %OPT;
getopts('c:dfp:', \%OPT) || die "usage...\n";
# -c config file
# -d enable all debugging
# -f foreground
# -p port
my $y = AC::Yenta::D->new(
class_myself => 'My::Code::MySelf',
);
$y->daemon( $OPT{c}, {
argv => \@saved_argv,
foreground => $OPT{f},
debugall => $OPT{d},
port => $OPT{p},
} );
lib/AC/Yenta.pm view on Meta::CPAN
use strict;
our $VERSION = 1.1;
=head1 NAME
AC::Yenta - eventually-consistent distributed key/value data store. et al.
=head1 SYNOPSIS
use AC::Yenta::D;
use strict;
my $y = AC::Yenta::D->new( );
$y->daemon( $configfile, {
argv => \@ARGV,
foreground => $OPT{f},
debugall => $OPT{d},
port => $OPT{p},
} );
exit;
=head1 USAGE
Copy + Paste from the example code into your own code.
Copy + Paste from the example config into your own config.
Send in bug report.
=head1 YIDDISH-ENGLISH GLOSSARY
Kibitz - Gossip. Casual information exchange with ones peers.
Yenta - 1. An old woman who kibitzes with other yentas.
2. Software which kibitzes with other yentas.
=head1 DESCRIPTION
=head2 Peers
All of the running yentas are peers. There is no master server.
New nodes can be added or removed on the fly with no configuration.
=head2 Kibitzing
lib/AC/Yenta/Client.pm view on Meta::CPAN
use strict;
require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';
our @EXPORT = 'timet_to_yenta_version'; # imported from Y/Conf
my $HOSTNAME = hostname();
my %MSGTYPE =
(
yenta_get => { num => 7, reqc => 'ACPYentaGetSet', resc => 'ACPYentaGetSet' },
yenta_distrib => { num => 8, reqc => 'ACPYentaDistRequest', resc => 'ACPYentaDistReply' },
yenta_check => { num => 9, reqc => 'ACPYentaCheckRequest', resc => 'ACPYentaCheckReply' },
);
for my $name (keys %MSGTYPE){
my $r = $MSGTYPE{$name};
AC::DC::Protocol->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}
# one or more of:
# new( host, port )
# new( servers => [ { host, port }, ... ] )
# new( server_file )
sub new {
my $class = shift;
my $me = bless {
debug => sub{ },
host => 'localhost',
proto => AC::DC::Protocol->new(),
copies => 1,
@_,
}, $class;
$me->{server_file} ||= $me->{altfile}; # compat
die "servers or server_file?\n" unless $me->{servers} || $me->{server_file};
return $me;
}
sub get {
my $me = shift;
my $map = shift;
my $key = shift;
my $ver = shift;
my $req = $me->{proto}->encode_request( {
type => 'yenta_get',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
data => [ {
map => $map,
key => $key,
version => $ver,
} ]
} );
return $me->_send_request($map, $req);
}
sub _shard {
my $key = shift;
my $sh = sha1($key);
my($a, $b) = unpack('NN', $sh);
return $a<<32 | $b;
}
sub distribute {
my $me = shift;
my $map = shift;
my $key = shift;
my $ver = shift;
my $val = shift;
my $file = shift; # reference
my $meta = shift;
return unless $key && $ver;
$me->{retries} = 25 unless $me->{retries};
my $req = $me->{proto}->encode_request( {
type => 'yenta_distrib',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
sender => "$HOSTNAME/$$",
hop => 0,
expire => time() + 120,
datum => [ {
map => $map,
key => $key,
version => $ver,
shard => _shard($key), # NYI
value => $val,
meta => $meta,
} ]
}, $file );
return $me->_send_request($map, $req, $file);
# return undef | result
}
sub check {
my $me = shift;
my $map = shift;
my $ver = shift;
my $lev = shift;
my $req = $me->{proto}->encode_request( {
type => 'yenta_check',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
map => $map,
level => $lev,
version => $ver,
} );
return $me->_send_request($map, $req);
}
################################################################
sub _send_request {
my $me = shift;
my $map = shift;
my $req = shift;
my $file = shift; # reference
my $tries = $me->{retries} + 1;
my $copy = $me->{copies} || 1;
my $delay = 0.25;
$me->_init_hostlist($map);
my ($addr, $port) = $me->_next_host($map);
for (1 .. $tries){
return unless $addr;
my $res = $me->_try_server($addr, $port, $req, $file);
return $res if $res && !--$copy;
($addr, $port) = $me->_next_host($map);
sleep $delay;
$delay *= 1.414;
}
}
sub _try_server {
my $me = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
my $file = shift; # reference
my $ipn = inet_aton($addr);
$req .= $$file if $file;
$me->{debug}->("trying to contact yenta server $addr:$port");
my $res;
eval {
$res = $me->{proto}->send_request($ipn, $port, $req, $me->{debug}, $me->{timeout});
$res->{data} = $me->{proto}->decode_reply( $res ) if $res;
};
if(my $e = $@){
$me->{debug}->("yenta request failed: $e");
$res = undef;
}
return $res;
}
################################################################
sub _next_host {
my $me = shift;
my $map = shift;
$me->_read_serverfile($map) unless $me->{_server};
return unless $me->{_server} && @{$me->{_server}};
my $next = shift @{$me->{_server}};
return( $next->{addr}, $next->{port} );
}
sub _init_hostlist {
my $me = shift;
my $map = shift;
my @server;
push @server, {
addr => $me->{host},
port => $me->{port},
} if $me->{host} && $me->{port};
push @server, @{$me->{servers}} if $me->{servers};
$me->{_server} = \@server;
$me->_read_serverfile($map);
}
# yentad saves a list of alternate peers to try in case it dies
sub _read_serverfile {
my $me = shift;
my $map = shift;
my $f;
my @server;
my @faraway;
open($f, $me->{server_file});
local $/ = "\n";
while(<$f>){
chop;
my $data = decode_json( $_ );
next unless grep { $_ eq $map } @{ $data->{map} };
if( $data->{is_local} ){
push @server, { addr => $data->{addr}, port => $data->{port} };
}else{
push @faraway, { addr => $data->{addr}, port => $data->{port} };
}
}
# prefer local
@server = @faraway unless @server;
shuffle( \@server );
push @{$me->{_server}}, @server;
}
################################################################
1;
lib/AC/Yenta/Conf.pm view on Meta::CPAN
use AC::Import;
use strict;
our @EXPORT = qw(timet_to_yenta_version_factor timet_to_yenta_version);
my $TTVF = x64_one_million();
# deprecated
sub timet_to_yenta_version_factor {
return $TTVF;
}
sub timet_to_yenta_version {
my $t = shift;
return unless defined $t;
return $t * $TTVF unless ref $TTVF;
# math::bigint does not like to multiply floats
my $ti = int $t;
my $tf = $t - $ti;
return $ti * $TTVF + int($TTVF->numify() * $tf);
}
1;
lib/AC/Yenta/Config.pm view on Meta::CPAN
use AC::ConfigFile::Simple;
use Socket;
use strict;
our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value conf_map);
my %CONFIG = (
include => \&AC::ConfigFile::Simple::include_file,
debug => \&AC::ConfigFile::Simple::parse_debug,
allow => \&AC::ConfigFile::Simple::parse_allow,
port => \&AC::ConfigFile::Simple::parse_keyvalue,
environment => \&AC::ConfigFile::Simple::parse_keyvalue,
secret => \&AC::ConfigFile::Simple::parse_keyvalue,
seedpeer => \&AC::ConfigFile::Simple::parse_keyarray,
ae_maxload => \&AC::ConfigFile::Simple::parse_keyvalue,
distrib_max => \&AC::ConfigFile::Simple::parse_keyvalue,
savestatus => \&parse_savefile,
monitor => \&parse_monitor,
map => \&parse_map,
);
my @MAP = qw(dbfile basedir keepold history expire backend sharded);
################################################################
sub handle_config {
my $me = shift;
my $key = shift;
my $rest = shift;
my $fnc = $CONFIG{$key};
return unless $fnc;
$fnc->($me, $key, $rest);
return 1;
}
################################################################
sub parse_map {
my $me = shift;
my $key = shift;
my $value = shift;
my($map) = $value =~ /(\S+)\s+\{\s*/;
die "invalid map spec\n" unless $map;
my $md = {};
problem("map '$map' redefined") if $me->{_pending}{map}{$map};
while( defined(my $l = $me->_nextline()) ){
last if $l eq '}';
my($k, $v) = split /\s+/, $l, 2;
if( grep {$_ eq $k} @MAP ){
$v = cvt_timespec($v) if $k eq 'expire';
$md->{$k} = $v;
}else{
problem("unknown map option '$k'");
}
}
$me->{_pending}{map}{$map} = $md;
}
sub parse_monitor {
my $me = shift;
my $key = shift;
my $mon = shift;
my($ip, $port) = split /:/, $mon;
push @{$me->{_pending}{monitor}}, {
monitor => $mon,
ipa => $ip,
ipn => inet_aton($ip),
ipi => inet_atoi($ip),
port => $port,
};
}
sub parse_savefile {
my $me = shift;
my $key = shift;
my $save = shift;
my($file, @type) = split /\s+/, $save;
push @{$me->{_pending}{savestatus}}, {
type => \@type,
file => $file,
};
}
sub cvt_timespec {
my $t = shift;
my %f = ( m => 60, h => 3600, d => 86400 );
my($n, $f) = $t =~ /(\d+)(\D?)/;
$f = $f{$f} || 1;
return $n * $f;
}
################################################################
sub conf_value {
my $key = shift;
return $AC::Yenta::CONF->{config}{$key};
}
sub conf_map {
my $map = shift;
return $AC::Yenta::CONF->{config}{map}{$map};
}
1;
lib/AC/Yenta/Crypto.pm view on Meta::CPAN
use Time::HiRes 'time';
use Crypt::Rijndael;
use Digest::SHA qw(sha256 hmac_sha256_base64);
use strict;
require 'AC/protobuf/auth.pl';
my $ALGORITHM = 'x-acy-aes-1';
sub new {
my $class = shift;
my $secret = shift;
return bless {
secret => $secret,
}, $class;
}
sub encrypt {
my $me = shift;
my $buf = shift;
my $seqno = int( time() * 1_000_000 );
my $nonce = random_text(48);
my $key = $me->_key($seqno, $nonce);
my $iv = $me->_iv($key, $seqno, $nonce);
# pad
my $pbuf = $buf;
$pbuf .= "\0" x (16 - length($pbuf) & 0xF) if length($pbuf) & 0xF;
my $aes = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
$aes->set_iv( $iv );
my $ct = $aes->encrypt( $pbuf );
my $hmac = hmac_sha256_base64($ct, $key);
my $eb = ACPEncrypt->encode( {
algorithm => $ALGORITHM,
seqno => $seqno,
nonce => $nonce,
hmac => $hmac,
length => length($buf),
ciphertext => $ct,
} );
debug("encrypted <$seqno,$nonce,$hmac>");
return $eb;
}
sub decrypt {
my $me = shift;
my $buf = shift;
my $ed = ACPEncrypt->decode( $buf );
die "cannot decrypt: unknown alg\n" unless $ed->{algorithm} eq $ALGORITHM;
my $seqno = $ed->{seqno},
my $nonce = $ed->{nonce};
my $key = $me->_key($seqno, $nonce);
my $iv = $me->_iv($key, $seqno, $nonce);
my $hmac = hmac_sha256_base64($ed->{ciphertext}, $key);
die "cannot decrypt: hmac mismatch\n" unless $hmac eq $ed->{hmac};
my $aes = Crypt::Rijndael->new( $key, Crypt::Rijndael::MODE_CBC );
$aes->set_iv( $iv );
my $pt = substr($aes->decrypt( $ed->{ciphertext} ), 0, $ed->{length});
debug("decrypted <$seqno,$nonce,$hmac>");
return $pt;
}
sub _key {
my $me = shift;
my $seqno = shift;
my $nonce = shift;
return sha256( 'key1' . $me->{secret} . $seqno . $nonce . '1yek' );
}
sub _iv {
my $me = shift;
my $key = shift;
my $seqno = shift;
my $nonce = shift;
return substr(sha256( 'iv' . $key . $seqno ), 0, 16);
}
1;
lib/AC/Yenta/Customize.pm view on Meta::CPAN
# Author: Jeff Weisberg
# Created: 2010-Jan-26 10:37 (EST)
# Function: connect user provided implementation
#
# $Id$
package AC::Yenta::Customize;
use strict;
sub customize {
my $class = shift;
my $implby = shift;
(my $default = $class) =~ s/(.*)::([^:]+)$/$1::Default::$2/;
# load user's implemantation + default
for my $p ($implby, $default){
eval "require $p" if $p;
die $@ if $@;
}
# import/export
no strict;
no warnings;
for my $f ( @{$class . '::CUSTOM'} ){
*{$class . '::' . $f} = ($implby && $implby->can($f)) || $default->can($f);
}
}
1;
lib/AC/Yenta/D.pm view on Meta::CPAN
use AC::Yenta::Store;
use AC::Yenta::MySelf;
use AC::Yenta::NetMon;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
# use AC::Yenta::Store::Tokyo;
use strict;
sub new {
my $class = shift;
my %p = @_;
AC::Yenta::MySelf->customize( $p{class_myself} );
# ...
return bless \$class, $class;
}
sub daemon {
my $me = shift;
my $cfile = shift;
my $opt = shift; # foreground, debugall, persistent_id, argv
die "no config file specified\n" unless $cfile;
# configure
$AC::Yenta::CONF = AC::Yenta::Config->new(
$cfile, onreload => sub {
AC::Yenta::Store::configure();
});
initlog( 'yenta', (conf_value('syslog') || 'local5'), $opt->{debugall} );
AC::Yenta::Debug->init( $opt->{debugall}, $AC::Yenta::CONF);
daemonize(5, 'yentad', $opt->{argv}) unless $opt->{foreground};
verbose("starting.");
$SIG{CHLD} = $SIG{PIPE} = sub{}; # ignore
$SIG{INT} = $SIG{TERM} = $SIG{QUIT} = \&AC::DC::IO::request_exit; # abort
# initialize subsystems
my $port = $opt->{port} || conf_value('port');
AC::Yenta::MySelf->init( $port, $opt->{persistent_id} );
AC::Yenta::Store::configure();
AC::Yenta::Status::init( $port );
AC::Yenta::Monitor::init();
AC::Yenta::NetMon::init();
AC::DC::IO::TCP::Server->new( $port, 'AC::Yenta::Server' );
verbose("server started on tcp/$port");
# start "cronjobs"
AC::DC::Sched->new(
info => 'check config files',
freq => 30,
func => sub { $AC::Yenta::CONF->check() },
);
run_and_watch(
($opt->{foreground} || $opt->{debugall}),
\&AC::DC::IO::mainloop,
);
}
1;
lib/AC/Yenta/Default/MySelf.pm view on Meta::CPAN
use Sys::Hostname;
use Socket;
use strict;
my $SERVERID;
my $MYIP = inet_ntoa(scalar gethostbyname(hostname()));
die "cannot determine my IP addr.\nsee 'class_myself' in the documentation\n" unless $MYIP;
sub init {
my $class = shift;
my $port = shift; # not used
my $id = shift;
$SERVERID = $id;
unless( $SERVERID ){
$SERVERID = 'yenta/' . conf_value('environment') . '@' . hostname();
}
verbose("system persistent-id: $SERVERID");
}
sub my_server_id {
return $SERVERID;
}
sub my_network_info {
return [ { ipa => $MYIP } ];
}
sub my_datacenter {
return 'default';
}
1;
lib/AC/Yenta/Direct.pm view on Meta::CPAN
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
use AC::Yenta::Store::Tokyo;
use AC::Misc;
use AC::Import;
use strict;
our @EXPORT = 'yenta_direct_get';
sub yenta_direct_get {
my $file = shift;
my $map = shift;
my $key = shift;
my $me = __PACKAGE__->new($map, $file);
return unless $me;
return $me->get( $key );
}
sub new {
my $class = shift;
my $map = shift;
my $file = shift;
my $db = AC::Yenta::Store::Map->new( $map, undef, { dbfile => $file, readonly => 1 } );
return unless $db;
return bless { db => $db }, $class;
}
sub get {
my $me = shift;
my $key = shift;
my $db = $me->{db};
return $db->get($key);
}
sub allkeys {
my $me = shift;
return $me->getrange( '', '' );
}
sub getrange {
my $me = shift;
my $lo = shift;
my $hi = shift;
my $db = $me->{db};
return map { $_->{k} } $db->range( ($lo||''), ($hi||'') );
}
1;
lib/AC/Yenta/IO/TCP/Client.pm view on Meta::CPAN
our @ISA = 'AC::DC::IO::TCP::Client';
use AC::Yenta::MySelf;
use AC::Misc;
use strict;
my $inited;
my $natdom;
sub use_addr_port {
my $class = shift;
my $addr = shift;
my $port = shift;
# is addr + port => return
return ($addr, $port) unless ref $addr;
# addr is array of nat ip info (ACPIPPort)
_init() unless $inited;
my $public;
my $private;
for my $i ( @$addr ){
# skip unreachable networks
my $ok = AC::Yenta::NetMon::status_dom( $i->{natdom} );
next unless $ok == 200;
$public = $i unless $i->{natdom};
$private = $i if $i->{natdom} eq $natdom;
}
# prefer private addr if available (cheaper)
my $prefer = $private || $public;
return unless $prefer;
return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}
sub _init {
# determine my local NAT domain
my $nat = my_network_info();
for my $i (@$nat){
$natdom ||= $i->{natdom};
}
$inited = 1;
}
1;
lib/AC/Yenta/Kibitz/Status.pm view on Meta::CPAN
require 'AC/protobuf/yenta_status.pl';
use strict;
my $HOSTNAME = hostname();
################################################################
sub _myself {
my $maps = conf_value('map');
my @ipinfo;
my $natinfo = my_network_info();
my $status = 200;
for my $i ( @$natinfo ){
my $st = AC::Yenta::NetMon::status_dom( $i->{natdom} );
# if a private network is down, announce the network + 500
# if a private network is down, stop announcing it
if( $i->{natdom} ){
push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} }
if $st == 200;
}else{
push @ipinfo, { ipv4 => inet_atoi($i->{ipa}), port => AC::Yenta::Status->my_port(), natdom => $i->{natdom} };
$status = 500 unless $st == 200;
}
}
return {
hostname => $HOSTNAME,
datacenter => my_datacenter(),
subsystem => 'yenta',
environment => conf_value('environment'),
via => AC::Yenta::Status->my_server_id(),
server_id => AC::Yenta::Status->my_server_id(),
instance_id => AC::Yenta::Status->my_instance_id(),
path => '.',
status => $status,
uptodate => AC::Yenta::Store::AE->up_to_date(),
timestamp => $^T,
lastup => $^T,
ip => \@ipinfo,
map => [ keys %$maps ],
sort_metric => loadave() * 1000,
};
}
################################################################
sub myself {
# tell server about ourself
return ACPYentaStatusRequest->encode({
myself => _myself(),
});
}
sub response {
# send client everything we know
my @peer = AC::Yenta::Status->allpeers();
push @peer, _myself();
# add the items we monitor
push @peer, AC::Yenta::Monitor::export();
return ACPYentaStatusReply->encode({
status => \@peer,
});
}
################################################################
# do not believe a client that says it is up
# put it on the sceptical queue, and check for ourself
sub update_sceptical {
my $gpb = shift;
my $io = shift;
return unless $gpb;
my $c;
eval {
$c = ACPYentaStatusRequest->decode( $gpb );
$c = $c->{myself};
};
if(my $e = $@){
problem("cannot decode status data: $e");
return;
}
my $id = $c->{server_id};
# don't track myself
return if AC::Yenta::Status->my_server_id() eq $id;
AC::Yenta::Status->update_sceptical($id, $c, $io);
}
sub update {
my $gpb = shift;
return unless $gpb;
my $c;
eval {
$c = ACPYentaStatusReply->decode( $gpb );
};
if(my $e = $@){
problem("cannot decode status data: $e");
return;
}
debug("rcvd update");
my $myself = AC::Yenta::Status->my_server_id();
for my $up (@{$c->{status}}){
my $id = $up->{server_id};
next if $up->{via} eq $myself;
next if $id eq $myself;
AC::Yenta::Status->update($id, $up);
}
}
# unable to connect to server. mark it down
sub isdown {
my $id = shift;
return unless $id;
AC::Yenta::Status->isdown( $id );
}
1;
lib/AC/Yenta/Kibitz/Status/Client.pm view on Meta::CPAN
use AC::Misc;
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid = $$;
sub new {
my $class = shift;
debug('starting kibitz status client');
my $me = $class->SUPER::new( @_ );
return unless $me;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
return $me;
}
sub use_addr_port {
my $class = shift;
my $addr = shift;
my $port = shift;
# is addr + port => return
return ($addr, $port) unless ref $addr;
# addr is array of nat ip info (ACPIPPort)
my $down;
my $public;
my $private;
for my $i ( @$addr ){
# usually, skip unreachable networks
my $ok = AC::Yenta::NetMon::status_dom( $i->{natdom} );
next unless defined $ok; # remote private network
if( $ok == 200 ){
if( $i->{natdom} ){
$private = $i;
}else{
$public = $i;
}
}else{
$down = $i;
}
}
my $prefer;
# make sure we use all networks once in a while
$prefer ||= $down unless int rand(20);
$prefer ||= $public unless int rand(20);
# prefer private addr if available (cheaper)
$prefer ||= $private || $public || $down;
return unless $prefer;
#print STDERR "using ", inet_itoa($prefer->{ipv4}), "\n";
return ( inet_itoa($prefer->{ipv4}), ($prefer->{port} || $port) );
}
sub start {
my $me = shift;
$me->SUPER::start();
# build request
my $yp = AC::Yenta::Protocol->new();
my $pb = AC::Yenta::Kibitz::Status::myself();
my $hdr = $yp->encode_header(
type => 'yenta_status',
data_length => length($pb),
content_length => 0,
want_reply => 1,
msgid => $msgid++,
);
# write request
$me->write( $hdr . $pb );
$me->timeout_rel($TIMEOUT);
return $me;
}
sub timeout {
my $me = shift;
$me->shut();
}
sub shutdown {
my $me = shift;
unless( $me->{status_ok} ){
AC::Yenta::Kibitz::Status::isdown( $me->{status_peer} );
}
}
sub read {
my $me = shift;
my $evt = shift;
#debug("recvd reply");
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
$me->{status_ok} = 1;
AC::Yenta::Kibitz::Status::update( $data );
AC::Yenta::NetMon::update( $me );
$me->shut();
}
1;
lib/AC/Yenta/Kibitz/Status/Server.pm view on Meta::CPAN
#
# $Id$
package AC::Yenta::Kibitz::Status::Server;
use AC::Dumper;
use AC::Yenta::Debug 'status_server';
use strict;
sub handler {
my $class = shift;
my $io = shift;
my $proto = shift;
my $gpb = shift;
my $content = shift;
if( $gpb ){
AC::Yenta::Kibitz::Status::update_sceptical( $gpb, $io );
}
unless( $proto->{want_reply} ){
$io->shut();
return;
}
AC::Yenta::NetMon::update( $io );
# respond with all known peers
my $response = AC::Yenta::Kibitz::Status::response();
my $yp = AC::Yenta::Protocol->new();
my $hdr = $yp->encode_header(
type => 'yenta_status',
data_length => length($response),
content_length => 0,
msgid => $proto->{msgid},
is_reply => 1,
);
debug("sending status reply");
$io->write_and_shut( $hdr . $response );
}
1;
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
use AC::Yenta::IO::TCP::Client;
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $TIMEOUT = 5;
sub new {
my $class = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
debug('starting kibitz store client');
my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
return unless $me;
$me->{_req} = $req;
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
return $me;
}
sub start {
my $me = shift;
$me->SUPER::start();
$me->write( $me->{_req} );
$me->timeout_rel($TIMEOUT);
return $me;
}
sub shutdown {
my $me = shift;
# maybe call error handler
$me->run_callback('error', undef) unless $me->{_store_ok};
}
sub timeout {
my $me = shift;
$me->shut();
}
sub read {
my $me = shift;
my $evt = shift;
debug("recvd reply");
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
$me->timeout_rel($TIMEOUT) if $evt->{data} && !$proto;
return unless $proto;
$proto->{data} = $data;
$proto->{content} = $content;
eval {
my $yp = AC::Yenta::Protocol->new();
$proto->{data} = $yp->decode_reply($proto) if $data;
};
if(my $e = $@){
problem("cannot decode reply: $e");
}
# process
$me->{_store_ok} = 1;
if( $proto->{is_error} || $@ ){
my $e = $@ || 'remote error';
$me->run_callback('error', {
error => $e,
});
}else{
$me->run_callback('load', $proto);
}
$me->shut();
}
1;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
use AC::Dumper;
use JSON;
use Digest::SHA 'sha1_base64';
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;
my $TIMEOUT = 1;
sub api_get {
my $io = shift;
my $proto = shift;
my $gpb = shift;
my $content = shift; # not used
unless( $proto->{want_reply} ){
$io->shut();
return;
}
# decode request
my $req;
eval {
$req = ACPYentaGetSet->decode( $gpb );
};
if(my $e = $@){
problem("cannot decode request: $e");
$io->shut();
return;
}
# process requests
my @res;
my $rescont;
for my $r (@{ $req->{data} }){
debug("get request: $r->{map}, $r->{key}, $r->{version}");
my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
my $res = {
map => $r->{map},
key => $r->{key},
};
if( $meta && $file ){
unless( _check_content( $meta, $file ) ){
problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
# QQQ - remove from system, (and let AE get a new copy)?
store_remove($r->{map}, $r->{key}, $ver);
# tell caller it was not found
$ver = undef;
}
}
if( defined $ver ){
$res->{version} = $ver;
$res->{value} = $data;
$res->{meta} = $meta if defined $meta;
if( $file ){
# if one file to send, send it as content
if( @{$req->{data}} == 1 ){
$rescont = $file;
}else{
$res->{file} = $$file;
}
}
}
push @res, $res;
}
# encode results
my $ect = '';
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
$ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
my $response = $yp->encode_reply( {
type => 'yenta_get',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
content_encrypted => $proto->{data_encrypted},
}, { data => \@res }, \$ect );
debug("sending get reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response . $ect );
}
sub api_check {
my $io = shift;
my $proto = shift;
my $gpb = shift;
my $content = shift; # not used
unless( $proto->{want_reply} ){
$io->shut();
return;
}
# decode request
my $req;
eval {
$req = ACPYentaCheckRequest->decode( $gpb );
};
if(my $e = $@){
problem("cannot decode request: $e");
$io->shut();
return;
}
debug("check request: $req->{map}, $req->{version}, $req->{level}");
my @res;
my @todo = { version => $req->{version}, shard => $req->{shard} };
# the top of the tree will be fairly sparse,
# return up to several levels if they are sparse
for my $l (0 .. 32){
my $cl = $req->{level} + $l;
my @lres;
my $nexttot;
for my $do (@todo){
my @r = _get_check( $req->{map}, $do->{shard}, $do->{version}, $cl );
push @lres, @r;
for (@r){
$nexttot += $_->{key} ? ($_->{count} / 8) : $_->{count};
}
}
push @res, @lres;
@todo = ();
last unless @lres; # reached the bottom of the tree
last if @res > 64; # got enough results
last if (@lres > 2) && ($nexttot > @lres + 2); # less sparse region
# get the next level also
@todo = @lres;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_check',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { check => \@res } );
debug("sending check reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response );
}
# get + process merkle data
sub _get_check {
my $map = shift;
my $shard = shift;
my $ver = shift;
my $lev = shift;
my $res = store_get_merkle($map, $shard, $ver, $lev);
return unless $res;
for my $r (@$res) {
$r->{map} = $map;
}
return @$res;
}
sub api_distrib {
my $io = shift;
my $proto = shift;
my $gpb = shift;
my $content = shift; # reference
# decode request
my $req;
eval {
$req = ACPYentaDistRequest->decode( $gpb );
die "invalid k/v for put request\n" unless $req->{datum}{key} && $req->{datum}{version};
};
if(my $e = $@){
my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
problem("cannot decode request: peer: $io->{peerip} $enc, $e");
$io->shut();
return;
}
unless( conf_map( $req->{datum}{map} ) ){
problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
_reply_error($io, $proto, 404, 'Map Not Found');
return;
}
my $v = $req->{datum};
# do we already have ?
my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );
if( $want ){
# put
debug("put request from $io->{peerip}: $v->{map}, $v->{key}, $v->{version}");
# file content is passed by reference, to avoid large copies
$content ||= \ $v->{file} if $v->{file};
# check
if( $v->{meta} && $content ){
unless( _check_content( $v->{meta}, $content ) ){
problem("content SHA1 check failed: $req->{datum}{map}, $req->{datum}{key}, $req->{datum}{version}");
$io->shut();
return;
}
}
$want = store_put( $v->{map}, $v->{shard}, $v->{key}, $v->{version}, $v->{value},
$content, $v->{meta} );
# distribute to other systems
AC::Yenta::Store::Distrib->new( $req, $content ) if $want;
}else{
debug("put from $io->{peerip} unwanted: $v->{map}, $v->{shard}, $v->{key}, $v->{version}");
}
unless( $proto->{want_reply} ){
$io->shut();
return;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { status_code => 200, status_message => 'OK', haveit => !$want } );
debug("sending distrib reply");
$io->timeout_rel($TIMEOUT);
$io->write_and_shut( $response );
}
sub _check_content {
my $meta = shift;
my $cont = shift;
return 1 unless $meta && $meta =~ /^\{/;
eval {
$meta = decode_json($meta);
};
return 1 if $@;
if( $meta->{sha1} ){
my $chk = sha1_base64( $$cont );
return unless $chk eq $meta->{sha1};
}
if( $meta->{size} ){
my $len = length($$cont);
return unless $len == $meta->{size};
}
return 1;
}
sub _reply_error {
my $io = shift;
my $proto = shift;
my $code = shift;
my $msg = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
is_error => 1,
data_encrypted => $proto->{data_encrypted},
}, {
status_code => $code,
status_message => $msg,
haveit => 0,
} );
debug("sending distrib reply");
$io->write_and_shut( $response );
}
1;
lib/AC/Yenta/Monitor.pm view on Meta::CPAN
require 'AC/protobuf/heartbeat.pl';
my $FREQ = 2;
my $OLD_DOWN = 30;
my $OLD_KEEP = 1800;
my %MON; # by 'id' (from config file)
sub init {
AC::DC::Sched->new(
info => 'monitor',
freq => $FREQ,
func => \&periodic,
);
}
sub periodic {
my $mon = conf_value('monitor');
# clean up old data
for my $id (keys %MON){
isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
}
# start monitoring (send heartbeat request)
for my $m (@$mon){
my $ip = $m->{ipa};
my $port = $m->{port};
my $id = "$ip:$port";
debug("start monitoring $id");
my $ok = AC::Yenta::Monitor::Client->new( $ip, $port,
info => "monitor client: $id",
monitor_peer => $id,
);
isdown($id, 0) unless $ok;
}
}
# data for kibitzing around
# array of ACPYentaStatus
sub export {
my @d;
my $here = my_datacenter();
my $self = my_server_id();
for my $v (values %MON){
push @d, {
id => $v->{id}, # from config, typically localhost:port
datacenter => $here,
via => $self,
hostname => $v->{hostname},
subsystem => $v->{subsystem},
environment => $v->{environment},
status => $v->{status_code},
timestamp => $v->{timestamp},
lastup => $v->{lastup},
sort_metric => $v->{sort_metric},
capacity_metric => $v->{capacity_metric},
server_id => $v->{server_id},
instance_id => $v->{server_id},
ip => $v->{ip},
path => '.',
};
}
return @d;
}
sub isdown {
my $id = shift;
my $code = shift;
my $d = $MON{$id};
return unless $d;
# require 2 polls to fail
return unless $^T - $d->{lastup} >= 2 * $FREQ;
$code = 0 if $code == 200;
$d->{status_code} = $code || 0;
$d->{timestamp} = $^T;
debug("monitor $id is down");
if( $d->{lastup} < $^T - $OLD_KEEP ){
debug("monitor $id down too long. removing from report");
delete $MON{$id};
}
}
sub update {
my $id = shift;
my $gb = shift; # ACPHeartbeat
my $up;
eval {
$up = ACPHeartBeat->decode( $gb );
$up->{id} = $id;
};
if(my $e = $@){
problem("cannot decode hb data: $e");
return isdown($id, 0);
}
unless( $up->{status_code} == 200 ){
return isdown($id, $up->{status_code});
}
return isdown($id, 0) unless $^T - $up->{timestamp} < $OLD_DOWN;
debug("monitor $id is up");
$up->{lastup} = $^T;
$up->{downcount} = 0;
_hb_ip_info( $up, $MON{$id} );
$MON{$id} = $up;
}
sub _hb_ip_info {
my $up = shift;
my $old = shift;
my $ip;
$ip = $old->{ip} if ($old->{process_id} == $up->{process_id}) && ($old->{server_id} eq $up->{server_id});
unless( $ip ){
my $port = $up->{port};
unless( $port ){
# use monitored port (id is from config)
(undef, $port) = split /:/, $up->{id};
}
if( $up->{ip} ){
$ip = [ { ipv4 => $up->{ip}, port => $port, natdom => undef } ];
}else{
my $mynat = my_network_info();
for my $i ( @$mynat ){
push @$ip, { ipv4 => $i->{ipi}, port => $port, natdom => $i->{natdom} };
}
}
}
$up->{ip} = $ip;
}
1;
lib/AC/Yenta/Monitor/Client.pm view on Meta::CPAN
use AC::Yenta::IO::TCP::Client;
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';
my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid = $$;
sub new {
my $class = shift;
debug('starting monitor status client');
my $me = $class->SUPER::new( @_ );
unless($me){
return;
}
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
$me->start();
# build request
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $hdr = $yp->encode_header(
type => 'heartbeat_request',
data_length => 0,
content_length => 0,
want_reply => 1,
msgid => $msgid++,
);
# write request
$me->write( $hdr );
$me->timeout_rel($TIMEOUT);
return $me;
}
sub timeout {
my $me = shift;
$me->shut();
}
sub shutdown {
my $me = shift;
unless( $me->{status_ok} ){
AC::Yenta::Monitor::isdown( $me->{monitor_peer} );
}
}
sub read {
my $me = shift;
my $evt = shift;
debug("recvd reply");
my $yp = AC::Yenta::Protocol->new();
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
$me->{status_ok} = 1;
AC::Yenta::Monitor::update( $me->{monitor_peer}, $data );
$me->shut();
}
1;
lib/AC/Yenta/MySelf.pm view on Meta::CPAN
1;
=head1 NAME
AC::Yenta::MySelf - customize yenta to your own environment
=head1 SYNOPSIS
emacs /myperldir/Local/Yenta/MySelf.pm
copy. paste. edit.
use lib '/myperldir';
my $y = AC::Yenta::D->new(
class_myself => 'Local::Yenta::MySelf',
);
=head1 DESCRIPTION
provide functions to override default behavior. you may define
any or all of the following functions.
=head2 my_server_id
return a unique identity for this yenta instance. typically,
something similar to the server hostname.
sub my_server_id {
return 'yenta@' . hostname();
}
=head2 my_datacenter
return the name of the local datacenter. yenta will use this
to determine which systems are local (same datacenter) and
which are remote (different datacenter), and will tune various
behaviors accordingly.
sub my_datacenter {
my($domain) = hostname() =~ /^[\.]+\.(.*)/;
return $domain;
}
=head2 my_network_info
return information about the various networks this server has.
sub my_network_info {
my $public_ip = inet_ntoa(scalar gethostbyname(hostname()));
my $privat_ip = inet_ntoa(scalar gethostbyname('internal-' . hostname()));
return [
# use this IP for communication with servers this datacenter (same natdom)
{ ip => $privat_ip, natdom => my_datacenter() },
# otherwise use this IP
{ ip => $public_ip },
]
}
=head2 init
inialization function called at startup. typically used to lookup hostanmes, IP addresses,
and such and store them in variables to make the above functions faster.
my $HOSTNAME;
my $DOMAIN;
sub init {
$HOSTNAME = hostname();
($DOMAIN) = $HOSTNAME =~ /^[\.]+\.(.*)/;
}
=head1 BUGS
none. you write this yourself.
=head1 SEE ALSO
AC::Yenta
=head1 AUTHOR
You!
=cut
lib/AC/Yenta/NetMon.pm view on Meta::CPAN
use strict;
my $STALE = 120;
my %lastok; # natdom => T
my %natdom; # ip => natdom
sub init {
my $natinfo = my_network_info();
for my $n ( @$natinfo ){
my $dom = $n->{natdom} || 'public';
$natdom{ $n->{ipa} } = $dom;
$lastok{ $dom } = $^T; # assume everything is working
}
}
sub update {
my $io = shift;
my $ip = inet_ntoa( (sockaddr_in(getsockname($io->{fd})))[1] );
my $dom = $natdom{ $ip } || 'public';
$lastok{$dom} = $^T;
}
sub status_dom {
my $dom = shift;
$dom ||= 'public';
return unless exists $lastok{$dom}; # not local
return ($lastok{$dom || 'public'} + $STALE < $^T) ? 0 : 200;
}
1;
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
require 'AC/protobuf/yenta_status.pl';
require 'AC/protobuf/yenta_check.pl';
require 'AC/protobuf/yenta_getset.pl';
our @ISA = 'AC::DC::Protocol';
our @EXPORT = qw(read_protocol use_encryption);
my $HDRSIZE = __PACKAGE__->header_size();
my %MSGTYPE =
(
heartbeat_request => { num => 2, reqc => '', resc => 'ACPHeartBeat' },
yenta_status => { num => 6, reqc => 'ACPYentaStatusRequest', resc => 'ACPYentaStatusReply' },
yenta_get => { num => 7, reqc => 'ACPYentaGetSet', resc => 'ACPYentaGetSet' },
yenta_distrib => { num => 8, reqc => 'ACPYentaDistRequest', resc => 'ACPYentaDistReply' },
yenta_check => { num => 9, reqc => 'ACPYentaCheckRequest', resc => 'ACPYentaCheckReply' },
);
for my $name (keys %MSGTYPE){
my $r = $MSGTYPE{$name};
__PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}
sub read_protocol {
my $me = shift;
my $io = shift;
my $evt = shift;
$io->{rbuffer} .= $evt->{data};
return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;
if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
# decode header
eval {
$io->{proto_header} = $me->decode_header( $io->{rbuffer} );
};
if(my $e=$@){
verbose("cannot decode protocol header: $e");
$io->run_callback('error', {
cause => 'read',
error => "cannot decode protocol: $e",
});
$io->shut();
return;
}
}
my $p = $io->{proto_header};
return unless $p; # read more
# do we have everything?
return unless length($io->{rbuffer}) >= ($p->{auth_length} + $p->{data_length} + $p->{content_length} + $HDRSIZE);
my $auth = substr($io->{rbuffer}, $HDRSIZE, $p->{auth_length});
my $data = substr($io->{rbuffer}, $HDRSIZE + $p->{auth_length}, $p->{data_length});
my $content = substr($io->{rbuffer}, $HDRSIZE + $p->{auth_length} + $p->{data_length}, $p->{content_length});
# RSN - validate auth
if( $p->{data_encrypted} && $data ){
$data = $me->_decrypt_data( $io, $auth, $data );
return unless $data;
}
if( $p->{content_encrypted} && $content ){
$content = $me->_decrypt_data( $io, $auth, $content );
return unless $content;
}
# content is passed as reference
return ($p, $data, ($content ? \$content : undef));
}
# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
my $io = shift;
my $evt = shift;
return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
my($get, $url, $http) = split /\s+/, $io->{rbuffer};
return ( { type => 'http', method => $get }, $url );
}
################################################################
sub _decrypt_data {
my $me = shift;
my $io = shift;
my $auth = shift;
my $data = shift;
eval {
$data = $me->decrypt( $auth, $data );
};
if(my $e=$@){
verbose("cannot decrypt protocol data: $e");
$io->run_callback('error', {
cause => 'read',
error => "cannot decrypt protocol: $e",
});
$io->shut();
return;
}
return $data;
}
sub use_encryption {
my $peer = shift;
return unless conf_value('secret');
# only encrypt far-away traffic, not local
return $peer->{datacenter} ne my_datacenter();
}
sub encrypt {
my $me = shift;
my $auth = shift; # not currently used
my $buf = shift;
my $secret = $me->{secret};
return $buf unless $secret;
return unless $buf;
my $crypto = AC::Yenta::Crypto->new( $secret );
return $crypto->encrypt( $buf );
}
sub decrypt {
my $me = shift;
my $abuf = shift; # not currently used
my $buf = shift;
my $secret = $me->{secret};
return $buf unless $secret;
return unless $buf;
my $crypto = AC::Yenta::Crypto->new( $secret );
return $crypto->decrypt( $buf );
}
1;
lib/AC/Yenta/Server.pm view on Meta::CPAN
use AC::Yenta::Kibitz::Status::Server;
use AC::Yenta::Kibitz::Store::Server;
use strict;
our @ISA = 'AC::DC::IO::TCP';
my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my %HANDLER = (
yenta_status => 'AC::Yenta::Kibitz::Status::Server',
yenta_get => \&AC::Yenta::Kibitz::Store::Server::api_get,
yenta_distrib => \&AC::Yenta::Kibitz::Store::Server::api_distrib,
yenta_check => \&AC::Yenta::Kibitz::Store::Server::api_check,
http => 'AC::Yenta::Stats',
);
sub new {
my $class = shift;
my $fd = shift;
my $ip = shift;
unless( $AC::Yenta::CONF->check_acl( $ip ) ){
verbose("rejecting connection from $ip");
return;
}
my $me = $class->SUPER::new( peerip => $ip, info => "tcp yenta server (from: $ip)" );
$me->init($fd);
$me->wantread(1);
$me->timeout_rel($TIMEOUT);
$me->set_callback('read', \&read);
$me->set_callback('timeout', \&timeout);
}
sub timeout {
my $me = shift;
debug("connection timed out");
$me->shut();
}
sub read {
my $me = shift;
my $evt = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
# dispatch request
my $h = $HANDLER{ $proto->{type} };
unless( $h ){
verbose("unknown message type: $proto->{type}");
$me->shut();
return;
}
debug("handling request - $proto->{type}");
if( ref $h ){
$h->( $me, $proto, $data, $content );
}else{
$h->handler( $me, $proto, $data, $content );
}
}
1;
view all matches for this distributionview release on metacpan - search on metacpan