AnyEvent-RipeRedis
view release on metacpan or search on metacpan
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
package AnyEvent::RipeRedis;
use 5.008000;
use strict;
use warnings;
use base qw( Exporter );
our $VERSION = '0.48';
use AnyEvent::RipeRedis::Error;
use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use Digest::SHA qw( sha1_hex );
use Carp qw( croak );
my %ERROR_CODES;
BEGIN {
%ERROR_CODES = %AnyEvent::RipeRedis::Error::ERROR_CODES;
our @EXPORT_OK = keys %ERROR_CODES;
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}
use constant {
# Default values
D_HOST => 'localhost',
D_PORT => 6379,
D_DB_INDEX => 0,
%ERROR_CODES,
# Operation status
S_NEED_DO => 1,
S_IN_PROGRESS => 2,
S_DONE => 3,
# String terminator
EOL => "\r\n",
EOL_LENGTH => 2,
};
my %SUB_CMDS = (
subscribe => 1,
psubscribe => 1,
);
my %SUBUNSUB_CMDS = (
%SUB_CMDS,
unsubscribe => 1,
punsubscribe => 1,
);
my %MESSAGE_TYPES = (
message => 1,
pmessage => 1,
);
my %NEED_PREPROCESS = (
multi => 1,
exec => 1,
discard => 1,
eval_cached => 1,
%SUBUNSUB_CMDS,
);
my %NEED_POSTPROCESS = (
info => 1,
cluster_info => 1,
select => 1,
quit => 1,
);
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
my $self = shift;
return $self->{$name};
}
}
foreach my $name ( qw( connection_timeout read_timeout
reconnect_interval ) )
{
*{$name} = sub {
my $self = shift;
if (@_) {
my $seconds = shift;
if ( defined $seconds
&& ( !looks_like_number($seconds) || $seconds < 0 ) )
{
croak qq{"$name" must be a positive number};
}
$self->{$name} = $seconds;
}
return $self->{$name};
};
}
foreach my $name ( qw( utf8 reconnect on_connect on_disconnect ) ) {
*{$name} = sub {
my $self = shift;
if (@_) {
$self->{$name} = shift;
}
return $self->{$name};
};
}
}
sub _connect {
my $self = shift;
$self->{_handle} = AnyEvent::Handle->new(
%{ $self->{handle_params} },
connect => [ $self->{host}, $self->{port} ],
on_prepare => $self->_create_on_prepare,
on_connect => $self->_create_on_connect,
on_connect_error => $self->_create_on_connect_error,
on_rtimeout => $self->_create_on_rtimeout,
on_eof => $self->_create_on_eof,
on_error => $self->_create_on_handle_error,
on_read => $self->_create_on_read,
);
return;
}
sub _create_on_prepare {
my $self = shift;
weaken($self);
return sub {
if ( defined $self->{connection_timeout} ) {
return $self->{connection_timeout};
}
return;
};
}
sub _create_on_connect {
my $self = shift;
weaken($self);
return sub {
$self->{_connected} = 1;
unless ( defined $self->{password} ) {
$self->{_auth_state} = S_DONE;
}
if ( $self->{database} == 0 ) {
$self->{_db_selection_state} = S_DONE;
}
if ( $self->{_auth_state} == S_NEED_DO ) {
$self->_auth;
}
elsif ( $self->{_db_selection_state} == S_NEED_DO ) {
$self->_select_database;
}
else {
$self->{_ready} = 1;
$self->_process_input_queue;
}
if ( defined $self->{on_connect} ) {
$self->{on_connect}->();
}
};
}
sub _create_on_connect_error {
my $self = shift;
weaken($self);
return sub {
my $err_msg = pop;
my $err = _new_error(
"Can't connect to $self->{host}:$self->{port}: $err_msg",
E_CANT_CONN
);
$self->_disconnect($err);
};
}
sub _create_on_rtimeout {
my $self = shift;
weaken($self);
return sub {
if ( @{ $self->{_processing_queue} } ) {
my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
$self->_disconnect($err);
}
else {
$self->{_handle}->rtimeout(undef);
}
};
}
sub _create_on_eof {
my $self = shift;
weaken($self);
return sub {
my $err = _new_error( 'Connection closed by remote host.',
E_CONN_CLOSED_BY_REMOTE_HOST );
$self->_disconnect($err);
};
}
sub _create_on_handle_error {
my $self = shift;
weaken($self);
return sub {
my $err_msg = pop;
my $err = _new_error( $err_msg, E_IO );
$self->_disconnect($err);
};
}
sub _create_on_read {
my $self = shift;
weaken($self);
my $str_len;
my @bufs;
my $bufs_num = 0;
return sub {
my $handle = shift;
MAIN: while (1) {
return if $handle->destroyed;
my $reply;
my $err_code;
if ( defined $str_len ) {
if ( length( $handle->{rbuf} ) < $str_len + EOL_LENGTH ) {
return;
}
$reply = substr( $handle->{rbuf}, 0, $str_len, '' );
substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );
if ( $self->{utf8} ) {
utf8::decode($reply);
}
undef $str_len;
}
else {
my $eol_pos = index( $handle->{rbuf}, EOL );
if ( $eol_pos < 0 ) {
return;
}
$reply = substr( $handle->{rbuf}, 0, $eol_pos, '' );
my $type = substr( $reply, 0, 1, '' );
substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );
if ( $type ne '+' && $type ne ':' ) {
if ( $type eq '$' ) {
if ( $reply >= 0 ) {
$str_len = $reply;
next;
}
undef $reply;
}
elsif ( $type eq '*' ) {
if ( $reply > 0 ) {
push( @bufs,
{ reply => [],
err_code => undef,
chunks_cnt => $reply,
}
);
$bufs_num++;
next;
}
elsif ( $reply == 0 ) {
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
while ( $bufs_num > 0 ) {
my $curr_buf = $bufs[-1];
if ( defined $err_code ) {
unless ( ref($reply) ) {
$reply = _new_error( $reply, $err_code );
}
$curr_buf->{err_code} = E_OPRN_ERROR;
}
push( @{ $curr_buf->{reply} }, $reply );
if ( --$curr_buf->{chunks_cnt} > 0 ) {
next MAIN;
}
$reply = $curr_buf->{reply};
$err_code = $curr_buf->{err_code};
pop @bufs;
$bufs_num--;
}
$self->_process_reply( $reply, $err_code );
}
return;
};
}
sub _prepare {
my $self = shift;
my $cmd_name = shift;
my $args = shift;
my $cbs;
if ( ref( $args->[-1] ) eq 'HASH' ) {
$cbs = pop @{$args};
}
else {
$cbs = {};
if ( ref( $args->[-1] ) eq 'CODE' ) {
if ( exists $SUB_CMDS{$cmd_name} ) {
$cbs->{on_message} = pop @{$args};
}
else {
$cbs->{on_reply} = pop @{$args};
}
}
}
my @kwds
= $cmd_name eq 'eval_cached'
? ('evalsha')
: split( m/_/, lc($cmd_name) );
my $cmd = {
name => $cmd_name,
kwds => \@kwds,
args => $args,
%{$cbs},
};
unless ( defined $cmd->{on_reply} ) {
weaken($self);
$cmd->{on_reply} = sub {
my $err = $_[1];
if ( defined $err ) {
$self->{on_error}->($err);
return;
}
};
}
return $cmd;
}
sub _execute {
my $self = shift;
my $cmd = shift;
if ( $self->{_multi_mode}
&& ( exists $SUBUNSUB_CMDS{ $cmd->{name} }
|| exists $NEED_POSTPROCESS{ $cmd->{name} } ) )
{
croak qq{Command "$cmd->{name}" not allowed after "multi" command.}
. ' First, the transaction must be finalized.';
}
if ( exists $NEED_PREPROCESS{ $cmd->{name} } ) {
if ( $cmd->{name} eq 'multi' ) {
$self->{_multi_mode} = 1;
}
elsif ( $cmd->{name} eq 'exec'
|| $cmd->{name} eq 'discard' )
{
$self->{_multi_mode} = 0;
}
elsif ( $cmd->{name} eq 'eval_cached' ) {
my $script = $cmd->{args}[0];
unless ( exists $EVAL_CACHE{$script} ) {
$EVAL_CACHE{$script} = sha1_hex($script);
}
$cmd->{args}[0] = $EVAL_CACHE{$script};
$cmd->{script} = $script;
}
else { # subscribe, unsubscribe, psubscribe, punsubscribe
if ( exists $SUB_CMDS{ $cmd->{name} }
&& !defined $cmd->{on_message} )
{
croak '"on_message" callback must be specified';
}
if ( @{ $cmd->{args} } ) {
$cmd->{reply_cnt} = scalar @{ $cmd->{args} };
}
}
}
unless ( $self->{_ready} ) {
if ( defined $self->{_handle} ) {
if ( $self->{_connected} ) {
if ( $self->{_auth_state} == S_DONE ) {
if ( $self->{_db_selection_state} == S_NEED_DO ) {
$self->_select_database;
}
}
elsif ( $self->{_auth_state} == S_NEED_DO ) {
$self->_auth;
}
}
}
elsif ( $self->{lazy} ) {
undef $self->{lazy};
$self->_connect;
}
elsif ( $self->{reconnect} ) {
if ( defined $self->{reconnect_interval}
&& $self->{reconnect_interval} > 0 )
{
unless ( defined $self->{_reconnect_timer} ) {
weaken($self);
$self->{_reconnect_timer} = AE::timer(
$self->{reconnect_interval}, 0,
sub {
undef $self->{_reconnect_timer};
$self->_connect;
}
);
}
}
else {
$self->_connect;
}
}
else {
AE::postpone {
my $err = _new_error( qq{Operation "$cmd->{name}" aborted:}
. ' No connection to the server.', E_NO_CONN );
$cmd->{on_reply}->( undef, $err );
};
return;
}
push( @{ $self->{_input_queue} }, $cmd );
return;
}
$self->_push_write($cmd);
return;
}
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_str = '';
my @tokens = ( @{ $cmd->{kwds} }, @{ $cmd->{args} } );
foreach my $token (@tokens) {
unless ( defined $token ) {
$token = '';
}
elsif ( $self->{utf8} ) {
utf8::encode($token);
}
$cmd_str .= '$' . length($token) . EOL . $token . EOL;
}
$cmd_str = '*' . scalar(@tokens) . EOL . $cmd_str;
my $handle = $self->{_handle};
if ( defined $self->{read_timeout}
&& !@{ $self->{_processing_queue} } )
{
$handle->rtimeout_reset;
$handle->rtimeout( $self->{read_timeout} );
}
push( @{ $self->{_processing_queue} }, $cmd );
$handle->push_write($cmd_str);
return;
}
sub _auth {
my $self = shift;
weaken($self);
$self->{_auth_state} = S_IN_PROGRESS;
$self->_push_write(
{ name => 'auth',
kwds => ['auth'],
args => [ $self->{password} ],
on_reply => sub {
my $err = $_[1];
if ( defined $err
&& $err->message ne 'ERR Client sent AUTH, but no password is set' )
{
$self->{_auth_state} = S_NEED_DO;
$self->_abort($err);
return;
}
$self->{_auth_state} = S_DONE;
if ( $self->{_db_selection_state} == S_NEED_DO ) {
$self->_select_database;
}
else {
$self->{_ready} = 1;
$self->_process_input_queue;
}
},
}
);
return;
}
sub _select_database {
my $self = shift;
weaken($self);
$self->{_db_selection_state} = S_IN_PROGRESS;
$self->_push_write(
{ name => 'select',
kwds => ['select'],
args => [ $self->{database} ],
on_reply => sub {
my $err = $_[1];
if ( defined $err ) {
$self->{_db_selection_state} = S_NEED_DO;
$self->_abort($err);
return;
}
$self->{_db_selection_state} = S_DONE;
$self->{_ready} = 1;
$self->_process_input_queue;
},
}
);
return;
}
sub _process_input_queue {
my $self = shift;
$self->{_temp_queue} = $self->{_input_queue};
$self->{_input_queue} = [];
while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
$self->_push_write($cmd);
}
return;
}
sub _process_reply {
my $self = shift;
my $reply = shift;
my $err_code = shift;
if ( defined $err_code ) {
$self->_process_error( $reply, $err_code );
}
elsif ( $self->{_channel_cnt} + $self->{_pchannel_cnt} > 0
&& ref($reply) && exists $MESSAGE_TYPES{ $reply->[0] } )
{
$self->_process_message($reply);
}
else {
$self->_process_success($reply);
}
return;
}
( run in 0.631 second using v1.01-cache-2.11-cpan-39bf76dae61 )