DR-Tarantool

 view release on metacpan or  search on metacpan

lib/DR/Tarantool/LLClient.pm  view on Meta::CPAN


An internal driver error.

=item error

The request wasn't executed: the server returned an error.

=item ok

Request was executed OK.

=back

=item errstr

If an error occurred, contains error description.

=item code

Contains reply code.

=item req_id

Contains request id.
(see
L<protocol documentation|https://github.com/mailru/tarantool/blob/master/doc/box-protocol.txt>)

=item type

Contains request type
(see
L<protocol documentation|https://github.com/mailru/tarantool/blob/master/doc/box-protocol.txt>)

=item count

Contains the count of returned tuples.

=item tuples

Returned tuples (B<ARRAYREF> of B<ARRAYREF>).

=back

If you use B<NUM> or B<NUM64> field types, values
for these fields need to be packed before they are sent to the
server, and unpacked when received in a response.
This is a low-level driver :)

=cut


package DR::Tarantool::LLClient;
use base qw(DR::Tarantool::AEConnection);
use AnyEvent;
use AnyEvent::Socket;
use Carp;
use Devel::GlobalDestruction;
use File::Spec::Functions 'catfile';
$Carp::Internal{ (__PACKAGE__) }++;

use Scalar::Util 'weaken';
require DR::Tarantool;
use Data::Dumper;
use Time::HiRes ();

my $LE = $] > 5.01 ? '<' : '';


=head2 connect

Creates a connection to L<Tarantool| http://tarantool.org>

    DR::Tarantool::LLClient->connect(
        host => '127.0.0.1',
        port => '33033',
        cb   => {
            my ($tnt) = @_;
            ...
        }
    );

=head3 Arguments

=over

=item host & port

Host and port to connect to.

=item reconnect_period

An interval to wait before trying to reconnect after a fatal error or
unsuccessful connect. If the field is defined and is greater than 0, the
driver tries to reconnect to the server after this interval.

B<Important>: the driver does not reconnect after B<the first>
unsuccessful connection. It calls B<callback> instead.

=item reconnect_always

Try to reconnect even after the first unsuccessful connection.

=item cb

Done callback. The callback receives a connection handle
connected to the server or an error string.

=back

=cut

