AC-Yenta

 view release on metacpan or  search on metacpan

eg/yenta.conf  view on Meta::CPAN

allow		127.0.0.1
allow           10.200.2.0/23

# seed peers to locate the network at startup
seedpeer        10.200.2.4:3503
seedpeer        10.200.2.5:3503


# enable debugging?
#debug           ae
#debug           map
#debug           merkle
# ...


# maps
map testyfoo {
    # name of the data file
    dbfile      /home/data/testyfoo.ydb
    # how much history to keep 
    history     4
}

eg/yenta_get  view on Meta::CPAN

# Author: Jeff Weisberg
# Created: 2009-Apr-03 13:43 (EDT)
# Function: get value example
#
# $Id$

use AC::Yenta::Client;
use AC::Dumper;
use strict;

my $map = shift @ARGV;
my $key = shift @ARGV;
die "usage: get [-h host] map key\n" unless $map && $key;

my $y   = AC::Yenta::Client->new(
    # server_file, servers[], or host + port
    server_file	=> '/var/tmp/yenta.status',
    debug 	=> \&debug,
   );

my $res = $y->get($map, $key);

print dumper($res), "\n";


exit;

sub debug {
    print STDERR @_, "\n";
}

eg/yenta_get_direct  view on Meta::CPAN

# Author: Jeff Weisberg
# Created: 2009-Sep-16 12:00 (EDT)
# Function: get - using bdb file directly
#
# $Id$

use AC::Dumper;
use AC::Yenta::Direct;
use strict;

my $map  = shift @ARGV;
my $file = shift @ARGV;
my $key  = shift @ARGV;

my $y = AC::Yenta::Direct->new( $map, $file );
my $v = $y->get($key);
print  dumper($v), "\n";

eg/yenta_put  view on Meta::CPAN

use JSON;
use strict;


my $ys = AC::Yenta::Client->new( debug => sub{ print STDERR @_, "\n"; });


my $key = 'YX3jSXD3CBRUDABm';

my $res = $ys->distribute(
    # map, key, version, data
    'mymap', $key, timet_to_yenta_version(time()),
    encode_json( {
        url_id	=> $key,
        url	=> 'http://www.example.com',
        acc_id	=> 'C9TdSgbUCBRUCABG',
        format	=> 'html',
    }),
   );

lib/AC/Yenta.pm  view on Meta::CPAN

=item syslog

specify a syslog facility for log messages.

    syslog local5

=item debug

enable debugging for a particular section

    debug map

=item map

configure a map (a collection of key-value data). you do not need
to configure the same set of maps on all servers. maps should be
configured similarly on all servers that they are on.

    map users {
	backend	    bdb
        dbfile      /home/acdata/users.ydb
        history     4
    }

=back

=head1 BUGS

Too many to list here.

lib/AC/Yenta/Client.pm  view on Meta::CPAN


    $me->{server_file} ||= $me->{altfile};	# compat

    die "servers or server_file?\n" unless $me->{servers} || $me->{server_file};

    return $me;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $key = shift;
    my $ver = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_get',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        data	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
        } ]
    } );

    return $me->_send_request($map, $req);
}

sub _shard {
    my $key = shift;

    my $sh = sha1($key);
    my($a, $b) = unpack('NN', $sh);
    return $a<<32 | $b;
}


sub distribute {
    my $me   = shift;
    my $map  = shift;
    my $key  = shift;
    my $ver  = shift;
    my $val  = shift;
    my $file = shift;	# reference
    my $meta = shift;

    return unless $key && $ver;
    $me->{retries} = 25 unless $me->{retries};

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_distrib',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        sender		=> "$HOSTNAME/$$",
        hop		=> 0,
        expire		=> time() + 120,
        datum	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
            shard	=> _shard($key),	# NYI
            value	=> $val,
            meta	=> $meta,
        } ]
    }, $file );

    return $me->_send_request($map, $req, $file);
    # return undef | result
}

sub check {
    my $me  = shift;
    my $map = shift;
    my $ver = shift;
    my $lev = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        map		=> $map,
        level		=> $lev,
        version		=> $ver,
    } );

    return $me->_send_request($map, $req);
}

################################################################

sub _send_request {
    my $me   = shift;
    my $map  = shift;
    my $req  = shift;
    my $file = shift;	# reference

    my $tries = $me->{retries} + 1;
    my $copy  = $me->{copies} || 1;
    my $delay = 0.25;

    $me->_init_hostlist($map);
    my ($addr, $port) = $me->_next_host($map);

    for (1 .. $tries){
        return unless $addr;
        my $res = $me->_try_server($addr, $port, $req, $file);
        return $res if $res && !--$copy;
        ($addr, $port) = $me->_next_host($map);
        sleep $delay;
        $delay *= 1.414;
    }
}

sub _try_server {
    my $me   = shift;
    my $addr = shift;
    my $port = shift;
    my $req  = shift;

lib/AC/Yenta/Client.pm  view on Meta::CPAN

        $res = undef;
    }
    return $res;
}


################################################################

sub _next_host {
    my $me  = shift;
    my $map = shift;

    $me->_read_serverfile($map) unless $me->{_server};
    return unless $me->{_server} && @{$me->{_server}};
    my $next = shift @{$me->{_server}};
    return( $next->{addr}, $next->{port} );
}

sub _init_hostlist {
    my $me  = shift;
    my $map = shift;

    my @server;
    push @server, {
        addr	=> $me->{host},
        port	=> $me->{port},
    } if $me->{host} && $me->{port};

    push @server, @{$me->{servers}} if $me->{servers};
    $me->{_server} = \@server;

    $me->_read_serverfile($map);
}

