AnyEvent-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

  %SUB_CMDS,
  unsubscribe  => 1,
  punsubscribe => 1,
);

my %MESSAGE_TYPES = (
  message  => 1,
  pmessage => 1,
);

my %NEED_PREPROCESS = (
  multi       => 1,
  exec        => 1,
  discard     => 1,
  eval_cached => 1,
  %SUBUNSUB_CMDS,
);

my %NEED_POSTPROCESS = (
  info         => 1,
  cluster_info => 1,
  select       => 1,
  quit         => 1,
);

my %ERR_PREFS_MAP = (
  LOADING     => E_LOADING_DATASET,
  NOSCRIPT    => E_NO_SCRIPT,
  BUSY        => E_BUSY,
  MASTERDOWN  => E_MASTER_DOWN,
  MISCONF     => E_MISCONF,
  READONLY    => E_READONLY,
  OOM         => E_OOM,
  EXECABORT   => E_EXEC_ABORT,
  NOAUTH      => E_NO_AUTH,
  WRONGTYPE   => E_WRONG_TYPE,
  NOREPLICAS  => E_NO_REPLICAS,
  BUSYKEY     => E_BUSY_KEY,
  CROSSSLOT   => E_CROSS_SLOT,
  TRYAGAIN    => E_TRY_AGAIN,
  ASK         => E_ASK,
  MOVED       => E_MOVED,
  CLUSTERDOWN => E_CLUSTER_DOWN,
  NOTBUSY     => E_NOT_BUSY,
);

my %EVAL_CACHE;


sub new {
  my $class  = shift;
  my %params = @_;

  my $self = bless {}, $class;

  $self->{host} = $params{host} || D_HOST;
  $self->{port} = $params{port} || D_PORT;
  $self->{password} = $params{password};
  $self->{database}
      = defined $params{database} ? $params{database} : D_DB_INDEX;
  $self->{utf8}          = exists $params{utf8} ? $params{utf8} : 1;
  $self->{lazy}          = $params{lazy};
  $self->{reconnect}     = exists $params{reconnect} ? $params{reconnect} : 1;
  $self->{handle_params} = $params{handle_params} || {};
  $self->{on_connect}    = $params{on_connect};
  $self->{on_disconnect} = $params{on_disconnect};

  $self->connection_timeout( $params{connection_timeout} );
  $self->read_timeout( $params{read_timeout} );
  $self->reconnect_interval( $params{reconnect_interval} );
  $self->on_error( $params{on_error} );

  $self->_reset_internals;
  $self->{_input_queue}      = [];
  $self->{_temp_queue}       = [];
  $self->{_processing_queue} = [];
  $self->{_channels}         = {};
  $self->{_channel_cnt}      = 0;
  $self->{_pchannel_cnt}     = 0;

  unless ( $self->{lazy} ) {
    $self->_connect;
  }

  return $self;
}

sub execute {
  my $self     = shift;
  my $cmd_name = shift;

  my $cmd = $self->_prepare( $cmd_name, [@_] );
  $self->_execute($cmd);

  return;
}

sub disconnect {
  my $self = shift;

  $self->_disconnect;

  return;
}

sub on_error {
  my $self = shift;

  if (@_) {
    my $on_error = shift;

    if ( defined $on_error ) {
      $self->{on_error} = $on_error;
    }
    else {
      $self->{on_error} = sub {
        my $err = shift;
        warn $err->message . "\n";
      };
    }
  }

  return $self->{on_error};
}

# Generate accessors
{
  no strict qw( refs );

  foreach my $name ( qw( host port database ) ) {
    *{$name} = sub {
      my $self = shift;
      return $self->{$name};
    }
  }

  foreach my $name ( qw( connection_timeout read_timeout
      reconnect_interval ) )
  {
    *{$name} = sub {
      my $self = shift;

      if (@_) {
        my $seconds = shift;

        if ( defined $seconds
          && ( !looks_like_number($seconds) || $seconds < 0 ) )
        {
          croak qq{"$name" must be a positive number};
        }
        $self->{$name} = $seconds;
      }

      return $self->{$name};
    };
  }

  foreach my $name ( qw( utf8 reconnect on_connect on_disconnect ) ) {
    *{$name} = sub {
      my $self = shift;

      if (@_) {
        $self->{$name} = shift;
      }

      return $self->{$name};
    };
  }
}