sub connect {
    my $class = shift;

    my (%opts, $cb);

    if (@_ % 2) {
        $cb = pop;
        %opts = @_;
    } else {
        %opts = @_;

lib/DR/Tarantool/LLClient.pm  view on Meta::CPAN


The driver is connecting to the server.

=item fatal

An attempt to connect was made, but ended up with an error. 
If the event loop is running, and B<reconnect_period> option
is set, the driver continues to try to reconnect and update its status.

=back

=cut

sub connection_status {
    my ($self) = @_;
    return 'ok'         if $self->state eq 'connected';
    return 'connecting' if $self->state eq 'connecting';
    return 'fatal'      if $self->state eq 'error';
    return 'not_connected';
}


=head2 ping

Ping the server.

    $tnt->ping( sub { .. } );

=head3 Arguments

=over

=item a callback

=back

=cut

sub ping :method {
    my ($self, $cb) = @_;
    my $id = $self->_req_id;
    $self->_check_cb( $cb );
    my $pkt = DR::Tarantool::_pkt_ping( $id );

    if ($self->is_connected) {
        $self->_request( $id, $pkt, $cb );
        return;
    }
    
    unless($self->reconnect_period) {
        $cb->({
                status  => 'fatal',
                req_id  => $id,
                errstr  => "Connection isn't established (yet)"
            }
        );
        return;
    }

    my $this = $self;
    weaken $this;

    my $tmr;
    $tmr = AE::timer $self->reconnect_period, 0, sub {
        undef $tmr;
        if ($this and $this->is_connected) {
            $this->_request( $id, $pkt, $cb );
            return;
        }
        $cb->({
                status  => 'fatal',
                req_id  => $id,
                errstr  => "Connection isn't established (yet)"
            }
        );
    };
}


=head2 insert

Insert a tuple.

    $tnt->insert(0, [ 1, 2, 3 ], sub { ... });
    $tnt->insert(0, [ 4, 5, 6 ], $flags, sub { .. });

=head3 Arguments

=over

=item space

=item tuple

=item flags (optional)

=item callback

=back

=cut

sub insert :method {

    my $self = shift;
    $self->_check_number(   my $space = shift       );
    $self->_check_tuple(    my $tuple = shift       );
    $self->_check_cb(       my $cb = pop            );
    $self->_check_number(   my $flags = pop || 0    );
    croak "insert: tuple must be ARRAYREF" unless ref $tuple eq 'ARRAY';
    $flags ||= 0;

    my $id = $self->_req_id;
    my $pkt = DR::Tarantool::_pkt_insert( $id, $space, $flags, $tuple );
    $self->_request( $id, $pkt, $cb );
    return;
}

=head2 select

Select a tuple or tuples.

lib/DR/Tarantool/LLClient.pm  view on Meta::CPAN

=cut


sub _log_transaction {
    my ($self, $id, $pkt, $response, $res_pkt) = @_;

    my $logdir = $ENV{TNT_LOG_DIR};
    goto DOLOG if $logdir;
    $logdir = $ENV{TNT_LOG_ERRDIR};
    goto DOLOG if $logdir and $response->{status} ne 'ok';
    return;

    DOLOG:
    eval {
        die "Directory $logdir was not found, transaction wasn't logged\n"
            unless -d $logdir;

        my $now = Time::HiRes::time;

        my $logdirname = catfile $logdir,
            sprintf '%s-%s', $now, $response->{status};

        die "Object $logdirname is already exists, transaction wasn't logged\n"
            if -e $logdirname or -d $logdirname;
        
        die $! unless mkdir $logdirname;
       
        my $rrname = catfile $logdirname, 
            sprintf 'rawrequest-%04d.bin', $id;
        open my $fh, '>:raw', $rrname or die "Can't open $rrname: $!\n";
        print $fh $pkt;
        close $fh;

        my $respname = catfile $logdirname,
            sprintf 'dumpresponse-%04d.txt', $id;

        open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
        
        local $Data::Dumper::Indent = 1;
        local $Data::Dumper::Terse = 1;
        local $Data::Dumper::Useqq = 1;
        local $Data::Dumper::Deepcopy = 1;
        local $Data::Dumper::Maxdepth = 0;
        print $fh Dumper($response);
        close $fh;

        if (defined $res_pkt) {
            $respname = catfile $logdirname,
                sprintf 'rawresponse-%04d.bin', $id;
            open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
            print $fh $res_pkt;
            close $fh;
        }
    };
    warn $@ if $@;
}


sub _request {
    my ($self, $id, $pkt, $cb ) = @_;
#     Scalar::Util::weaken $self;
  
    my $cbres = $cb;
    $cbres = sub { $self->_log_transaction($id, $pkt, @_); &$cb }
        if $ENV{TNT_LOG_ERRDIR} or $ENV{TNT_LOG_DIR};

    $self->{ wait }{ $id } = $cbres;

    $self->push_write($pkt);
}

sub _req_id {
    my ($self) = @_;
    for (my $id = $self->{req_id} || 0;; $id++) {
        $id = 0 unless $id < 0x7FFF_FFFF;
        next if exists $self->{wait}{$id};
        $self->{req_id} = $id + 1;
        return $id;
    }
}

sub _fatal_error {
    my ($self, $msg, $raw) = @_;

    $self->{last_code} ||= -1;
    $self->{last_error_string} ||= $msg;

    delete $self->{fh};
    $self->{wbuf} = '';

    my $wait = delete $self->{wait};
    $self->{wait} = {};
    for (keys %$wait) {
        my $cb = delete $wait->{$_};
        $cb->({ status  => 'fatal',  errstr  => $msg, req_id => $_ }, $raw);
    }

    $self->set_error($msg) if $self->state ne 'error';
}


sub _check_rbuf {{
    my ($self) = @_;
    return unless length $self->{rbuf} >= 12;
    my (undef, $blen) = unpack "L$LE L$LE", $self->{rbuf};
    return unless length $self->{rbuf} >= 12 + $blen;
    

    my $pkt = substr $self->{rbuf}, 0, 12 + $blen, '';

    my $res = DR::Tarantool::_pkt_parse_response( $pkt );

    $self->{last_code} = $res->{code};
    if (exists $res->{errstr}) {
        $self->{last_error_string} = $res->{errstr};
    } else {
        delete $self->{last_error_string};
    }

    if ($res->{status} =~ /^(fatal|buffer)$/) {
        $self->_fatal_error( $res->{errstr}, $pkt );



( run in 1.901 second using v1.01-cache-2.11-cpan-39bf76dae61 )