AnyEvent-RipeRedis
view release on metacpan or search on metacpan
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
if ( $self->{_multi_mode}
&& ( exists $SUBUNSUB_CMDS{ $cmd->{name} }
|| exists $NEED_POSTPROCESS{ $cmd->{name} } ) )
{
croak qq{Command "$cmd->{name}" not allowed after "multi" command.}
. ' First, the transaction must be finalized.';
}
if ( exists $NEED_PREPROCESS{ $cmd->{name} } ) {
if ( $cmd->{name} eq 'multi' ) {
$self->{_multi_mode} = 1;
}
elsif ( $cmd->{name} eq 'exec'
|| $cmd->{name} eq 'discard' )
{
$self->{_multi_mode} = 0;
}
elsif ( $cmd->{name} eq 'eval_cached' ) {
my $script = $cmd->{args}[0];
unless ( exists $EVAL_CACHE{$script} ) {
$EVAL_CACHE{$script} = sha1_hex($script);
}
$cmd->{args}[0] = $EVAL_CACHE{$script};
$cmd->{script} = $script;
}
else { # subscribe, unsubscribe, psubscribe, punsubscribe
if ( exists $SUB_CMDS{ $cmd->{name} }
&& !defined $cmd->{on_message} )
{
croak '"on_message" callback must be specified';
}
if ( @{ $cmd->{args} } ) {
$cmd->{reply_cnt} = scalar @{ $cmd->{args} };
}
}
}
unless ( $self->{_ready} ) {
if ( defined $self->{_handle} ) {
if ( $self->{_connected} ) {
if ( $self->{_auth_state} == S_DONE ) {
if ( $self->{_db_selection_state} == S_NEED_DO ) {
$self->_select_database;
}
}
elsif ( $self->{_auth_state} == S_NEED_DO ) {
$self->_auth;
}
}
}
elsif ( $self->{lazy} ) {
undef $self->{lazy};
$self->_connect;
}
elsif ( $self->{reconnect} ) {
if ( defined $self->{reconnect_interval}
&& $self->{reconnect_interval} > 0 )
{
unless ( defined $self->{_reconnect_timer} ) {
weaken($self);
$self->{_reconnect_timer} = AE::timer(
$self->{reconnect_interval}, 0,
sub {
undef $self->{_reconnect_timer};
$self->_connect;
}
);
}
}
else {
$self->_connect;
}
}
else {
AE::postpone {
my $err = _new_error( qq{Operation "$cmd->{name}" aborted:}
. ' No connection to the server.', E_NO_CONN );
$cmd->{on_reply}->( undef, $err );
};
return;
}
push( @{ $self->{_input_queue} }, $cmd );
return;
}
$self->_push_write($cmd);
return;
}
sub _push_write {
my $self = shift;
my $cmd = shift;
my $cmd_str = '';
my @tokens = ( @{ $cmd->{kwds} }, @{ $cmd->{args} } );
foreach my $token (@tokens) {
unless ( defined $token ) {
$token = '';
}
elsif ( $self->{utf8} ) {
utf8::encode($token);
}
$cmd_str .= '$' . length($token) . EOL . $token . EOL;
}
$cmd_str = '*' . scalar(@tokens) . EOL . $cmd_str;
my $handle = $self->{_handle};
if ( defined $self->{read_timeout}
&& !@{ $self->{_processing_queue} } )
{
$handle->rtimeout_reset;
$handle->rtimeout( $self->{read_timeout} );
}
push( @{ $self->{_processing_queue} }, $cmd );
$handle->push_write($cmd_str);
return;
}
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
}
if ( !defined $cmd->{reply_cnt}
|| --$cmd->{reply_cnt} == 0 )
{
shift @{ $self->{_processing_queue} };
if ( exists $NEED_POSTPROCESS{ $cmd->{name} } ) {
if ( $cmd->{name} eq 'info'
|| $cmd->{name} eq 'cluster_info' )
{
$reply = _parse_info($reply);
}
elsif ( $cmd->{name} eq 'select' ) {
$self->{database} = $cmd->{args}[0];
}
else { # quit
$self->_disconnect;
}
}
$cmd->{on_reply}->($reply);
}
return;
}
sub _parse_info {
return { map { split( m/:/, $_, 2 ) }
grep { m/^[^#]/ } split( EOL, $_[0] ) };
}
sub _disconnect {
my $self = shift;
my $err = shift;
my $was_connected = $self->{_connected};
if ( defined $self->{_handle} ) {
$self->{_handle}->destroy;
}
$self->_reset_internals;
$self->_abort($err);
if ( $was_connected && defined $self->{on_disconnect} ) {
$self->{on_disconnect}->();
}
return;
}
sub _reset_internals {
my $self = shift;
$self->{_handle} = undef;
$self->{_connected} = 0;
$self->{_auth_state} = S_NEED_DO;
$self->{_db_selection_state} = S_NEED_DO;
$self->{_ready} = 0;
$self->{_multi_mode} = 0;
$self->{_reconnect_timer} = undef;
return;
}
sub _abort {
my $self = shift;
my $err = shift;
my @queued_commands = $self->_queued_commands;
my %channels = %{ $self->{_channels} };
$self->{_input_queue} = [];
$self->{_temp_queue} = [];
$self->{_processing_queue} = [];
$self->{_channels} = {};
$self->{_channel_cnt} = 0;
$self->{_pchannel_cnt} = 0;
if ( !defined $err && @queued_commands ) {
$err = _new_error( 'Connection closed by client prematurely.',
E_CONN_CLOSED_BY_CLIENT );
}
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
$self->{on_error}->($err);
if ( %channels && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
foreach my $name ( keys %channels ) {
my $err = _new_error(
qq{Subscription to channel "$name" lost: $err_msg},
$err_code
);
my $cmd = $channels{$name};
$cmd->{on_reply}->( undef, $err );
}
}
foreach my $cmd (@queued_commands) {
my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
$err_code );
$cmd->{on_reply}->( undef, $err );
}
}
return;
}
sub _queued_commands {
my $self = shift;
return (
@{ $self->{_processing_queue} },
@{ $self->{_temp_queue} },
@{ $self->{_input_queue} },
);
}
( run in 1.883 second using v1.01-cache-2.11-cpan-385001e3568 )