AnyEvent-Redis-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN


  # String terminator
  EOL     => "\r\n",
  EOL_LEN => 2,
};

my %SUB_CMDS = (
  subscribe  => 1,
  psubscribe => 1,
);
my %SUBUNSUB_CMDS = (
  %SUB_CMDS,
  unsubscribe  => 1,
  punsubscribe => 1,
);

my %NEED_POSTPROCESS = (
  %SUBUNSUB_CMDS,
  info   => 1,
  select => 1,
  quit   => 1,
);

my %MSG_TYPES = (
  message  => 1,
  pmessage => 1,
);

my %ERR_PREFS_MAP = (
  LOADING     => E_LOADING_DATASET,
  NOSCRIPT    => E_NO_SCRIPT,
  BUSY        => E_BUSY,
  MASTERDOWN  => E_MASTER_DOWN,
  MISCONF     => E_MISCONF,
  READONLY    => E_READONLY,
  OOM         => E_OOM,
  EXECABORT   => E_EXEC_ABORT,
  NOAUTH      => E_NO_AUTH,
  WRONGTYPE   => E_WRONG_TYPE,
  NOREPLICAS  => E_NO_REPLICAS,
  BUSYKEY     => E_BUSY_KEY,
  CROSSSLOT   => E_CROSS_SLOT,
  TRYAGAIN    => E_TRY_AGAIN,
  ASK         => E_ASK,
  MOVED       => E_MOVED,
  CLUSTERDOWN => E_CLUSTER_DOWN,
);

my %EVAL_CACHE;


# Constructor
sub new {
  my $proto  = shift;
  my %params = @_;

  my $self = ref( $proto ) ? $proto : bless {}, $proto;

  $self->{host} = $params{host} || D_HOST;
  $self->{port} = $params{port} || D_PORT;
  $self->{password} = $params{password};
  $self->{database}
      = defined $params{database} ? $params{database} : D_DB_INDEX;
  $self->{reconnect} = exists $params{reconnect} ? $params{reconnect} : 1;
  $self->{on_connect}       = $params{on_connect};
  $self->{on_disconnect}    = $params{on_disconnect};
  $self->{on_connect_error} = $params{on_connect_error};

  $self->encoding( $params{encoding} );
  $self->connection_timeout( $params{connection_timeout} );
  $self->read_timeout( $params{read_timeout} );
  $self->min_reconnect_interval( $params{min_reconnect_interval} );
  $self->on_error( $params{on_error} );

  my $hdl_params = $params{handle_params} || {};
  foreach my $name ( qw( linger autocork ) ) {
    if ( !exists $hdl_params->{$name} && defined $params{$name} ) {
      $hdl_params->{$name} = $params{$name};
    }
  }
  $self->{handle_params} = $hdl_params;

  $self->{_handle}           = undef;
  $self->{_connected}        = 0;
  $self->{_lazy_conn_st}     = $params{lazy};
  $self->{_auth_st}          = S_NEED_PERFORM;
  $self->{_select_db_st}     = S_NEED_PERFORM;
  $self->{_ready_to_write}   = 0;
  $self->{_input_queue}      = [];
  $self->{_temp_queue}       = [];
  $self->{_processing_queue} = [];
  $self->{_txn_lock}         = 0;
  $self->{_channels}         = {};
  $self->{_channel_cnt}      = 0;
  $self->{_reconnect_timer}  = undef;

  unless ( $self->{_lazy_conn_st} ) {
    $self->_connect();
  }

  return $self;
}

sub multi {
  my $self = shift;
  my $cmd  = $self->_prepare_cmd( 'multi', [ @_ ] );

  $self->{_txn_lock} = 1;
  $self->_execute_cmd( $cmd );

  return;
}

sub exec {
  my $self = shift;
  my $cmd  = $self->_prepare_cmd( 'exec', [ @_ ] );

  $self->{_txn_lock} = 0;
  $self->_execute_cmd( $cmd );

  return;

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

        $self->{$name} = $seconds;
      }

      return $self->{$name};
    }
  }

  foreach my $name ( qw( reconnect on_connect on_disconnect on_connect_error ) ) {
    *{$name} = sub {
      my $self = shift;

      if ( @_ ) {
        $self->{$name} = shift;
      }

      return $self->{$name};
    }
  }
}

sub _connect {
  my $self = shift;

  $self->{_handle} = AnyEvent::Handle->new(
    %{ $self->{handle_params} },
    connect          => [ $self->{host}, $self->{port} ],
    on_prepare       => $self->_get_on_prepare(),
    on_connect       => $self->_get_on_connect(),
    on_connect_error => $self->_get_on_connect_error(),
    on_rtimeout      => $self->_get_on_rtimeout(),
    on_eof           => $self->_get_on_eof(),
    on_error         => $self->_get_handle_on_error(),
    on_read          => $self->_get_on_read(),
  );

  return;
}