# yentad saves a list of alternate peers to try in case it dies
sub _read_serverfile {
    my $me  = shift;
    my $map = shift;

    my $f;
    my @server;
    my @faraway;
    open($f, $me->{server_file});
    local $/ = "\n";
    while(<$f>){
        chop;
        my $data = decode_json( $_ );
        next unless grep { $_ eq $map } @{ $data->{map} };
        if( $data->{is_local} ){
            push @server, { addr => $data->{addr}, port => $data->{port} };
        }else{
            push @faraway, { addr => $data->{addr}, port => $data->{port} };
        }
    }

    # prefer local
    @server = @faraway unless @server;

lib/AC/Yenta/Config.pm  view on Meta::CPAN


package AC::Yenta::Config;
use AC::Misc;
use AC::Import;
use AC::DC::Debug;
use AC::ConfigFile::Simple;
use Socket;
use strict;

our @ISA = 'AC::ConfigFile::Simple';
our @EXPORT = qw(conf_value conf_map);


my %CONFIG = (

    include	=> \&AC::ConfigFile::Simple::include_file,
    debug	=> \&AC::ConfigFile::Simple::parse_debug,
    allow	=> \&AC::ConfigFile::Simple::parse_allow,
    port	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    environment => \&AC::ConfigFile::Simple::parse_keyvalue,
    secret 	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    seedpeer	=> \&AC::ConfigFile::Simple::parse_keyarray,

    ae_maxload	=> \&AC::ConfigFile::Simple::parse_keyvalue,
    distrib_max	=> \&AC::ConfigFile::Simple::parse_keyvalue,

    savestatus	=> \&parse_savefile,
    monitor	=> \&parse_monitor,
    map		=> \&parse_map,

);

my @MAP = qw(dbfile basedir keepold history expire backend sharded);


################################################################

sub handle_config {
    my $me   = shift;

lib/AC/Yenta/Config.pm  view on Meta::CPAN

    my $rest = shift;

    my $fnc = $CONFIG{$key};
    return unless $fnc;
    $fnc->($me, $key, $rest);
    return 1;
}

################################################################

sub parse_map {
    my $me    = shift;
    my $key   = shift;
    my $value = shift;

    my($map) = $value =~ /(\S+)\s+\{\s*/;
    die "invalid map spec\n" unless $map;

    my $md = {};
    problem("map '$map' redefined") if $me->{_pending}{map}{$map};

    while( defined(my $l = $me->_nextline()) ){
        last if $l eq '}';
        my($k, $v) = split /\s+/, $l, 2;

        if( grep {$_ eq $k} @MAP ){
            $v = cvt_timespec($v) if $k eq 'expire';
            $md->{$k} = $v;
        }else{
            problem("unknown map option '$k'");
        }
    }

    $me->{_pending}{map}{$map} = $md;
}

sub parse_monitor {
    my $me  = shift;
    my $key = shift;
    my $mon = shift;

    my($ip, $port) = split /:/, $mon;
    push @{$me->{_pending}{monitor}}, {
        monitor	=> $mon,

lib/AC/Yenta/Config.pm  view on Meta::CPAN



################################################################

sub conf_value {
    my $key = shift;

    return $AC::Yenta::CONF->{config}{$key};
}

sub conf_map {
    my $map = shift;

    return $AC::Yenta::CONF->{config}{map}{$map};
}


1;

lib/AC/Yenta/Direct.pm  view on Meta::CPAN

use AC::Yenta::Store::SQLite;
use AC::Yenta::Store::Tokyo;
use AC::Misc;
use AC::Import;
use strict;

our @EXPORT = 'yenta_direct_get';

sub yenta_direct_get {
    my $file = shift;
    my $map  = shift;
    my $key  = shift;

    my $me = __PACKAGE__->new($map, $file);
    return unless $me;
    return $me->get( $key );
}


sub new {
    my $class = shift;
    my $map   = shift;
    my $file  = shift;

    my $db = AC::Yenta::Store::Map->new( $map, undef, { dbfile => $file, readonly => 1 } );
    return unless $db;

    return bless { db => $db }, $class;
}

sub get {
    my $me  = shift;
    my $key = shift;

    my $db = $me->{db};

lib/AC/Yenta/Direct.pm  view on Meta::CPAN

    return $me->getrange( '', '' );
}

sub getrange {
    my $me  = shift;
    my $lo  = shift;
    my $hi  = shift;

    my $db = $me->{db};

    return map { $_->{k} } $db->range( ($lo||''), ($hi||'') );
}

1;

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

require 'AC/protobuf/yenta_status.pl';
use strict;


my $HOSTNAME = hostname();

################################################################

sub _myself {

    my $maps = conf_value('map');

    my @ipinfo;
    my $natinfo = my_network_info();
    my $status  = 200;

    for my $i ( @$natinfo ){
        my $st = AC::Yenta::NetMon::status_dom( $i->{natdom} );

        # if a private network is down, announce the network + 500
        # if a private network is down, stop announcing it

lib/AC/Yenta/Kibitz/Status.pm  view on Meta::CPAN

        environment	=> conf_value('environment'),
        via		=> AC::Yenta::Status->my_server_id(),
        server_id	=> AC::Yenta::Status->my_server_id(),
        instance_id	=> AC::Yenta::Status->my_instance_id(),
        path		=> '.',
        status		=> $status,
        uptodate	=> AC::Yenta::Store::AE->up_to_date(),
        timestamp   	=> $^T,
        lastup		=> $^T,
        ip		=> \@ipinfo,
        map		=> [ keys %$maps ],
        sort_metric	=> loadave() * 1000,
    };
}


################################################################

sub myself {

    # tell server about ourself

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    # process requests
    my @res;
    my $rescont;
    for my $r (@{ $req->{data} }){
        debug("get request: $r->{map}, $r->{key}, $r->{version}");
        my($data, $ver, $file, $meta) = store_get( $r->{map}, $r->{key}, $r->{version} );
        my $res = {
            map		=> $r->{map},
            key		=> $r->{key},
        };

        if( $meta && $file ){
            unless( _check_content( $meta, $file ) ){
                problem("content SHA1 check failed: $r->{map}, $r->{key}, $ver - removing");
                # QQQ - remove from system, (and let AE get a new copy)?
                store_remove($r->{map}, $r->{key}, $ver);
                # tell caller it was not found
                $ver = undef;
            }
        }

        if( defined $ver ){
            $res->{version} = $ver;
            $res->{value}   = $data;
            $res->{meta}    = $meta  if defined $meta;
            if( $file ){

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    my $req;
    eval {
        $req = ACPYentaCheckRequest->decode( $gpb );
    };
    if(my $e = $@){
        problem("cannot decode request: $e");
        $io->shut();
        return;
    }

    debug("check request: $req->{map}, $req->{version}, $req->{level}");

    my @res;
    my @todo = { version => $req->{version}, shard => $req->{shard} };

    # the top of the tree will be fairly sparse,
    # return up to several levels if they are sparse
    for my $l (0 .. 32){
        my $cl = $req->{level} + $l;
        my @lres;
        my $nexttot;
        for my $do (@todo){
            my @r = _get_check( $req->{map}, $do->{shard}, $do->{version}, $cl );
            push @lres, @r;
            for (@r){
                $nexttot += $_->{key} ? ($_->{count} / 8) : $_->{count};
            }
        }
        push @res, @lres;
        @todo = ();
        last unless @lres;				# reached the bottom of the tree
        last if @res > 64;				# got enough results
        last if (@lres > 2) && ($nexttot > @lres + 2);	# less sparse region

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN


    debug("sending check reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response );

}

# get + process merkle data
sub _get_check {
    my $map   = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    my $res = store_get_merkle($map, $shard, $ver, $lev);
    return unless $res;
    for my $r (@$res) {
        $r->{map}   = $map;
    }

    return @$res;
}

sub api_distrib {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# reference

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        $req = ACPYentaDistRequest->decode( $gpb );
        die "invalid k/v for put request\n" unless $req->{datum}{key} && $req->{datum}{version};
    };
    if(my $e = $@){
        my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
        problem("cannot decode request: peer: $io->{peerip} $enc, $e");
        $io->shut();
        return;
    }

    unless( conf_map( $req->{datum}{map} ) ){
        problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
        _reply_error($io, $proto, 404, 'Map Not Found');
        return;
    }

    my $v = $req->{datum};

    # do we already have ?
    my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );

    if( $want ){
        # put

        debug("put request from $io->{peerip}: $v->{map}, $v->{key}, $v->{version}");

        # file content is passed by reference, to avoid large copies
        $content ||= \ $v->{file} if $v->{file};

        # check
        if( $v->{meta} && $content ){
            unless( _check_content( $v->{meta}, $content ) ){
                problem("content SHA1 check failed: $req->{datum}{map}, $req->{datum}{key}, $req->{datum}{version}");
                $io->shut();
                return;
            }
        }

        $want = store_put( $v->{map}, $v->{shard}, $v->{key}, $v->{version}, $v->{value},
                   $content, $v->{meta} );

        # distribute to other systems
        AC::Yenta::Store::Distrib->new( $req, $content ) if $want;
    }else{
        debug("put from $io->{peerip} unwanted: $v->{map}, $v->{shard}, $v->{key}, $v->{version}");
    }


    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );

lib/AC/Yenta/Stats.pm  view on Meta::CPAN


    my @peers = AC::Yenta::Status->allpeers();
    $res .= sprintf("%-24s%s\n", "peers:", scalar @peers);
    $res .= "\n";
    return $res;
}

sub http_data {
    my $url = shift;

    my(undef, $map, $key, $ver) = split m|/|, $url;
    my($data, $version, $file, $meta) = store_get($map, $key, $ver);

    return http_notfound($url) unless $version;
    return $data;
}

sub http_file {
    my $url = shift;

    my(undef, $map, $key, $ver) = split m|/|, $url;
    my($data, $version, $file, $meta) = store_get($map, $key, $ver);

    return http_notfound($url) unless $version && $file;
    return $$file;
}

1;

lib/AC/Yenta/Status.pm  view on Meta::CPAN


my $KEEPDOWN = 1800;	# keep data about down servers for how long?
my $KEEPLOST = 600;	# keep data about servers we have not heard about for how long?
my $SAVEMAX  = 1800;	# do not save if older than

my $PORT;

our $DATA = bless {
    allpeer	=> {},		# yenta_status
    sceptical	=> {},
    mappeer	=> {},		# {map} => { id => id }
    peermap	=> {},		# {id}  => @map
    datacenter  => {},		# {dc}  => { id => id }
    peertype	=> {},		# {ss}  => { id => id }
};

sub init {
    my $port = shift;

    $PORT = $port;

    AC::DC::Sched->new(

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    $c->start();
}

sub _random_peer {

    my $here  = my_datacenter();

    # sceptical
    my @scept = values %{$DATA->{sceptical}};

    my @all   = map  { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};
    my @old   = grep { $_->{timestamp} < $^T - $KEEPLOST *.75 } @all;
    my @local = grep { $_->{datacenter} eq $here } @all;	# this datacenter
    my @away  = grep { $_->{datacenter} ne $here } @all;	# not this datacenter

    # first check anything sceptical
    my @peer  = @scept;

    # then (maybe) something about to expire
    @peer = @old  unless @peer || int rand(5);

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    # where - no longer used
    $where ||= my_datacenter();

    my @peer = keys %{ $DATA->{peertype}{$type} };
    return unless @peer;

    # nothing too old
    @peer = grep { $DATA->{allpeer}{$_}{lastup} > $^T - $SAVEMAX } @peer;
    return unless @peer;

    return map { $DATA->{allpeer}{$_} } @peer;
}

# save a list of peers, in case I crash, and for others to use
sub save_status {

    my $save = conf_value('savestatus');
    my $here = my_datacenter();

    # also save locally running services
    my @mon  = AC::Yenta::Monitor::export();

lib/AC/Yenta/Status.pm  view on Meta::CPAN

                port		=> int($port),
                status		=> int($pd->{status}),
                subsystem	=> $pd->{subsystem},
                environment	=> $pd->{environment},
                sort_metric	=> int($pd->{sort_metric}),
                capacity_metric => int($pd->{capacity_metric}),
                datacenter	=> $pd->{datacenter},
                is_local	=> ($here eq $pd->{datacenter} ? 1 : 0),
            };
            if( $pd->{subsystem} eq 'yenta' ){
                $data->{map} = $pd->{map};
            }

            print FILE encode_json( $data ), "\n";
        }

        close FILE;
        unless( rename("$file.tmp", $file) ){
            problem("cannot rename save file '$file': $!");
        }

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    return $DATA->{allpeer}{$id};
}

sub allpeers {
    my $class = shift;

    # idown sets status to 0 (below), skip such
    return grep { $_->{status} } values %{$DATA->{allpeer}};
}

sub mappeers {
    my $class = shift;
    my $map   = shift;

    return keys %{ $DATA->{mappeer}{$map} };
}

sub datacenters {
    my $class = shift;

    return $DATA->{datacenter};
}
################################################################

sub _remove {

lib/AC/Yenta/Status.pm  view on Meta::CPAN


    my $ss = $DATA->{allpeer}{$id}{subsystem};
    delete $DATA->{peertype}{$ss}{$id} if $ss;

    my $dc = $DATA->{allpeer}{$id}{datacenter};
    delete $DATA->{datacenter}{$dc}{$id} if $dc;

    verbose("deleting peer: $id");
    delete $DATA->{allpeer}{$id};

    # remove map info
    for my $map ( @{$DATA->{peermap}{$id}} ){
        delete $DATA->{mappeer}{$map}{$id};
    }
    delete $DATA->{peermap}{$id};

    # delete its monitored items
    for my $p (keys %{$DATA->{allpeer}}){
        next unless $DATA->{allpeer}{$p}{via} eq $id;
        _remove($p);
    }
}

sub _maybe_remove {
    my $id = shift;

lib/AC/Yenta/Status.pm  view on Meta::CPAN

        $DATA->{datacenter}{$up->{datacenter}}{$id} = $id;
    }

    # update subsystem info
    unless( $DATA->{peertype}{$up->{subsystem}}{$id} ){
        my $ss = $previnfo->{subsystem};
        delete $DATA->{peertype}{$ss}{$id} if $ss;
        $DATA->{peertype}{$up->{subsystem}}{$id} = $id;
    }

    # update map info
    $DATA->{peermap}{$id} ||= [];
    $up->{map} ||= [];
    my @curmap = @{$DATA->{peermap}{$id}};
    my @newmap = sort @{$up->{map}};

    return if "@curmap" eq "@newmap";		# unchanged

    # what do we need to add/remove
    my (%remove, %add);
    @remove{@curmap} = @curmap;
    @add{@newmap}    = @newmap;
    delete $remove{$_} for @newmap;
    delete $add{$_}    for @curmap;

    for my $map (keys %remove){
        debug("removing $map from $id");
        delete $DATA->{mappeer}{$map}{$id};
    }
    for my $map (keys %add){
        debug("adding $map to $id");
        $DATA->{mappeer}{$map}{$id} = $id;
    }
    $DATA->{peermap}{$id} = \@newmap;
}

lib/AC/Yenta/Store.pm  view on Meta::CPAN

use AC::Yenta::Store::Distrib;
use AC::Yenta::Store::AE;
use AC::Yenta::Store::Expire;
use strict;

our @EXPORT = qw(store_get store_put store_want store_get_merkle store_get_internal store_set_internal store_expire store_remove);

my %STORE;


# create maps from config
sub configure {

    my $maps = $AC::Yenta::CONF->{config}{map};

    my %remove = %STORE;
    for my $map (keys %{$maps}){
        debug("configuring map $map");

        my $conf = $maps->{$map};
        my $sharded = $conf->{sharded};
        my $c  = $sharded ? 'AC::Yenta::Store::Sharded' : 'AC::Yenta::Store::Map';
        my $be = $conf->{backend};

        my $m = $c->new( $map, $be, { %$conf, recovery => 1 } );
        $STORE{$map} = $m;
        delete $remove{$map};
    }

    for my $map (keys %remove){
        debug("removing unused map '$map'");
        delete $STORE{$map};
    }
}

sub store_get {
    my $map   = shift;
    my $key   = shift;
    my $ver   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->get($key, $ver);
}

sub store_want {
    my $map   = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->want($shard, $key, $ver);
}

sub store_put {
    my $map   = shift;
    my $shard = shift;
    my $key   = shift;
    my $ver   = shift;
    my $data  = shift;
    my $file  = shift;	# reference
    my $meta  = shift;

    my $m = $STORE{$map};
    return unless $m;

    debug("storing $map/$key/$ver");
    $m->put($shard, $key, $ver, $data, $file, $meta);
}

# NB: only removes local copy temporarily. will be replaced at next AE run
sub store_remove {
    my $map   = shift;
    my $key   = shift;
    my $ver   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->remove($key, $ver);
}


sub store_get_merkle {
    my $map   = shift;
    my $shard = shift;
    my $ver   = shift;
    my $lev   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->get_merkle($shard, $ver, $lev);
}

sub store_get_internal {
    my $map   = shift;
    my $key   = shift;

    my $m = $STORE{$map};
    return unless $m;

    return $m->get_internal($key);
}

sub store_set_internal {
    my $map   = shift;
    my $key   = shift;
    my $val   = shift;

    my $m = $STORE{$map};
    return unless $m;

    $m->set_internal($key, $val);
}

sub store_expire {
    my $map  = shift;
    my $max  = shift;	# all versions before this

    my $m = $STORE{$map};
    return unless $m;

    $m->expire($max);
}

sub store_normalize_version {
    my $map = shift;

    my $m = $STORE{$map};
    return unless $m;

    $m->normalize_version(@_);
}

sub store_version_max {
    my $map = shift;

    my $m = $STORE{$map};
    return unless $m;

    $m->version_max(@_);
}

sub store_merkle_scrub {
    my $map = shift;

    my $m = $STORE{$map};
    return unless $m;

    $m->merkle_scrub(@_);
}

1;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


my $MAXGET     = 32;	# maximum number of records per fetch
my $MAXFILES   = 4;	# maximum number of files per fetch
my $MAXFETCH   = 32;	# maximum number of simultaneous fetches
my $MAXMISSING = 10;	# maximum number of missing records to be considered up to date
my $MAXLOAD    = 0.5;	# do not run if load average is too high
my $EXPIRE     = 300;	# expire hung job after this long
my $TOONEW     = 60;	# don't consider things missing if they are less than this old

my $msgid      = $$;
my %DONE;		# maps which have finished
my @AE;			# normally, just one

AC::DC::Sched->new(
    info	=> 'anti-entropy',
    freq	=> 60,
    func	=> \&AC::Yenta::Store::AE::periodic,
   );

sub new {
    my $class = shift;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    my $me = bless {
        badnode		=> [ {version => 0, shard => 0, level => 0} ],
        cache		=> {},
        kvneed		=> [],
        kvneedorig	=> [],
        kvfetching	=> 0,
        missing		=> 0,
    }, $class;

    debug("new ae");
    $me->_pick_map()  || return;

    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_start', $^T);
    $me->_init_peer() || return;

    debug("checking $me->{map} with $me->{peer}{id}");
    inc_stat('ae_runs');
    $me->_next_step();

    push @AE, $me;
    return $me;
}

sub periodic {
    # kill dead sessions, start new ones

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

            push @keep, $ae;
        }
    }
    @AE = @keep;

    return if @AE;
    return if loadave() > (conf_value('ae_maxload') || $MAXLOAD);
    __PACKAGE__->new();
}

# we are up to date if we have AE'ed every map at least once since starting
sub up_to_date {
    my $class = shift;

    my $maps = conf_value('map');
    for my $m (keys %$maps){
        return 0 unless $DONE{$m};
    }
    return 1;
}

################################################################

# find most stale map
sub _pick_map {
    my $me = shift;

    my $maps = conf_value('map');
    my(@best, $bestv);
    for my $m (keys %$maps){
        my $lt = AC::Yenta::Store::store_get_internal($m, 'ae_last_start');
        if( !@best || $lt < $bestv ){
            @best = $m;
            $bestv = $lt;
        }elsif( $lt == $bestv ){
            push @best, $m;
        }
    }

    return unless @best;

    my $map = $best[ rand(@best) ];
    $me->{map} = $map;

    # is this a data or file map?
    # adjust accordingly
    my $cf = conf_map( $map );
    $me->{has_files} = 1 if $cf->{basedir};
    $me->{maxget} = $me->{has_files} ? $MAXFILES : $MAXGET;
    $me->{expire} = $cf->{expire};

    return 1;
}

sub _init_peer {
    my $me = shift;

    my $here = my_datacenter();
    my @peer = AC::Yenta::Status->mappeers( $me->{map} );
    my $env  = conf_value('environment');

    my(@near, @far, @ood);

    for my $p (@peer){
        my $d = AC::Yenta::Status->peer($p);
        next unless $d->{environment} eq $env;
        next unless $d->{status}      == 200;

        if( $d->{uptodate} ){

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    my $peer = $peer[ rand(@peer) ];

    return $peer;
}

################################################################

sub _finished {
    my $me = shift;

    debug("finished $me->{map}");
    $DONE{$me->{map}} = $^T if $me->{missing} < $MAXMISSING;
    AC::Yenta::Store::store_set_internal($me->{map}, 'ae_last_finish', $^T);
    @AE = grep{ $_ != $me } @AE;
}

sub _next_step {
    my $me = shift;

    $me->{timestamp} = $^T;

    if( $me->{kvfetching} < $MAXFETCH ){
        # any missing data?

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


    $me->_finished();
}

################################################################

sub _start_check {
    my $me = shift;

    my $node = shift @{$me->{badnode}};
    debug("checking next node: $me->{map} $node->{level}/$node->{version}");
    inc_stat('ae_check_node');

    my $enc     = use_encryption($me->{peer});
    my $proto   = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $request = $proto->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> $msgid++,
        want_reply	=> 1,
        data_encrypted	=> $enc,
    }, {
        map		=> $me->{map},
        level		=> $node->{level},
        version		=> $node->{version},
        shard		=> $node->{shard},
    } );

    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new(
        $me->{peer}{ip}, undef,
        $request,
        info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    debug("check results");
    $evt->{data} ||= {};

    # determine highest level returned

    my @keydata;
    my @nodedata;
    my $maxlev = 0;

    for my $d ( @{ $evt->{data}{check} }){
        debug("recvd result for $d->{map} $d->{level}/$d->{shard}/$d->{version} $d->{key}");

        if( $d->{key} ){
            push @keydata, $d;
            next;
        }
        next if $d->{level} < $maxlev;

        if( $d->{level} > $maxlev ){
            @nodedata = ();
            $maxlev   = $d->{level};

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    }

    $me->_next_step();
}

sub _check_error {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
    $me->_next_step();
}

sub _check_result_keys {
    my $me  = shift;
    my $chk = shift;

    my %vscnt;
    my %vsadd;

    for my $d (@$chk){
        inc_stat('ae_check_key');
        my $vsk = "$d->{version} $d->{shard}";
        $vscnt{ $vsk } ++;

        next unless AC::Yenta::Store::store_want( $me->{map}, $d->{shard}, $d->{key}, $d->{version} );

        debug("missing data $d->{map}/$d->{key}/$d->{shard}/$d->{version}");
        push @{$me->{kvneed}}, { key => $d->{key}, version => $d->{version}, shard => $d->{shard} };
        inc_stat('ae_key_missing');
        $me->{missing} ++;
        $vsadd{ $vsk } ++;
    }
}

sub _is_expired {
    my $me  = shift;
    my $map = shift;
    my $lev = shift;
    my $ver = shift;

    return unless $me->{expire};

    my $vmx = AC::Yenta::Store::store_version_max( $map, $ver, $lev );
    return unless defined $vmx;

    if( $vmx < timet_to_yenta_version($^T - $me->{expire} + $TOONEW) ){
        debug("skipping expired $lev/$ver - $vmx");
        return 1;
    }

    return;
}

sub _check_result_nodes {
    my $me  = shift;
    my $lev = shift;
    my $chk = shift;

    # determine all of the base versions of the recvd data
    my %ver;
    for my $d (@$chk){
        my($shard, $ver) = AC::Yenta::Store::store_normalize_version( $d->{map}, $d->{shard}, $d->{version}, $lev - 1);
        $ver{"$ver $shard"} = { ver => $ver, shard => $shard };
    }

    # get all of our merkle data for these versions
    my %merkle;
    my $t_new = timet_to_yenta_version($^T - $TOONEW);

    for my $d (values %ver){
        next if $d->{ver} > $t_new;				# too new, ignore
        next if $me->_is_expired($me->{map}, $lev, $d->{ver});
        # RSN - skip unwanted shards
        my $ms = AC::Yenta::Store::store_get_merkle($me->{map}, $d->{shard}, $d->{ver}, $lev - 1);
        for my $m (@$ms){
            # debug("my hash $me->{map} $m->{level}/$m->{shard}/$m->{version} => $m->{hash}");
            $merkle{"$m->{version} $m->{shard}"} = $m->{hash};
        }
    }

    # compare (don't bother with things that are too new (the data may still be en route))
    for my $d (@$chk){
        next if $d->{version} > $t_new;				# too new, ignore
        next if $me->_is_expired($me->{map}, $lev, $d->{version});
        # RSN - skip unwanted shards
        my $hash = $merkle{"$d->{version} $d->{shard}"};

        if( $d->{hash} eq $hash ){
            debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} => match");
            next;
        }else{
            debug("check $d->{level}/$d->{shard}/$d->{version}: $d->{hash} != $hash");
        }

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    debug("getting kv data from current peer");
    $me->_start_get_kv( $me->{peer}, 0, \@get);
}

sub _start_get_kv {
    my $me    = shift;
    my $peer  = shift;
    my $retry = shift;
    my $get   = shift;

    # insert map into request
    $_->{map} = $me->{map} for @$get;

    # for (@$get){ debug("requesting $_->{key}/$_->{version}") }

    my $enc   = use_encryption($peer);
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_get',
        msgidno		  => $msgid++,

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

    $me->{kvfetching} --;
    $evt->{data} ||= {};

    debug("got kv data results");

    my %need = map {
        ( "$_->{key}/$_->{version}" => $_ )
    } @$get;

    for my $d ( @{$evt->{data}{data}}){
        debug("got $d->{map}/$d->{key}/$d->{version}");
        next unless $d->{key} && $d->{version};		# not found

        delete $need{ "$d->{key}/$d->{version}" };
        my $file = $evt->{content};
        $file = \ $d->{file} if $d->{file};

        AC::Yenta::Store::store_put( $d->{map}, $d->{shard}, $d->{key}, $d->{version},
                                     $d->{value}, $file, $d->{meta} );

    }

    push @{$me->{kvneedorig}}, (values %need) if $retry;

    $me->_next_get_kv();

}

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN

    return bless {
        dir	=> $dir,
        file	=> $file,
        db	=> $db,
        hasenv  => ($env ? 1 : 0),
    }, $class;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    my $v;
    debug("get $map/$sub/$key");

    $me->_start();
    my $r = $me->{db}->db_get( _key($map,$sub,$key), $v );
    $me->_finish();

    return if $r; # not found

    if( wantarray ){
        return ($v, 1);
    }
    return $v;
}

sub put {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $val = shift;

    debug("put $map/$sub/$key");

    $me->_start();
    my $r = $me->{db}->db_put( _key($map,$sub,$key), $val);
    $me->_finish();

    return !$r;
}

sub del {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    $me->_start();
    $me->{db}->db_del( _key($map,$sub,$key));
    $me->_finish();
}

sub sync {
    my $me  = shift;

    $me->{db}->db_sync();
}

sub range {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $end = shift;	# undef => to end of map

    my ($k, $v, @k);
    $me->_start();
    my $cursor = $me->{db}->db_cursor();
    $k = _key($map,$sub,$key);
    my $e = _key($map,$sub,$end);
    $cursor->c_get($k, $v, DB_SET_RANGE);

    my $MAX = 100;
    my $max = $MAX;

    while( !$end || ($k lt $e) ){
        debug("range $k");
        last unless $k =~ m|$map/$sub/|;
        $k =~ s|$map/$sub/||;
        push @k, { k => $k, v => $v };
        my $r = $cursor->c_get($k, $v, DB_NEXT);
        last if $r;	# error

        # cursor locks the db
        # close+recreate so other processes can proceed
        unless( $max -- ){
            $cursor->c_close();
            $me->_finish();
            sleep 0;

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN


sub _finish {
    my $me = shift;

    alarm($me->{alarmold} || 0);
    $me->{alarmold} = 0;
}


sub _key {
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    return "$map/$sub/$key";
}

1;

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    my $req   = shift;
    my $cont  = shift;

    return if $req->{hop} >= $MAXHOP;
    return if $req->{expire} < $^T;

    my $sender = $req->{sender};
    my $sendat = AC::Yenta::Status->peer($sender);

    my $me = bless {
        info		=> "$req->{datum}{map}/$req->{datum}{key}/$req->{datum}{version}",
        map		=> $req->{datum}{map},
        req		=> $req,
        content		=> $cont,
        # we tune the distribution algorithm based on where it came from:
        faraway 	=> (my_datacenter() ne $sendat->{datacenter}),

        farseen		=> 0,
        nearseen	=> 0,
        farsend		=> [],
        nearsend	=> [],
        ordershift	=> 4,

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    }
}

# which yentas can do something with the update?
sub _compat_peers_in_dc {
    my $me = shift;
    my $dc = shift;

    my $env = conf_value('environment');
    my $dcs = AC::Yenta::Status->datacenters();
    my $map = $me->{map};
    my @id;

    for my $id (keys %{$dcs->{$dc}}){
        my $pd = AC::Yenta::Status->peer($id);

        next unless $pd->{subsystem}   eq 'yenta';
        next unless $pd->{environment} eq $env;
        next unless grep {$map eq $_} @{ $pd->{map} };
        push @id, $id;
    }
    return @id;
}

sub _start_far {
    my $me  = shift;

    my $d = shift @{ $me->{farsend} };
    return unless $d;

lib/AC/Yenta/Store/Expire.pm  view on Meta::CPAN

use strict;

AC::DC::Sched->new(
    info	=> 'expire',
    freq	=> 60,
    func	=> \&AC::Yenta::Store::Expire::periodic,
   );

sub periodic {

    my $maps = conf_value('map');
    for my $map (keys %$maps){
        my $cf = conf_map($map);
        next unless $cf->{expire};

        my $expire = timet_to_yenta_version($^T - $cf->{expire});
        debug("running expire from $expire");
        AC::Yenta::Store::store_expire( $map, $expire );
    }
}

################################################################


1;

lib/AC/Yenta/Store/LevelDB.pm  view on Meta::CPAN

    chmod 0777, $file;

    return bless {
        file	=> $file,
        db	=> $db,
    }, $class;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    my $v;
    debug("get $map/$sub/$key");
    my $v = $me->{db}->Get( _key($map,$sub,$key) );

    return unless $v; # not found

    if( wantarray ){
        return ($v, 1);
    }
    return $v;
}

sub put {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $val = shift;

    debug("put $map/$sub/$key");

    my $r = $me->{db}->Put( _key($map,$sub,$key), $val);
    return 1;
}

sub del {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    $me->{db}->Delete( _key($map,$sub,$key));
}

sub sync {
    my $me  = shift;

}

sub range {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $end = shift;	# undef => to end of map

    my $it = $me->{db}->NewIterator();

    my @k;

    my $e = _key($map,$sub,$end);

    my $k = _key($map,$sub,$key);
    my $r = $it->Seek($k);

    while( $it->Valid() ){
        my $k = $it->key();
        my $v = $it->value();

        last if $end && ($k ge $e);

        debug("range $k");
        last unless $k =~ m|$map/$sub/|;
        $k =~ s|$map/$sub/||;
        push @k, { k => $k, v => $v };
        $it->Next();
    }

    return @k;
}

################################################################

sub _key {
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    return "$map/$sub/$key";
}

1;

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Mar-30 19:21 (EDT)
# Function: storage maps
#
# $Id$

package AC::Yenta::Store::Map;
use AC::Yenta::Store::File;
use AC::Yenta::Store::Merkle;
use AC::Yenta::Debug 'map';
use AC::Yenta::Conf;
use AC::Cache;
use strict;

our @ISA = 'AC::Yenta::Store::Merkle';

my $DEFAULT = 'bdb';
my %BACKEND ;
my $CACHESIZE = 1024;	# gives us ~90% cache hit rate

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

    my $conf  = shift;

    unless( $bkend ){
        # from extension, or default
        my($ext) = $conf->{dbfile} =~ /\.(.+)$/;
        $bkend = $ext if $BACKEND{$ext};
    }

    my $c  = $BACKEND{$bkend || $DEFAULT};
    unless( $c ){
        problem("invalid storage backend: $bkend - ignoring map");
        return ;
    }

    debug("configuring map $name with $c");

    my $db = $c->new( $name, $conf );
    my $fs = AC::Yenta::Store::File->new( $name, $conf );

    my $me = bless {
        name		=> $name,
        conf		=> $conf,
        db		=> $db,
        fs		=> $fs,
        merkle_height	=> 16,

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN

            # unless we are keeping old data, remove previous version
            $deletedata{$versions[0]} = 1 if @versions;
        }
    }

    # add new version to list. newest 1st
    @versions = sort {$b cmp $a} (@versions, $v);
    if( $cf->{history} && @versions > $cf->{history} ){
        # trim list
        my @rm = splice @versions, $cf->{history}, @versions, ();
        push @deletehist, (map { ({version => decode_version($_), key => $key, shard => $shard}) } @rm);
        $deletedata{$_} = 1 for @_;
    }
    if( $me->is_sharded() ){
        # QQQ - shard changed?
        $db->put($me->{name}, 'shard', $key, encode_shard($shard || 0));
    }

    my $dd = join(' ', map { $_->{version} } @deletehist);
    debug("version list: @versions [delete: $dd]");

    $me->_versput( $key, @versions );

    # update merkles
    $me->merkle( { shard => $shard, key => $key, version => $ver }, @deletehist);

    # delete old data
    for my $rm (keys %deletedata){
        debug("removing old version $key/$rm");

lib/AC/Yenta/Store/Map.pm  view on Meta::CPAN


sub is_my_shard {
    return 1;
}


1;

=head1 NAME

AC::Yenta::Store::Map - persistent storage for yenta maps

=head1 SYNOPSIS

  your code:

    AC::Yenta::Store::Map->add_backend( postgres => 'Local::Yenta::Postgres' );

  your config:

    map mappyfoo {
        backend     postgres
        # ...
    }

=cut

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

        debug("updating merkle node $k1 + { $k0 => $hash, $count }");
        $d{$k0} = "$hash $count";
    }else{
        # remove
        debug("updating merkle node $k1 - { $k0 => empty }");
        delete $d{$k0};
    }

    if( keys %d ){

        $d = join("\0", map {"$_\0$d{$_}"} (sort keys %d));
        $me->_mcput( $k1, $d );
        my $newh = sha1_base64($d);
        return if $newh eq $oldh;	# unchanged
        return ($nextshard, $nextver, $newh, scalar keys %d);
    }else{
        $me->_mcdel( $k1 );
        return unless $oldh;		# unchanged
        return ($nextshard, $nextver, undef);
    }
}

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

################################################################

sub _get_actual_keys {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;

    my $db = $me->{db};

    # get range on data
    my @key = map {
        my $k = $_->{k}; $k =~ s|.*/||; $k
    } $db->range($me->{name}, 'data', encode_version($ver), encode_version($ver + 1));
    debug("actual key: @key");

    return @key unless defined $shard;

    # get vers list to filter on shard
    my $sh = encode_shard($shard);
    return grep {
        my $k = $_;

lib/AC/Yenta/Store/Merkle.pm  view on Meta::CPAN

# check merkle leaf node against actual data
sub merkle_scrub {
    my $me    = shift;
    my $shard = shift;
    my $ver   = shift;

    debug("scrub $me->{name} $shard/$ver");

    # get list of keys from merkle leaf node
    my $mlist = $me->get_merkle($shard, $ver, $me->{merkle_height}) || [];
    my @mkey  = map { $_->{key} } @$mlist;
    my %mkey;
    @mkey{@mkey} = @mkey;

    # get list of keys from actual data
    my @akey  = $me->_get_actual_keys( $shard, $ver );

    # compare lists

    for my $k (@akey){
        next if $mkey{$k};

lib/AC/Yenta/Store/SQLite.pm  view on Meta::CPAN

    _init( $db );

    return bless {
        file	=> $file,
        db	=> $db,
    }, $class;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    debug("get $map/$sub/$key");

    my $st = _do($me->{db}, 'select value, 1 from ykv where map = ? and sub = ? and key = ?', $map, $sub, $key);

    my($v, $found) = $st->fetchrow_array();
    return unless $found;

    $v = decode_base64($v);

    if( wantarray ){
        return ($v, 1);
    }
    return $v;
}

sub put {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $val = shift;

    debug("put $map/$sub/$key");

    my $st = _do($me->{db}, 'select 1 from ykv where map = ? and sub = ? and key = ?', $map, $sub, $key);
    my($found) = $st->fetchrow_array();

    if( $found ){
        _do($me->{db}, 'update ykv set value = ? where map = ? and sub = ? and key = ?', encode_base64($val), $map, $sub, $key);
    }else{
        _do($me->{db}, 'insert into ykv (map,sub,key,value) values (?,?,?,?)',  $map, $sub, $key, encode_base64($val));
    }

    return 1;
}

sub del {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    debug("del $map/$sub/$key");

    _do($me->{db}, 'delete from ykv where map = ? and sub = ? and key = ?', $map, $sub, $key);

    return 1;
}

sub sync {}

sub range {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $end = shift;	# undef => to end of map

    my $st;

    if( defined $end ){
        $st = _do($me->{db}, 'select key as k, value as v from ykv where map = ? and sub = ? and key >= ? and key < ?',
                     $map, $sub, $key, $end);
    }else{
        $st = _do($me->{db}, 'select key as k, value as v from ykv where map = ? and sub = ? and key >= ?',
                     $map, $sub, $key);
    }

    my $r = $st->fetchall_arrayref({});

    return @$r;
}

################################################################

sub _init {

lib/AC/Yenta/Store/SQLite.pm  view on Meta::CPAN

    die $e if $e;

    return $st;
}

################################################################

$initsql = <<END;

create table if not exists ykv (
	map	text 	not null,
	sub	text	not null,
	key	text	not null,
	value	text,

	unique(map,sub,key)
);

create index if not exists ykvidx on ykv(map, sub, key);

pragma synchronous = 1;         -- default is full(2)

pragma cache_size  = 100000;    -- default is 2000

vacuum;

analyze;

END

lib/AC/Yenta/Store/Sharded.pm  view on Meta::CPAN


# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jun-07 17:34 (EDT)
# Function: consistently-hashed shards (unfinished)
#
# $Id$


package AC::Yenta::Store::Sharded;
use AC::Yenta::Debug 'map';
use AC::Yenta::Config;
use strict;

our @ISA = 'AC::Yenta::Store::Map';

# using too large a size, makes the tree heavy at the top, thin at the bottom
# taking up more space, and slowing down AE
my $SHARDLEN = 5;

sub new {

lib/AC/Yenta/Store/Tokyo.pm  view on Meta::CPAN

    chmod 0666, $file;

    return bless {
        file	=> $file,
        db	=> $db,
    }, $class;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    debug("get $map/$sub/$key");
    my $v = $me->{db}->get( _key($map,$sub,$key) );

    return unless $v; # not found

    if( wantarray ){
        return ($v, 1);
    }
    return $v;
}

sub put {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $val = shift;

    debug("put $map/$sub/$key");

    my $r = $me->{db}->put( _key($map,$sub,$key), $val);
    return 1;
}

sub del {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    $me->{db}->out( _key($map,$sub,$key));
}

sub sync {
    my $me  = shift;

    $me->{db}->sync();
}

sub range {
    my $me  = shift;
    my $map = shift;
    my $sub = shift;
    my $key = shift;
    my $end = shift;	# undef => to end of map

    my $cur = TokyoCabinet::BDBCUR->new($me->{db});

    my @k;

    my $e = _key($map,$sub,$end);

    my $k = _key($map,$sub,$key);
    my $r = $cur->jump($k);

    while( $cur->key() ){
        my $k = $cur->key();
        my $v = $cur->val();

        last if $end && ($k ge $e);

        debug("range $k");

        last unless $k =~ m|$map/$sub/|;
        $k =~ s|$map/$sub/||;
        push @k, { k => $k, v => $v };
        $cur->next();
    }

    return @k;
}

################################################################

sub _key {
    my $map = shift;
    my $sub = shift;
    my $key = shift;

    return "$map/$sub/$key";
}

1;

lib/AC/protobuf/yenta_check.pl  view on Meta::CPAN

use warnings;
use Google::ProtocolBuffers;
{       
    unless (ACPYentaCheckValue->can('_pb_fields_list')) {
        Google::ProtocolBuffers->create_message(
            'ACPYentaCheckValue',
            [
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'map', 1, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_INT64(), 
                    'version', 2, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
                    Google::ProtocolBuffers::Constants::TYPE_INT32(), 
                    'level', 3, undef

lib/AC/protobuf/yenta_check.pl  view on Meta::CPAN

        );
    }

    unless (ACPYentaCheckRequest->can('_pb_fields_list')) {
        Google::ProtocolBuffers->create_message(
            'ACPYentaCheckRequest',
            [
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'map', 1, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_INT32(), 
                    'level', 2, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_INT64(), 
                    'version', 3, undef

lib/AC/protobuf/yenta_getset.pl  view on Meta::CPAN

use warnings;
use Google::ProtocolBuffers;
{       
    unless (ACPYentaMapDatum->can('_pb_fields_list')) {
        Google::ProtocolBuffers->create_message(
            'ACPYentaMapDatum',
            [
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'map', 1, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'key', 2, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
                    Google::ProtocolBuffers::Constants::TYPE_INT64(), 
                    'version', 3, undef

lib/AC/protobuf/yenta_status.pl  view on Meta::CPAN

                    'timestamp', 8, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REQUIRED(), 
                    Google::ProtocolBuffers::Constants::TYPE_INT64(), 
                    'lastup', 9, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_REPEATED(), 
                    Google::ProtocolBuffers::Constants::TYPE_STRING(), 
                    'map', 10, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
                    Google::ProtocolBuffers::Constants::TYPE_BOOL(), 
                    'uptodate', 11, undef
                ],
                [
                    Google::ProtocolBuffers::Constants::LABEL_OPTIONAL(), 
                    Google::ProtocolBuffers::Constants::TYPE_INT32(), 
                    'sort_metric', 12, undef

proto/yenta_check.proto  view on Meta::CPAN

// Copyright (c) 2009 AdCopy
// Author: Jeff Weisberg
// Created: 2009-Apr-01 10:57 (EDT)
// Function: 
//
// $Id$


message ACPYentaCheckValue {
        required string         map             = 1;
        required int64          version         = 2;
        optional int32          level           = 3;
        optional string         key             = 4;
        optional string         hash            = 5;
        optional int64          shard           = 6;
};

message ACPYentaCheckRequest {
        required string         map             = 1;
        required int32          level           = 2;
        required int64          version         = 3;
        optional int64          shard           = 4;
};

message ACPYentaCheckReply {
        repeated ACPYentaCheckValue check     = 1;
};

proto/yenta_getset.proto  view on Meta::CPAN

// Copyright (c) 2009 AdCopy
// Author: Jeff Weisberg
// Created: 2009-Apr-01 10:41 (EDT)
// Function: 
//
// $Id$

message ACPYentaMapDatum {
        required string         map             = 1;
        required string         key             = 2;
        optional int64          version         = 3;
        optional bytes          value           = 4;
        optional bytes          meta            = 5;
        optional bytes          file            = 6;
        optional int64          shard           = 7;
};

message ACPYentaGetSet {
        repeated ACPYentaMapDatum data          = 1;

proto/yenta_status.proto  view on Meta::CPAN

message ACPYentaStatus {
        required string         hostname        = 1;
        required string         datacenter      = 2;
	required string		subsystem	= 3;
        required string         environment     = 4;
        required string         via             = 5;
        repeated ACPIPPort      ip              = 6;
        required int32          status          = 7;
        required int64          timestamp       = 8;
        required int64          lastup          = 9;
        repeated string         map             = 10;
        optional bool           uptodate        = 11;
	optional int32		sort_metric	= 12;

        required string         server_id       = 13;   // typically IP:PORT
        required string         instance_id     = 14;   // for loop detection
        optional string         path            = 15;   // for debugging

        optional int32          capacity_metric = 16;   // bouncr slots
};



( run in 0.752 second using v1.01-cache-2.11-cpan-49f99fa48dc )