AC-Yenta

 view release on metacpan or  search on metacpan

lib/AC/Yenta.pm  view on Meta::CPAN


Key-value data is versioned with timestamps. By default, newest wins.
Maps can be configured to keep and return multiple versions and client
code can use other conflict resolution mechanisms.

Lost, missing or otherwise inconsistent data is detected
by kibitzing merkle tree hash values.

=head2 Topological awareness

Yentas can take network topology into account when tranferring
data around to minimize long-distance transfers. You will need to
write a custom C<MySelf> class with a C<my_datacenter> function.

=head2 Multiple Network Interfaces / NAT

Yentas can take advantage of multiple network interfaces with
different IP addresses (eg. a private internal network + a public network),
or multiple addresses (eg. a private addresses and a public address)
and various NAT configurations.

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN


    $me->SUPER::start();
    $me->write( $me->{_req} );
    $me->timeout_rel($TIMEOUT);
    return $me;
}

sub shutdown {
    my $me = shift;

    # maybe call error handler
    $me->run_callback('error', undef) unless $me->{_store_ok};
}

sub timeout {
    my $me = shift;

    $me->shut();
}

sub read {
    my $me  = shift;

lib/AC/Yenta/Kibitz/Store/Client.pm  view on Meta::CPAN

    eval {
        my $yp = AC::Yenta::Protocol->new();
        $proto->{data} = $yp->decode_reply($proto) if $data;
    };
    if(my $e = $@){
        problem("cannot decode reply: $e");
    }

    # process
    $me->{_store_ok} = 1;
    if( $proto->{is_error} || $@ ){
        my $e = $@ || 'remote error';
        $me->run_callback('error', {
            error	=> $e,
        });
    }else{
        $me->run_callback('load', $proto);
    }

    $me->shut();
}


1;

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

    };
    if(my $e = $@){
        my $enc = $proto->{data_encrypted} ? ' (encrypted)' : '';
        problem("cannot decode request: peer: $io->{peerip} $enc, $e");
        $io->shut();
        return;
    }

    unless( conf_map( $req->{datum}{map} ) ){
        problem("distribute request for unknown map '$req->{datum}{map}' - $io->{info}");
        _reply_error($io, $proto, 404, 'Map Not Found');
        return;
    }

    my $v = $req->{datum};

    # do we already have ?
    my $want = store_want( $v->{map}, $v->{shard}, $v->{key}, $v->{version} );

    if( $want ){
        # put

lib/AC/Yenta/Kibitz/Store/Server.pm  view on Meta::CPAN

        return unless $chk eq $meta->{sha1};
    }
    if( $meta->{size} ){
        my $len = length($$cont);
        return unless $len == $meta->{size};
    }

    return 1;
}

sub _reply_error {
    my $io    = shift;
    my $proto = shift;
    my $code  = shift;
    my $msg   = shift;

    my $yp = AC::Yenta::Protocol->new( secret => conf_value('secret') );
    my $response = $yp->encode_reply( {
        type		=> 'yenta_distrib',
        msgid		=> $proto->{msgid},
        is_reply	=> 1,
        is_error	=> 1,
        data_encrypted	=> $proto->{data_encrypted},
    }, {
        status_code     => $code,
        status_message  => $msg,
        haveit		=> 0,
    } );

    debug("sending distrib reply");
    $io->write_and_shut( $response );
}

lib/AC/Yenta/MySelf.pm  view on Meta::CPAN

    emacs /myperldir/Local/Yenta/MySelf.pm
    copy. paste. edit.

    use lib '/myperldir';
    my $y = AC::Yenta::D->new(
        class_myself        => 'Local::Yenta::MySelf',
    );

=head1 DESCRIPTION

provide functions to override default behavior. you may define
any or all of the following functions.

=head2 my_server_id

return a unique identity for this yenta instance. typically,
something similar to the server hostname.

    sub my_server_id {
        return 'yenta@' . hostname();
    }

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN


    return _read_http($io, $evt) if $io->{rbuffer} =~ /^GET/;

    if( length($io->{rbuffer}) >= $HDRSIZE && !$io->{proto_header} ){
        # decode header
        eval {
            $io->{proto_header} = $me->decode_header( $io->{rbuffer} );
        };
        if(my $e=$@){
            verbose("cannot decode protocol header: $e");
            $io->run_callback('error', {
                cause	=> 'read',
                error	=> "cannot decode protocol: $e",
            });
            $io->shut();
            return;
        }
    }

    my $p = $io->{proto_header};
    return unless $p; 	# read more

    # do we have everything?

lib/AC/Yenta/Protocol.pm  view on Meta::CPAN

    my $me   = shift;
    my $io   = shift;
    my $auth = shift;
    my $data = shift;

    eval {
        $data = $me->decrypt( $auth, $data );
    };
    if(my $e=$@){
        verbose("cannot decrypt protocol data: $e");
        $io->run_callback('error', {
            cause	=> 'read',
            error	=> "cannot decrypt protocol: $e",
        });
        $io->shut();
        return;
    }

    return $data;
}