sub _get_on_prepare {
  my $self = shift;

  weaken( $self );

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }

    return;
  };
}

sub _get_on_connect {
  my $self = shift;

  weaken( $self );

  return sub {
    $self->{_connected} = 1;

    unless ( defined $self->{password} ) {
      $self->{_auth_st} = S_IS_DONE;
    }
    if ( $self->{database} == 0 ) {
      $self->{_select_db_st} = S_IS_DONE;
    }

    if ( $self->{_auth_st} == S_NEED_PERFORM ) {
      $self->_auth();
    }
    elsif ( $self->{_select_db_st} == S_NEED_PERFORM ) {
      $self->_select_db();
    }
    else {
      $self->{_ready_to_write} = 1;
      $self->_flush_input_queue();
    }

    if ( defined $self->{on_connect} ) {
      $self->{on_connect}->();
    }
  };
}

sub _get_on_connect_error {
  my $self = shift;

  weaken( $self );

  return sub {
    my $err_msg = pop;

    $self->_disconnect(
      "Can't connect to $self->{host}:$self->{port}: $err_msg",
      E_CANT_CONN
    );
  };
}

sub _get_on_rtimeout {
  my $self = shift;

  weaken( $self );

  return sub {
    if ( @{ $self->{_processing_queue} } ) {
      $self->_disconnect( 'Read timed out.', E_READ_TIMEDOUT );
    }
    else {
      $self->{_handle}->rtimeout( undef );
    }
  };
}

sub _get_on_eof {
  my $self = shift;

  weaken( $self );

  return sub {
    $self->_disconnect( 'Connection closed by remote host.',

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

      }
    }
    else {
      AE::postpone(
        sub {
          $self->_process_cmd_error( $cmd, "Operation \"$cmd->{kwd}\" aborted:"
              . ' No connection to the server.', E_NO_CONN );
        }
      );

      return;
    }

    push( @{ $self->{_input_queue} }, $cmd );

    return;
  }

  $self->_push_write( $cmd );

  return;
}

sub _push_write {
  my $self = shift;
  my $cmd  = shift;

  my $cmd_str = '';
  foreach my $token ( $cmd->{kwd}, @{ $cmd->{args} } ) {
    unless ( defined $token ) {
      $token = '';
    }
    elsif ( defined $self->{encoding} && is_utf8( $token ) ) {
      $token = $self->{encoding}->encode( $token );
    }
    $cmd_str .= '$' . length( $token ) . EOL . $token . EOL;
  }
  $cmd_str = '*' . ( scalar( @{ $cmd->{args} } ) + 1 ) . EOL . $cmd_str;

  my $handle = $self->{_handle};
  if ( defined $self->{read_timeout} && !@{ $self->{_processing_queue} } ) {
    $handle->rtimeout_reset();
    $handle->rtimeout( $self->{read_timeout} );
  }
  push( @{ $self->{_processing_queue} }, $cmd );

  $handle->push_write( $cmd_str );

  return;
}

sub _auth {
  my $self = shift;

  weaken( $self );

  $self->{_auth_st} = S_IN_PROGRESS;

  $self->_push_write(
    { kwd  => 'auth',
      args => [ $self->{password} ],

      on_done => sub {
        $self->{_auth_st} = S_IS_DONE;

        if ( $self->{_select_db_st} == S_NEED_PERFORM ) {
          $self->_select_db();
        }
        else {
          $self->{_ready_to_write} = 1;
          $self->_flush_input_queue();
        }
      },

      on_error => sub {
        $self->{_auth_st} = S_NEED_PERFORM;
        $self->_abort_all( @_ );
      },
    }
  );

  return;
}

sub _select_db {
  my $self = shift;

  weaken( $self );

  $self->{_select_db_st} = S_IN_PROGRESS;

  $self->_push_write(
    { kwd  => 'select',
      args => [ $self->{database} ],

      on_done => sub {
        $self->{_select_db_st}   = S_IS_DONE;
        $self->{_ready_to_write} = 1;
        $self->_flush_input_queue();
      },

      on_error => sub {
        $self->{_select_db_st} = S_NEED_PERFORM;
        $self->_abort_all( @_ );
      },
    }
  );

  return;
}

sub _flush_input_queue {
  my $self = shift;

  $self->{_temp_queue}  = $self->{_input_queue};
  $self->{_input_queue} = [];

  while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
    $self->_push_write( $cmd );
  }

lib/AnyEvent/Redis/RipeRedis.pm  view on Meta::CPAN

      @{ $self->{_processing_queue} },
      @{ $self->{_temp_queue} },
      @{ $self->{_input_queue} },
    );

    foreach my $cmd ( @unfin_cmds ) {
      warn "Operation \"$cmd->{kwd}\" aborted:"
          . " Client object destroyed prematurely.\n";
    }
  }

  return;
}


package AnyEvent::Redis::RipeRedis::Error;

