AnyEvent-Redis-RipeRedis
view release on metacpan or search on metacpan
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
use 5.008000;
use strict;
use warnings;
package AnyEvent::Redis::RipeRedis;
use base qw( Exporter );
our $VERSION = '1.62';
use AnyEvent;
use AnyEvent::Handle;
use Encode qw( find_encoding is_utf8 );
use Scalar::Util qw( looks_like_number weaken );
use Digest::SHA qw( sha1_hex );
use Carp qw( croak );
my %ERROR_CODES;
BEGIN {
%ERROR_CODES = (
E_CANT_CONN => 1,
E_LOADING_DATASET => 2,
E_IO => 3,
E_CONN_CLOSED_BY_REMOTE_HOST => 4,
E_CONN_CLOSED_BY_CLIENT => 5,
E_NO_CONN => 6,
E_OPRN_ERROR => 9,
E_UNEXPECTED_DATA => 10,
E_NO_SCRIPT => 11,
E_READ_TIMEDOUT => 12,
E_BUSY => 13,
E_MASTER_DOWN => 14,
E_MISCONF => 15,
E_READONLY => 16,
E_OOM => 17,
E_EXEC_ABORT => 18,
E_NO_AUTH => 19,
E_WRONG_TYPE => 20,
E_NO_REPLICAS => 21,
E_BUSY_KEY => 22,
E_CROSS_SLOT => 23,
E_TRY_AGAIN => 24,
E_ASK => 25,
E_MOVED => 26,
E_CLUSTER_DOWN => 27,
);
}
BEGIN {
our @EXPORT_OK = keys %ERROR_CODES;
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK, );
}
use constant {
# Default values
D_HOST => 'localhost',
D_PORT => 6379,
D_DB_INDEX => 0,
%ERROR_CODES,
# Operation status
S_NEED_PERFORM => 1,
S_IN_PROGRESS => 2,
S_IS_DONE => 3,
# String terminator
EOL => "\r\n",
EOL_LEN => 2,
};
my %SUB_CMDS = (
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
$self->_select_db();
}
}
elsif ( $self->{_auth_st} == S_NEED_PERFORM ) {
$self->_auth();
}
}
}
elsif ( $self->{_lazy_conn_st} ) {
$self->{_lazy_conn_st} = 0;
$self->_connect();
}
elsif ( $self->{reconnect} ) {
if ( defined $self->{min_reconnect_interval}
&& $self->{min_reconnect_interval} > 0 )
{
unless ( defined $self->{_reconnect_timer} ) {
$self->{_reconnect_timer} = AE::timer( $self->{min_reconnect_interval}, 0,
sub {
undef $self->{_reconnect_timer};
$self->_connect();
}
);
}
}
else {
$self->_connect();
}
}
else {
AE::postpone(
sub {
$self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted:"
. ' No connection to the server.', E_NO_CONN );
}
);
return;
}
push( @{ $self->{_input_queue} }, $cmd );
return;
}
$self->_push_write( $cmd );
return;
}
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_str = '';
foreach my $token ( $cmd->{kwd}, @{ $cmd->{args} } ) {
unless ( defined $token ) {
$token = '';
}
elsif ( defined $self->{encoding} && is_utf8( $token ) ) {
$token = $self->{encoding}->encode( $token );
}
$cmd_str .= '$' . length( $token ) . EOL . $token . EOL;
}
$cmd_str = '*' . ( scalar( @{ $cmd->{args} } ) + 1 ) . EOL . $cmd_str;
my $handle = $self->{_handle};
if ( defined $self->{read_timeout} && !@{ $self->{_processing_queue} } ) {
$handle->rtimeout_reset();
$handle->rtimeout( $self->{read_timeout} );
}
push( @{ $self->{_processing_queue} }, $cmd );
$handle->push_write( $cmd_str );
return;
}
sub _auth {
my $self = shift;
weaken( $self );
$self->{_auth_st} = S_IN_PROGRESS;
$self->_push_write(
{ kwd => 'auth',
args => [ $self->{password} ],
on_done => sub {
$self->{_auth_st} = S_IS_DONE;
if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
$self->_select_db();
}
else {
$self->{_ready_to_write} = 1;
$self->_flush_input_queue();
}
},
on_error => sub {
$self->{_auth_st} = S_NEED_PERFORM;
$self->_abort_all( @_ );
},
}
);
return;
}
sub _select_db {
my $self = shift;
weaken( $self );
$self->{_select_db_st} = S_IN_PROGRESS;
$self->_push_write(
{ kwd => 'select',
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
$redis->get( 'bar',
{ on_done => sub {
my $reply = shift;
print "$reply\n";
},
on_error => sub {
my $err_msg = shift;
my $err_code = shift;
warn "[$err_code] $err_msg\n";
},
}
);
$redis->quit(
sub {
my $reply = shift;
if (@_) {
my $err_msg = shift;
my $err_code = shift;
warn "[$err_code] $err_msg\n";
}
$cv->send();
}
);
$cv->recv();
=head1 DESCRIPTION
MODULE IS DEPRECATED. Use L<AnyEvent::RipeRedis> instead. The interface of
L<AnyEvent::RipeRedis> has several differences from interface of
AnyEvent::Redis::RipeRedis. For more information see documentation.
AnyEvent::Redis::RipeRedis is the flexible non-blocking Redis client with
reconnect feature. The client supports subscriptions, transactions and connection
via UNIX-socket.
Requires Redis 1.2 or higher, and any supported event loop.
=head1 CONSTRUCTOR
=head2 new( %params )
my $redis = AnyEvent::Redis::RipeRedis->new(
host => 'localhost',
port => 6379,
password => 'yourpass',
database => 7,
lazy => 1,
connection_timeout => 5,
read_timeout => 5,
reconnect => 1,
min_reconnect_interval => 5,
encoding => 'utf8',
on_connect => sub {
# handling...
},
on_disconnect => sub {
# handling...
},
on_connect_error => sub {
my $err_msg = shift;
# error handling...
},
on_error => sub {
my $err_msg = shift;
my $err_code = shift;
# error handling...
},
);
=over
=item host => $host
Server hostname (default: 127.0.0.1)
=item port => $port
Server port (default: 6379)
=item password => $password
If the password is specified, the C<AUTH> command is sent to the server
after connection.
=item database => $index
Database index. If the index is specified, the client is switched to
the specified database after connection. You can also switch to another database
after connection by using C<SELECT> command. The client remembers last selected
database after reconnection.
The default database index is C<0>.
=item encoding => $encoding_name
Used for encode/decode strings at time of input/output operations.
Not set by default.
=item connection_timeout => $fractional_seconds
Specifies connection timeout. If the client could not connect to the server
after specified timeout, the C<on_error> or C<on_connect_error> callback is
called. In case when C<on_error> callback is called, C<E_CANT_CONN> error code
is passed to callback in the second argument. The timeout specifies in seconds
and can contain a fractional part.
( run in 0.738 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )