view release on metacpan or search on metacpan
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
use strict;
use warnings;
use base qw( Exporter );
our $VERSION = '0.48';
use AnyEvent::RipeRedis::Error;
use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use Digest::SHA qw( sha1_hex );
use Carp qw( croak );
my %ERROR_CODES;
BEGIN {
%ERROR_CODES = %AnyEvent::RipeRedis::Error::ERROR_CODES;
our @EXPORT_OK = keys %ERROR_CODES;
our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
on_error => $self->_create_on_handle_error,
on_read => $self->_create_on_read,
);
return;
}
sub _create_on_prepare {
my $self = shift;
weaken($self);
return sub {
if ( defined $self->{connection_timeout} ) {
return $self->{connection_timeout};
}
return;
};
}
sub _create_on_connect {
my $self = shift;
weaken($self);
return sub {
$self->{_connected} = 1;
unless ( defined $self->{password} ) {
$self->{_auth_state} = S_DONE;
}
if ( $self->{database} == 0 ) {
$self->{_db_selection_state} = S_DONE;
}
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
if ( defined $self->{on_connect} ) {
$self->{on_connect}->();
}
};
}
sub _create_on_connect_error {
my $self = shift;
weaken($self);
return sub {
my $err_msg = pop;
my $err = _new_error(
"Can't connect to $self->{host}:$self->{port}: $err_msg",
E_CANT_CONN
);
$self->_disconnect($err);
};
}
sub _create_on_rtimeout {
my $self = shift;
weaken($self);
return sub {
if ( @{ $self->{_processing_queue} } ) {
my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
$self->_disconnect($err);
}
else {
$self->{_handle}->rtimeout(undef);
}
};
}
sub _create_on_eof {
my $self = shift;
weaken($self);
return sub {
my $err = _new_error( 'Connection closed by remote host.',
E_CONN_CLOSED_BY_REMOTE_HOST );
$self->_disconnect($err);
};
}
sub _create_on_handle_error {
my $self = shift;
weaken($self);
return sub {
my $err_msg = pop;
my $err = _new_error( $err_msg, E_IO );
$self->_disconnect($err);
};
}
sub _create_on_read {
my $self = shift;
weaken($self);
my $str_len;
my @bufs;
my $bufs_num = 0;
return sub {
my $handle = shift;
MAIN: while (1) {
return if $handle->destroyed;
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
: split( m/_/, lc($cmd_name) );
my $cmd = {
name => $cmd_name,
kwds => \@kwds,
args => $args,
%{$cbs},
};
unless ( defined $cmd->{on_reply} ) {
weaken($self);
$cmd->{on_reply} = sub {
my $err = $_[1];
if ( defined $err ) {
$self->{on_error}->($err);
return;
}
};
}
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
}
elsif ( $self->{lazy} ) {
undef $self->{lazy};
$self->_connect;
}
elsif ( $self->{reconnect} ) {
if ( defined $self->{reconnect_interval}
&& $self->{reconnect_interval} > 0 )
{
unless ( defined $self->{_reconnect_timer} ) {
weaken($self);
$self->{_reconnect_timer} = AE::timer(
$self->{reconnect_interval}, 0,
sub {
undef $self->{_reconnect_timer};
$self->_connect;
}
);
}
}
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
push( @{ $self->{_processing_queue} }, $cmd );
$handle->push_write($cmd_str);
return;
}
sub _auth {
my $self = shift;
weaken($self);
$self->{_auth_state} = S_IN_PROGRESS;
$self->_push_write(
{ name => 'auth',
kwds => ['auth'],
args => [ $self->{password} ],
on_reply => sub {
my $err = $_[1];
lib/AnyEvent/RipeRedis.pm view on Meta::CPAN
},
}
);
return;
}
sub _select_database {
my $self = shift;
weaken($self);
$self->{_db_selection_state} = S_IN_PROGRESS;
$self->_push_write(
{ name => 'select',
kwds => ['select'],
args => [ $self->{database} ],
on_reply => sub {
my $err = $_[1];
t/08-eval.t view on Meta::CPAN
use 5.008000;
use strict;
use warnings;
use Test::More;
use AnyEvent::RipeRedis qw( :err_codes );
use Digest::SHA qw( sha1_hex );
use Scalar::Util qw( weaken );
use version 0.77;
require 't/test_helper.pl';
my $server_info = run_redis_instance();
if ( !defined $server_info ) {
plan skip_all => 'redis-server is required for this test';
}
my $redis = AnyEvent::RipeRedis->new(
host => $server_info->{host},
port => $server_info->{port},
t/08-eval.t view on Meta::CPAN
return ARGV[1]
};
my @t_replies;
ev_loop(
sub {
my $cv = shift;
my $redis = $redis;
weaken( $redis );
$redis->eval_cached( $script, 0, 42,
sub {
my $reply = shift;
my $err = shift;
if ( defined $err ) {
diag( $err->message );
$cv->send;
t/09-conn-errors.t view on Meta::CPAN
use 5.008000;
use strict;
use warnings;
use Test::More tests => 40;
use AnyEvent::RipeRedis qw( :err_codes );
use Net::EmptyPort qw( empty_port );
use Scalar::Util qw( weaken );
require 't/test_helper.pl';
t_cant_connect();
t_no_connection();
t_reconnection();
t_read_timeout();
t_premature_disconnect();
t_premature_destroy();
t_subscription_lost();
t/11-leaks.t view on Meta::CPAN
use 5.008000;
use strict;
use warnings;
use Test::More;
use AnyEvent::RipeRedis qw( :err_codes );
use Scalar::Util qw( weaken );
use version 0.77;
require 't/test_helper.pl';
BEGIN {
eval "use Test::LeakTrace 0.15";
if ( $@ ) {
plan skip_all => 'Test::LeakTrace 0.15 required for this test';
}
}
t/11-leaks.t view on Meta::CPAN
my $script = q{
return ARGV[1]
};
no_leaks_ok {
ev_loop(
sub {
my $cv = shift;
my $redis = $redis;
weaken( $redis );
$redis->eval_cached( $script, 0, 42,
sub {
my $reply = shift;
my $err = shift;
if ( defined $err ) {
diag( $err->message );
$cv->send;