AC-Yenta

 view release on metacpan or  search on metacpan

META.yml  view on Meta::CPAN

--- #YAML:1.0
name:               AC-Yenta
version:            1.1
abstract:           eventually-consistent distributed key/value data store. et al.
author:
    - AdCopy <http://www.adcopy.com>
license:            perl
distribution_type:  module
configure_requires:
    ExtUtils::MakeMaker:  0
requires:
    AC::DC:               0
    BerkeleyDB:           0
    Crypt::Rijndael:      0
    Digest::SHA:          0
    Google::ProtocolBuffers:  0
    JSON:                 0
    POSIX:                0

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    return $me;
}

sub get {
    my $me  = shift;
    my $map = shift;
    my $key = shift;
    my $ver = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_get',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        data	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,
        } ]
    } );

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    my $key  = shift;
    my $ver  = shift;
    my $val  = shift;
    my $file = shift;	# reference
    my $meta = shift;

    return unless $key && $ver;
    $me->{retries} = 25 unless $me->{retries};

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_distrib',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        sender		=> "$HOSTNAME/$$",
        hop		=> 0,
        expire		=> time() + 120,
        datum	=> [ {
            map		=> $map,
            key		=> $key,
            version	=> $ver,

lib/AC/Yenta/Client.pm  view on Meta::CPAN

    # return undef | result
}

sub check {
    my $me  = shift;
    my $map = shift;
    my $ver = shift;
    my $lev = shift;

    my $req = $me->{proto}->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> rand(0xFFFFFFFF),
        want_reply	=> 1,
    }, {
        map		=> $map,
        level		=> $lev,
        version		=> $ver,
    } );

    return $me->_send_request($map, $req);
}

lib/AC/Yenta/Config.pm  view on Meta::CPAN

        ipi	=> inet_atoi($ip),
        port	=> $port,
    };
}

sub parse_savefile {
    my $me   = shift;
    my $key  = shift;
    my $save = shift;

    my($file, @type) = split /\s+/, $save;
    push @{$me->{_pending}{savestatus}}, {
        type	=> \@type,
        file	=> $file,
    };
}

sub cvt_timespec {
    my $t = shift;

    my %f = ( m => 60, h => 3600, d => 86400 );
    my($n, $f) = $t =~ /(\d+)(\D?)/;

lib/AC/Yenta/Kibitz/Status/Client.pm  view on Meta::CPAN


sub start {
    my $me = shift;

    $me->SUPER::start();

    # build request
    my $yp  = AC::Yenta::Protocol->new();
    my $pb  = AC::Yenta::Kibitz::Status::myself();
    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($pb),
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,
       );

    # write request
    $me->write( $hdr . $pb );
    $me->timeout_rel($TIMEOUT);
    return $me;

lib/AC/Yenta/Kibitz/Status/Server.pm  view on Meta::CPAN

        return;
    }

    AC::Yenta::NetMon::update( $io );

    # respond with all known peers
    my $response = AC::Yenta::Kibitz::Status::response();
    my $yp  = AC::Yenta::Protocol->new();

    my $hdr = $yp->encode_header(
        type		=> 'yenta_status',
        data_length	=> length($response),
        content_length  => 0,
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
       );

    debug("sending status reply");
    $io->write_and_shut( $hdr . $response );
}

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

            }
        }
        push @res, $res;
    }

    # encode results
    my $ect = '';
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $proto->{data_encrypted} ? $yp->encrypt(undef, $$rescont) : $$rescont if $rescont;
    my $response = $yp->encode_reply( {
        type		  => 'yenta_get',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        content_encrypted => $proto->{data_encrypted},
    }, { data => \@res }, \$ect );

    debug("sending get reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response . $ect );

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        last unless @lres;				# reached the bottom of the tree
        last if @res > 64;				# got enough results
        last if (@lres > 2) && ($nexttot > @lres + 2);	# less sparse region
        # get the next level also
        @todo = @lres;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		  => 'yenta_check',
        msgid		  => $proto->{msgid},
        is_reply	  => 1,
        data_encrypted	  => $proto->{data_encrypted},
        }, { check => \@res } );

    debug("sending check reply");
    $io->timeout_rel($TIMEOUT);
    $io->{writebuf_timeout} = $TIMEOUT;
    $io->write_and_shut( $response );

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN



    unless( $proto->{want_reply} ){
        $io->shut();
        return;
    }

    # encode results
    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, { status_code => 200, status_message => 'OK', haveit => !$want } );

    debug("sending distrib reply");
    $io->timeout_rel($TIMEOUT);
    $io->write_and_shut( $response );

}

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

}