sub _connect {
  my $self = shift;

  $self->{_handle} = AnyEvent::Handle->new(
    %{ $self->{handle_params} },
    connect          => [ $self->{host}, $self->{port} ],
    on_prepare       => $self->_create_on_prepare,
    on_connect       => $self->_create_on_connect,
    on_connect_error => $self->_create_on_connect_error,
    on_rtimeout      => $self->_create_on_rtimeout,
    on_eof           => $self->_create_on_eof,
    on_error         => $self->_create_on_handle_error,
    on_read          => $self->_create_on_read,
  );

  return;
}

sub _create_on_prepare {
  my $self = shift;

  weaken($self);

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }

    return;
  };
}

sub _create_on_connect {
  my $self = shift;

  weaken($self);

  return sub {
    $self->{_connected} = 1;

    unless ( defined $self->{password} ) {
      $self->{_auth_state} = S_DONE;
    }
    if ( $self->{database} == 0 ) {
      $self->{_db_selection_state} = S_DONE;
    }

    if ( $self->{_auth_state} == S_NEED_DO ) {

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

    if ( @{ $self->{_processing_queue} } ) {
      my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
      $self->_disconnect($err);
    }
    else {
      $self->{_handle}->rtimeout(undef);
    }
  };
}

sub _create_on_eof {
  my $self = shift;

  weaken($self);

  return sub {
    my $err = _new_error( 'Connection closed by remote host.',
        E_CONN_CLOSED_BY_REMOTE_HOST );
    $self->_disconnect($err);
  };
}

sub _create_on_handle_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error( $err_msg, E_IO );
    $self->_disconnect($err);
  };
}

sub _create_on_read {
  my $self = shift;

  weaken($self);

  my $str_len;
  my @bufs;
  my $bufs_num = 0;

  return sub {
    my $handle = shift;

    MAIN: while (1) {
      return if $handle->destroyed;

      my $reply;
      my $err_code;

      if ( defined $str_len ) {
        if ( length( $handle->{rbuf} ) < $str_len + EOL_LENGTH ) {
          return;
        }

        $reply = substr( $handle->{rbuf}, 0, $str_len, '' );
        substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );
        if ( $self->{utf8} ) {
          utf8::decode($reply);
        }

        undef $str_len;
      }
      else {
        my $eol_pos = index( $handle->{rbuf}, EOL );

        if ( $eol_pos < 0 ) {
          return;
        }

        $reply = substr( $handle->{rbuf}, 0, $eol_pos, '' );
        my $type = substr( $reply, 0, 1, '' );
        substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );

        if ( $type ne '+' && $type ne ':' ) {
          if ( $type eq '$' ) {
            if ( $reply >= 0 ) {
              $str_len = $reply;
              next;
            }

            undef $reply;
          }
          elsif ( $type eq '*' ) {
            if ( $reply > 0 ) {
              push( @bufs,
                { reply      => [],
                  err_code   => undef,
                  chunks_cnt => $reply,
                }
              );
              $bufs_num++;

              next;
            }
            elsif ( $reply == 0 ) {
              $reply = [];
            }
            else {
              undef $reply;
            }
          }
          elsif ( $type eq '-' ) {
            $err_code = E_OPRN_ERROR;
            if ( $reply =~ m/^([A-Z]{3,}) / ) {
              if ( exists $ERR_PREFS_MAP{$1} ) {
                $err_code = $ERR_PREFS_MAP{$1};
              }
            }
          }
          else {
            my $err = _new_error( 'Unexpected reply received.',
                E_UNEXPECTED_DATA );
            $self->_disconnect($err);

            return;
          }
        }
      }

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

        }
        elsif ( $self->{_auth_state} == S_NEED_DO ) {
          $self->_auth;
        }
      }
    }
    elsif ( $self->{lazy} ) {
      undef $self->{lazy};
      $self->_connect;
    }
    elsif ( $self->{reconnect} ) {
      if ( defined $self->{reconnect_interval}
        && $self->{reconnect_interval} > 0 )
      {
        unless ( defined $self->{_reconnect_timer} ) {
          weaken($self);

          $self->{_reconnect_timer} = AE::timer(
            $self->{reconnect_interval}, 0,
            sub {
              undef $self->{_reconnect_timer};
              $self->_connect;
            }
          );
        }
      }
      else {
        $self->_connect;
      }
    }
    else {
      AE::postpone {
        my $err = _new_error( qq{Operation "$cmd->{name}" aborted:}
            . ' No connection to the server.', E_NO_CONN );
        $cmd->{on_reply}->( undef, $err );
      };

      return;
    }

    push( @{ $self->{_input_queue} }, $cmd );

    return;
  }

  $self->_push_write($cmd);

  return;
}

