AnyEvent-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

use strict;
use warnings;
use base qw( Exporter );

our $VERSION = '0.48';

use AnyEvent::RipeRedis::Error;

use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use Digest::SHA qw( sha1_hex );
use Carp qw( croak );

my %ERROR_CODES;

BEGIN {
  %ERROR_CODES = %AnyEvent::RipeRedis::Error::ERROR_CODES;
  our @EXPORT_OK   = keys %ERROR_CODES;
  our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

    on_error         => $self->_create_on_handle_error,
    on_read          => $self->_create_on_read,
  );

  return;
}

sub _create_on_prepare {
  my $self = shift;

  weaken($self);

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }

    return;
  };
}

sub _create_on_connect {
  my $self = shift;

  weaken($self);

  return sub {
    $self->{_connected} = 1;

    unless ( defined $self->{password} ) {
      $self->{_auth_state} = S_DONE;
    }
    if ( $self->{database} == 0 ) {
      $self->{_db_selection_state} = S_DONE;
    }

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN


    if ( defined $self->{on_connect} ) {
      $self->{on_connect}->();
    }
  };
}

sub _create_on_connect_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error(
      "Can't connect to $self->{host}:$self->{port}: $err_msg",
      E_CANT_CONN
    );
    $self->_disconnect($err);
  };
}

sub _create_on_rtimeout {
  my $self = shift;

  weaken($self);

  return sub {
    if ( @{ $self->{_processing_queue} } ) {
      my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
      $self->_disconnect($err);
    }
    else {
      $self->{_handle}->rtimeout(undef);
    }
  };
}

sub _create_on_eof {
  my $self = shift;

  weaken($self);

  return sub {
    my $err = _new_error( 'Connection closed by remote host.',
        E_CONN_CLOSED_BY_REMOTE_HOST );
    $self->_disconnect($err);
  };
}

sub _create_on_handle_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error( $err_msg, E_IO );
    $self->_disconnect($err);
  };
}

sub _create_on_read {
  my $self = shift;

  weaken($self);

  my $str_len;
  my @bufs;
  my $bufs_num = 0;

  return sub {
    my $handle = shift;

    MAIN: while (1) {
      return if $handle->destroyed;

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

      : split( m/_/, lc($cmd_name) );

  my $cmd = {
    name => $cmd_name,
    kwds => \@kwds,
    args => $args,
    %{$cbs},
  };

  unless ( defined $cmd->{on_reply} ) {
    weaken($self);

    $cmd->{on_reply} = sub {
      my $err = $_[1];

      if ( defined $err ) {
        $self->{on_error}->($err);
        return;
      }
    };
  }

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

    }
    elsif ( $self->{lazy} ) {
      undef $self->{lazy};
      $self->_connect;
    }
    elsif ( $self->{reconnect} ) {
      if ( defined $self->{reconnect_interval}
        && $self->{reconnect_interval} > 0 )
      {
        unless ( defined $self->{_reconnect_timer} ) {
          weaken($self);

          $self->{_reconnect_timer} = AE::timer(
            $self->{reconnect_interval}, 0,
            sub {
              undef $self->{_reconnect_timer};
              $self->_connect;
            }
          );
        }
      }

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN


  push( @{ $self->{_processing_queue} }, $cmd );
  $handle->push_write($cmd_str);

  return;
}

sub _auth {
  my $self = shift;

  weaken($self);
  $self->{_auth_state} = S_IN_PROGRESS;

  $self->_push_write(
    { name => 'auth',
      kwds => ['auth'],
      args => [ $self->{password} ],

      on_reply => sub {
        my $err = $_[1];

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

      },
    }
  );

  return;
}

sub _select_database {
  my $self = shift;

  weaken($self);
  $self->{_db_selection_state} = S_IN_PROGRESS;

  $self->_push_write(
    { name => 'select',
      kwds => ['select'],
      args => [ $self->{database} ],

      on_reply => sub {
        my $err = $_[1];

t/08-eval.t  view on Meta::CPAN

use 5.008000;
use strict;
use warnings;

use Test::More;
use AnyEvent::RipeRedis qw( :err_codes );
use Digest::SHA qw( sha1_hex );
use Scalar::Util qw( weaken );
use version 0.77;
require 't/test_helper.pl';

my $server_info = run_redis_instance();
if ( !defined $server_info ) {
  plan skip_all => 'redis-server is required for this test';
}
my $redis = AnyEvent::RipeRedis->new(
  host => $server_info->{host},
  port => $server_info->{port},

t/08-eval.t  view on Meta::CPAN

    return ARGV[1]
  };

  my @t_replies;

  ev_loop(
    sub {
      my $cv = shift;

      my $redis = $redis;
      weaken( $redis );

      $redis->eval_cached( $script, 0, 42,
        sub {
          my $reply = shift;
          my $err   = shift;

          if ( defined $err ) {
            diag( $err->message );
            $cv->send;

t/09-conn-errors.t  view on Meta::CPAN

use 5.008000;
use strict;
use warnings;

use Test::More tests => 40;
use AnyEvent::RipeRedis qw( :err_codes );
use Net::EmptyPort qw( empty_port );
use Scalar::Util qw( weaken );
require 't/test_helper.pl';

t_cant_connect();
t_no_connection();
t_reconnection();
t_read_timeout();
t_premature_disconnect();
t_premature_destroy();
t_subscription_lost();

t/11-leaks.t  view on Meta::CPAN

use 5.008000;
use strict;
use warnings;

use Test::More;
use AnyEvent::RipeRedis qw( :err_codes );
use Scalar::Util qw( weaken );
use version 0.77;
require 't/test_helper.pl';

BEGIN {
  eval "use Test::LeakTrace 0.15";
  if ( $@ ) {
    plan skip_all => 'Test::LeakTrace 0.15 required for this test';
  }
}

t/11-leaks.t  view on Meta::CPAN

  my $script = q{
    return ARGV[1]
  };

  no_leaks_ok {
    ev_loop(
      sub {
        my $cv = shift;

        my $redis = $redis;
        weaken( $redis );

        $redis->eval_cached( $script, 0, 42,
          sub {
            my $reply = shift;
            my $err   = shift;

            if ( defined $err ) {
              diag( $err->message );
              $cv->send;



( run in 0.615 second using v1.01-cache-2.11-cpan-65fba6d93b7 )