AnyEvent-Redis-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

  READONLY    => E_READONLY,
  OOM         => E_OOM,
  EXECABORT   => E_EXEC_ABORT,
  NOAUTH      => E_NO_AUTH,
  WRONGTYPE   => E_WRONG_TYPE,
  NOREPLICAS  => E_NO_REPLICAS,
  BUSYKEY     => E_BUSY_KEY,
  CROSSSLOT   => E_CROSS_SLOT,
  TRYAGAIN    => E_TRY_AGAIN,
  ASK         => E_ASK,
  MOVED       => E_MOVED,
  CLUSTERDOWN => E_CLUSTER_DOWN,
);

my %EVAL_CACHE;


# Constructor
sub new {
  my $proto  = shift;
  my %params = @_;

  my $self = ref( $proto ) ? $proto : bless {}, $proto;

  $self->{host} = $params{host} || D_HOST;
  $self->{port} = $params{port} || D_PORT;
  $self->{password} = $params{password};
  $self->{database}
      = defined $params{database} ? $params{database} : D_DB_INDEX;
  $self->{reconnect} = exists $params{reconnect} ? $params{reconnect} : 1;
  $self->{on_connect}       = $params{on_connect};
  $self->{on_disconnect}    = $params{on_disconnect};
  $self->{on_connect_error} = $params{on_connect_error};

  $self->encoding( $params{encoding} );
  $self->connection_timeout( $params{connection_timeout} );
  $self->read_timeout( $params{read_timeout} );
  $self->min_reconnect_interval( $params{min_reconnect_interval} );
  $self->on_error( $params{on_error} );

  my $hdl_params = $params{handle_params} || {};
  foreach my $name ( qw( linger autocork ) ) {
    if ( !exists $hdl_params->{$name} && defined $params{$name} ) {
      $hdl_params->{$name} = $params{$name};
    }
  }
  $self->{handle_params} = $hdl_params;

  $self->{_handle}           = undef;
  $self->{_connected}        = 0;
  $self->{_lazy_conn_st}     = $params{lazy};
  $self->{_auth_st}          = S_NEED_PERFORM;
  $self->{_select_db_st}     = S_NEED_PERFORM;
  $self->{_ready_to_write}   = 0;
  $self->{_input_queue}      = [];
  $self->{_temp_queue}       = [];
  $self->{_processing_queue} = [];
  $self->{_txn_lock}         = 0;
  $self->{_channels}         = {};
  $self->{_channel_cnt}      = 0;
  $self->{_reconnect_timer}  = undef;

  unless ( $self->{_lazy_conn_st} ) {
    $self->_connect();
  }

  return $self;
}

sub multi {
  my $self = shift;
  my $cmd  = $self->_prepare_cmd( 'multi', [ @_ ] );

  $self->{_txn_lock} = 1;
  $self->_execute_cmd( $cmd );

  return;
}

sub exec {
  my $self = shift;
  my $cmd  = $self->_prepare_cmd( 'exec', [ @_ ] );

  $self->{_txn_lock} = 0;
  $self->_execute_cmd( $cmd );

  return;
}

sub eval_cached {
  my $self = shift;
  my $cmd  = $self->_prepare_cmd( 'evalsha', [ @_ ] );

  $cmd->{script} = $cmd->{args}[0];
  unless ( exists $EVAL_CACHE{ $cmd->{script} } ) {
    $EVAL_CACHE{ $cmd->{script} } = sha1_hex( $cmd->{script} );
  }
  $cmd->{args}[0] = $EVAL_CACHE{ $cmd->{script} };

  $self->_execute_cmd( $cmd );

  return;
}

sub disconnect {
  my $self = shift;

  $self->_disconnect();

  return;
}

sub encoding {
  my $self = shift;

  if ( @_ ) {
    my $enc = shift;

    if ( defined $enc ) {
      $self->{encoding} = find_encoding( $enc );

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

      }

      $self->_process_reply( $reply, $err_code );
    }

    return;
  };
}

sub _prepare_cmd {
  my $self = shift;
  my $kwd  = shift;
  my $args = shift;

  my $cmd;
  if ( ref( $args->[-1] ) eq 'HASH' ) {
    $cmd = pop @{$args};
  }
  else {
    $cmd = {};
    if ( ref( $args->[-1] ) eq 'CODE' ) {
      if ( exists $SUB_CMDS{$kwd} ) {
        $cmd->{on_message} = pop @{$args};
      }
      else {
        $cmd->{on_reply} = pop @{$args};
      }
    }
  }
  $cmd->{kwd}  = $kwd;
  $cmd->{args} = $args;

  return $cmd;
}

sub _execute_cmd {
  my $self = shift;
  my $cmd  = shift;

  unless ( $self->{_ready_to_write} ) {
    if ( defined $self->{_handle} ) {
      if ( $self->{_connected} ) {
        if ( $self->{_auth_st} == S_IS_DONE ) {
          if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
            $self->_select_db();
          }
        }
        elsif ( $self->{_auth_st} == S_NEED_PERFORM ) {
          $self->_auth();
        }
      }
    }
    elsif ( $self->{_lazy_conn_st} ) {
      $self->{_lazy_conn_st} = 0;
      $self->_connect();
    }
    elsif ( $self->{reconnect} ) {
      if ( defined $self->{min_reconnect_interval}
        && $self->{min_reconnect_interval} > 0 )
      {
        unless ( defined $self->{_reconnect_timer} ) {
          $self->{_reconnect_timer} = AE::timer( $self->{min_reconnect_interval}, 0,
            sub {
              undef $self->{_reconnect_timer};
              $self->_connect();
            }
          );
        }
      }
      else {
        $self->_connect();
      }
    }
    else {
      AE::postpone(
        sub {
          $self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted:"
              . ' No connection to the server.', E_NO_CONN );
        }
      );

      return;
    }

    push( @{ $self->{_input_queue} }, $cmd );

    return;
  }

  $self->_push_write( $cmd );

  return;
}

sub _push_write {
  my $self = shift;
  my $cmd  = shift;

  my $cmd_str = '';
  foreach my $token ( $cmd->{kwd}, @{ $cmd->{args} } ) {
    unless ( defined $token ) {
      $token = '';
    }
    elsif ( defined $self->{encoding} && is_utf8( $token ) ) {
      $token = $self->{encoding}->encode( $token );
    }
    $cmd_str .= '$' . length( $token ) . EOL . $token . EOL;
  }
  $cmd_str = '*' . ( scalar( @{ $cmd->{args} } ) + 1 ) . EOL . $cmd_str;

  my $handle = $self->{_handle};
  if ( defined $self->{read_timeout} && !@{ $self->{_processing_queue} } ) {
    $handle->rtimeout_reset();
    $handle->rtimeout( $self->{read_timeout} );
  }
  push( @{ $self->{_processing_queue} }, $cmd );

  $handle->push_write( $cmd_str );

  return;
}

sub _auth {
  my $self = shift;



( run in 0.742 second using v1.01-cache-2.11-cpan-3782747c604 )