AnyEvent-Redis-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

use 5.008000;
use strict;
use warnings;

package AnyEvent::Redis::RipeRedis;

use base qw( Exporter );

our $VERSION = '1.62';

use AnyEvent;
use AnyEvent::Handle;
use Encode qw( find_encoding is_utf8 );
use Scalar::Util qw( looks_like_number weaken );
use Digest::SHA qw( sha1_hex );
use Carp qw( croak );

my %ERROR_CODES;

BEGIN {
  %ERROR_CODES = (
    E_CANT_CONN                  => 1,
    E_LOADING_DATASET            => 2,
    E_IO                         => 3,
    E_CONN_CLOSED_BY_REMOTE_HOST => 4,
    E_CONN_CLOSED_BY_CLIENT      => 5,
    E_NO_CONN                    => 6,
    E_OPRN_ERROR                 => 9,
    E_UNEXPECTED_DATA            => 10,
    E_NO_SCRIPT                  => 11,
    E_READ_TIMEDOUT              => 12,
    E_BUSY                       => 13,
    E_MASTER_DOWN                => 14,
    E_MISCONF                    => 15,
    E_READONLY                   => 16,
    E_OOM                        => 17,
    E_EXEC_ABORT                 => 18,
    E_NO_AUTH                    => 19,
    E_WRONG_TYPE                 => 20,
    E_NO_REPLICAS                => 21,
    E_BUSY_KEY                   => 22,
    E_CROSS_SLOT                 => 23,
    E_TRY_AGAIN                  => 24,
    E_ASK                        => 25,
    E_MOVED                      => 26,
    E_CLUSTER_DOWN               => 27,
  );
}

BEGIN {
  our @EXPORT_OK   = keys %ERROR_CODES;
  our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK, );
}

use constant {
  # Default values
  D_HOST     => 'localhost',
  D_PORT     => 6379,
  D_DB_INDEX => 0,

  %ERROR_CODES,

  # Operation status
  S_NEED_PERFORM => 1,
  S_IN_PROGRESS  => 2,
  S_IS_DONE      => 3,

  # String terminator
  EOL     => "\r\n",
  EOL_LEN => 2,
};

my %SUB_CMDS = (

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

          if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
            $self->_select_db();
          }
        }
        elsif ( $self->{_auth_st} == S_NEED_PERFORM ) {
          $self->_auth();
        }
      }
    }
    elsif ( $self->{_lazy_conn_st} ) {
      $self->{_lazy_conn_st} = 0;
      $self->_connect();
    }
    elsif ( $self->{reconnect} ) {
      if ( defined $self->{min_reconnect_interval}
        && $self->{min_reconnect_interval} > 0 )
      {
        unless ( defined $self->{_reconnect_timer} ) {
          $self->{_reconnect_timer} = AE::timer( $self->{min_reconnect_interval}, 0,
            sub {
              undef $self->{_reconnect_timer};
              $self->_connect();
            }
          );
        }
      }
      else {
        $self->_connect();
      }
    }
    else {
      AE::postpone(
        sub {
          $self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted:"
              . ' No connection to the server.', E_NO_CONN );
        }
      );

      return;
    }

    push( @{ $self->{_input_queue} }, $cmd );

    return;
  }

  $self->_push_write( $cmd );

  return;
}

sub _push_write {
  my $self = shift;
  my $cmd  = shift;

  my $cmd_str = '';
  foreach my $token ( $cmd->{kwd}, @{ $cmd->{args} } ) {
    unless ( defined $token ) {
      $token = '';
    }
    elsif ( defined $self->{encoding} && is_utf8( $token ) ) {
      $token = $self->{encoding}->encode( $token );
    }
    $cmd_str .= '$' . length( $token ) . EOL . $token . EOL;
  }
  $cmd_str = '*' . ( scalar( @{ $cmd->{args} } ) + 1 ) . EOL . $cmd_str;

  my $handle = $self->{_handle};
  if ( defined $self->{read_timeout} && !@{ $self->{_processing_queue} } ) {
    $handle->rtimeout_reset();
    $handle->rtimeout( $self->{read_timeout} );
  }
  push( @{ $self->{_processing_queue} }, $cmd );

  $handle->push_write( $cmd_str );

  return;
}

sub _auth {
  my $self = shift;

  weaken( $self );

  $self->{_auth_st} = S_IN_PROGRESS;

  $self->_push_write(
    { kwd  => 'auth',
      args => [ $self->{password} ],

      on_done => sub {
        $self->{_auth_st} = S_IS_DONE;

        if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
          $self->_select_db();
        }
        else {
          $self->{_ready_to_write} = 1;
          $self->_flush_input_queue();
        }
      },

      on_error => sub {
        $self->{_auth_st} = S_NEED_PERFORM;
        $self->_abort_all( @_ );
      },
    }
  );

  return;
}

sub _select_db {
  my $self = shift;

  weaken( $self );

  $self->{_select_db_st} = S_IN_PROGRESS;

  $self->_push_write(
    { kwd  => 'select',

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN


  $redis->get( 'bar',
    { on_done => sub {
        my $reply = shift;

        print "$reply\n";
      },

      on_error => sub {
        my $err_msg  = shift;
        my $err_code = shift;

        warn "[$err_code] $err_msg\n";
      },
    }
  );

  $redis->quit(
    sub {
      my $reply = shift;

      if (@_) {
        my $err_msg  = shift;
        my $err_code = shift;

        warn "[$err_code] $err_msg\n";
      }

      $cv->send();
    }
  );

  $cv->recv();

=head1 DESCRIPTION

MODULE IS DEPRECATED. Use L<AnyEvent::RipeRedis> instead. The interface of
L<AnyEvent::RipeRedis> has several differences from interface of
AnyEvent::Redis::RipeRedis. For more information see documentation.

AnyEvent::Redis::RipeRedis is the flexible non-blocking Redis client with
reconnect feature. The client supports subscriptions, transactions and connection
via UNIX-socket.

Requires Redis 1.2 or higher, and any supported event loop.

=head1 CONSTRUCTOR

=head2 new( %params )

  my $redis = AnyEvent::Redis::RipeRedis->new(
    host                   => 'localhost',
    port                   => 6379,
    password               => 'yourpass',
    database               => 7,
    lazy                   => 1,
    connection_timeout     => 5,
    read_timeout           => 5,
    reconnect              => 1,
    min_reconnect_interval => 5,
    encoding               => 'utf8',

    on_connect => sub {
      # handling...
    },

    on_disconnect => sub {
      # handling...
    },

    on_connect_error => sub {
      my $err_msg = shift;

      # error handling...
    },

    on_error => sub {
      my $err_msg  = shift;
      my $err_code = shift;

      # error handling...
    },
  );

=over

=item host => $host

Server hostname (default: 127.0.0.1)

=item port => $port

Server port (default: 6379)

=item password => $password

If the password is specified, the C<AUTH> command is sent to the server
after connection.

=item database => $index

Database index. If the index is specified, the client is switched to
the specified database after connection. You can also switch to another database
after connection by using C<SELECT> command. The client remembers last selected
database after reconnection.

The default database index is C<0>.

=item encoding => $encoding_name

Used for encode/decode strings at time of input/output operations.

Not set by default.

=item connection_timeout => $fractional_seconds

Specifies connection timeout. If the client could not connect to the server
after specified timeout, the C<on_error> or C<on_connect_error> callback is
called. In case when C<on_error> callback is called, C<E_CANT_CONN> error code
is passed to callback in the second argument. The timeout specifies in seconds
and can contain a fractional part.



( run in 0.738 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )