AC-Yenta

 view release on metacpan or  search on metacpan

eg/myself.pm  view on Meta::CPAN

# example myself

# $Id$

package Local::Yenta::MySelf;
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# our tcp port
    my $id    = shift;  # from cmd line

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.example.com//;	# remove domain
        $SERVERID = "yenta/$h";
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

eg/yenta_get  view on Meta::CPAN

    debug 	=> \&debug,
   );

my $res = $y->get($map, $key);

print dumper($res), "\n";


exit;

sub debug {
    print STDERR @_, "\n";
}

lib/AC/Yenta/AC/MySelf.pm  view on Meta::CPAN


package AC::Yenta::AC::MySelf;
use AC::Yenta::Config;
use AC::Yenta::Debug;
use AC::DataCenter;	# provides my_network_info, my_datacenter
use Sys::Hostname;
use strict;

my $SERVERID;

sub init {
    my $class = shift;
    my $port  = shift;	# not used
    my $id    = shift;

    $SERVERID = $id;
    unless( $SERVERID ){
        (my $h = hostname()) =~ s/\.adcopy.*//;
        my $v = conf_value('environment');
        $SERVERID = 'yenta';
        $SERVERID .= '/' . $v unless $v eq 'prod';
        $SERVERID .= '@' . $h;
    }
    verbose("system persistent-id: $SERVERID");
}

sub my_server_id {
    return $SERVERID;
}

1;

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    my $r = $MSGTYPE{$name};
    AC::DC::Protocol->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


# one or more of:
#   new( host, port )
#   new( servers => [ { host, port }, ... ] )
#   new( server_file )

sub new {
    my $class = shift;

    my $me = bless {
        debug	=> sub{ },
        host	=> 'localhost',
        proto	=> AC::DC::Protocol->new(),
        copies	=> 1,
        @_,
    }, $class;

    $me->{server_file} ||= $me->{altfile};	# compat

    die "servers or server_file?\n" unless $me->{servers} || $me->{server_file};

    return $me;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $key = shift;
    my $ver = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_get',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        data	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
        } ]
    } );

    return $me->_send_request($map, $req);
}

sub _shard {
    my $key = shift;

    my $sh = sha1($key);
    my($a, $b) = unpack('NN', $sh);
    return $a<<32 | $b;
}


sub distribute {
    my $me   = shift;
    my $map  = shift;
    my $key  = shift;
    my $ver  = shift;
    my $val  = shift;
    my $file = shift;	# reference
    my $meta = shift;

    return unless $key && $ver;
    $me->{retries} = 25 unless $me->{retries};

lib/AC/Yenta/Config.pm  view on Meta::CPAN

    monitor	=> \&parse_monitor,
    map		=> \&parse_map,

);

my @MAP = qw(dbfile basedir keepold history expire backend sharded);


################################################################

sub handle_config {
    my $me   = shift;
    my $key  = shift;
    my $rest = shift;

    my $fnc = $CONFIG{$key};
    return unless $fnc;
    $fnc->($me, $key, $rest);
    return 1;
}

################################################################