sub _push_write {
  my $self = shift;
  my $cmd  = shift;

  my $cmd_str = '';
  my @tokens  = ( @{ $cmd->{kwds} }, @{ $cmd->{args} } );
  foreach my $token (@tokens) {
    unless ( defined $token ) {
      $token = '';
    }
    elsif ( $self->{utf8} ) {
      utf8::encode($token);
    }
    $cmd_str .= '$' . length($token) . EOL . $token . EOL;
  }
  $cmd_str = '*' . scalar(@tokens) . EOL . $cmd_str;

  my $handle = $self->{_handle};

  if ( defined $self->{read_timeout}
    && !@{ $self->{_processing_queue} } )
  {
    $handle->rtimeout_reset;
    $handle->rtimeout( $self->{read_timeout} );
  }

  push( @{ $self->{_processing_queue} }, $cmd );
  $handle->push_write($cmd_str);

  return;
}

sub _auth {
  my $self = shift;

  weaken($self);
  $self->{_auth_state} = S_IN_PROGRESS;

  $self->_push_write(
    { name => 'auth',
      kwds => ['auth'],
      args => [ $self->{password} ],

      on_reply => sub {
        my $err = $_[1];

        if ( defined $err
          && $err->message ne 'ERR Client sent AUTH, but no password is set' )
        {
          $self->{_auth_state} = S_NEED_DO;
          $self->_abort($err);

          return;
        }

        $self->{_auth_state} = S_DONE;

        if ( $self->{_db_selection_state} == S_NEED_DO ) {
          $self->_select_database;
        }
        else {
          $self->{_ready} = 1;
          $self->_process_input_queue;
        }
      },
    }
  );

  return;
}