sub _reply_error {
    my $io    = shift;
    my $proto = shift;
    my $code  = shift;
    my $msg   = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        is_error	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, {
        status_code     => $code,
        status_message  => $msg,
        haveit		=> 0,
    } );

lib/AC/Yenta/Monitor/Client.pm  view on Meta::CPAN


    $me->set_callback('timeout',  \&timeout);
    $me->set_callback('read',     \&read);
    $me->set_callback('shutdown', \&shutdown);

    $me->start();

    # build request
    my $yp  = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $hdr = $yp->encode_header(
        type		=> 'heartbeat_request',
        data_length	=> 0,
        content_length	=> 0,
        want_reply	=> 1,
        msgid		=> $msgid++,
       );

    # write request
    $me->write( $hdr );

    $me->timeout_rel($TIMEOUT);

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN


# for simple status queries, argus, debugging
# this is not an RFC compliant http server
sub _read_http {
    my $io  = shift;
    my $evt = shift;

    return unless $io->{rbuffer} =~ /\r?\n\r?\n/s;
    my($get, $url, $http) = split /\s+/, $io->{rbuffer};

    return ( { type => 'http', method => $get }, $url );
}

################################################################

sub _decrypt_data {
    my $me   = shift;
    my $io   = shift;
    my $auth = shift;
    my $data = shift;

lib/AC/Yenta/Server.pm  view on Meta::CPAN


sub read {
    my $me  = shift;
    my $evt = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my($proto, $data, $content) = $yp->read_protocol( $me, $evt );
    return unless $proto;

    # dispatch request
    my $h = $HANDLER{ $proto->{type} };

    unless( $h ){
        verbose("unknown message type: $proto->{type}");
        $me->shut();
        return;
    }

    debug("handling request - $proto->{type}");

    if( ref $h ){
        $h->( $me, $proto, $data, $content );
    }else{
        $h->handler( $me, $proto, $data, $content );
    }
}


1;

lib/AC/Yenta/Status.pm  view on Meta::CPAN

my $SAVEMAX  = 1800;	# do not save if older than

my $PORT;

our $DATA = bless {
    allpeer	=> {},		# yenta_status
    sceptical	=> {},
    mappeer	=> {},		# {map} => { id => id }
    peermap	=> {},		# {id}  => @map
    datacenter  => {},		# {dc}  => { id => id }
    peertype	=> {},		# {ss}  => { id => id }
};

sub init {
    my $port = shift;

    $PORT = $port;

    AC::DC::Sched->new(
        info	=> 'kibitz status',
        freq	=> (conf_value('time_status_kibitz') || 5),

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    $c->start();
}

sub _random_peer {

    my $here  = my_datacenter();

    # sceptical
    my @scept = values %{$DATA->{sceptical}};

    my @all   = map  { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};
    my @old   = grep { $_->{timestamp} < $^T - $KEEPLOST *.75 } @all;
    my @local = grep { $_->{datacenter} eq $here } @all;	# this datacenter
    my @away  = grep { $_->{datacenter} ne $here } @all;	# not this datacenter

    # first check anything sceptical
    my @peer  = @scept;

    # then (maybe) something about to expire
    @peer = @old  unless @peer || int rand(5);

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    my $ipinfo = my_network_info();
    for my $i (@$ipinfo){
        return if $ip eq $i->{ipa} && $port == $PORT;
    }

    return("seed/$ip:$port", $ip, $port);
}

# server list for save file
sub server_list {
    my $type = shift;

    ($type, my $where) = split m|/|, $type;
    # where - no longer used
    $where ||= my_datacenter();

    my @peer = keys %{ $DATA->{peertype}{$type} };
    return unless @peer;

    # nothing too old
    @peer = grep { $DATA->{allpeer}{$_}{lastup} > $^T - $SAVEMAX } @peer;
    return unless @peer;

    return map { $DATA->{allpeer}{$_} } @peer;
}

# save a list of peers, in case I crash, and for others to use
sub save_status {

    my $save = conf_value('savestatus');
    my $here = my_datacenter();

    # also save locally running services
    my @mon  = AC::Yenta::Monitor::export();

    for my $s ( @$save ){
        my $file  = $s->{file};
        my $types = $s->{type};

        my @peer;
        for my $type (@$types){
            push @peer, server_list($type);

            for my $m (@mon){
                push @peer, $m if $m->{subsystem} eq $type;
            }
        }

        next unless @peer;

        debug("saving peer status file");
        unless( open(FILE, ">$file.tmp") ){
            problem("cannot open save file '$file.tmp': $!");
            return;
        }

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    my $class = shift;

    return $DATA->{datacenter};
}
################################################################

sub _remove {
    my $id = shift;

    my $ss = $DATA->{allpeer}{$id}{subsystem};
    delete $DATA->{peertype}{$ss}{$id} if $ss;

    my $dc = $DATA->{allpeer}{$id}{datacenter};
    delete $DATA->{datacenter}{$dc}{$id} if $dc;

    verbose("deleting peer: $id");
    delete $DATA->{allpeer}{$id};

    # remove map info
    for my $map ( @{$DATA->{peermap}{$id}} ){
        delete $DATA->{mappeer}{$map}{$id};

lib/AC/Yenta/Status.pm  view on Meta::CPAN

    }

    # update datacenter info
    unless( $DATA->{datacenter}{$up->{datacenter}}{$id} ){
        my $pdc = $previnfo->{datacenter};
        delete $DATA->{datacenter}{$pdc}{$id} if $pdc;
        $DATA->{datacenter}{$up->{datacenter}}{$id} = $id;
    }

    # update subsystem info
    unless( $DATA->{peertype}{$up->{subsystem}}{$id} ){
        my $ss = $previnfo->{subsystem};
        delete $DATA->{peertype}{$ss}{$id} if $ss;
        $DATA->{peertype}{$up->{subsystem}}{$id} = $id;
    }

    # update map info
    $DATA->{peermap}{$id} ||= [];
    $up->{map} ||= [];
    my @curmap = @{$DATA->{peermap}{$id}};
    my @newmap = sort @{$up->{map}};

    return if "@curmap" eq "@newmap";		# unchanged

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

sub _start_check {
    my $me = shift;

    my $node = shift @{$me->{badnode}};
    debug("checking next node: $me->{map} $node->{level}/$node->{version}");
    inc_stat('ae_check_node');

    my $enc     = use_encryption($me->{peer});
    my $proto   = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $request = $proto->encode_request( {
        type		=> 'yenta_check',
        msgidno		=> $msgid++,
        want_reply	=> 1,
        data_encrypted	=> $enc,
    }, {
        map		=> $me->{map},
        level		=> $node->{level},
        version		=> $node->{version},
        shard		=> $node->{shard},
    } );

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    # insert map into request
    $_->{map} = $me->{map} for @$get;

    # for (@$get){ debug("requesting $_->{key}/$_->{version}") }

    my $enc   = use_encryption($peer);
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_get',
        msgidno		  => $msgid++,
        want_reply	  => 1,
        data_encrypted	  => $enc,
    }, {
        data		  => $get,
    } );

    # connect + send
    debug("sending to $peer->{id}");
    my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    my $pd   = AC::Yenta::Status->peer($id);
    my $addr = $pd->{ip};	# array of nat ip info

    my $enc = use_encryption($pd);
    my $ect = '';
    my $proto = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    $ect = $enc ? $proto->encrypt(undef, ${$me->{content}}) : ${$me->{content}} if $me->{content};

    # build request
    my $request = $proto->encode_request( {
        type		  => 'yenta_distrib',
        msgidno		  => $msgid++,
        want_reply	  => 1,
        data_encrypted	  => $enc,
        content_encrypted => $enc,
    }, {
        sender		  => AC::Yenta::Status->my_server_id(),
        hop		  => $me->{req}{hop} + 1,
        expire		  => $me->{req}{expire},
        datum		  => $me->{req}{datum},
    }, \$ect );



( run in 2.764 seconds using v1.01-cache-2.11-cpan-df04353d9ac )