view release on metacpan or search on metacpan
--- #YAML:1.0
name: AC-Yenta
version: 1.1
abstract: eventually-consistent distributed key/value data store. et al.
author:
- AdCopy <http://www.adcopy.com>
license: perl
distribution_type: module
configure_requires:
ExtUtils::MakeMaker: 0
requires:
AC::DC: 0
BerkeleyDB: 0
Crypt::Rijndael: 0
Digest::SHA: 0
Google::ProtocolBuffers: 0
JSON: 0
POSIX: 0
lib/AC/Yenta/Client.pm view on Meta::CPAN
return $me;
}
sub get {
my $me = shift;
my $map = shift;
my $key = shift;
my $ver = shift;
my $req = $me->{proto}->encode_request( {
type => 'yenta_get',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
data => [ {
map => $map,
key => $key,
version => $ver,
} ]
} );
lib/AC/Yenta/Client.pm view on Meta::CPAN
my $key = shift;
my $ver = shift;
my $val = shift;
my $file = shift; # reference
my $meta = shift;
return unless $key && $ver;
$me->{retries} = 25 unless $me->{retries};
my $req = $me->{proto}->encode_request( {
type => 'yenta_distrib',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
sender => "$HOSTNAME/$$",
hop => 0,
expire => time() + 120,
datum => [ {
map => $map,
key => $key,
version => $ver,
lib/AC/Yenta/Client.pm view on Meta::CPAN
# return undef | result
}
sub check {
my $me = shift;
my $map = shift;
my $ver = shift;
my $lev = shift;
my $req = $me->{proto}->encode_request( {
type => 'yenta_check',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
map => $map,
level => $lev,
version => $ver,
} );
return $me->_send_request($map, $req);
}
lib/AC/Yenta/Config.pm view on Meta::CPAN
ipi => inet_atoi($ip),
port => $port,
};
}
sub parse_savefile {
my $me = shift;
my $key = shift;
my $save = shift;
my($file, @type) = split /\s+/, $save;
push @{$me->{_pending}{savestatus}}, {
type => \@type,
file => $file,
};
}
sub cvt_timespec {
my $t = shift;
my %f = ( m => 60, h => 3600, d => 86400 );
my($n, $f) = $t =~ /(\d+)(\D?)/;
lib/AC/Yenta/Kibitz/Status/Client.pm view on Meta::CPAN
sub start {
my $me = shift;
$me->SUPER::start();
# build request
my $yp = AC::Yenta::Protocol->new();
my $pb = AC::Yenta::Kibitz::Status::myself();
my $hdr = $yp->encode_header(
type => 'yenta_status',
data_length => length($pb),
content_length => 0,
want_reply => 1,
msgid => $msgid++,
);
# write request
$me->write( $hdr . $pb );
$me->timeout_rel($TIMEOUT);
return $me;
lib/AC/Yenta/Kibitz/Status/Server.pm view on Meta::CPAN
return;
}
AC::Yenta::NetMon::update( $io );
# respond with all known peers
my $response = AC::Yenta::Kibitz::Status::response();
my $yp = AC::Yenta::Protocol->new();
my $hdr = $yp->encode_header(
type => 'yenta_status',
data_length => length($response),
content_length => 0,
msgid => $proto->{msgid},
is_reply => 1,
);
debug("sending status reply");
$io->write_and_shut( $hdr . $response );
}
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
}
}
push @res, $res;
}
# encode results
my $ect = '';
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
$ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
my $response = $yp->encode_reply( {
type => 'yenta_get',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
content_encrypted => $proto->{data_encrypted},
}, { data => \@res }, \$ect );
debug("sending get reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response . $ect );
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
last unless @lres; # reached the bottom of the tree
last if @res > 64; # got enough results
last if (@lres > 2) && ($nexttot > @lres + 2); # less sparse region
# get the next level also
@todo = @lres;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_check',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { check => \@res } );
debug("sending check reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response );
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
unless( $proto->{want_reply} ){
$io->shut();
return;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
data_encrypted => $proto->{data_encrypted},
}, { status_code => 200, status_message => 'OK', haveit => !$want } );
debug("sending distrib reply");
$io->timeout_rel($TIMEOUT);
$io->write_and_shut( $response );
}
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
}
sub _reply_error {
my $io = shift;
my $proto = shift;
my $code = shift;
my $msg = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
is_error => 1,
data_encrypted => $proto->{data_encrypted},
}, {
status_code => $code,
status_message => $msg,
haveit => 0,
} );
lib/AC/Yenta/Monitor/Client.pm view on Meta::CPAN
$me->set_callback('timeout', \&timeout);
$me->set_callback('read', \&read);
$me->set_callback('shutdown', \&shutdown);
$me->start();
# build request
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $hdr = $yp->encode_header(
type => 'heartbeat_request',
data_length => 0,
content_length => 0,
want_reply => 1,
msgid => $msgid++,
);
# write request
$me->write( $hdr );
$me->timeout_rel($TIMEOUT);
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
my $io = shift;
my $evt = shift;
return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
my($get, $url, $http) = split /\s+/, $io->{rbuffer};
return ( { type => 'http', method => $get }, $url );
}
################################################################
sub _decrypt_data {
my $me = shift;
my $io = shift;
my $auth = shift;
my $data = shift;
lib/AC/Yenta/Server.pm view on Meta::CPAN
sub read {
my $me = shift;
my $evt = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
return unless $proto;
# dispatch request
my $h = $HANDLER{ $proto->{type} };
unless( $h ){
verbose("unknown message type: $proto->{type}");
$me->shut();
return;
}
debug("handling request - $proto->{type}");
if( ref $h ){
$h->( $me, $proto, $data, $content );
}else{
$h->handler( $me, $proto, $data, $content );
}
}
1;
lib/AC/Yenta/Status.pm view on Meta::CPAN
my $SAVEMAX = 1800; # do not save if older than
my $PORT;
our $DATA = bless {
allpeer => {}, # yenta_status
sceptical => {},
mappeer => {}, # {map} => { id => id }
peermap => {}, # {id} => @map
datacenter => {}, # {dc} => { id => id }
peertype => {}, # {ss} => { id => id }
};
sub init {
my $port = shift;
$PORT = $port;
AC::DC::Sched->new(
info => 'kibitz status',
freq => (conf_value('time_status_kibitz') || 5),
lib/AC/Yenta/Status.pm view on Meta::CPAN
$c->start();
}
sub _random_peer {
my $here = my_datacenter();
# sceptical
my @scept = values %{$DATA->{sceptical}};
my @all = map { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};
my @old = grep { $_->{timestamp} < $^T - $KEEPLOST *.75 } @all;
my @local = grep { $_->{datacenter} eq $here } @all; # this datacenter
my @away = grep { $_->{datacenter} ne $here } @all; # not this datacenter
# first check anything sceptical
my @peer = @scept;
# then (maybe) something about to expire
@peer = @old unless @peer || int rand(5);
lib/AC/Yenta/Status.pm view on Meta::CPAN
my $ipinfo = my_network_info();
for my $i (@$ipinfo){
return if $ip eq $i->{ipa} && $port == $PORT;
}
return("seed/$ip:$port", $ip, $port);
}
# server list for save file
sub server_list {
my $type = shift;
($type, my $where) = split m|/|, $type;
# where - no longer used
$where ||= my_datacenter();
my @peer = keys %{ $DATA->{peertype}{$type} };
return unless @peer;
# nothing too old
@peer = grep { $DATA->{allpeer}{$_}{lastup} > $^T - $SAVEMAX } @peer;
return unless @peer;
return map { $DATA->{allpeer}{$_} } @peer;
}
# save a list of peers, in case I crash, and for others to use
sub save_status {
my $save = conf_value('savestatus');
my $here = my_datacenter();
# also save locally running services
my @mon = AC::Yenta::Monitor::export();
for my $s ( @$save ){
my $file = $s->{file};
my $types = $s->{type};
my @peer;
for my $type (@$types){
push @peer, server_list($type);
for my $m (@mon){
push @peer, $m if $m->{subsystem} eq $type;
}
}
next unless @peer;
debug("saving peer status file");
unless( open(FILE, ">$file.tmp") ){
problem("cannot open save file '$file.tmp': $!");
return;
}
lib/AC/Yenta/Status.pm view on Meta::CPAN
my $class = shift;
return $DATA->{datacenter};
}
################################################################
sub _remove {
my $id = shift;
my $ss = $DATA->{allpeer}{$id}{subsystem};
delete $DATA->{peertype}{$ss}{$id} if $ss;
my $dc = $DATA->{allpeer}{$id}{datacenter};
delete $DATA->{datacenter}{$dc}{$id} if $dc;
verbose("deleting peer: $id");
delete $DATA->{allpeer}{$id};
# remove map info
for my $map ( @{$DATA->{peermap}{$id}} ){
delete $DATA->{mappeer}{$map}{$id};
lib/AC/Yenta/Status.pm view on Meta::CPAN
}
# update datacenter info
unless( $DATA->{datacenter}{$up->{datacenter}}{$id} ){
my $pdc = $previnfo->{datacenter};
delete $DATA->{datacenter}{$pdc}{$id} if $pdc;
$DATA->{datacenter}{$up->{datacenter}}{$id} = $id;
}
# update subsystem info
unless( $DATA->{peertype}{$up->{subsystem}}{$id} ){
my $ss = $previnfo->{subsystem};
delete $DATA->{peertype}{$ss}{$id} if $ss;
$DATA->{peertype}{$up->{subsystem}}{$id} = $id;
}
# update map info
$DATA->{peermap}{$id} ||= [];
$up->{map} ||= [];
my @curmap = @{$DATA->{peermap}{$id}};
my @newmap = sort @{$up->{map}};
return if "@curmap" eq "@newmap"; # unchanged
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
sub _start_check {
my $me = shift;
my $node = shift @{$me->{badnode}};
debug("checking next node: $me->{map} $node->{level}/$node->{version}");
inc_stat('ae_check_node');
my $enc = use_encryption($me->{peer});
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $request = $proto->encode_request( {
type => 'yenta_check',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
map => $me->{map},
level => $node->{level},
version => $node->{version},
shard => $node->{shard},
} );
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
# insert map into request
$_->{map} = $me->{map} for @$get;
# for (@$get){ debug("requesting $_->{key}/$_->{version}") }
my $enc = use_encryption($peer);
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
# build request
my $request = $proto->encode_request( {
type => 'yenta_get',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
data => $get,
} );
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
my $pd = AC::Yenta::Status->peer($id);
my $addr = $pd->{ip}; # array of nat ip info
my $enc = use_encryption($pd);
my $ect = '';
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
$ect = $enc ? $proto->encrypt(undef, ${$me->{content}}) : ${$me->{content}} if $me->{content};
# build request
my $request = $proto->encode_request( {
type => 'yenta_distrib',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
content_encrypted => $enc,
}, {
sender => AC::Yenta::Status->my_server_id(),
hop => $me->{req}{hop} + 1,
expire => $me->{req}{expire},
datum => $me->{req}{datum},
}, \$ect );