AnyEvent-Redis-RipeRedis
view release on metacpan or search on metacpan
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
# String terminator
EOL => "\r\n",
EOL_LEN => 2,
};
my %SUB_CMDS = (
subscribe => 1,
psubscribe => 1,
);
my %SUBUNSUB_CMDS = (
%SUB_CMDS,
unsubscribe => 1,
punsubscribe => 1,
);
my %NEED_POSTPROCESS = (
%SUBUNSUB_CMDS,
info => 1,
select => 1,
quit => 1,
);
my %MSG_TYPES = (
message => 1,
pmessage => 1,
);
my %ERR_PREFS_MAP = (
LOADING => E_LOADING_DATASET,
NOSCRIPT => E_NO_SCRIPT,
BUSY => E_BUSY,
MASTERDOWN => E_MASTER_DOWN,
MISCONF => E_MISCONF,
READONLY => E_READONLY,
OOM => E_OOM,
EXECABORT => E_EXEC_ABORT,
NOAUTH => E_NO_AUTH,
WRONGTYPE => E_WRONG_TYPE,
NOREPLICAS => E_NO_REPLICAS,
BUSYKEY => E_BUSY_KEY,
CROSSSLOT => E_CROSS_SLOT,
TRYAGAIN => E_TRY_AGAIN,
ASK => E_ASK,
MOVED => E_MOVED,
CLUSTERDOWN => E_CLUSTER_DOWN,
);
my %EVAL_CACHE;
# Constructor
sub new {
my $proto = shift;
my %params = @_;
my $self = ref( $proto ) ? $proto : bless {}, $proto;
$self->{host} = $params{host} || D_HOST;
$self->{port} = $params{port} || D_PORT;
$self->{password} = $params{password};
$self->{database}
= defined $params{database} ? $params{database} : D_DB_INDEX;
$self->{reconnect} = exists $params{reconnect} ? $params{reconnect} : 1;
$self->{on_connect} = $params{on_connect};
$self->{on_disconnect} = $params{on_disconnect};
$self->{on_connect_error} = $params{on_connect_error};
$self->encoding( $params{encoding} );
$self->connection_timeout( $params{connection_timeout} );
$self->read_timeout( $params{read_timeout} );
$self->min_reconnect_interval( $params{min_reconnect_interval} );
$self->on_error( $params{on_error} );
my $hdl_params = $params{handle_params} || {};
foreach my $name ( qw( linger autocork ) ) {
if ( !exists $hdl_params->{$name} && defined $params{$name} ) {
$hdl_params->{$name} = $params{$name};
}
}
$self->{handle_params} = $hdl_params;
$self->{_handle} = undef;
$self->{_connected} = 0;
$self->{_lazy_conn_st} = $params{lazy};
$self->{_auth_st} = S_NEED_PERFORM;
$self->{_select_db_st} = S_NEED_PERFORM;
$self->{_ready_to_write} = 0;
$self->{_input_queue} = [];
$self->{_temp_queue} = [];
$self->{_processing_queue} = [];
$self->{_txn_lock} = 0;
$self->{_channels} = {};
$self->{_channel_cnt} = 0;
$self->{_reconnect_timer} = undef;
unless ( $self->{_lazy_conn_st} ) {
$self->_connect();
}
return $self;
}
sub multi {
my $self = shift;
my $cmd = $self->_prepare_cmd( 'multi', [ @_ ] );
$self->{_txn_lock} = 1;
$self->_execute_cmd( $cmd );
return;
}
sub exec {
my $self = shift;
my $cmd = $self->_prepare_cmd( 'exec', [ @_ ] );
$self->{_txn_lock} = 0;
$self->_execute_cmd( $cmd );
return;
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
$self->{$name} = $seconds;
}
return $self->{$name};
}
}
foreach my $name ( qw( reconnect on_connect on_disconnect on_connect_error ) ) {
*{$name} = sub {
my $self = shift;
if ( @_ ) {
$self->{$name} = shift;
}
return $self->{$name};
}
}
}
sub _connect {
my $self = shift;
$self->{_handle} = AnyEvent::Handle->new(
%{ $self->{handle_params} },
connect => [ $self->{host}, $self->{port} ],
on_prepare => $self->_get_on_prepare(),
on_connect => $self->_get_on_connect(),
on_connect_error => $self->_get_on_connect_error(),
on_rtimeout => $self->_get_on_rtimeout(),
on_eof => $self->_get_on_eof(),
on_error => $self->_get_handle_on_error(),
on_read => $self->_get_on_read(),
);
return;
}
sub _get_on_prepare {
my $self = shift;
weaken( $self );
return sub {
if ( defined $self->{connection_timeout} ) {
return $self->{connection_timeout};
}
return;
};
}
sub _get_on_connect {
my $self = shift;
weaken( $self );
return sub {
$self->{_connected} = 1;
unless ( defined $self->{password} ) {
$self->{_auth_st} = S_IS_DONE;
}
if ( $self->{database} == 0 ) {
$self->{_select_db_st} = S_IS_DONE;
}
if ( $self->{_auth_st} == S_NEED_PERFORM ) {
$self->_auth();
}
elsif ( $self->{_select_db_st} == S_NEED_PERFORM ) {
$self->_select_db();
}
else {
$self->{_ready_to_write} = 1;
$self->_flush_input_queue();
}
if ( defined $self->{on_connect} ) {
$self->{on_connect}->();
}
};
}
sub _get_on_connect_error {
my $self = shift;
weaken( $self );
return sub {
my $err_msg = pop;
$self->_disconnect(
"Can't connect to $self->{host}:$self->{port}: $err_msg",
E_CANT_CONN
);
};
}
sub _get_on_rtimeout {
my $self = shift;
weaken( $self );
return sub {
if ( @{ $self->{_processing_queue} } ) {
$self->_disconnect( 'Read timed out.', E_READ_TIMEDOUT );
}
else {
$self->{_handle}->rtimeout( undef );
}
};
}
sub _get_on_eof {
my $self = shift;
weaken( $self );
return sub {
$self->_disconnect( 'Connection closed by remote host.',
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
}
}
else {
AE::postpone(
sub {
$self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted:"
. ' No connection to the server.', E_NO_CONN );
}
);
return;
}
push( @{ $self->{_input_queue} }, $cmd );
return;
}
$self->_push_write( $cmd );
return;
}
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_str = '';
foreach my $token ( $cmd->{kwd}, @{ $cmd->{args} } ) {
unless ( defined $token ) {
$token = '';
}
elsif ( defined $self->{encoding} && is_utf8( $token ) ) {
$token = $self->{encoding}->encode( $token );
}
$cmd_str .= '$' . length( $token ) . EOL . $token . EOL;
}
$cmd_str = '*' . ( scalar( @{ $cmd->{args} } ) + 1 ) . EOL . $cmd_str;
my $handle = $self->{_handle};
if ( defined $self->{read_timeout} && !@{ $self->{_processing_queue} } ) {
$handle->rtimeout_reset();
$handle->rtimeout( $self->{read_timeout} );
}
push( @{ $self->{_processing_queue} }, $cmd );
$handle->push_write( $cmd_str );
return;
}
sub _auth {
my $self = shift;
weaken( $self );
$self->{_auth_st} = S_IN_PROGRESS;
$self->_push_write(
{ kwd => 'auth',
args => [ $self->{password} ],
on_done => sub {
$self->{_auth_st} = S_IS_DONE;
if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
$self->_select_db();
}
else {
$self->{_ready_to_write} = 1;
$self->_flush_input_queue();
}
},
on_error => sub {
$self->{_auth_st} = S_NEED_PERFORM;
$self->_abort_all( @_ );
},
}
);
return;
}
sub _select_db {
my $self = shift;
weaken( $self );
$self->{_select_db_st} = S_IN_PROGRESS;
$self->_push_write(
{ kwd => 'select',
args => [ $self->{database} ],
on_done => sub {
$self->{_select_db_st} = S_IS_DONE;
$self->{_ready_to_write} = 1;
$self->_flush_input_queue();
},
on_error => sub {
$self->{_select_db_st} = S_NEED_PERFORM;
$self->_abort_all( @_ );
},
}
);
return;
}
sub _flush_input_queue {
my $self = shift;
$self->{_temp_queue} = $self->{_input_queue};
$self->{_input_queue} = [];
while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
$self->_push_write( $cmd );
}
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
@{ $self->{_processing_queue} },
@{ $self->{_temp_queue} },
@{ $self->{_input_queue} },
);
foreach my $cmd ( @unfin_cmds ) {
warn "Operation \"$cmd->{kwd}\" aborted:"
. " Client object destroyed prematurely.\n";
}
}
return;
}
package AnyEvent::Redis::RipeRedis::Error;
# Constructor
sub new {
my $class = shift;
my $err_msg = shift;
my $err_code = shift;
my $self = bless {}, $class;
$self->{message} = $err_msg;
$self->{code} = $err_code;
return $self;
}
sub message {
my $self = shift;
return $self->{message};
}
sub code {
my $self = shift;
return $self->{code};
}
1;
__END__
=head1 NAME
AnyEvent::Redis::RipeRedis - DEPRECATED. Use AnyEvent::RipeRedis instead
=head1 SYNOPSIS
use AnyEvent;
use AnyEvent::Redis::RipeRedis;
my $cv = AE::cv();
my $redis = AnyEvent::Redis::RipeRedis->new(
host => 'localhost',
port => 6379,
password => 'yourpass',
);
$redis->incr( 'foo',
sub {
my $reply = shift;
if (@_) {
my $err_msg = shift;
my $err_code = shift;
warn "[$err_code] $err_msg\n";
return;
}
print "$reply\n";
}
);
$redis->set( 'bar', 'string',
{ on_error => sub {
my $err_msg = shift;
my $err_code = shift;
warn "[$err_code] $err_msg\n";
}
}
);
$redis->get( 'bar',
{ on_done => sub {
my $reply = shift;
print "$reply\n";
},
on_error => sub {
my $err_msg = shift;
my $err_code = shift;
warn "[$err_code] $err_msg\n";
},
}
);
$redis->quit(
sub {
my $reply = shift;
if (@_) {
my $err_msg = shift;
my $err_code = shift;
warn "[$err_code] $err_msg\n";
}
$cv->send();
}
);
$cv->recv();
=head1 DESCRIPTION
MODULE IS DEPRECATED. Use L<AnyEvent::RipeRedis> instead. The interface of
L<AnyEvent::RipeRedis> has several differences from interface of
AnyEvent::Redis::RipeRedis. For more information see documentation.
AnyEvent::Redis::RipeRedis is the flexible non-blocking Redis client with
reconnect feature. The client supports subscriptions, transactions and connection
via UNIX-socket.
Requires Redis 1.2 or higher, and any supported event loop.
=head1 CONSTRUCTOR
=head2 new( %params )
my $redis = AnyEvent::Redis::RipeRedis->new(
host => 'localhost',
port => 6379,
password => 'yourpass',
database => 7,
lazy => 1,
connection_timeout => 5,
read_timeout => 5,
reconnect => 1,
min_reconnect_interval => 5,
encoding => 'utf8',
on_connect => sub {
# handling...
},
on_disconnect => sub {
# handling...
},
on_connect_error => sub {
my $err_msg = shift;
# error handling...
},
on_error => sub {
my $err_msg = shift;
my $err_code = shift;
# error handling...
},
);
=over
=item host => $host
Server hostname (default: 127.0.0.1)
=item port => $port
Server port (default: 6379)
=item password => $password
If the password is specified, the C<AUTH> command is sent to the server
after connection.
=item database => $index
Database index. If the index is specified, the client is switched to
the specified database after connection. You can also switch to another database
after connection by using C<SELECT> command. The client remembers last selected
database after reconnection.
The default database index is C<0>.
=item encoding => $encoding_name
Used for encode/decode strings at time of input/output operations.
Not set by default.
=item connection_timeout => $fractional_seconds
Specifies connection timeout. If the client could not connect to the server
after specified timeout, the C<on_error> or C<on_connect_error> callback is
called. In case when C<on_error> callback is called, C<E_CANT_CONN> error code
is passed to callback in the second argument. The timeout specifies in seconds
and can contain a fractional part.
connection_timeout => 10.5,
By default the client use kernel's connection timeout.
=item read_timeout => $fractional_seconds
Specifies read timeout. If the client could not receive a reply from the
server after specified timeout, the client close connection and call the
C<on_error> callback with the C<E_READ_TIMEDOUT> error code. The timeout is
specifies in seconds and can contain a fractional part.
read_timeout => 3.5,
Not set by default.
=item lazy => $boolean
If enabled, the connection establishes at time when you will send the first
command to the server. By default the connection establishes after calling of
the C<new> method.
Disabled by default.
=item reconnect => $boolean
If the connection to the Redis server was lost and the parameter C<reconnect> is
TRUE, the client try to restore the connection on a next command executuion
unless C<min_reconnect_interval> is specified. The client try to reconnect only
once and if it fails, is called the C<on_error> callback. If you need several
attempts of the reconnection, just retry a command from the C<on_error>
callback as many times, as you need. This feature made the client more
responsive.
Enabled by default.
( run in 0.934 second using v1.01-cache-2.11-cpan-5a3173703d6 )