view release on metacpan or search on metacpan
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
$me->SUPER::start();
$me->write( $me->{_req} );
$me->timeout_rel($TIMEOUT);
return $me;
}
sub shutdown {
my $me = shift;
# maybe call error handler
$me->run_callback('error', undef) unless $me->{_store_ok};
}
sub timeout {
my $me = shift;
$me->shut();
}
sub read {
my $me = shift;
lib/AC/Yenta/Kibitz/Store/Client.pm view on Meta::CPAN
eval {
my $yp = AC::Yenta::Protocol->new();
$proto->{data} = $yp->decode_reply($proto) if $data;
};
if(my $e = $@){
problem("cannot decode reply: $e");
}
# process
$me->{_store_ok} = 1;
if( $proto->{is_error} || $@ ){
my $e = $@ || 'remote error';
$me->run_callback('error', {
error => $e,
});
}else{
$me->run_callback('load', $proto);
}
$me->shut();
}
1;
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
};
if(my $e = $@){
my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
problem("cannot decode request: peer: $io->{peerip} $enc, $e");
$io->shut();
return;
}
unless( conf_map( $req->{datum}{map} ) ){
problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
_reply_error($io, $proto, 404, 'Map Not Found');
return;
}
my $v = $req->{datum};
# do we already have ?
my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );
if( $want ){
# put
lib/AC/Yenta/Kibitz/Store/Server.pm view on Meta::CPAN
return unless $chk eq $meta->{sha1};
}
if( $meta->{size} ){
my $len = length($$cont);
return unless $len == $meta->{size};
}
return 1;
}
sub _reply_error {
my $io = shift;
my $proto = shift;
my $code = shift;
my $msg = shift;
my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
my $response = $yp->encode_reply( {
type => 'yenta_distrib',
msgid => $proto->{msgid},
is_reply => 1,
is_error => 1,
data_encrypted => $proto->{data_encrypted},
}, {
status_code => $code,
status_message => $msg,
haveit => 0,
} );
debug("sending distrib reply");
$io->write_and_shut( $response );
}
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;
if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
# decode header
eval {
$io->{proto_header} = $me->decode_header( $io->{rbuffer} );
};
if(my $e=$@){
verbose("cannot decode protocol header: $e");
$io->run_callback('error', {
cause => 'read',
error => "cannot decode protocol: $e",
});
$io->shut();
return;
}
}
my $p = $io->{proto_header};
return unless $p; # read more
# do we have everything?
lib/AC/Yenta/Protocol.pm view on Meta::CPAN
my $me = shift;
my $io = shift;
my $auth = shift;
my $data = shift;
eval {
$data = $me->decrypt( $auth, $data );
};
if(my $e=$@){
verbose("cannot decrypt protocol data: $e");
$io->run_callback('error', {
cause => 'read',
error => "cannot decrypt protocol: $e",
});
$io->shut();
return;
}
return $data;
}
sub use_encryption {
my $peer = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
} );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new(
$me->{peer}{ip}, undef,
$request,
info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );
if( $io ){
$io->set_callback('load', \&_check_load, $me);
$io->set_callback('error', \&_check_error, $me);
$io->start();
}
}
sub _check_load {
my $io = shift;
my $evt = shift;
my $me = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
if( @keydata ){
$me->_check_result_keys( \@keydata );
}elsif( @nodedata ){
$me->_check_result_nodes( $maxlev, \@nodedata );
}
$me->_next_step();
}
sub _check_error {
my $io = shift;
my $evt = shift;
my $me = shift;
verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
$me->_next_step();
}
sub _check_result_keys {
my $me = shift;
my $chk = shift;
my %vscnt;
my %vsadd;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
} );
# connect + send
debug("sending to $peer->{id}");
my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
info => "AE getkv from $peer->{id}" );
if( $io ){
$me->{kvfetching} ++;
$io->set_callback('load', \&_getkv_load, $me, $retry, $get);
$io->set_callback('error', \&_getkv_error, $me, $retry, $get);
$io->start();
}
}
sub _getkv_load {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
lib/AC/Yenta/Store/AE.pm view on Meta::CPAN
$d->{value}, $file, $d->{meta} );
}
push @{$me->{kvneedorig}}, (values %need) if $retry;
$me->_next_get_kv();
}
sub _getkv_error {
my $io = shift;
my $evt = shift;
my $me = shift;
my $retry = shift;
my $get = shift;
$me->{kvfetching} --;
if( $retry ){
push @{$me->{kvneedorig}}, @$get;
lib/AC/Yenta/Store/BDBI.pm view on Meta::CPAN
my $MAX = 100;
my $max = $MAX;
while( !$end || ($k lt $e) ){
debug("range $k");
last unless $k =~ m|$map/$sub/|;
$k =~ s|$map/$sub/||;
push @k, { k => $k, v => $v };
my $r = $cursor->c_get($k, $v, DB_NEXT);
last if $r; # error
# cursor locks the db
# close+recreate so other processes can proceed
unless( $max -- ){
$cursor->c_close();
$me->_finish();
sleep 0;
$me->_start();
$cursor = $me->{db}->db_cursor();
$cursor->c_get($k, $v, DB_SET);
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
}, \$ect );
# connect + send
my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
$request . $ect,
info => "distrib $me->{info} to $id",
);
if( $io ){
$io->set_callback('load', \&_onload, $me, $id, $far);
$io->set_callback('error', \&_onerror, $me, $id, $far);
$io->start();
}else{
debug("start client failed");
}
}
sub _onload {
my $io = shift;
my $evt = shift;
my $me = shift;
lib/AC/Yenta/Store/Distrib.pm view on Meta::CPAN
my $n = $me->{ordershift};
$n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
shift @{$me->{nearsend}} for (1 .. $n);
$me->{ordershift} *= 2;
}
}
$me->_start_one($far);
}
sub _onerror {
my $io = shift;
my $evt = shift;
my $me = shift;
my $id = shift;
my $far = shift;
verbose("error distributing $me->{info} to $id");
# don't need to track anything
$me->_start_one($far);
}
sub _orderly {
my $peers = shift;
my $myself = AC::Yenta::Status->my_server_id();
my @p = sort {$a cmp $b} @$peers;
lib/AC/Yenta/Store/SQLite.pm view on Meta::CPAN
eval {
for my $sql (split /;/, $initsql){
$sql =~ s/--\s.*$//gm; # remove comments
next unless $sql !~ /^\s*$/;
_do($db, $sql);
}
};
if(my $e=$@){
# QQQ?
problem("error initializing sqlite db: $e");
}
}
sub _do {
my $db = shift;
my $sql = shift;
my( $st, $nrow );
eval {
debug("sql: $sql");
lib/AC/Yenta/Store/Tokyo.pm view on Meta::CPAN
problem("no dbfile specified for '$name'");
return;
}
debug("opening Tokyo DB file=$file");
my $db = TokyoCabinet::BDB->new();
my $flags = $conf->{readonly} ? ($db->OREADER | $db->ONOLCK) : ($db->OWRITER | $db->OCREAT);
if(!$db->open($file, $flags)){
#my $ecode = $db->ecode();
#printf STDERR ("open error: %s\n", $db->errmsg($ecode));
problem("cannot open db file $file");
}
# web server will need access
chmod 0666, $file;
return bless {
file => $file,
db => $db,
}, $class;