sub _select_database {

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN


AnyEvent::RipeRedis is flexible non-blocking Redis client. Supports
subscriptions, transactions and can automaticaly restore connection after
failure.

Requires Redis 1.2 or higher, and any supported event loop.

=head1 CONSTRUCTOR

=head2 new( %params )

  my $redis = AnyEvent::RipeRedis->new(
    host               => 'localhost',
    port               => 6379,
    password           => 'yourpass',
    database           => 7,
    connection_timeout => 5,
    read_timeout       => 5,
    lazy               => 1,
    reconnect_interval => 5,

    on_connect => sub {
      # handling...
    },

    on_disconnect => sub {
      # handling...
    },

    on_error => sub {
      my $err = shift;

      # error handling...
    },
  );

=over

=item host => $host

Server hostname (default: 127.0.0.1)

=item port => $port

Server port (default: 6379)

=item password => $password

If the password is specified, the C<AUTH> command is sent to the server
after connection.

=item database => $index

Database index. If the index is specified, the client switches to the specified
database after connection. You can also switch to another database after
connection by using C<SELECT> command. The client remembers last selected
database after reconnection and switches to it automaticaly.

The default database index is C<0>.

=item utf8 => $boolean

If enabled, all strings will be converted to UTF-8 before sending to
the server, and all results will be decoded from UTF-8.

Enabled by default.

=item connection_timeout => $fractional_seconds

Specifies connection timeout. If the client could not connect to the server
after specified timeout, the C<on_error> callback is called with the
C<E_CANT_CONN> error. The timeout specifies in seconds and can contain a
fractional part.

  connection_timeout => 10.5,

By default the client use kernel's connection timeout.

=item read_timeout => $fractional_seconds

Specifies read timeout. If the client could not receive a reply from the server
after specified timeout, the client close connection and call the C<on_error>
callback with the C<E_READ_TIMEDOUT> error. The timeout is specifies in seconds
and can contain a fractional part.

  read_timeout => 3.5,

Not set by default.

=item lazy => $boolean

If enabled, the connection establishes at time when you will send the first
command to the server. By default the connection establishes after calling of
the C<new> method.

Disabled by default.

=item reconnect => $boolean

If the connection to the server was lost and the parameter C<reconnect> is
TRUE (default), the client will try to restore the connection when you execute
next command. The client will try to reconnect only once and, if attempt fails,
the error object is passed to command callback. If you need several attempts of
the reconnection, you must retry a command from the callback as many times, as
you need. Such behavior allows to control reconnection procedure.

Enabled by default.

=item reconnect_interval => $fractional_seconds

If the parameter is specified, the client will try to reconnect only after
this interval. Commands executed between reconnections will be queued.

  reconnect_interval => 5,

Not set by default.

=item handle_params => \%params

Specifies L<AnyEvent::Handle> parameters.

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN


=item E_ASK

Redirection required. For more information see:
L<http://redis.io/topics/cluster-spec>

=item E_MOVED

Redirection required. For more information see:
L<http://redis.io/topics/cluster-spec>

=item E_CLUSTER_DOWN

The cluster is down or hash slot not served.

=back

=head1 DISCONNECTION

When the connection to the server is no longer needed you can close it in three
ways: call the method C<disconnect()>, send the C<QUIT> command or you can just
"forget" any references to an AnyEvent::RipeRedis object, but in this
case the client object is destroyed without calling any callbacks, including
the C<on_disconnect> callback, to avoid an unexpected behavior.

=head2 disconnect()

The method for synchronous disconnection. All uncompleted operations will be
aborted.

=head2 quit( [ $cb->( $reply, $err ) ] )

The method for asynchronous disconnection.

=head1 OTHER METHODS

=head2 info( [ $section ] [, $cb->( $reply, $err ) ] )

Gets and parses information and statistics about the server. The result
is passed to callback as a hash reference.

More information about C<INFO> command can be found here:
L<http://redis.io/commands/info>

=head2 host()

Gets current host of the client.

=head2 port()

Gets current port of the client.

=head2 select( $index, [, $cb->( $reply, $err ) ] )

Selects the database by numeric index.

=head2 database()

Gets selected database index.

=head2 utf8( [ $boolean ] )

Enables or disables UTF-8 mode.

=head2 connection_timeout( [ $fractional_seconds ] )

Gets or sets the C<connection_timeout> of the client. The C<undef> value resets
the C<connection_timeout> to default value.

=head2 read_timeout( [ $fractional_seconds ] )

Gets or sets the C<read_timeout> of the client.

=head2 reconnect( [ $boolean ] )

Enables or disables reconnection mode of the client.

=head2 reconnect_interval( [ $fractional_seconds ] )

Gets or sets C<reconnect_interval> of the client.

=head2 on_connect( [ $callback ] )

Gets or sets the C<on_connect> callback.

=head2 on_disconnect( [ $callback ] )

Gets or sets the C<on_disconnect> callback.

=head2 on_error( [ $callback ] )

Gets or sets the C<on_error> callback.

=head1 SEE ALSO

L<AnyEvent::RipeRedis::Cluster>, L<AnyEvent>, L<Redis::hiredis>, L<Redis>,
L<RedisDB>

=head1 AUTHOR

Eugene Ponizovsky, E<lt>ponizovsky@gmail.comE<gt>

Sponsored by SMS Online, E<lt>dev.opensource@sms-online.comE<gt>

=head2 Special thanks

=over

=item *

Alexey Shrub

=item *

Vadim Vlasov

=item *

Konstantin Uvarin

=item *



( run in 2.644 seconds using v1.01-cache-2.11-cpan-97f6503c9c8 )