view release on metacpan or search on metacpan
lib/AC/Yenta.pm view on Meta::CPAN
Key-value data is versioned with timestamps. By default, newest wins.
Maps can be configured to keep and return multiple versions and client
code can use other conflict resolution mechanisms.
Lost, missing or otherwise inconsistent data is detected
by kibitzing merkle tree hash values.
=head2 Topological awareness
Yentas can take network topology into account when tranferring
data around to minimize long-distance transfers. You will need to
write a custom C<MySelf> class with a C<my_datacenter> function.
=head2 Multiple Network Interfaces / NAT
Yentas can take advantage of multiple network interfaces with
different IP addresses (eg. a private internal network + a public network),
or multiple addresses (eg. a private addresses and a public address)
and various NAT configurations.
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
$me->SUPER::start();
$me->write( $me->{_req} );
$me->timeout_rel($TIMEOUT);
return $me;
}
sub shutdown {
my $me = shift;
# maybe call error handler
$me->run_callback('error', undef) unless $me->{_store_ok};
}
sub timeout {
my $me = shift;
$me->shut();
}
sub read {
my $me = shift;
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
eval {
my $yp = AC::Yenta::Protocol->new();
$proto->{data} = $yp->decode_reply($proto) if $data;
};
if(my $e = $@){
problem("cannot decode reply: $e");
}
# process
$me->{_store_ok} = 1;
if( $proto->{is_error} || $@ ){
my $e = $@ || 'remote error';
$me->run_callback('error', {
error => $e,
});
}else{
$me->run_callback('load', $proto);
}
$me->shut();
}
1;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
};
if(my $e = $@){
my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
problem("cannot decode request: peer: $io->{peerip} $enc, $e");
$io->shut();
return;
}
unless( conf_map( $req->{datum}{map} ) ){
problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
_reply_error($io, $proto, 404, 'Map Not Found');
return;
}
my $v = $req->{datum};
# do we already have ?
my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );
if( $want ){
# put
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
return unless $chk eq $meta->{sha1};
}
if( $meta->{size} ){
my $len = length($$cont);
return unless $len == $meta->{size};
}
return 1;
}
sub _reply_error {
my $io = shift;
my $proto = shift;
my $code = shift;
my $msg = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
is_error => 1,
data_encrypted => $proto->{data_encrypted},
}, {
status_code => $code,
status_message => $msg,
haveit => 0,
} );
debug("sending distrib reply");
$io->write_and_shut( $response );
}
lib/AC/Yenta/MySelf.pm view on Meta::CPAN
emacs /myperldir/Local/Yenta/MySelf.pm
copy. paste. edit.
use lib '/myperldir';
my $y = AC::Yenta::D->new(
class_myself => 'Local::Yenta::MySelf',
);
=head1 DESCRIPTION
provide functions to override default behavior. you may define
any or all of the following functions.
=head2 my_server_id
return a unique identity for this yenta instance. typically,
something similar to the server hostname.
sub my_server_id {
return 'yenta@' . hostname();
}
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;
if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
# decode header
eval {
$io->{proto_header} = $me->decode_header( $io->{rbuffer} );
};
if(my $e=$@){
verbose("cannot decode protocol header: $e");
$io->run_callback('error', {
cause => 'read',
error => "cannot decode protocol: $e",
});
$io->shut();
return;
}
}
my $p = $io->{proto_header};
return unless $p; # read more
# do we have everything?
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
my $me = shift;
my $io = shift;
my $auth = shift;
my $data = shift;
eval {
$data = $me->decrypt( $auth, $data );
};
if(my $e=$@){
verbose("cannot decrypt protocol data: $e");
$io->run_callback('error', {
cause => 'read',
error => "cannot decrypt protocol: $e",
});
$io->shut();
return;
}
return $data;
}
sub use_encryption {
my $peer = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
} );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new(
$me->{peer}{ip}, undef,
$request,
info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );
if( $io ){
$io->set_callback('load', \&_check_load, $me);
$io->set_callback('error', \&_check_error, $me);
$io->start();
}
}
sub _check_load {
my $io = shift;
my $evt = shift;
my $me = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
if( @keydata ){
$me->_check_result_keys( \@keydata );
}elsif( @nodedata ){
$me->_check_result_nodes( $maxlev, \@nodedata );
}
$me->_next_step();
}
sub _check_error {
my $io = shift;
my $evt = shift;
my $me = shift;
verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
$me->_next_step();
}
sub _check_result_keys {
my $me = shift;
my $chk = shift;
my %vscnt;
my %vsadd;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
} );
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
info => "AE getkv from $peer->{id}" );
if( $io ){
$me->{kvfetching} ++;
$io->set_callback('load', \&_getkv_load, $me, $retry, $get);
$io->set_callback('error', \&_getkv_error, $me, $retry, $get);
$io->start();
}
}
sub _getkv_load {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
$d->{value}, $file, $d->{meta} );
}
push @{$me->{kvneedorig}}, (values %need) if $retry;
$me->_next_get_kv();
}
sub _getkv_error {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
$me->{kvfetching} --;
if( $retry ){
push @{$me->{kvneedorig}}, @$get;
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
my $MAX = 100;
my $max = $MAX;
while( !$end || ($k lt $e) ){
debug("range $k");
last unless $k =~ m|$map/$sub/|;
$k =~ s|$map/$sub/||;
push @k, { k => $k, v => $v };
my $r = $cursor->c_get($k, $v, DB_NEXT);
last if $r; # error
# cursor locks the db
# close+recreate so other processes can proceed
unless( $max -- ){
$cursor->c_close();
$me->_finish();
sleep 0;
$me->_start();
$cursor = $me->{db}->db_cursor();
$cursor->c_get($k, $v, DB_SET);
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
}, \$ect );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
$request . $ect,
info => "distrib $me->{info} to $id",
);
if( $io ){
$io->set_callback('load', \&_onload, $me, $id, $far);
$io->set_callback('error', \&_onerror, $me, $id, $far);
$io->start();
}else{
debug("start client failed");
}
}
sub _onload {
my $io = shift;
my $evt = shift;
my $me = shift;
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
my $n = $me->{ordershift};
$n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
shift @{$me->{nearsend}} for (1 .. $n);
$me->{ordershift} *= 2;
}
}
$me->_start_one($far);
}
sub _onerror {
my $io = shift;
my $evt = shift;
my $me = shift;
my $id = shift;
my $far = shift;
verbose("error distributing $me->{info} to $id");
# don't need to track anything
$me->_start_one($far);
}
sub _orderly {
my $peers = shift;
my $myself = AC::Yenta::Status->my_server_id();
my @p = sort {$a cmp $b} @$peers;
lib/AC/Yenta/Store/SQLite.pm view on Meta::CPAN
eval {
for my $sql (split /;/, $initsql){
$sql =~ s/--\s.*$//gm; # remove comments
next unless $sql !~ /^\s*$/;
_do($db, $sql);
}
};
if(my $e=$@){
# QQQ?
problem("error initializing sqlite db: $e");
}
}
sub _do {
my $db = shift;
my $sql = shift;
my( $st, $nrow );
eval {
debug("sql: $sql");
lib/AC/Yenta/Store/Tokyo.pm view on Meta::CPAN
problem("no dbfile specified for '$name'");
return;
}
debug("opening Tokyo DB file=$file");
my $db = TokyoCabinet::BDB->new();
my $flags = $conf->{readonly} ? ($db->OREADER | $db->ONOLCK) : ($db->OWRITER | $db->OCREAT);
if(!$db->open($file, $flags)){
#my $ecode = $db->ecode();
#printf STDERR ("open error: %s\n", $db->errmsg($ecode));
problem("cannot open db file $file");
}
# web server will need access
chmod 0666, $file;
return bless {
file => $file,
db => $db,
}, $class;