DR-Tarantool
view release on metacpan or search on metacpan
lib/DR/Tarantool/LLClient.pm view on Meta::CPAN
An internal driver error.
=item error
The request wasn't executed: the server returned an error.
=item ok
Request was executed OK.
=back
=item errstr
If an error occurred, contains error description.
=item code
Contains reply code.
=item req_id
Contains request id.
(see
L<protocol documentation|https://github.com/mailru/tarantool/blob/master/doc/box-protocol.txt>)
=item type
Contains request type
(see
L<protocol documentation|https://github.com/mailru/tarantool/blob/master/doc/box-protocol.txt>)
=item count
Contains the count of returned tuples.
=item tuples
Returned tuples (B<ARRAYREF> of B<ARRAYREF>).
=back
If you use B<NUM> or B<NUM64> field types, values
for these fields need to be packed before they are sent to the
server, and unpacked when received in a response.
This is a low-level driver :)
=cut
package DR::Tarantool::LLClient;
use base qw(DR::Tarantool::AEConnection);
use AnyEvent;
use AnyEvent::Socket;
use Carp;
use Devel::GlobalDestruction;
use File::Spec::Functions 'catfile';
$Carp::Internal{ (__PACKAGE__) }++;
use Scalar::Util 'weaken';
require DR::Tarantool;
use Data::Dumper;
use Time::HiRes ();
my $LE = $] > 5.01 ? '<' : '';
=head2 connect
Creates a connection to L<Tarantool| http://tarantool.org>
DR::Tarantool::LLClient->connect(
host => '127.0.0.1',
port => '33033',
cb => {
my ($tnt) = @_;
...
}
);
=head3 Arguments
=over
=item host & port
Host and port to connect to.
=item reconnect_period
An interval to wait before trying to reconnect after a fatal error or
unsuccessful connect. If the field is defined and is greater than 0, the
driver tries to reconnect to the server after this interval.
B<Important>: the driver does not reconnect after B<the first>
unsuccessful connection. It calls B<callback> instead.
=item reconnect_always
Try to reconnect even after the first unsuccessful connection.
=item cb
Done callback. The callback receives a connection handle
connected to the server or an error string.
=back
=cut
sub connect {
my $class = shift;
my (%opts, $cb);
if (@_ % 2) {
$cb = pop;
%opts = @_;
} else {
%opts = @_;
lib/DR/Tarantool/LLClient.pm view on Meta::CPAN
The driver is connecting to the server.
=item fatal
An attempt to connect was made, but ended up with an error.
If the event loop is running, and B<reconnect_period> option
is set, the driver continues to try to reconnect and update its status.
=back
=cut
sub connection_status {
my ($self) = @_;
return 'ok' if $self->state eq 'connected';
return 'connecting' if $self->state eq 'connecting';
return 'fatal' if $self->state eq 'error';
return 'not_connected';
}
=head2 ping
Ping the server.
$tnt->ping( sub { .. } );
=head3 Arguments
=over
=item a callback
=back
=cut
sub ping :method {
my ($self, $cb) = @_;
my $id = $self->_req_id;
$self->_check_cb( $cb );
my $pkt = DR::Tarantool::_pkt_ping( $id );
if ($self->is_connected) {
$self->_request( $id, $pkt, $cb );
return;
}
unless($self->reconnect_period) {
$cb->({
status => 'fatal',
req_id => $id,
errstr => "Connection isn't established (yet)"
}
);
return;
}
my $this = $self;
weaken $this;
my $tmr;
$tmr = AE::timer $self->reconnect_period, 0, sub {
undef $tmr;
if ($this and $this->is_connected) {
$this->_request( $id, $pkt, $cb );
return;
}
$cb->({
status => 'fatal',
req_id => $id,
errstr => "Connection isn't established (yet)"
}
);
};
}
=head2 insert
Insert a tuple.
$tnt->insert(0, [ 1, 2, 3 ], sub { ... });
$tnt->insert(0, [ 4, 5, 6 ], $flags, sub { .. });
=head3 Arguments
=over
=item space
=item tuple
=item flags (optional)
=item callback
=back
=cut
sub insert :method {
my $self = shift;
$self->_check_number( my $space = shift );
$self->_check_tuple( my $tuple = shift );
$self->_check_cb( my $cb = pop );
$self->_check_number( my $flags = pop || 0 );
croak "insert: tuple must be ARRAYREF" unless ref $tuple eq 'ARRAY';
$flags ||= 0;
my $id = $self->_req_id;
my $pkt = DR::Tarantool::_pkt_insert( $id, $space, $flags, $tuple );
$self->_request( $id, $pkt, $cb );
return;
}
=head2 select
Select a tuple or tuples.
lib/DR/Tarantool/LLClient.pm view on Meta::CPAN
=cut
sub _log_transaction {
my ($self, $id, $pkt, $response, $res_pkt) = @_;
my $logdir = $ENV{TNT_LOG_DIR};
goto DOLOG if $logdir;
$logdir = $ENV{TNT_LOG_ERRDIR};
goto DOLOG if $logdir and $response->{status} ne 'ok';
return;
DOLOG:
eval {
die "Directory $logdir was not found, transaction wasn't logged\n"
unless -d $logdir;
my $now = Time::HiRes::time;
my $logdirname = catfile $logdir,
sprintf '%s-%s', $now, $response->{status};
die "Object $logdirname is already exists, transaction wasn't logged\n"
if -e $logdirname or -d $logdirname;
die $! unless mkdir $logdirname;
my $rrname = catfile $logdirname,
sprintf 'rawrequest-%04d.bin', $id;
open my $fh, '>:raw', $rrname or die "Can't open $rrname: $!\n";
print $fh $pkt;
close $fh;
my $respname = catfile $logdirname,
sprintf 'dumpresponse-%04d.txt', $id;
open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
local $Data::Dumper::Indent = 1;
local $Data::Dumper::Terse = 1;
local $Data::Dumper::Useqq = 1;
local $Data::Dumper::Deepcopy = 1;
local $Data::Dumper::Maxdepth = 0;
print $fh Dumper($response);
close $fh;
if (defined $res_pkt) {
$respname = catfile $logdirname,
sprintf 'rawresponse-%04d.bin', $id;
open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
print $fh $res_pkt;
close $fh;
}
};
warn $@ if $@;
}
sub _request {
my ($self, $id, $pkt, $cb ) = @_;
# Scalar::Util::weaken $self;
my $cbres = $cb;
$cbres = sub { $self->_log_transaction($id, $pkt, @_); &$cb }
if $ENV{TNT_LOG_ERRDIR} or $ENV{TNT_LOG_DIR};
$self->{ wait }{ $id } = $cbres;
$self->push_write($pkt);
}
sub _req_id {
my ($self) = @_;
for (my $id = $self->{req_id} || 0;; $id++) {
$id = 0 unless $id < 0x7FFF_FFFF;
next if exists $self->{wait}{$id};
$self->{req_id} = $id + 1;
return $id;
}
}
sub _fatal_error {
my ($self, $msg, $raw) = @_;
$self->{last_code} ||= -1;
$self->{last_error_string} ||= $msg;
delete $self->{fh};
$self->{wbuf} = '';
my $wait = delete $self->{wait};
$self->{wait} = {};
for (keys %$wait) {
my $cb = delete $wait->{$_};
$cb->({ status => 'fatal', errstr => $msg, req_id => $_ }, $raw);
}
$self->set_error($msg) if $self->state ne 'error';
}
sub _check_rbuf {{
my ($self) = @_;
return unless length $self->{rbuf} >= 12;
my (undef, $blen) = unpack "L$LE L$LE", $self->{rbuf};
return unless length $self->{rbuf} >= 12 + $blen;
my $pkt = substr $self->{rbuf}, 0, 12 + $blen, '';
my $res = DR::Tarantool::_pkt_parse_response( $pkt );
$self->{last_code} = $res->{code};
if (exists $res->{errstr}) {
$self->{last_error_string} = $res->{errstr};
} else {
delete $self->{last_error_string};
}
if ($res->{status} =~ /^(fatal|buffer)$/) {
$self->_fatal_error( $res->{errstr}, $pkt );
( run in 1.901 second using v1.01-cache-2.11-cpan-39bf76dae61 )