# Constructor
sub new {
  my $class    = shift;
  my $err_msg  = shift;
  my $err_code = shift;

  my $self = bless {}, $class;

  $self->{message} = $err_msg;
  $self->{code}    = $err_code;

  return $self;
}

sub message {
  my $self = shift;

  return $self->{message};
}

sub code {
  my $self = shift;

  return $self->{code};
}

1;
__END__

=head1 NAME

AnyEvent::Redis::RipeRedis - DEPRECATED. Use AnyEvent::RipeRedis instead

=head1 SYNOPSIS

  use AnyEvent;
  use AnyEvent::Redis::RipeRedis;

  my $cv = AE::cv();

  my $redis = AnyEvent::Redis::RipeRedis->new(
    host     => 'localhost',
    port     => 6379,
    password => 'yourpass',
  );

  $redis->incr( 'foo',
    sub {
      my $reply = shift;

      if (@_) {
        my $err_msg  = shift;
        my $err_code = shift;

        warn "[$err_code] $err_msg\n";

        return;
      }

      print "$reply\n";
    }
  );

  $redis->set( 'bar', 'string',
    { on_error => sub {
        my $err_msg  = shift;
        my $err_code = shift;

        warn "[$err_code] $err_msg\n";
      }
    }
  );

  $redis->get( 'bar',
    { on_done => sub {
        my $reply = shift;

        print "$reply\n";
      },

      on_error => sub {
        my $err_msg  = shift;
        my $err_code = shift;

        warn "[$err_code] $err_msg\n";
      },
    }
  );

  $redis->quit(
    sub {
      my $reply = shift;

      if (@_) {
        my $err_msg  = shift;
        my $err_code = shift;

        warn "[$err_code] $err_msg\n";
      }

      $cv->send();
    }
  );

  $cv->recv();

=head1 DESCRIPTION

MODULE IS DEPRECATED. Use L<AnyEvent::RipeRedis> instead. The interface of
L<AnyEvent::RipeRedis> has several differences from interface of
AnyEvent::Redis::RipeRedis. For more information see documentation.

AnyEvent::Redis::RipeRedis is the flexible non-blocking Redis client with
reconnect feature. The client supports subscriptions, transactions and connection
via UNIX-socket.

Requires Redis 1.2 or higher, and any supported event loop.

=head1 CONSTRUCTOR

=head2 new( %params )

  my $redis = AnyEvent::Redis::RipeRedis->new(
    host                   => 'localhost',
    port                   => 6379,
    password               => 'yourpass',
    database               => 7,
    lazy                   => 1,
    connection_timeout     => 5,
    read_timeout           => 5,
    reconnect              => 1,
    min_reconnect_interval => 5,
    encoding               => 'utf8',

    on_connect => sub {
      # handling...
    },

    on_disconnect => sub {
      # handling...
    },

    on_connect_error => sub {
      my $err_msg = shift;

      # error handling...
    },

    on_error => sub {
      my $err_msg  = shift;
      my $err_code = shift;

      # error handling...
    },
  );

=over

=item host => $host

Server hostname (default: 127.0.0.1)

=item port => $port

Server port (default: 6379)

=item password => $password

If the password is specified, the C<AUTH> command is sent to the server
after connection.

=item database => $index

Database index. If the index is specified, the client is switched to
the specified database after connection. You can also switch to another database
after connection by using C<SELECT> command. The client remembers last selected
database after reconnection.

The default database index is C<0>.

=item encoding => $encoding_name

Used for encode/decode strings at time of input/output operations.

Not set by default.

=item connection_timeout => $fractional_seconds

Specifies connection timeout. If the client could not connect to the server
after specified timeout, the C<on_error> or C<on_connect_error> callback is
called. In case when C<on_error> callback is called, C<E_CANT_CONN> error code
is passed to callback in the second argument. The timeout specifies in seconds
and can contain a fractional part.

  connection_timeout => 10.5,

By default the client use kernel's connection timeout.

=item read_timeout => $fractional_seconds

Specifies read timeout. If the client could not receive a reply from the
server after specified timeout, the client close connection and call the
C<on_error> callback with the C<E_READ_TIMEDOUT> error code. The timeout is
specifies in seconds and can contain a fractional part.

  read_timeout => 3.5,

Not set by default.

=item lazy => $boolean

If enabled, the connection establishes at time when you will send the first
command to the server. By default the connection establishes after calling of
the C<new> method.

Disabled by default.

=item reconnect => $boolean

If the connection to the Redis server was lost and the parameter C<reconnect> is
TRUE, the client try to restore the connection on a next command executuion
unless C<min_reconnect_interval> is specified. The client try to reconnect only
once and if it fails, is called the C<on_error> callback. If you need several
attempts of the reconnection, just retry a command from the C<on_error>
callback as many times, as you need. This feature made the client more
responsive.

Enabled by default.



( run in 0.934 second using v1.01-cache-2.11-cpan-5a3173703d6 )