sub use_encryption {
    my $peer = shift;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    } );

    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new(
        $me->{peer}{ip}, undef,
        $request,
        info => "AE node $node->{level}/$node->{version} with $me->{peer}{id}" );

    if( $io ){
        $io->set_callback('load',  \&_check_load,  $me);
        $io->set_callback('error', \&_check_error, $me);
        $io->start();
    }

}

sub _check_load {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN


    if( @keydata ){
        $me->_check_result_keys( \@keydata );
    }elsif( @nodedata ){
        $me->_check_result_nodes( $maxlev, \@nodedata );
    }

    $me->_next_step();
}

sub _check_error {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

    verbose("AE check error with $me->{peer}{id} map $me->{map} ($io->{info})");
    $me->_next_step();
}

sub _check_result_keys {
    my $me  = shift;
    my $chk = shift;

    my %vscnt;
    my %vsadd;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

    } );

    # connect + send
    debug("sending to $peer->{id}");
    my $io = AC::Yenta::Kibitz::Store::Client->new($peer->{ip}, undef, $request,
                                                  info => "AE getkv from $peer->{id}" );

    if( $io ){
        $me->{kvfetching} ++;
        $io->set_callback('load',  \&_getkv_load,  $me, $retry, $get);
        $io->set_callback('error', \&_getkv_error, $me, $retry, $get);
        $io->start();
    }
}

sub _getkv_load {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

lib/AC/Yenta/Store/AE.pm  view on Meta::CPAN

                                     $d->{value}, $file, $d->{meta} );

    }

    push @{$me->{kvneedorig}}, (values %need) if $retry;

    $me->_next_get_kv();

}

sub _getkv_error {
    my $io    = shift;
    my $evt   = shift;
    my $me    = shift;
    my $retry = shift;
    my $get   = shift;

    $me->{kvfetching} --;

    if( $retry ){
        push @{$me->{kvneedorig}}, @$get;

lib/AC/Yenta/Store/BDBI.pm  view on Meta::CPAN


    my $MAX = 100;
    my $max = $MAX;

    while( !$end || ($k lt $e) ){
        debug("range $k");
        last unless $k =~ m|$map/$sub/|;
        $k =~ s|$map/$sub/||;
        push @k, { k => $k, v => $v };
        my $r = $cursor->c_get($k, $v, DB_NEXT);
        last if $r;	# error

        # cursor locks the db
        # close+recreate so other processes can proceed
        unless( $max -- ){
            $cursor->c_close();
            $me->_finish();
            sleep 0;
            $me->_start();
            $cursor = $me->{db}->db_cursor();
            $cursor->c_get($k, $v, DB_SET);

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

    }, \$ect );

    # connect + send
    my $io = AC::Yenta::Kibitz::Store::Client->new($addr, undef,
                                                   $request . $ect,
                                                   info => "distrib $me->{info} to $id",
                                                  );

    if( $io ){
        $io->set_callback('load',  \&_onload,  $me, $id, $far);
        $io->set_callback('error', \&_onerror, $me, $id, $far);
        $io->start();
    }else{
        debug("start client failed");
    }
}

sub _onload {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;

lib/AC/Yenta/Store/Distrib.pm  view on Meta::CPAN

            my $n = $me->{ordershift};
            $n = @{$me->{nearsend}} / 2 if $n > @{$me->{nearsend}} / 2;
            shift @{$me->{nearsend}} for (1 .. $n);
            $me->{ordershift} *= 2;
        }
    }

    $me->_start_one($far);
}

sub _onerror {
    my $io  = shift;
    my $evt = shift;
    my $me  = shift;
    my $id  = shift;
    my $far = shift;

    verbose("error distributing $me->{info} to $id");
    # don't need to track anything

    $me->_start_one($far);
}

sub _orderly {
    my $peers = shift;

    my $myself = AC::Yenta::Status->my_server_id();
    my @p = sort {$a cmp $b} @$peers;

lib/AC/Yenta/Store/SQLite.pm  view on Meta::CPAN


    eval {
        for my $sql (split /;/, $initsql){
            $sql =~ s/--\s.*$//gm;              # remove comments
            next unless $sql !~ /^\s*$/;
            _do($db, $sql);
        }
    };
    if(my $e=$@){
        # QQQ?
        problem("error initializing sqlite db: $e");
    }
}

sub _do {
    my $db  = shift;
    my $sql = shift;

    my( $st, $nrow );
    eval {
        debug("sql: $sql");

lib/AC/Yenta/Store/Tokyo.pm  view on Meta::CPAN

        problem("no dbfile specified for '$name'");
        return;
    }

    debug("opening Tokyo DB file=$file");

    my $db = TokyoCabinet::BDB->new();
    my $flags = $conf->{readonly} ? ($db->OREADER | $db->ONOLCK) : ($db->OWRITER | $db->OCREAT);
    if(!$db->open($file, $flags)){
        #my $ecode = $db->ecode();
        #printf STDERR ("open error: %s\n", $db->errmsg($ecode));
        problem("cannot open db file $file");
    }

    # web server will need access
    chmod 0666, $file;

    return bless {
        file	=> $file,
        db	=> $db,
    }, $class;



( run in 1.198 second using v1.01-cache-2.11-cpan-49f99fa48dc )