AnyEvent-Redis-RipeRedis
view release on metacpan or search on metacpan
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
use 5.008000;
use strict;
use warnings;
package AnyEvent::Redis::RipeRedis;
use base qw( Exporter );
our $VERSION = '1.62';
use AnyEvent;
use AnyEvent::Handle;
use Encode qw( find_encoding is_utf8 );
use Scalar::Util qw( looks_like_number weaken );
use Digest::SHA qw( sha1_hex );
use Carp qw( croak );
my %ERROR_CODES;
BEGIN {
%ERROR_CODES = (
E_CANT_CONN => 1,
E_LOADING_DATASET => 2,
E_IO => 3,
E_CONN_CLOSED_BY_REMOTE_HOST => 4,
E_CONN_CLOSED_BY_CLIENT => 5,
E_NO_CONN => 6,
E_OPRN_ERROR => 9,
E_UNEXPECTED_DATA => 10,
E_NO_SCRIPT => 11,
E_READ_TIMEDOUT => 12,
E_BUSY => 13,
E_MASTER_DOWN => 14,
E_MISCONF => 15,
E_READONLY => 16,
E_OOM => 17,
E_EXEC_ABORT => 18,
E_NO_AUTH => 19,
E_WRONG_TYPE => 20,
E_NO_REPLICAS => 21,
E_BUSY_KEY => 22,
E_CROSS_SLOT => 23,
E_TRY_AGAIN => 24,
E_ASK => 25,
E_MOVED => 26,
E_CLUSTER_DOWN => 27,
);
}
BEGIN {
our @EXPORT_OK = keys %ERROR_CODES;
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK, );
}
use constant {
# Default values
D_HOST => 'localhost',
D_PORT => 6379,
D_DB_INDEX => 0,
%ERROR_CODES,
# Operation status
S_NEED_PERFORM => 1,
S_IN_PROGRESS => 2,
S_IS_DONE => 3,
# String terminator
EOL => "\r\n",
EOL_LEN => 2,
};
my %SUB_CMDS = (
subscribe => 1,
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
$self->_execute_cmd( $cmd );
return;
},
}
foreach my $name ( qw( connection_timeout read_timeout min_reconnect_interval ) ) {
*{$name} = sub {
my $self = shift;
if ( @_ ) {
my $seconds = shift;
if ( defined $seconds
&& ( !looks_like_number($seconds) || $seconds < 0 ) )
{
croak "\"$name\" must be a positive number";
}
$self->{$name} = $seconds;
}
return $self->{$name};
}
}
foreach my $name ( qw( reconnect on_connect on_disconnect on_connect_error ) ) {
*{$name} = sub {
my $self = shift;
if ( @_ ) {
$self->{$name} = shift;
}
return $self->{$name};
}
}
}
sub _connect {
my $self = shift;
$self->{_handle} = AnyEvent::Handle->new(
%{ $self->{handle_params} },
connect => [ $self->{host}, $self->{port} ],
on_prepare => $self->_get_on_prepare(),
on_connect => $self->_get_on_connect(),
on_connect_error => $self->_get_on_connect_error(),
on_rtimeout => $self->_get_on_rtimeout(),
on_eof => $self->_get_on_eof(),
on_error => $self->_get_handle_on_error(),
on_read => $self->_get_on_read(),
);
return;
}
sub _get_on_prepare {
my $self = shift;
weaken( $self );
return sub {
if ( defined $self->{connection_timeout} ) {
return $self->{connection_timeout};
}
return;
};
}
sub _get_on_connect {
my $self = shift;
weaken( $self );
return sub {
$self->{_connected} = 1;
unless ( defined $self->{password} ) {
$self->{_auth_st} = S_IS_DONE;
}
if ( $self->{database} == 0 ) {
$self->{_select_db_st} = S_IS_DONE;
}
if ( $self->{_auth_st} == S_NEED_PERFORM ) {
$self->_auth();
}
elsif ( $self->{_select_db_st} == S_NEED_PERFORM ) {
$self->_select_db();
}
else {
$self->{_ready_to_write} = 1;
$self->_flush_input_queue();
}
if ( defined $self->{on_connect} ) {
$self->{on_connect}->();
}
};
}
sub _get_on_connect_error {
my $self = shift;
weaken( $self );
return sub {
my $err_msg = pop;
$self->_disconnect(
"Can't connect to $self->{host}:$self->{port}: $err_msg",
E_CANT_CONN
);
};
}
sub _get_on_rtimeout {
my $self = shift;
weaken( $self );
return sub {
if ( @{ $self->{_processing_queue} } ) {
$self->_disconnect( 'Read timed out.', E_READ_TIMEDOUT );
}
else {
$self->{_handle}->rtimeout( undef );
}
};
}
sub _get_on_eof {
my $self = shift;
weaken( $self );
return sub {
$self->_disconnect( 'Connection closed by remote host.',
E_CONN_CLOSED_BY_REMOTE_HOST );
};
}
sub _get_handle_on_error {
my $self = shift;
weaken( $self );
return sub {
my $err_msg = pop;
$self->_disconnect( $err_msg, E_IO );
};
}
sub _get_on_read {
my $self = shift;
weaken( $self );
my $str_len;
my @bufs;
my $bufs_num = 0;
return sub {
my $handle = shift;
MAIN: while ( 1 ) {
if ( $handle->destroyed() ) {
return;
}
my $reply;
my $err_code;
if ( defined $str_len ) {
if ( length( $handle->{rbuf} ) < $str_len + EOL_LEN ) {
return;
}
$reply = substr( $handle->{rbuf}, 0, $str_len, '' );
substr( $handle->{rbuf}, 0, EOL_LEN, '' );
if ( defined $self->{encoding} ) {
$reply = $self->{encoding}->decode( $reply );
}
undef $str_len;
}
else {
my $eol_pos = index( $handle->{rbuf}, EOL );
if ( $eol_pos < 0 ) {
return;
}
$reply = substr( $handle->{rbuf}, 0, $eol_pos, '' );
my $type = substr( $reply, 0, 1, '' );
substr( $handle->{rbuf}, 0, EOL_LEN, '' );
if ( $type ne '+' && $type ne ':' ) {
if ( $type eq '$' ) {
if ( $reply >= 0 ) {
$str_len = $reply;
next;
}
undef $reply;
}
elsif ( $type eq '*' ) {
if ( $reply > 0 ) {
push( @bufs,
{ data => [],
err_code => undef,
chunks_cnt => $reply,
}
);
$bufs_num++;
next;
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
}
);
}
}
else {
$self->_connect();
}
}
else {
AE::postpone(
sub {
$self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted:"
. ' No connection to the server.', E_NO_CONN );
}
);
return;
}
push( @{ $self->{_input_queue} }, $cmd );
return;
}
$self->_push_write( $cmd );
return;
}
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_str = '';
foreach my $token ( $cmd->{kwd}, @{ $cmd->{args} } ) {
unless ( defined $token ) {
$token = '';
}
elsif ( defined $self->{encoding} && is_utf8( $token ) ) {
$token = $self->{encoding}->encode( $token );
}
$cmd_str .= '$' . length( $token ) . EOL . $token . EOL;
}
$cmd_str = '*' . ( scalar( @{ $cmd->{args} } ) + 1 ) . EOL . $cmd_str;
my $handle = $self->{_handle};
if ( defined $self->{read_timeout} && !@{ $self->{_processing_queue} } ) {
$handle->rtimeout_reset();
$handle->rtimeout( $self->{read_timeout} );
}
push( @{ $self->{_processing_queue} }, $cmd );
$handle->push_write( $cmd_str );
return;
}
sub _auth {
my $self = shift;
weaken( $self );
$self->{_auth_st} = S_IN_PROGRESS;
$self->_push_write(
{ kwd => 'auth',
args => [ $self->{password} ],
on_done => sub {
$self->{_auth_st} = S_IS_DONE;
if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
$self->_select_db();
}
else {
$self->{_ready_to_write} = 1;
$self->_flush_input_queue();
}
},
on_error => sub {
$self->{_auth_st} = S_NEED_PERFORM;
$self->_abort_all( @_ );
},
}
);
return;
}
sub _select_db {
my $self = shift;
weaken( $self );
$self->{_select_db_st} = S_IN_PROGRESS;
$self->_push_write(
{ kwd => 'select',
args => [ $self->{database} ],
on_done => sub {
$self->{_select_db_st} = S_IS_DONE;
$self->{_ready_to_write} = 1;
$self->_flush_input_queue();
},
on_error => sub {
$self->{_select_db_st} = S_NEED_PERFORM;
$self->_abort_all( @_ );
},
}
);
return;
}
sub _flush_input_queue {
my $self = shift;
$self->{_temp_queue} = $self->{_input_queue};
$self->{_input_queue} = [];
while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
$self->_push_write( $cmd );
}
return;
}
sub _process_reply {
my $self = shift;
my $reply = shift;
my $err_code = shift;
if ( defined $err_code ) {
my $cmd = shift @{ $self->{_processing_queue} };
unless ( defined $cmd ) {
$self->_disconnect(
"Don't know how process error message. Processing queue is empty.",
E_UNEXPECTED_DATA,
);
return;
}
$self->_process_cmd_error( $cmd, ref($reply)
? ( "Operation \"$cmd->{kwd}\" completed with errors.",
$err_code, $reply )
: $reply, $err_code );
}
elsif ( $self->{_channel_cnt} > 0
&& ref( $reply ) && exists $MSG_TYPES{ $reply->[0] } )
( run in 1.675 second using v1.01-cache-2.11-cpan-5735350b133 )