sub parse_map {
    my $me    = shift;
    my $key   = shift;
    my $value = shift;

    my($map) = $value =~ /(\S+)\s+\{\s*/;
    die "invalid map spec\n" unless $map;

    my $md = {};
    problem("map '$map' redefined") if $me->{_pending}{map}{$map};

lib/AC/Yenta/Customize.pm  view on Meta::CPAN

# Copyright (c) 2010 AdCopy
# Author: Jeff Weisberg
# Created: 2010-Jan-26 10:37 (EST)
# Function: connect user provided implementation
#
# $Id$

package AC::Yenta::Customize;
use strict;

sub customize {
    my $class  = shift;
    my $implby = shift;

    (my $default = $class) =~ s/(.*)::([^:]+)$/$1::Default::$2/;

    # load user's implemantation + default
    for my $p ($implby, $default){
        eval "require $p" if $p;
        die $@ if $@;
    }

lib/AC/Yenta/D.pm  view on Meta::CPAN

use AC::Yenta::Status;
use AC::Yenta::Store;
use AC::Yenta::MySelf;
use AC::Yenta::NetMon;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
# use AC::Yenta::Store::Tokyo;

use strict;

sub new {
    my $class = shift;
    my %p = @_;

    AC::Yenta::MySelf->customize( $p{class_myself} );
    # ...

    return bless \$class, $class;
}

sub daemon {
    my $me    = shift;
    my $cfile = shift;
    my $opt   = shift;	# foreground, debugall, persistent_id, argv

    die "no config file specified\n" unless $cfile;

    # configure
    $AC::Yenta::CONF = AC::Yenta::Config->new(
        $cfile, onreload => sub {
            AC::Yenta::Store::configure();
        });


    initlog( 'yenta', (conf_value('syslog') || 'local5'), $opt->{debugall} );

    AC::Yenta::Debug->init( $opt->{debugall}, $AC::Yenta::CONF);
    daemonize(5, 'yentad', $opt->{argv}) unless $opt->{foreground};
    verbose("starting.");

lib/AC/Yenta/Direct.pm  view on Meta::CPAN

use AC::Yenta::Store::Map;
use AC::Yenta::Store::BDBI;
use AC::Yenta::Store::SQLite;
use AC::Yenta::Store::Tokyo;
use AC::Misc;
use AC::Import;
use strict;

our @EXPORT = 'yenta_direct_get';

sub yenta_direct_get {
    my $file = shift;
    my $map  = shift;
    my $key  = shift;

    my $me = __PACKAGE__->new($map, $file);
    return unless $me;
    return $me->get( $key );
}


sub new {
    my $class = shift;
    my $map   = shift;
    my $file  = shift;

    my $db = AC::Yenta::Store::Map->new( $map, undef, { dbfile => $file, readonly => 1 } );
    return unless $db;

    return bless { db => $db }, $class;
}

sub get {
    my $me  = shift;
    my $key = shift;

    my $db = $me->{db};
    return $db->get($key);
}

sub allkeys {
    my $me  = shift;

    return $me->getrange( '', '' );
}

sub getrange {
    my $me  = shift;
    my $lo  = shift;
    my $hi  = shift;

    my $db = $me->{db};

    return map { $_->{k} } $db->range( ($lo||''), ($hi||'') );
}

1;

lib/AC/Yenta/IO/TCP/Client.pm  view on Meta::CPAN

package AC::Yenta::IO::TCP::Client;
our @ISA = 'AC::DC::IO::TCP::Client';

use AC::Yenta::MySelf;
use AC::Misc;
use strict;

my $inited;
my $natdom;

sub use_addr_port {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;

    # is addr + port => return
    return ($addr, $port) unless ref $addr;

    # addr is array of nat ip info (ACPIPPort)

    _init() unless $inited;

lib/AC/Yenta/Kibitz/Status/Server.pm  view on Meta::CPAN

# Function: 
#
# $Id$

package AC::Yenta::Kibitz::Status::Server;
use AC::Dumper;
use AC::Yenta::Debug 'status_server';
use strict;


sub handler {
    my $class   = shift;
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;

    if( $gpb ){
        AC::Yenta::Kibitz::Status::update_sceptical( $gpb, $io );
    }

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN

use AC::Yenta::Protocol;
use AC::Yenta::IO::TCP::Client;
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;

our @ISA = 'AC::Yenta::IO::TCP::Client';

my $TIMEOUT = 5;

sub new {
    my $class = shift;
    my $addr  = shift;
    my $port  = shift;
    my $req   = shift;

    debug('starting kibitz store client');
    my $me = $class->SUPER::new( $addr, $port, info => "kibitz store client $addr:$port", @_ );
    return unless $me;

    $me->{_req} = $req;
    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    return $me;
}

sub start {
    my $me = shift;

    $me->SUPER::start();
    $me->write( $me->{_req} );
    $me->timeout_rel($TIMEOUT);
    return $me;
}

sub shutdown {
    my $me = shift;

    # maybe call error handler
    $me->run_callback('error', undef) unless $me->{_store_ok};
}

sub timeout {
    my $me = shift;

    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    debug("recvd reply");

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    $me->timeout_rel($TIMEOUT) if $evt->{data} && !$proto;
    return unless $proto;
    $proto->{data}    = $data;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

use AC::Yenta::Config;
use AC::Dumper;
use JSON;
use Digest::SHA 'sha1_base64';
require 'AC/protobuf/yenta_getset.pl';
require 'AC/protobuf/yenta_check.pl';
use strict;

my $TIMEOUT = 1;

sub api_get {
    my $io      = shift;
    my $proto   = shift;
    my $gpb     = shift;
    my $content = shift;	# not used

    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

lib/AC/Yenta/Monitor.pm  view on Meta::CPAN


require 'AC/protobuf/yenta_status.pl';
require 'AC/protobuf/heartbeat.pl';

my $FREQ	= 2;
my $OLD_DOWN	= 30;
my $OLD_KEEP	= 1800;

my %MON;	# by 'id' (from config file)

sub init {

    AC::DC::Sched->new(
        info	=> 'monitor',
        freq	=> $FREQ,
        func	=> \&periodic,
       );
}

sub periodic {

    my $mon = conf_value('monitor');

    # clean up old data
    for my $id (keys %MON){
        isdown($id, 0) if $MON{$id}{lastup} < $^T - $OLD_DOWN;
    }

    # start monitoring (send heartbeat request)
    for my $m (@$mon){

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN

use AC::Yenta::Debug 'monitor_client';
use AC::Yenta::IO::TCP::Client;
use strict;
our @ISA = 'AC::Yenta::IO::TCP::Client';


my $HDRSIZE = AC::Yenta::Protocol->header_size();
my $TIMEOUT = 2;
my $msgid   = $$;

sub new {
    my $class = shift;

    debug('starting monitor status client');
    my $me = $class->SUPER::new( @_ );
    unless($me){
        return;
    }

    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);

lib/AC/Yenta/NetMon.pm  view on Meta::CPAN

use Socket;

use strict;

my $STALE	= 120;

my %lastok;	# natdom => T
my %natdom;	# ip => natdom


sub init {

    my $natinfo = my_network_info();
    for my $n ( @$natinfo ){
        my $dom = $n->{natdom} || 'public';
        $natdom{ $n->{ipa} } = $dom;
        $lastok{ $dom } = $^T;	# assume everything is working
    }
}

sub update {
    my $io = shift;

    my $ip  = inet_ntoa( (sockaddr_in(getsockname($io->{fd})))[1] );
    my $dom = $natdom{ $ip } || 'public';

    $lastok{$dom} = $^T;
}

sub status_dom {
    my $dom = shift;

    $dom ||= 'public';
    return unless exists $lastok{$dom};	# not local
    return ($lastok{$dom || 'public'} + $STALE < $^T) ? 0 : 200;
}


1;

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

  yenta_check		=> { num => 9, reqc => 'ACPYentaCheckRequest',  resc => 'ACPYentaCheckReply' },
 );


for my $name (keys %MSGTYPE){
    my $r = $MSGTYPE{$name};
    __PACKAGE__->add_msg( $name, $r->{num}, $r->{reqc}, $r->{resc});
}


sub read_protocol {
    my $me  = shift;
    my $io  = shift;
    my $evt = shift;

    $io->{rbuffer} .= $evt->{data};

    return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;

    if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
        # decode header

lib/AC/Yenta/Server.pm  view on Meta::CPAN

my $TIMEOUT = 2;

my %HANDLER = (
    yenta_status	=> 'AC::Yenta::Kibitz::Status::Server',
    yenta_get		=> \&AC::Yenta::Kibitz::Store::Server::api_get,
    yenta_distrib	=> \&AC::Yenta::Kibitz::Store::Server::api_distrib,
    yenta_check		=> \&AC::Yenta::Kibitz::Store::Server::api_check,
    http		=> 'AC::Yenta::Stats',
   );

sub new {
    my $class = shift;
    my $fd    = shift;
    my $ip    = shift;

    unless( $AC::Yenta::CONF->check_acl( $ip ) ){
        verbose("rejecting connection from $ip");
        return;
    }

    my $me = $class->SUPER::new( peerip => $ip, info => "tcp yenta server (from: $ip)" );

    $me->init($fd);
    $me->wantread(1);
    $me->timeout_rel($TIMEOUT);
    $me->set_callback('read',    \&read);
    $me->set_callback('timeout', \&timeout);
}

sub timeout {
    my $me = shift;

    debug("connection timed out");
    $me->shut();
}

sub read {
    my $me  = shift;
    my $evt = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

lib/AC/Yenta/SixtyFour.pm  view on Meta::CPAN

# Author: Jeff Weisberg <jaw @ tcp4me.com>
# Created: 2010-Dec-23 11:39 (EST)
# Function: 64 bit numbers as native integers or math::bigints?
#
# $Id$

package AC::Yenta::SixtyFour;
use strict;

# export one set of functions as x64_<name>
sub import {
    my $class  = shift;
    my $caller = caller;

    my $l = length(sprintf '%x', -1);
    my $prefix;
    if( $l >= 16 ){
        $prefix = 'native_';
    }else{
        $prefix = 'bigint_';
        require Math::BigInt;

lib/AC/Yenta/Store.pm  view on Meta::CPAN

use AC::Yenta::Store::AE;
use AC::Yenta::Store::Expire;
use strict;

our @EXPORT = qw(store_get store_put store_want store_get_merkle store_get_internal store_set_internal store_expire store_remove);

my %STORE;


# create maps from config
sub configure {

    my $maps = $AC::Yenta::CONF->{config}{map};

    my %remove = %STORE;
    for my $map (keys %{$maps}){
        debug("configuring map $map");

        my $conf = $maps->{$map};
        my $sharded = $conf->{sharded};
        my $c  = $sharded ? 'AC::Yenta::Store::Sharded' : 'AC::Yenta::Store::Map';

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN


my $msgid = $$;
my @DIST;

AC::DC::Sched->new(
    info	=> 'distribution',
    freq	=> 5,
    func	=> \&AC::Yenta::Store::Distrib::periodic,
   );

sub new {
    my $class = shift;
    my $req   = shift;
    my $cont  = shift;

    return if $req->{hop} >= $MAXHOP;
    return if $req->{expire} < $^T;

    my $sender = $req->{sender};
    my $sendat = AC::Yenta::Status->peer($sender);

 view all matches for this distribution
 view release on metacpan -  search on metacpan

( run in 3.844 seconds using v1.00-cache-2.02-grep-82fe00e-cpan-9f2165ba459b )