AnyEvent-Redis-RipeRedis
view release on metacpan or search on metacpan
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
READONLY => E_READONLY,
OOM => E_OOM,
EXECABORT => E_EXEC_ABORT,
NOAUTH => E_NO_AUTH,
WRONGTYPE => E_WRONG_TYPE,
NOREPLICAS => E_NO_REPLICAS,
BUSYKEY => E_BUSY_KEY,
CROSSSLOT => E_CROSS_SLOT,
TRYAGAIN => E_TRY_AGAIN,
ASK => E_ASK,
MOVED => E_MOVED,
CLUSTERDOWN => E_CLUSTER_DOWN,
);
my %EVAL_CACHE;
# Constructor
sub new {
my $proto = shift;
my %params = @_;
my $self = ref( $proto ) ? $proto : bless {}, $proto;
$self->{host} = $params{host} || D_HOST;
$self->{port} = $params{port} || D_PORT;
$self->{password} = $params{password};
$self->{database}
= defined $params{database} ? $params{database} : D_DB_INDEX;
$self->{reconnect} = exists $params{reconnect} ? $params{reconnect} : 1;
$self->{on_connect} = $params{on_connect};
$self->{on_disconnect} = $params{on_disconnect};
$self->{on_connect_error} = $params{on_connect_error};
$self->encoding( $params{encoding} );
$self->connection_timeout( $params{connection_timeout} );
$self->read_timeout( $params{read_timeout} );
$self->min_reconnect_interval( $params{min_reconnect_interval} );
$self->on_error( $params{on_error} );
my $hdl_params = $params{handle_params} || {};
foreach my $name ( qw( linger autocork ) ) {
if ( !exists $hdl_params->{$name} && defined $params{$name} ) {
$hdl_params->{$name} = $params{$name};
}
}
$self->{handle_params} = $hdl_params;
$self->{_handle} = undef;
$self->{_connected} = 0;
$self->{_lazy_conn_st} = $params{lazy};
$self->{_auth_st} = S_NEED_PERFORM;
$self->{_select_db_st} = S_NEED_PERFORM;
$self->{_ready_to_write} = 0;
$self->{_input_queue} = [];
$self->{_temp_queue} = [];
$self->{_processing_queue} = [];
$self->{_txn_lock} = 0;
$self->{_channels} = {};
$self->{_channel_cnt} = 0;
$self->{_reconnect_timer} = undef;
unless ( $self->{_lazy_conn_st} ) {
$self->_connect();
}
return $self;
}
sub multi {
my $self = shift;
my $cmd = $self->_prepare_cmd( 'multi', [ @_ ] );
$self->{_txn_lock} = 1;
$self->_execute_cmd( $cmd );
return;
}
sub exec {
my $self = shift;
my $cmd = $self->_prepare_cmd( 'exec', [ @_ ] );
$self->{_txn_lock} = 0;
$self->_execute_cmd( $cmd );
return;
}
sub eval_cached {
my $self = shift;
my $cmd = $self->_prepare_cmd( 'evalsha', [ @_ ] );
$cmd->{script} = $cmd->{args}[0];
unless ( exists $EVAL_CACHE{ $cmd->{script} } ) {
$EVAL_CACHE{ $cmd->{script} } = sha1_hex( $cmd->{script} );
}
$cmd->{args}[0] = $EVAL_CACHE{ $cmd->{script} };
$self->_execute_cmd( $cmd );
return;
}
sub disconnect {
my $self = shift;
$self->_disconnect();
return;
}
sub encoding {
my $self = shift;
if ( @_ ) {
my $enc = shift;
if ( defined $enc ) {
$self->{encoding} = find_encoding( $enc );
lib/AnyEvent/Redis/RipeRedis.pm view on Meta::CPAN
}
$self->_process_reply( $reply, $err_code );
}
return;
};
}
sub _prepare_cmd {
my $self = shift;
my $kwd = shift;
my $args = shift;
my $cmd;
if ( ref( $args->[-1] ) eq 'HASH' ) {
$cmd = pop @{$args};
}
else {
$cmd = {};
if ( ref( $args->[-1] ) eq 'CODE' ) {
if ( exists $SUB_CMDS{$kwd} ) {
$cmd->{on_message} = pop @{$args};
}
else {
$cmd->{on_reply} = pop @{$args};
}
}
}
$cmd->{kwd} = $kwd;
$cmd->{args} = $args;
return $cmd;
}
sub _execute_cmd {
my $self = shift;
my $cmd = shift;
unless ( $self->{_ready_to_write} ) {
if ( defined $self->{_handle} ) {
if ( $self->{_connected} ) {
if ( $self->{_auth_st} == S_IS_DONE ) {
if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
$self->_select_db();
}
}
elsif ( $self->{_auth_st} == S_NEED_PERFORM ) {
$self->_auth();
}
}
}
elsif ( $self->{_lazy_conn_st} ) {
$self->{_lazy_conn_st} = 0;
$self->_connect();
}
elsif ( $self->{reconnect} ) {
if ( defined $self->{min_reconnect_interval}
&& $self->{min_reconnect_interval} > 0 )
{
unless ( defined $self->{_reconnect_timer} ) {
$self->{_reconnect_timer} = AE::timer( $self->{min_reconnect_interval}, 0,
sub {
undef $self->{_reconnect_timer};
$self->_connect();
}
);
}
}
else {
$self->_connect();
}
}
else {
AE::postpone(
sub {
$self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted:"
. ' No connection to the server.', E_NO_CONN );
}
);
return;
}
push( @{ $self->{_input_queue} }, $cmd );
return;
}
$self->_push_write( $cmd );
return;
}
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_str = '';
foreach my $token ( $cmd->{kwd}, @{ $cmd->{args} } ) {
unless ( defined $token ) {
$token = '';
}
elsif ( defined $self->{encoding} && is_utf8( $token ) ) {
$token = $self->{encoding}->encode( $token );
}
$cmd_str .= '$' . length( $token ) . EOL . $token . EOL;
}
$cmd_str = '*' . ( scalar( @{ $cmd->{args} } ) + 1 ) . EOL . $cmd_str;
my $handle = $self->{_handle};
if ( defined $self->{read_timeout} && !@{ $self->{_processing_queue} } ) {
$handle->rtimeout_reset();
$handle->rtimeout( $self->{read_timeout} );
}
push( @{ $self->{_processing_queue} }, $cmd );
$handle->push_write( $cmd_str );
return;
}
sub _auth {
my $self = shift;
( run in 0.742 second using v1.01-cache-2.11-cpan-3782747c604 )