AnyEvent-RipeRedis
view release on metacpan or search on metacpan
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
);
my %SUBUNSUB_CMDS = (
%SUB_CMDS,
unsubscribe => 1,
punsubscribe => 1,
);
my %MESSAGE_TYPES = (
message => 1,
pmessage => 1,
);
my %NEED_PREPROCESS = (
multi => 1,
exec => 1,
discard => 1,
eval_cached => 1,
%SUBUNSUB_CMDS,
);
my %NEED_POSTPROCESS = (
info => 1,
cluster_info => 1,
select => 1,
quit => 1,
);
my %ERR_PREFS_MAP = (
LOADING => E_LOADING_DATASET,
NOSCRIPT => E_NO_SCRIPT,
BUSY => E_BUSY,
MASTERDOWN => E_MASTER_DOWN,
MISCONF => E_MISCONF,
READONLY => E_READONLY,
OOM => E_OOM,
EXECABORT => E_EXEC_ABORT,
NOAUTH => E_NO_AUTH,
WRONGTYPE => E_WRONG_TYPE,
NOREPLICAS => E_NO_REPLICAS,
BUSYKEY => E_BUSY_KEY,
CROSSSLOT => E_CROSS_SLOT,
TRYAGAIN => E_TRY_AGAIN,
ASK => E_ASK,
MOVED => E_MOVED,
CLUSTERDOWN => E_CLUSTER_DOWN,
NOTBUSY => E_NOT_BUSY,
);
my %EVAL_CACHE;
sub new {
my $class = shift;
my %params = @_;
my $self = bless {}, $class;
$self->{host} = $params{host} || D_HOST;
$self->{port} = $params{port} || D_PORT;
$self->{password} = $params{password};
$self->{database}
= defined $params{database} ? $params{database} : D_DB_INDEX;
$self->{utf8} = exists $params{utf8} ? $params{utf8} : 1;
$self->{lazy} = $params{lazy};
$self->{reconnect} = exists $params{reconnect} ? $params{reconnect} : 1;
$self->{handle_params} = $params{handle_params} || {};
$self->{on_connect} = $params{on_connect};
$self->{on_disconnect} = $params{on_disconnect};
$self->connection_timeout( $params{connection_timeout} );
$self->read_timeout( $params{read_timeout} );
$self->reconnect_interval( $params{reconnect_interval} );
$self->on_error( $params{on_error} );
$self->_reset_internals;
$self->{_input_queue} = [];
$self->{_temp_queue} = [];
$self->{_processing_queue} = [];
$self->{_channels} = {};
$self->{_channel_cnt} = 0;
$self->{_pchannel_cnt} = 0;
unless ( $self->{lazy} ) {
$self->_connect;
}
return $self;
}
sub execute {
my $self = shift;
my $cmd_name = shift;
my $cmd = $self->_prepare( $cmd_name, [@_] );
$self->_execute($cmd);
return;
}
sub disconnect {
my $self = shift;
$self->_disconnect;
return;
}
sub on_error {
my $self = shift;
if (@_) {
my $on_error = shift;
if ( defined $on_error ) {
$self->{on_error} = $on_error;
}
else {
$self->{on_error} = sub {
my $err = shift;
warn $err->message . "\n";
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
$self->{$name} = $seconds;
}
return $self->{$name};
};
}
foreach my $name ( qw( utf8 reconnect on_connect on_disconnect ) ) {
*{$name} = sub {
my $self = shift;
if (@_) {
$self->{$name} = shift;
}
return $self->{$name};
};
}
}
sub _connect {
my $self = shift;
$self->{_handle} = AnyEvent::Handle->new(
%{ $self->{handle_params} },
connect => [ $self->{host}, $self->{port} ],
on_prepare => $self->_create_on_prepare,
on_connect => $self->_create_on_connect,
on_connect_error => $self->_create_on_connect_error,
on_rtimeout => $self->_create_on_rtimeout,
on_eof => $self->_create_on_eof,
on_error => $self->_create_on_handle_error,
on_read => $self->_create_on_read,
);
return;
}
sub _create_on_prepare {
my $self = shift;
weaken($self);
return sub {
if ( defined $self->{connection_timeout} ) {
return $self->{connection_timeout};
}
return;
};
}
sub _create_on_connect {
my $self = shift;
weaken($self);
return sub {
$self->{_connected} = 1;
unless ( defined $self->{password} ) {
$self->{_auth_state} = S_DONE;
}
if ( $self->{database} == 0 ) {
$self->{_db_selection_state} = S_DONE;
}
if ( $self->{_auth_state} == S_NEED_DO ) {
$self->_auth;
}
elsif ( $self->{_db_selection_state} == S_NEED_DO ) {
$self->_select_database;
}
else {
$self->{_ready} = 1;
$self->_process_input_queue;
}
if ( defined $self->{on_connect} ) {
$self->{on_connect}->();
}
};
}
sub _create_on_connect_error {
my $self = shift;
weaken($self);
return sub {
my $err_msg = pop;
my $err = _new_error(
"Can't connect to $self->{host}:$self->{port}: $err_msg",
E_CANT_CONN
);
$self->_disconnect($err);
};
}
sub _create_on_rtimeout {
my $self = shift;
weaken($self);
return sub {
if ( @{ $self->{_processing_queue} } ) {
my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
$self->_disconnect($err);
}
else {
$self->{_handle}->rtimeout(undef);
}
};
}
sub _create_on_eof {
my $self = shift;
weaken($self);
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
AE::postpone {
my $err = _new_error( qq{Operation "$cmd->{name}" aborted:}
. ' No connection to the server.', E_NO_CONN );
$cmd->{on_reply}->( undef, $err );
};
return;
}
push( @{ $self->{_input_queue} }, $cmd );
return;
}
$self->_push_write($cmd);
return;
}
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_str = '';
my @tokens = ( @{ $cmd->{kwds} }, @{ $cmd->{args} } );
foreach my $token (@tokens) {
unless ( defined $token ) {
$token = '';
}
elsif ( $self->{utf8} ) {
utf8::encode($token);
}
$cmd_str .= '$' . length($token) . EOL . $token . EOL;
}
$cmd_str = '*' . scalar(@tokens) . EOL . $cmd_str;
my $handle = $self->{_handle};
if ( defined $self->{read_timeout}
&& !@{ $self->{_processing_queue} } )
{
$handle->rtimeout_reset;
$handle->rtimeout( $self->{read_timeout} );
}
push( @{ $self->{_processing_queue} }, $cmd );
$handle->push_write($cmd_str);
return;
}
sub _auth {
my $self = shift;
weaken($self);
$self->{_auth_state} = S_IN_PROGRESS;
$self->_push_write(
{ name => 'auth',
kwds => ['auth'],
args => [ $self->{password} ],
on_reply => sub {
my $err = $_[1];
if ( defined $err
&& $err->message ne 'ERR Client sent AUTH, but no password is set' )
{
$self->{_auth_state} = S_NEED_DO;
$self->_abort($err);
return;
}
$self->{_auth_state} = S_DONE;
if ( $self->{_db_selection_state} == S_NEED_DO ) {
$self->_select_database;
}
else {
$self->{_ready} = 1;
$self->_process_input_queue;
}
},
}
);
return;
}
sub _select_database {
my $self = shift;
weaken($self);
$self->{_db_selection_state} = S_IN_PROGRESS;
$self->_push_write(
{ name => 'select',
kwds => ['select'],
args => [ $self->{database} ],
on_reply => sub {
my $err = $_[1];
if ( defined $err ) {
$self->{_db_selection_state} = S_NEED_DO;
$self->_abort($err);
return;
}
$self->{_db_selection_state} = S_DONE;
$self->{_ready} = 1;
$self->_process_input_queue;
},
}
);
return;
}
sub _process_input_queue {
my $self = shift;
$self->{_temp_queue} = $self->{_input_queue};
$self->{_input_queue} = [];
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
sub _new_error {
return AnyEvent::RipeRedis::Error->new(@_);
}
sub AUTOLOAD {
our $AUTOLOAD;
my $cmd_name = $AUTOLOAD;
$cmd_name =~ s/^.+:://;
my $sub = sub {
my $self = shift;
my $cmd = $self->_prepare( $cmd_name, [@_] );
$self->_execute($cmd);
return;
};
do {
no strict 'refs';
*{$cmd_name} = $sub;
};
goto &{$sub};
}
sub DESTROY {
my $self = shift;
if ( defined $self->{_handle} ) {
$self->{_handle}->destroy;
}
if ( defined $self->{_processing_queue} ) {
my @queued_commands = $self->_queued_commands;
foreach my $cmd (@queued_commands) {
warn qq{Operation "$cmd->{name}" aborted:}
. " Client object destroyed prematurely.\n";
}
}
return;
}
1;
__END__
=head1 NAME
AnyEvent::RipeRedis - Flexible non-blocking Redis client
=head1 SYNOPSIS
use AnyEvent;
use AnyEvent::RipeRedis;
my $redis = AnyEvent::RipeRedis->new(
host => 'localhost',
port => 6379,
password => 'yourpass',
);
my $cv = AE::cv;
$redis->set( 'foo', 'bar',
sub {
my $err = $_[1];
if ( defined $err ) {
warn $err->message . "\n";
$cv->send;
return;
}
$redis->get( 'foo',
sub {
my $reply = shift;
my $err = shift;
if ( defined $err ) {
warn $err->message . "\n";
$cv->send;
return;
}
print "$reply\n";
$cv->send;
}
);
}
);
$cv->recv;
=head1 DESCRIPTION
AnyEvent::RipeRedis is flexible non-blocking Redis client. Supports
subscriptions, transactions and can automaticaly restore connection after
failure.
Requires Redis 1.2 or higher, and any supported event loop.
=head1 CONSTRUCTOR
=head2 new( %params )
my $redis = AnyEvent::RipeRedis->new(
host => 'localhost',
port => 6379,
password => 'yourpass',
database => 7,
connection_timeout => 5,
read_timeout => 5,
lazy => 1,
reconnect_interval => 5,
on_connect => sub {
# handling...
},
on_disconnect => sub {
# handling...
},
on_error => sub {
my $err = shift;
# error handling...
},
);
=over
=item host => $host
Server hostname (default: 127.0.0.1)
=item port => $port
Server port (default: 6379)
=item password => $password
If the password is specified, the C<AUTH> command is sent to the server
after connection.
=item database => $index
Database index. If the index is specified, the client switches to the specified
database after connection. You can also switch to another database after
connection by using C<SELECT> command. The client remembers last selected
database after reconnection and switches to it automaticaly.
The default database index is C<0>.
=item utf8 => $boolean
If enabled, all strings will be converted to UTF-8 before sending to
the server, and all results will be decoded from UTF-8.
Enabled by default.
=item connection_timeout => $fractional_seconds
Specifies connection timeout. If the client could not connect to the server
after specified timeout, the C<on_error> callback is called with the
C<E_CANT_CONN> error. The timeout specifies in seconds and can contain a
fractional part.
connection_timeout => 10.5,
By default the client use kernel's connection timeout.
=item read_timeout => $fractional_seconds
Specifies read timeout. If the client could not receive a reply from the server
after specified timeout, the client close connection and call the C<on_error>
callback with the C<E_READ_TIMEDOUT> error. The timeout is specifies in seconds
and can contain a fractional part.
read_timeout => 3.5,
Not set by default.
=item lazy => $boolean
If enabled, the connection establishes at time when you will send the first
command to the server. By default the connection establishes after calling of
the C<new> method.
Disabled by default.
=item reconnect => $boolean
If the connection to the server was lost and the parameter C<reconnect> is
TRUE (default), the client will try to restore the connection when you execute
next command. The client will try to reconnect only once and, if attempt fails,
the error object is passed to command callback. If you need several attempts of
the reconnection, you must retry a command from the callback as many times, as
you need. Such behavior allows to control reconnection procedure.
Enabled by default.
=item reconnect_interval => $fractional_seconds
( run in 1.684 second using v1.01-cache-2.11-cpan-e1769b4cff6 )