AnyEvent-Redis-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN


package AnyEvent::Redis::RipeRedis;

use base qw( Exporter );

our $VERSION = '1.62';

use AnyEvent;
use AnyEvent::Handle;
use Encode qw( find_encoding is_utf8 );
use Scalar::Util qw( looks_like_number weaken );
use Digest::SHA qw( sha1_hex );
use Carp qw( croak );

my %ERROR_CODES;

BEGIN {
  %ERROR_CODES = (
    E_CANT_CONN                  => 1,
    E_LOADING_DATASET            => 2,
    E_IO                         => 3,

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

    on_error         => $self->_get_handle_on_error(),
    on_read          => $self->_get_on_read(),
  );

  return;
}

sub _get_on_prepare {
  my $self = shift;

  weaken( $self );

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }

    return;
  };
}

sub _get_on_connect {
  my $self = shift;

  weaken( $self );

  return sub {
    $self->{_connected} = 1;

    unless ( defined $self->{password} ) {
      $self->{_auth_st} = S_IS_DONE;
    }
    if ( $self->{database} == 0 ) {
      $self->{_select_db_st} = S_IS_DONE;
    }

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN


    if ( defined $self->{on_connect} ) {
      $self->{on_connect}->();
    }
  };
}

sub _get_on_connect_error {
  my $self = shift;

  weaken( $self );

  return sub {
    my $err_msg = pop;

    $self->_disconnect(
      "Can't connect to $self->{host}:$self->{port}: $err_msg",
      E_CANT_CONN
    );
  };
}

sub _get_on_rtimeout {
  my $self = shift;

  weaken( $self );

  return sub {
    if ( @{ $self->{_processing_queue} } ) {
      $self->_disconnect( 'Read timed out.', E_READ_TIMEDOUT );
    }
    else {
      $self->{_handle}->rtimeout( undef );
    }
  };
}

sub _get_on_eof {
  my $self = shift;

  weaken( $self );

  return sub {
    $self->_disconnect( 'Connection closed by remote host.',
        E_CONN_CLOSED_BY_REMOTE_HOST );
  };
}

sub _get_handle_on_error {
  my $self = shift;

  weaken( $self );

  return sub {
    my $err_msg = pop;

    $self->_disconnect( $err_msg, E_IO );
  };
}

sub _get_on_read {
  my $self = shift;

  weaken( $self );

  my $str_len;
  my @bufs;
  my $bufs_num = 0;

  return sub {
    my $handle = shift;

    MAIN: while ( 1 ) {
      if ( $handle->destroyed() ) {

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

  push( @{ $self->{_processing_queue} }, $cmd );

  $handle->push_write( $cmd_str );

  return;
}

sub _auth {
  my $self = shift;

  weaken( $self );

  $self->{_auth_st} = S_IN_PROGRESS;

  $self->_push_write(
    { kwd  => 'auth',
      args => [ $self->{password} ],

      on_done => sub {
        $self->{_auth_st} = S_IS_DONE;

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

      },
    }
  );

  return;
}

sub _select_db {
  my $self = shift;

  weaken( $self );

  $self->{_select_db_st} = S_IN_PROGRESS;

  $self->_push_write(
    { kwd  => 'select',
      args => [ $self->{database} ],

      on_done => sub {
        $self->{_select_db_st}   = S_IS_DONE;
        $self->{_ready_to_write} = 1;

t/08-eval.t  view on Meta::CPAN

use 5.008000;
use strict;
use warnings;

use Test::More;
use AnyEvent::Redis::RipeRedis qw( :err_codes );
use Digest::SHA qw( sha1_hex );
use Scalar::Util qw( weaken );
use version 0.77;
require 't/test_helper.pl';

my $SERVER_INFO = run_redis_instance();
if ( !defined $SERVER_INFO ) {
  plan skip_all => 'redis-server is required for this test';
}
my $REDIS = AnyEvent::Redis::RipeRedis->new(
  host => $SERVER_INFO->{host},
  port => $SERVER_INFO->{port},

t/08-eval.t  view on Meta::CPAN

return ARGV[1]
LUA
;
  my @t_replies;

  ev_loop(
    sub {
      my $cv = shift;

      my $redis = $redis;
      weaken( $redis );

      $redis->eval_cached( $script, 0, 42,
        { on_done => sub {
            my $reply = shift;

            push( @t_replies, $reply );

            $redis->eval_cached( $script, 0, 15 );

            $redis->eval_cached( $script, 0, 57,

t/08-eval.t  view on Meta::CPAN

return ARGV[1]
LUA
;
  my @t_replies;

  ev_loop(
    sub {
      my $cv = shift;

      my $redis = $redis;
      weaken( $redis );

      $redis->eval_cached( $script, 0, 42,
        sub {
          my $reply = shift;

          if ( @_ ) {
            my $err_msg = shift;

            diag( $err_msg );

t/09-conn-errors.t  view on Meta::CPAN

use 5.008000;
use strict;
use warnings;

use Test::More tests => 31;
use AnyEvent::Redis::RipeRedis qw( :err_codes );
use Net::EmptyPort qw( empty_port );
use Scalar::Util qw( weaken );
require 't/test_helper.pl';

t_cant_connect_mth1();
t_cant_connect_mth2();

t_no_connection();
t_reconnection();
t_read_timeout();

t_premature_conn_close_mth1();

t/11-leaks.t  view on Meta::CPAN

use 5.008000;
use strict;
use warnings;

use Test::More;
use AnyEvent::Redis::RipeRedis qw( :err_codes );
use Scalar::Util qw( weaken );
use version 0.77;
require 't/test_helper.pl';

BEGIN {
  eval "use Test::LeakTrace 0.15";
  if ( $@ ) {
    plan skip_all => 'Test::LeakTrace 0.15 required for this test';
  }
}

t/11-leaks.t  view on Meta::CPAN

  my $script = <<LUA
return ARGV[1]
LUA
;
  no_leaks_ok {
    ev_loop(
      sub {
        my $cv = shift;

        my $redis = $redis;
        weaken( $redis );

        $redis->eval_cached( $script, 0, 42,
          { on_done => sub {
              my $reply = shift;

              $redis->eval_cached( $script, 0, 57,
                {
                  on_done => sub {
                    my $reply = shift;
                    $cv->send();

t/11-leaks.t  view on Meta::CPAN

  my $script = <<LUA
return ARGV[1]
LUA
;
  no_leaks_ok {
    ev_loop(
      sub {
        my $cv = shift;

        my $redis = $redis;
        weaken( $redis );

        $redis->eval_cached( $script, 0, 42,
          sub {
            my $reply = shift;

            if ( @_ ) {
              my $err_msg = shift;

              diag( $err_msg );



( run in 0.564 second using v1.01-cache-2.11-cpan-65fba6d93b7 )