view release on metacpan or search on metacpan
eg/yenta.conf view on Meta::CPAN
allow 127.0.0.1
allow 10.200.2.0/23
# seed peers to locate the network at startup
seedpeer 10.200.2.4:3503
seedpeer 10.200.2.5:3503
# enable debugging?
#debug ae
#debug map
#debug merkle
# ...
# maps
map testyfoo {
# name of the data file
dbfile /home/data/testyfoo.ydb
# how much history to keep
history 4
}
eg/yenta_get view on Meta::CPAN
# Author: Jeff Weisberg
# Created: 2009-Apr-03 13:43 (EDT)
# Function: get value example
#
# $Id$
use AC::Yenta::Client;
use AC::Dumper;
use strict;
my $map = shift @ARGV;
my $key = shift @ARGV;
die "usage: get [-h host] map key\n" unless $map && $key;
my $y = AC::Yenta::Client->new(
# server_file, servers[], or host + port
server_file => '/var/tmp/yenta.status',
debug => \&debug,
);
my $res = $y->get($map, $key);
print dumper($res), "\n";
exit;
sub debug {
print STDERR @_, "\n";
}
eg/yenta_get_direct view on Meta::CPAN
# Author: Jeff Weisberg
# Created: 2009-Sep-16 12:00 (EDT)
# Function: get - using bdb file directly
#
# $Id$
use AC::Dumper;
use AC::Yenta::Direct;
use strict;
my $map = shift @ARGV;
my $file = shift @ARGV;
my $key = shift @ARGV;
my $y = AC::Yenta::Direct->new( $map, $file );
my $v = $y->get($key);
print dumper($v), "\n";
eg/yenta_put view on Meta::CPAN
use JSON;
use strict;
my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });
my $key = 'YX3jSXD3CBRUDABm';
my $res = $ys->distribute(
# map, key, version, data
'mymap', $key, timet_to_yenta_version(time()),
encode_json( {
url_id => $key,
url => 'http://www.example.com',
acc_id => 'C9TdSgbUCBRUCABG',
format => 'html',
}),
);
lib/AC/Yenta.pm view on Meta::CPAN
=item syslog
specify a syslog facility for log messages.
syslog local5
=item debug
enable debugging for a particular section
debug map
=item map
configure a map (a collection of key-value data). you do not need
to configure the same set of maps on all servers. maps should be
configured similarly on all servers that they are on.
map users {
backend bdb
dbfile /home/acdata/users.ydb
history 4
}
=back
=head1 BUGS
Too many to list here.
lib/AC/Yenta/Client.pm view on Meta::CPAN
$me->{server_file} ||= $me->{altfile}; # compat
die "servers or server_file?\n" unless $me->{servers} || $me->{server_file};
return $me;
}
sub get {
my $me = shift;
my $map = shift;
my $key = shift;
my $ver = shift;
my $req = $me->{proto}->encode_request( {
type => 'yenta_get',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
data => [ {
map => $map,
key => $key,
version => $ver,
} ]
} );
return $me->_send_request($map, $req);
}
sub _shard {
my $key = shift;
my $sh = sha1($key);
my($a, $b) = unpack('NN', $sh);
return $a<<32 | $b;
}
sub distribute {
my $me = shift;
my $map = shift;
my $key = shift;
my $ver = shift;
my $val = shift;
my $file = shift; # reference
my $meta = shift;
return unless $key && $ver;
$me->{retries} = 25 unless $me->{retries};
my $req = $me->{proto}->encode_request( {
type => 'yenta_distrib',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
sender => "$HOSTNAME/$$",
hop => 0,
expire => time() + 120,
datum => [ {
map => $map,
key => $key,
version => $ver,
shard => _shard($key), # NYI
value => $val,
meta => $meta,
} ]
}, $file );
return $me->_send_request($map, $req, $file);
# return undef | result
}
sub check {
my $me = shift;
my $map = shift;
my $ver = shift;
my $lev = shift;
my $req = $me->{proto}->encode_request( {
type => 'yenta_check',
msgidno => rand(0xFFFFFFFF),
want_reply => 1,
}, {
map => $map,
level => $lev,
version => $ver,
} );
return $me->_send_request($map, $req);
}
################################################################
sub _send_request {
my $me = shift;
my $map = shift;
my $req = shift;
my $file = shift; # reference
my $tries = $me->{retries} + 1;
my $copy = $me->{copies} || 1;
my $delay = 0.25;
$me->_init_hostlist($map);
my ($addr, $port) = $me->_next_host($map);
for (1 .. $tries){
return unless $addr;
my $res = $me->_try_server($addr, $port, $req, $file);
return $res if $res && !--$copy;
($addr, $port) = $me->_next_host($map);
sleep $delay;
$delay *= 1.414;
}
}
sub _try_server {
my $me = shift;
my $addr = shift;
my $port = shift;
my $req = shift;
lib/AC/Yenta/Client.pm view on Meta::CPAN
$res = undef;
}
return $res;
}
################################################################
sub _next_host {
my $me = shift;
my $map = shift;
$me->_read_serverfile($map) unless $me->{_server};
return unless $me->{_server} && @{$me->{_server}};
my $next = shift @{$me->{_server}};
return( $next->{addr}, $next->{port} );
}
sub _init_hostlist {
my $me = shift;
my $map = shift;
my @server;
push @server, {
addr => $me->{host},
port => $me->{port},
} if $me->{host} && $me->{port};
push @server, @{$me->{servers}} if $me->{servers};
$me->{_server} = \@server;
$me->_read_serverfile($map);
}
# yentad saves a list of alternate peers to try in case it dies
sub _read_serverfile {
my $me = shift;
my $map = shift;
my $f;
my @server;
my @faraway;
open($f, $me->{server_file});
local $/ = "\n";
while(<$f>){
chop;
my $data = decode_json( $_ );
next unless grep { $_ eq $map } @{ $data->{map} };
if( $data->{is_local} ){
push @server, { addr => $data->{addr}, port => $data->{port} };
}else{
push @faraway, { addr => $data->{addr}, port => $data->{port} };
}
}
# prefer local
@server = @faraway unless @server;
lib/AC/Yenta/Config.pm view on Meta::CPAN
package AC::Yenta::Config;
use AC::Misc;
use AC::Import;
use AC::DC::Debug;
use AC::ConfigFile::Simple;
use Socket;
use strict;
our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value conf_map);
my %CONFIG = (
include => \&AC::ConfigFile::Simple::include_file,
debug => \&AC::ConfigFile::Simple::parse_debug,
allow => \&AC::ConfigFile::Simple::parse_allow,
port => \&AC::ConfigFile::Simple::parse_keyvalue,
environment => \&AC::ConfigFile::Simple::parse_keyvalue,
secret => \&AC::ConfigFile::Simple::parse_keyvalue,
seedpeer => \&AC::ConfigFile::Simple::parse_keyarray,
ae_maxload => \&AC::ConfigFile::Simple::parse_keyvalue,
distrib_max => \&AC::ConfigFile::Simple::parse_keyvalue,
savestatus => \&parse_savefile,
monitor => \&parse_monitor,
map => \&parse_map,
);
my @MAP = qw(dbfile basedir keepold history expire backend sharded);
################################################################
sub handle_config {
my $me = shift;
lib/AC/Yenta/Config.pm view on Meta::CPAN
my $rest = shift;
my $fnc = $CONFIG{$key};
return unless $fnc;
$fnc->($me, $key, $rest);
return 1;
}
################################################################
sub parse_map {
my $me = shift;
my $key = shift;
my $value = shift;
my($map) = $value =~ /(\S+)\s+\{\s*/;
die "invalid map spec\n" unless $map;
my $md = {};
problem("map '$map' redefined") if $me->{_pending}{map}{$map};
while( defined(my $l = $me->_nextline()) ){
last if $l eq '}';
my($k, $v) = split /\s+/, $l, 2;
if( grep {$_ eq $k} @MAP ){
$v = cvt_timespec($v) if $k eq 'expire';
$md->{$k} = $v;
}else{
problem("unknown map option '$k'");
}
}
$me->{_pending}{map}{$map} = $md;
}
sub parse_monitor {
my $me = shift;
my $key = shift;
my $mon = shift;
my($ip, $port) = split /:/, $mon;
push @{$me->{_pending}{monitor}}, {
monitor => $mon,
lib/AC/Yenta/Config.pm view on Meta::CPAN
################################################################
sub conf_value {
my $key = shift;
return $AC::Yenta::CONF->{config}{$key};
}
sub conf_map {
my $map = shift;
return $AC::Yenta::CONF->{config}{map}{$map};
}
1;
lib/AC/Yenta/Direct.pm view on Meta::CPAN
use AC::Yenta::Store::SQLite;
use AC::Yenta::Store::Tokyo;
use AC::Misc;
use AC::Import;
use strict;
our @EXPORT = 'yenta_direct_get';
sub yenta_direct_get {
my $file = shift;
my $map = shift;
my $key = shift;
my $me = __PACKAGE__->new($map, $file);
return unless $me;
return $me->get( $key );
}
sub new {
my $class = shift;
my $map = shift;
my $file = shift;
my $db = AC::Yenta::Store::Map->new( $map, undef, { dbfile => $file, readonly => 1 } );
return unless $db;
return bless { db => $db }, $class;
}
sub get {
my $me = shift;
my $key = shift;
my $db = $me->{db};
lib/AC/Yenta/Direct.pm view on Meta::CPAN
return $me->getrange( '', '' );
}
sub getrange {
my $me = shift;
my $lo = shift;
my $hi = shift;
my $db = $me->{db};
return map { $_->{k} } $db->range( ($lo||''), ($hi||'') );
}
1;
lib/AC/Yenta/Kibitz/Status.pm view on Meta::CPAN
require 'AC/protobuf/yenta_status.pl';
use strict;
my $HOSTNAME = hostname();
################################################################
sub _myself {
my $maps = conf_value('map');
my @ipinfo;
my $natinfo = my_network_info();
my $status = 200;
for my $i ( @$natinfo ){
my $st = AC::Yenta::NetMon::status_dom( $i->{natdom} );
# if a private network is down, announce the network + 500
# if a private network is down, stop announcing it
lib/AC/Yenta/Kibitz/Status.pm view on Meta::CPAN
environment => conf_value('environment'),
via => AC::Yenta::Status->my_server_id(),
server_id => AC::Yenta::Status->my_server_id(),
instance_id => AC::Yenta::Status->my_instance_id(),
path => '.',
status => $status,
uptodate => AC::Yenta::Store::AE->up_to_date(),
timestamp => $^T,
lastup => $^T,
ip => \@ipinfo,
map => [ keys %$maps ],
sort_metric => loadave() * 1000,
};
}
################################################################
sub myself {
# tell server about ourself
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
if(my $e = $@){
problem("cannot decode request: $e");
$io->shut();
return;
}
# process requests
my @res;
my $rescont;
for my $r (@{ $req->{data} }){
debug("get request: $r->{map}, $r->{key}, $r->{version}");
my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
my $res = {
map => $r->{map},
key => $r->{key},
};
if( $meta && $file ){
unless( _check_content( $meta, $file ) ){
problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
# QQQ - remove from system, (and let AE get a new copy)?
store_remove($r->{map}, $r->{key}, $ver);
# tell caller it was not found
$ver = undef;
}
}
if( defined $ver ){
$res->{version} = $ver;
$res->{value} = $data;
$res->{meta} = $meta if defined $meta;
if( $file ){
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
my $req;
eval {
$req = ACPYentaCheckRequest->decode( $gpb );
};
if(my $e = $@){
problem("cannot decode request: $e");
$io->shut();
return;
}
debug("check request: $req->{map}, $req->{version}, $req->{level}");
my @res;
my @todo = { version => $req->{version}, shard => $req->{shard} };
# the top of the tree will be fairly sparse,
# return up to several levels if they are sparse
for my $l (0 .. 32){
my $cl = $req->{level} + $l;
my @lres;
my $nexttot;
for my $do (@todo){
my @r = _get_check( $req->{map}, $do->{shard}, $do->{version}, $cl );
push @lres, @r;
for (@r){
$nexttot += $_->{key} ? ($_->{count} / 8) : $_->{count};
}
}
push @res, @lres;
@todo = ();
last unless @lres; # reached the bottom of the tree
last if @res > 64; # got enough results
last if (@lres > 2) && ($nexttot > @lres + 2); # less sparse region
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
debug("sending check reply");
$io->timeout_rel($TIMEOUT);
$io->{writebuf_timeout} = $TIMEOUT;
$io->write_and_shut( $response );
}
# get + process merkle data
sub _get_check {
my $map = shift;
my $shard = shift;
my $ver = shift;
my $lev = shift;
my $res = store_get_merkle($map, $shard, $ver, $lev);
return unless $res;
for my $r (@$res) {
$r->{map} = $map;
}
return @$res;
}
sub api_distrib {
my $io = shift;
my $proto = shift;
my $gpb = shift;
my $content = shift; # reference
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
$req = ACPYentaDistRequest->decode( $gpb );
die "invalid k/v for put request\n" unless $req->{datum}{key} && $req->{datum}{version};
};
if(my $e = $@){
my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
problem("cannot decode request: peer: $io->{peerip} $enc, $e");
$io->shut();
return;
}
unless( conf_map( $req->{datum}{map} ) ){
problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
_reply_error($io, $proto, 404, 'Map Not Found');
return;
}
my $v = $req->{datum};
# do we already have ?
my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );
if( $want ){
# put
debug("put request from $io->{peerip}: $v->{map}, $v->{key}, $v->{version}");
# file content is passed by reference, to avoid large copies
$content ||= \ $v->{file} if $v->{file};
# check
if( $v->{meta} && $content ){
unless( _check_content( $v->{meta}, $content ) ){
problem("content SHA1 check failed: $req->{datum}{map}, $req->{datum}{key}, $req->{datum}{version}");
$io->shut();
return;
}
}
$want = store_put( $v->{map}, $v->{shard}, $v->{key}, $v->{version}, $v->{value},
$content, $v->{meta} );
# distribute to other systems
AC::Yenta::Store::Distrib->new( $req, $content ) if $want;
}else{
debug("put from $io->{peerip} unwanted: $v->{map}, $v->{shard}, $v->{key}, $v->{version}");
}
unless( $proto->{want_reply} ){
$io->shut();
return;
}
# encode results
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
lib/AC/Yenta/Stats.pm view on Meta::CPAN
my @peers = AC::Yenta::Status->allpeers();
$res .= sprintf("%-24s%s\n", "peers:", scalar @peers);
$res .= "\n";
return $res;
}
sub http_data {
my $url = shift;
my(undef, $map, $key, $ver) = split m|/|, $url;
my($data, $version, $file, $meta) = store_get($map, $key, $ver);
return http_notfound($url) unless $version;
return $data;
}
sub http_file {
my $url = shift;
my(undef, $map, $key, $ver) = split m|/|, $url;
my($data, $version, $file, $meta) = store_get($map, $key, $ver);
return http_notfound($url) unless $version && $file;
return $$file;
}
1;
lib/AC/Yenta/Status.pm view on Meta::CPAN
my $KEEPDOWN = 1800; # keep data about down servers for how long?
my $KEEPLOST = 600; # keep data about servers we have not heard about for how long?
my $SAVEMAX = 1800; # do not save if older than
my $PORT;
our $DATA = bless {
allpeer => {}, # yenta_status
sceptical => {},
mappeer => {}, # {map} => { id => id }
peermap => {}, # {id} => @map
datacenter => {}, # {dc} => { id => id }
peertype => {}, # {ss} => { id => id }
};
sub init {
my $port = shift;
$PORT = $port;
AC::DC::Sched->new(
lib/AC/Yenta/Status.pm view on Meta::CPAN
$c->start();
}
sub _random_peer {
my $here = my_datacenter();
# sceptical
my @scept = values %{$DATA->{sceptical}};
my @all = map { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};
my @old = grep { $_->{timestamp} < $^T - $KEEPLOST *.75 } @all;
my @local = grep { $_->{datacenter} eq $here } @all; # this datacenter
my @away = grep { $_->{datacenter} ne $here } @all; # not this datacenter
# first check anything sceptical
my @peer = @scept;
# then (maybe) something about to expire
@peer = @old unless @peer || int rand(5);
lib/AC/Yenta/Status.pm view on Meta::CPAN
# where - no longer used
$where ||= my_datacenter();
my @peer = keys %{ $DATA->{peertype}{$type} };
return unless @peer;
# nothing too old
@peer = grep { $DATA->{allpeer}{$_}{lastup} > $^T - $SAVEMAX } @peer;
return unless @peer;
return map { $DATA->{allpeer}{$_} } @peer;
}
# save a list of peers, in case I crash, and for others to use
sub save_status {
my $save = conf_value('savestatus');
my $here = my_datacenter();
# also save locally running services
my @mon = AC::Yenta::Monitor::export();
lib/AC/Yenta/Status.pm view on Meta::CPAN
port => int($port),
status => int($pd->{status}),
subsystem => $pd->{subsystem},
environment => $pd->{environment},
sort_metric => int($pd->{sort_metric}),
capacity_metric => int($pd->{capacity_metric}),
datacenter => $pd->{datacenter},
is_local => ($here eq $pd->{datacenter} ? 1 : 0),
};
if( $pd->{subsystem} eq 'yenta' ){
$data->{map} = $pd->{map};
}
print FILE encode_json( $data ), "\n";
}
close FILE;
unless( rename("$file.tmp", $file) ){
problem("cannot rename save file '$file': $!");
}
lib/AC/Yenta/Status.pm view on Meta::CPAN
return $DATA->{allpeer}{$id};
}
sub allpeers {
my $class = shift;
# idown sets status to 0 (below), skip such
return grep { $_->{status} } values %{$DATA->{allpeer}};
}
sub mappeers {
my $class = shift;
my $map = shift;
return keys %{ $DATA->{mappeer}{$map} };
}
sub datacenters {
my $class = shift;
return $DATA->{datacenter};
}
################################################################
sub _remove {
lib/AC/Yenta/Status.pm view on Meta::CPAN
my $ss = $DATA->{allpeer}{$id}{subsystem};
delete $DATA->{peertype}{$ss}{$id} if $ss;
my $dc = $DATA->{allpeer}{$id}{datacenter};
delete $DATA->{datacenter}{$dc}{$id} if $dc;
verbose("deleting peer: $id");
delete $DATA->{allpeer}{$id};
# remove map info
for my $map ( @{$DATA->{peermap}{$id}} ){
delete $DATA->{mappeer}{$map}{$id};
}
delete $DATA->{peermap}{$id};
# delete its monitored items
for my $p (keys %{$DATA->{allpeer}}){
next unless $DATA->{allpeer}{$p}{via} eq $id;
_remove($p);
}
}
sub _maybe_remove {
my $id = shift;
lib/AC/Yenta/Status.pm view on Meta::CPAN
$DATA->{datacenter}{$up->{datacenter}}{$id} = $id;
}
# update subsystem info
unless( $DATA->{peertype}{$up->{subsystem}}{$id} ){
my $ss = $previnfo->{subsystem};
delete $DATA->{peertype}{$ss}{$id} if $ss;
$DATA->{peertype}{$up->{subsystem}}{$id} = $id;
}
# update map info
$DATA->{peermap}{$id} ||= [];
$up->{map} ||= [];
my @curmap = @{$DATA->{peermap}{$id}};
my @newmap = sort @{$up->{map}};
return if "@curmap" eq "@newmap"; # unchanged
# what do we need to add/remove
my (%remove, %add);
@remove{@curmap} = @curmap;
@add{@newmap} = @newmap;
delete $remove{$_} for @newmap;
delete $add{$_} for @curmap;
for my $map (keys %remove){
debug("removing $map from $id");
delete $DATA->{mappeer}{$map}{$id};
}
for my $map (keys %add){
debug("adding $map to $id");
$DATA->{mappeer}{$map}{$id} = $id;
}
$DATA->{peermap}{$id} = \@newmap;
}
lib/AC/Yenta/Store.pm view on Meta::CPAN
use AC::Yenta::Store::Distrib;
use AC::Yenta::Store::AE;
use AC::Yenta::Store::Expire;
use strict;
our @EXPORT = qw(store_get store_put store_want store_get_merkle store_get_internal store_set_internal store_expire store_remove);
my %STORE;
# create maps from config
sub configure {
my $maps = $AC::Yenta::CONF->{config}{map};
my %remove = %STORE;
for my $map (keys %{$maps}){
debug("configuring map $map");
my $conf = $maps->{$map};
my $sharded = $conf->{sharded};
my $c = $sharded ? 'AC::Yenta::Store::Sharded' : 'AC::Yenta::Store::Map';
my $be = $conf->{backend};
my $m = $c->new( $map, $be, { %$conf, recovery => 1 } );
$STORE{$map} = $m;
delete $remove{$map};
}
for my $map (keys %remove){
debug("removing unused map '$map'");
delete $STORE{$map};
}
}
sub store_get {
my $map = shift;
my $key = shift;
my $ver = shift;
my $m = $STORE{$map};
return unless $m;
return $m->get($key, $ver);
}
sub store_want {
my $map = shift;
my $shard = shift;
my $key = shift;
my $ver = shift;
my $m = $STORE{$map};
return unless $m;
return $m->want($shard, $key, $ver);
}
sub store_put {
my $map = shift;
my $shard = shift;
my $key = shift;
my $ver = shift;
my $data = shift;
my $file = shift; # reference
my $meta = shift;
my $m = $STORE{$map};
return unless $m;
debug("storing $map/$key/$ver");
$m->put($shard, $key, $ver, $data, $file, $meta);
}
# NB: only removes local copy temporarily. will be replaced at next AE run
sub store_remove {
my $map = shift;
my $key = shift;
my $ver = shift;
my $m = $STORE{$map};
return unless $m;
return $m->remove($key, $ver);
}
sub store_get_merkle {
my $map = shift;
my $shard = shift;
my $ver = shift;
my $lev = shift;
my $m = $STORE{$map};
return unless $m;
return $m->get_merkle($shard, $ver, $lev);
}
sub store_get_internal {
my $map = shift;
my $key = shift;
my $m = $STORE{$map};
return unless $m;
return $m->get_internal($key);
}
sub store_set_internal {
my $map = shift;
my $key = shift;
my $val = shift;
my $m = $STORE{$map};
return unless $m;
$m->set_internal($key, $val);
}
sub store_expire {
my $map = shift;
my $max = shift; # all versions before this
my $m = $STORE{$map};
return unless $m;
$m->expire($max);
}
sub store_normalize_version {
my $map = shift;
my $m = $STORE{$map};
return unless $m;
$m->normalize_version(@_);
}
sub store_version_max {
my $map = shift;
my $m = $STORE{$map};
return unless $m;
$m->version_max(@_);
}
sub store_merkle_scrub {
my $map = shift;
my $m = $STORE{$map};
return unless $m;
$m->merkle_scrub(@_);
}
1;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my $MAXGET = 32; # maximum number of records per fetch
my $MAXFILES = 4; # maximum number of files per fetch
my $MAXFETCH = 32; # maximum number of simultaneous fetches
my $MAXMISSING = 10; # maximum number of missing records to be considered up to date
my $MAXLOAD = 0.5; # do not run if load average is too high
my $EXPIRE = 300; # expire hung job after this long
my $TOONEW = 60; # don't consider things missing if they are less than this old
my $msgid = $$;
my %DONE; # maps which have finished
my @AE; # normally, just one
AC::DC::Sched->new(
info => 'anti-entropy',
freq => 60,
func => \&AC::Yenta::Store::AE::periodic,
);
sub new {
my $class = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my $me = bless {
badnode => [ {version => 0, shard => 0, level => 0} ],
cache => {},
kvneed => [],
kvneedorig => [],
kvfetching => 0,
missing => 0,
}, $class;
debug("new ae");
$me->_pick_map() || return;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
$me->_init_peer() || return;
debug("checking $me->{map} with $me->{peer}{id}");
inc_stat('ae_runs');
$me->_next_step();
push @AE, $me;
return $me;
}
sub periodic {
# kill dead sessions, start new ones
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
push @keep, $ae;
}
}
@AE = @keep;
return if @AE;
return if loadave() > (conf_value('ae_maxload') || $MAXLOAD);
__PACKAGE__->new();
}
# we are up to date if we have AE'ed every map at least once since starting
sub up_to_date {
my $class = shift;
my $maps = conf_value('map');
for my $m (keys %$maps){
return 0 unless $DONE{$m};
}
return 1;
}
################################################################
# find most stale map
sub _pick_map {
my $me = shift;
my $maps = conf_value('map');
my(@best, $bestv);
for my $m (keys %$maps){
my $lt = AC::Yenta::Store::store_get_internal($m, 'ae_last_start');
if( !@best || $lt < $bestv ){
@best = $m;
$bestv = $lt;
}elsif( $lt == $bestv ){
push @best, $m;
}
}
return unless @best;
my $map = $best[ rand(@best) ];
$me->{map} = $map;
# is this a data or file map?
# adjust accordingly
my $cf = conf_map( $map );
$me->{has_files} = 1 if $cf->{basedir};
$me->{maxget} = $me->{has_files} ? $MAXFILES : $MAXGET;
$me->{expire} = $cf->{expire};
return 1;
}
sub _init_peer {
my $me = shift;
my $here = my_datacenter();
my @peer = AC::Yenta::Status->mappeers( $me->{map} );
my $env = conf_value('environment');
my(@near, @far, @ood);
for my $p (@peer){
my $d = AC::Yenta::Status->peer($p);
next unless $d->{environment} eq $env;
next unless $d->{status} == 200;
if( $d->{uptodate} ){
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my $peer = $peer[ rand(@peer) ];
return $peer;
}
################################################################
sub _finished {
my $me = shift;
debug("finished $me->{map}");
$DONE{$me->{map}} = $^T if $me->{missing} < $MAXMISSING;
AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
@AE = grep{ $_ != $me } @AE;
}
sub _next_step {
my $me = shift;
$me->{timestamp} = $^T;
if( $me->{kvfetching} < $MAXFETCH ){
# any missing data?
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
$me->_finished();
}
################################################################
sub _start_check {
my $me = shift;
my $node = shift @{$me->{badnode}};
debug("checking next node: $me->{map} $node->{level}/$node->{version}");
inc_stat('ae_check_node');
my $enc = use_encryption($me->{peer});
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $request = $proto->encode_request( {
type => 'yenta_check',
msgidno => $msgid++,
want_reply => 1,
data_encrypted => $enc,
}, {
map => $me->{map},
level => $node->{level},
version => $node->{version},
shard => $node->{shard},
} );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new(
$me->{peer}{ip}, undef,
$request,
info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
debug("check results");
$evt->{data} ||= {};
# determine highest level returned
my @keydata;
my @nodedata;
my $maxlev = 0;
for my $d ( @{ $evt->{data}{check} }){
debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");
if( $d->{key} ){
push @keydata, $d;
next;
}
next if $d->{level} < $maxlev;
if( $d->{level} > $maxlev ){
@nodedata = ();
$maxlev = $d->{level};
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
}
$me->_next_step();
}
sub _check_error {
my $io = shift;
my $evt = shift;
my $me = shift;
verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
$me->_next_step();
}
sub _check_result_keys {
my $me = shift;
my $chk = shift;
my %vscnt;
my %vsadd;
for my $d (@$chk){
inc_stat('ae_check_key');
my $vsk = "$d->{version} $d->{shard}";
$vscnt{ $vsk } ++;
next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );
debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
inc_stat('ae_key_missing');
$me->{missing} ++;
$vsadd{ $vsk } ++;
}
}
sub _is_expired {
my $me = shift;
my $map = shift;
my $lev = shift;
my $ver = shift;
return unless $me->{expire};
my $vmx = AC::Yenta::Store::store_version_max( $map, $ver, $lev );
return unless defined $vmx;
if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
debug("skipping expired $lev/$ver - $vmx");
return 1;
}
return;
}
sub _check_result_nodes {
my $me = shift;
my $lev = shift;
my $chk = shift;
# determine all of the base versions of the recvd data
my %ver;
for my $d (@$chk){
my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
$ver{"$ver $shard"} = { ver => $ver, shard => $shard };
}
# get all of our merkle data for these versions
my %merkle;
my $t_new = timet_to_yenta_version($^T - $TOONEW);
for my $d (values %ver){
next if $d->{ver} > $t_new; # too new, ignore
next if $me->_is_expired($me->{map}, $lev, $d->{ver});
# RSN - skip unwanted shards
my $ms = AC::Yenta::Store::store_get_merkle($me->{map}, $d->{shard}, $d->{ver}, $lev - 1);
for my $m (@$ms){
# debug("my hash $me->{map} $m->{level}/$m->{shard}/$m->{version} => $m->{hash}");
$merkle{"$m->{version} $m->{shard}"} = $m->{hash};
}
}
# compare (don't bother with things that are too new (the data may still be en route))
for my $d (@$chk){
next if $d->{version} > $t_new; # too new, ignore
next if $me->_is_expired($me->{map}, $lev, $d->{version});
# RSN - skip unwanted shards
my $hash = $merkle{"$d->{version} $d->{shard}"};
if( $d->{hash} eq $hash ){
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} => match");
next;
}else{
debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} != $hash");
}
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
debug("getting kv data from current peer");
$me->_start_get_kv( $me->{peer}, 0, \@get);
}
sub _start_get_kv {
my $me = shift;
my $peer = shift;
my $retry = shift;
my $get = shift;
# insert map into request
$_->{map} = $me->{map} for @$get;
# for (@$get){ debug("requesting $_->{key}/$_->{version}") }
my $enc = use_encryption($peer);
my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
# build request
my $request = $proto->encode_request( {
type => 'yenta_get',
msgidno => $msgid++,
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
$me->{kvfetching} --;
$evt->{data} ||= {};
debug("got kv data results");
my %need = map {
( "$_->{key}/$_->{version}" => $_ )
} @$get;
for my $d ( @{$evt->{data}{data}}){
debug("got $d->{map}/$d->{key}/$d->{version}");
next unless $d->{key} && $d->{version}; # not found
delete $need{ "$d->{key}/$d->{version}" };
my $file = $evt->{content};
$file = \ $d->{file} if $d->{file};
AC::Yenta::Store::store_put( $d->{map}, $d->{shard}, $d->{key}, $d->{version},
$d->{value}, $file, $d->{meta} );
}
push @{$me->{kvneedorig}}, (values %need) if $retry;
$me->_next_get_kv();
}
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
return bless {
dir => $dir,
file => $file,
db => $db,
hasenv => ($env ? 1 : 0),
}, $class;
}
sub get {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $v;
debug("get $map/$sub/$key");
$me->_start();
my $r = $me->{db}->db_get( _key($map,$sub,$key), $v );
$me->_finish();
return if $r; # not found
if( wantarray ){
return ($v, 1);
}
return $v;
}
sub put {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $val = shift;
debug("put $map/$sub/$key");
$me->_start();
my $r = $me->{db}->db_put( _key($map,$sub,$key), $val);
$me->_finish();
return !$r;
}
sub del {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
$me->_start();
$me->{db}->db_del( _key($map,$sub,$key));
$me->_finish();
}
sub sync {
my $me = shift;
$me->{db}->db_sync();
}
sub range {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $end = shift; # undef => to end of map
my ($k, $v, @k);
$me->_start();
my $cursor = $me->{db}->db_cursor();
$k = _key($map,$sub,$key);
my $e = _key($map,$sub,$end);
$cursor->c_get($k, $v, DB_SET_RANGE);
my $MAX = 100;
my $max = $MAX;
while( !$end || ($k lt $e) ){
debug("range $k");
last unless $k =~ m|$map/$sub/|;
$k =~ s|$map/$sub/||;
push @k, { k => $k, v => $v };
my $r = $cursor->c_get($k, $v, DB_NEXT);
last if $r; # error
# cursor locks the db
# close+recreate so other processes can proceed
unless( $max -- ){
$cursor->c_close();
$me->_finish();
sleep 0;
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
sub _finish {
my $me = shift;
alarm($me->{alarmold} || 0);
$me->{alarmold} = 0;
}
sub _key {
my $map = shift;
my $sub = shift;
my $key = shift;
return "$map/$sub/$key";
}
1;
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
my $req = shift;
my $cont = shift;
return if $req->{hop} >= $MAXHOP;
return if $req->{expire} < $^T;
my $sender = $req->{sender};
my $sendat = AC::Yenta::Status->peer($sender);
my $me = bless {
info => "$req->{datum}{map}/$req->{datum}{key}/$req->{datum}{version}",
map => $req->{datum}{map},
req => $req,
content => $cont,
# we tune the distribution algorithm based on where it came from:
faraway => (my_datacenter() ne $sendat->{datacenter}),
farseen => 0,
nearseen => 0,
farsend => [],
nearsend => [],
ordershift => 4,
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
}
}
# which yentas can do something with the update?
sub _compat_peers_in_dc {
my $me = shift;
my $dc = shift;
my $env = conf_value('environment');
my $dcs = AC::Yenta::Status->datacenters();
my $map = $me->{map};
my @id;
for my $id (keys %{$dcs->{$dc}}){
my $pd = AC::Yenta::Status->peer($id);
next unless $pd->{subsystem} eq 'yenta';
next unless $pd->{environment} eq $env;
next unless grep {$map eq $_} @{ $pd->{map} };
push @id, $id;
}
return @id;
}
sub _start_far {
my $me = shift;
my $d = shift @{ $me->{farsend} };
return unless $d;
lib/AC/Yenta/Store/Expire.pm view on Meta::CPAN
use strict;
AC::DC::Sched->new(
info => 'expire',
freq => 60,
func => \&AC::Yenta::Store::Expire::periodic,
);
sub periodic {
my $maps = conf_value('map');
for my $map (keys %$maps){
my $cf = conf_map($map);
next unless $cf->{expire};
my $expire = timet_to_yenta_version($^T - $cf->{expire});
debug("running expire from $expire");
AC::Yenta::Store::store_expire( $map, $expire );
}
}
################################################################
1;
lib/AC/Yenta/Store/LevelDB.pm view on Meta::CPAN
chmod 0777, $file;
return bless {
file => $file,
db => $db,
}, $class;
}
sub get {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $v;
debug("get $map/$sub/$key");
my $v = $me->{db}->Get( _key($map,$sub,$key) );
return unless $v; # not found
if( wantarray ){
return ($v, 1);
}
return $v;
}
sub put {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $val = shift;
debug("put $map/$sub/$key");
my $r = $me->{db}->Put( _key($map,$sub,$key), $val);
return 1;
}
sub del {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
$me->{db}->Delete( _key($map,$sub,$key));
}
sub sync {
my $me = shift;
}
sub range {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $end = shift; # undef => to end of map
my $it = $me->{db}->NewIterator();
my @k;
my $e = _key($map,$sub,$end);
my $k = _key($map,$sub,$key);
my $r = $it->Seek($k);
while( $it->Valid() ){
my $k = $it->key();
my $v = $it->value();
last if $end && ($k ge $e);
debug("range $k");
last unless $k =~ m|$map/$sub/|;
$k =~ s|$map/$sub/||;
push @k, { k => $k, v => $v };
$it->Next();
}
return @k;
}
################################################################
sub _key {
my $map = shift;
my $sub = shift;
my $key = shift;
return "$map/$sub/$key";
}
1;
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
# -*- perl -*-
# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-30 19:21 (EDT)
# Function: storage maps
#
# $Id$
package AC::Yenta::Store::Map;
use AC::Yenta::Store::File;
use AC::Yenta::Store::Merkle;
use AC::Yenta::Debug 'map';
use AC::Yenta::Conf;
use AC::Cache;
use strict;
our @ISA = 'AC::Yenta::Store::Merkle';
my $DEFAULT = 'bdb';
my %BACKEND ;
my $CACHESIZE = 1024; # gives us ~90% cache hit rate
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
my $conf = shift;
unless( $bkend ){
# from extension, or default
my($ext) = $conf->{dbfile} =~ /\.(.+)$/;
$bkend = $ext if $BACKEND{$ext};
}
my $c = $BACKEND{$bkend || $DEFAULT};
unless( $c ){
problem("invalid storage backend: $bkend - ignoring map");
return ;
}
debug("configuring map $name with $c");
my $db = $c->new( $name, $conf );
my $fs = AC::Yenta::Store::File->new( $name, $conf );
my $me = bless {
name => $name,
conf => $conf,
db => $db,
fs => $fs,
merkle_height => 16,
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
# unless we are keeping old data, remove previous version
$deletedata{$versions[0]} = 1 if @versions;
}
}
# add new version to list. newest 1st
@versions = sort {$b cmp $a} (@versions, $v);
if( $cf->{history} && @versions > $cf->{history} ){
# trim list
my @rm = splice @versions, $cf->{history}, @versions, ();
push @deletehist, (map { ({version => decode_version($_), key => $key, shard => $shard}) } @rm);
$deletedata{$_} = 1 for @_;
}
if( $me->is_sharded() ){
# QQQ - shard changed?
$db->put($me->{name}, 'shard', $key, encode_shard($shard || 0));
}
my $dd = join(' ', map { $_->{version} } @deletehist);
debug("version list: @versions [delete: $dd]");
$me->_versput( $key, @versions );
# update merkles
$me->merkle( { shard => $shard, key => $key, version => $ver }, @deletehist);
# delete old data
for my $rm (keys %deletedata){
debug("removing old version $key/$rm");
lib/AC/Yenta/Store/Map.pm view on Meta::CPAN
sub is_my_shard {
return 1;
}
1;
=head1 NAME
AC::Yenta::Store::Map - persistent storage for yenta maps
=head1 SYNOPSIS
your code:
AC::Yenta::Store::Map->add_backend( postgres => 'Local::Yenta::Postgres' );
your config:
map mappyfoo {
backend postgres
# ...
}
=cut
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
debug("updating merkle node $k1 + { $k0 => $hash, $count }");
$d{$k0} = "$hash $count";
}else{
# remove
debug("updating merkle node $k1 - { $k0 => empty }");
delete $d{$k0};
}
if( keys %d ){
$d = join("\0", map {"$_\0$d{$_}"} (sort keys %d));
$me->_mcput( $k1, $d );
my $newh = sha1_base64($d);
return if $newh eq $oldh; # unchanged
return ($nextshard, $nextver, $newh, scalar keys %d);
}else{
$me->_mcdel( $k1 );
return unless $oldh; # unchanged
return ($nextshard, $nextver, undef);
}
}
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
################################################################
sub _get_actual_keys {
my $me = shift;
my $shard = shift;
my $ver = shift;
my $db = $me->{db};
# get range on data
my @key = map {
my $k = $_->{k}; $k =~ s|.*/||; $k
} $db->range($me->{name}, 'data', encode_version($ver), encode_version($ver + 1));
debug("actual key: @key");
return @key unless defined $shard;
# get vers list to filter on shard
my $sh = encode_shard($shard);
return grep {
my $k = $_;
lib/AC/Yenta/Store/Merkle.pm view on Meta::CPAN
# check merkle leaf node against actual data
sub merkle_scrub {
my $me = shift;
my $shard = shift;
my $ver = shift;
debug("scrub $me->{name} $shard/$ver");
# get list of keys from merkle leaf node
my $mlist = $me->get_merkle($shard, $ver, $me->{merkle_height}) || [];
my @mkey = map { $_->{key} } @$mlist;
my %mkey;
@mkey{@mkey} = @mkey;
# get list of keys from actual data
my @akey = $me->_get_actual_keys( $shard, $ver );
# compare lists
for my $k (@akey){
next if $mkey{$k};
lib/AC/Yenta/Store/SQLite.pm view on Meta::CPAN
_init( $db );
return bless {
file => $file,
db => $db,
}, $class;
}
sub get {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
debug("get $map/$sub/$key");
my $st = _do($me->{db}, 'select value, 1 from ykv where map = ? and sub = ? and key = ?', $map, $sub, $key);
my($v, $found) = $st->fetchrow_array();
return unless $found;
$v = decode_base64($v);
if( wantarray ){
return ($v, 1);
}
return $v;
}
sub put {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $val = shift;
debug("put $map/$sub/$key");
my $st = _do($me->{db}, 'select 1 from ykv where map = ? and sub = ? and key = ?', $map, $sub, $key);
my($found) = $st->fetchrow_array();
if( $found ){
_do($me->{db}, 'update ykv set value = ? where map = ? and sub = ? and key = ?', encode_base64($val), $map, $sub, $key);
}else{
_do($me->{db}, 'insert into ykv (map,sub,key,value) values (?,?,?,?)', $map, $sub, $key, encode_base64($val));
}
return 1;
}
sub del {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
debug("del $map/$sub/$key");
_do($me->{db}, 'delete from ykv where map = ? and sub = ? and key = ?', $map, $sub, $key);
return 1;
}
sub sync {}
sub range {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $end = shift; # undef => to end of map
my $st;
if( defined $end ){
$st = _do($me->{db}, 'select key as k, value as v from ykv where map = ? and sub = ? and key >= ? and key < ?',
$map, $sub, $key, $end);
}else{
$st = _do($me->{db}, 'select key as k, value as v from ykv where map = ? and sub = ? and key >= ?',
$map, $sub, $key);
}
my $r = $st->fetchall_arrayref({});
return @$r;
}
################################################################
sub _init {
lib/AC/Yenta/Store/SQLite.pm view on Meta::CPAN
die $e if $e;
return $st;
}
################################################################
$initsql = <<END;
create table if not exists ykv (
map text not null,
sub text not null,
key text not null,
value text,
unique(map,sub,key)
);
create index if not exists ykvidx on ykv(map, sub, key);
pragma synchronous = 1; -- default is full(2)
pragma cache_size = 100000; -- default is 2000
vacuum;
analyze;
END
lib/AC/Yenta/Store/Sharded.pm view on Meta::CPAN
# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jun-07 17:34 (EDT)
# Function: consistently-hashed shards (unfinished)
#
# $Id$
package AC::Yenta::Store::Sharded;
use AC::Yenta::Debug 'map';
use AC::Yenta::Config;
use strict;
our @ISA = 'AC::Yenta::Store::Map';
# using too large a size, makes the tree heavy at the top, thin at the bottom
# taking up more space, and slowing down AE
my $SHARDLEN = 5;
sub new {
lib/AC/Yenta/Store/Tokyo.pm view on Meta::CPAN
chmod 0666, $file;
return bless {
file => $file,
db => $db,
}, $class;
}
sub get {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
debug("get $map/$sub/$key");
my $v = $me->{db}->get( _key($map,$sub,$key) );
return unless $v; # not found
if( wantarray ){
return ($v, 1);
}
return $v;
}
sub put {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $val = shift;
debug("put $map/$sub/$key");
my $r = $me->{db}->put( _key($map,$sub,$key), $val);
return 1;
}
sub del {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
$me->{db}->out( _key($map,$sub,$key));
}
sub sync {
my $me = shift;
$me->{db}->sync();
}
sub range {
my $me = shift;
my $map = shift;
my $sub = shift;
my $key = shift;
my $end = shift; # undef => to end of map
my $cur = TokyoCabinet::BDBCUR->new($me->{db});
my @k;
my $e = _key($map,$sub,$end);
my $k = _key($map,$sub,$key);
my $r = $cur->jump($k);
while( $cur->key() ){
my $k = $cur->key();
my $v = $cur->val();
last if $end && ($k ge $e);
debug("range $k");
last unless $k =~ m|$map/$sub/|;
$k =~ s|$map/$sub/||;
push @k, { k => $k, v => $v };
$cur->next();
}
return @k;
}
################################################################
sub _key {
my $map = shift;
my $sub = shift;
my $key = shift;
return "$map/$sub/$key";
}
1;
lib/AC/protobuf/yenta_check.pl view on Meta::CPAN
use warnings;
use Google::ProtocolBuffers;
{
unless (ACPYentaCheckValue->can('_pb_fields_list')) {
Google::ProtocolBuffers->create_message(
'ACPYentaCheckValue',
[
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'map', 1, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_INT64(),
'version', 2, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
Google::ProtocolBuffers::Constants::TYPE_INT32(),
'level', 3, undef
lib/AC/protobuf/yenta_check.pl view on Meta::CPAN
);
}
unless (ACPYentaCheckRequest->can('_pb_fields_list')) {
Google::ProtocolBuffers->create_message(
'ACPYentaCheckRequest',
[
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'map', 1, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_INT32(),
'level', 2, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_INT64(),
'version', 3, undef
lib/AC/protobuf/yenta_getset.pl view on Meta::CPAN
use warnings;
use Google::ProtocolBuffers;
{
unless (ACPYentaMapDatum->can('_pb_fields_list')) {
Google::ProtocolBuffers->create_message(
'ACPYentaMapDatum',
[
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'map', 1, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'key', 2, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
Google::ProtocolBuffers::Constants::TYPE_INT64(),
'version', 3, undef
lib/AC/protobuf/yenta_status.pl view on Meta::CPAN
'timestamp', 8, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REQUIRED(),
Google::ProtocolBuffers::Constants::TYPE_INT64(),
'lastup', 9, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_REPEATED(),
Google::ProtocolBuffers::Constants::TYPE_STRING(),
'map', 10, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
Google::ProtocolBuffers::Constants::TYPE_BOOL(),
'uptodate', 11, undef
],
[
Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(),
Google::ProtocolBuffers::Constants::TYPE_INT32(),
'sort_metric', 12, undef
proto/yenta_check.proto view on Meta::CPAN
// Copyright (c) 2009 AdCopy
// Author: Jeff Weisberg
// Created: 2009-Apr-01 10:57 (EDT)
// Function:
//
// $Id$
message ACPYentaCheckValue {
required string map = 1;
required int64 version = 2;
optional int32 level = 3;
optional string key = 4;
optional string hash = 5;
optional int64 shard = 6;
};
message ACPYentaCheckRequest {
required string map = 1;
required int32 level = 2;
required int64 version = 3;
optional int64 shard = 4;
};
message ACPYentaCheckReply {
repeated ACPYentaCheckValue check = 1;
};
proto/yenta_getset.proto view on Meta::CPAN
// Copyright (c) 2009 AdCopy
// Author: Jeff Weisberg
// Created: 2009-Apr-01 10:41 (EDT)
// Function:
//
// $Id$
message ACPYentaMapDatum {
required string map = 1;
required string key = 2;
optional int64 version = 3;
optional bytes value = 4;
optional bytes meta = 5;
optional bytes file = 6;
optional int64 shard = 7;
};
message ACPYentaGetSet {
repeated ACPYentaMapDatum data = 1;
proto/yenta_status.proto view on Meta::CPAN
message ACPYentaStatus {
required string hostname = 1;
required string datacenter = 2;
required string subsystem = 3;
required string environment = 4;
required string via = 5;
repeated ACPIPPort ip = 6;
required int32 status = 7;
required int64 timestamp = 8;
required int64 lastup = 9;
repeated string map = 10;
optional bool uptodate = 11;
optional int32 sort_metric = 12;
required string server_id = 13; // typically IP:PORT
required string instance_id = 14; // for loop detection
optional string path = 15; // for debugging
optional int32 capacity_metric = 16; // bouncr slots
};