AnyEvent-Redis-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

use 5.008000;
use strict;
use warnings;

package AnyEvent::Redis::RipeRedis;

use base qw( Exporter );

our $VERSION = '1.62';

use AnyEvent;
use AnyEvent::Handle;
use Encode qw( find_encoding is_utf8 );
use Scalar::Util qw( looks_like_number weaken );
use Digest::SHA qw( sha1_hex );
use Carp qw( croak );

my %ERROR_CODES;

BEGIN {
  %ERROR_CODES = (
    E_CANT_CONN                  => 1,
    E_LOADING_DATASET            => 2,
    E_IO                         => 3,
    E_CONN_CLOSED_BY_REMOTE_HOST => 4,
    E_CONN_CLOSED_BY_CLIENT      => 5,
    E_NO_CONN                    => 6,
    E_OPRN_ERROR                 => 9,
    E_UNEXPECTED_DATA            => 10,
    E_NO_SCRIPT                  => 11,
    E_READ_TIMEDOUT              => 12,
    E_BUSY                       => 13,
    E_MASTER_DOWN                => 14,
    E_MISCONF                    => 15,
    E_READONLY                   => 16,
    E_OOM                        => 17,
    E_EXEC_ABORT                 => 18,
    E_NO_AUTH                    => 19,
    E_WRONG_TYPE                 => 20,
    E_NO_REPLICAS                => 21,
    E_BUSY_KEY                   => 22,
    E_CROSS_SLOT                 => 23,
    E_TRY_AGAIN                  => 24,
    E_ASK                        => 25,
    E_MOVED                      => 26,
    E_CLUSTER_DOWN               => 27,
  );
}

BEGIN {
  our @EXPORT_OK   = keys %ERROR_CODES;
  our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK, );
}

use constant {
  # Default values
  D_HOST     => 'localhost',
  D_PORT     => 6379,
  D_DB_INDEX => 0,

  %ERROR_CODES,

  # Operation status
  S_NEED_PERFORM => 1,
  S_IN_PROGRESS  => 2,
  S_IS_DONE      => 3,

  # String terminator
  EOL     => "\r\n",
  EOL_LEN => 2,
};

my %SUB_CMDS = (
  subscribe  => 1,

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN


  return $self->{on_error};
}

sub selected_database {
  my $self = shift;

  return $self->{database};
}

# Generate additional methods and accessors
{
  no strict 'refs';

  foreach my $kwd ( keys %SUBUNSUB_CMDS ) {
    *{$kwd} = sub {
      my $self = shift;
      my $cmd  = $self->_prepare_cmd( $kwd, [ @_ ] );

      if ( exists $SUB_CMDS{ $cmd->{kwd} } && !defined $cmd->{on_message} ) {
        croak "\"on_message\" callback must be specified";
      }

      if ( $self->{_txn_lock} ) {
        AE::postpone(
          sub {
            $self->_process_cmd_error( $cmd,
                "Command \"$cmd->{kwd}\" not allowed after \"multi\" command."
                    . ' First, the transaction must be finalized.',
                E_OPRN_ERROR );
          }
        );

        return;
      }

      $cmd->{replies_cnt} = scalar @{ $cmd->{args} };

      if ( defined $cmd->{on_done} ) {
        my $on_done = $cmd->{on_done};

        $cmd->{on_done} = sub {
          $on_done->( @{ $_[0] } );
        }
      }

      $self->_execute_cmd( $cmd );

      return;
    },
  }

  foreach my $name ( qw( connection_timeout read_timeout min_reconnect_interval ) ) {
    *{$name} = sub {
      my $self = shift;

      if ( @_ ) {
        my $seconds = shift;

        if ( defined $seconds
          && ( !looks_like_number($seconds) || $seconds < 0 ) )
        {
          croak "\"$name\" must be a positive number";
        }
        $self->{$name} = $seconds;
      }

      return $self->{$name};
    }
  }

  foreach my $name ( qw( reconnect on_connect on_disconnect on_connect_error ) ) {
    *{$name} = sub {
      my $self = shift;

      if ( @_ ) {
        $self->{$name} = shift;
      }

      return $self->{$name};
    }
  }
}

sub _connect {
  my $self = shift;

  $self->{_handle} = AnyEvent::Handle->new(
    %{ $self->{handle_params} },
    connect          => [ $self->{host}, $self->{port} ],
    on_prepare       => $self->_get_on_prepare(),
    on_connect       => $self->_get_on_connect(),
    on_connect_error => $self->_get_on_connect_error(),
    on_rtimeout      => $self->_get_on_rtimeout(),
    on_eof           => $self->_get_on_eof(),
    on_error         => $self->_get_handle_on_error(),
    on_read          => $self->_get_on_read(),
  );

  return;
}

sub _get_on_prepare {
  my $self = shift;

  weaken( $self );

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }

    return;
  };
}

sub _get_on_connect {
  my $self = shift;

  weaken( $self );



( run in 0.584 second using v1.01-cache-2.11-cpan-39bf76dae61 )