AnyEvent-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

);

my %SUBUNSUB_CMDS = (
  %SUB_CMDS,
  unsubscribe  => 1,
  punsubscribe => 1,
);

my %MESSAGE_TYPES = (
  message  => 1,
  pmessage => 1,
);

my %NEED_PREPROCESS = (
  multi       => 1,
  exec        => 1,
  discard     => 1,
  eval_cached => 1,
  %SUBUNSUB_CMDS,
);

my %NEED_POSTPROCESS = (
  info         => 1,
  cluster_info => 1,
  select       => 1,
  quit         => 1,
);

my %ERR_PREFS_MAP = (
  LOADING     => E_LOADING_DATASET,
  NOSCRIPT    => E_NO_SCRIPT,
  BUSY        => E_BUSY,
  MASTERDOWN  => E_MASTER_DOWN,
  MISCONF     => E_MISCONF,
  READONLY    => E_READONLY,
  OOM         => E_OOM,
  EXECABORT   => E_EXEC_ABORT,
  NOAUTH      => E_NO_AUTH,
  WRONGTYPE   => E_WRONG_TYPE,
  NOREPLICAS  => E_NO_REPLICAS,
  BUSYKEY     => E_BUSY_KEY,
  CROSSSLOT   => E_CROSS_SLOT,
  TRYAGAIN    => E_TRY_AGAIN,
  ASK         => E_ASK,
  MOVED       => E_MOVED,
  CLUSTERDOWN => E_CLUSTER_DOWN,
  NOTBUSY     => E_NOT_BUSY,
);

my %EVAL_CACHE;


sub new {
  my $class  = shift;
  my %params = @_;

  my $self = bless {}, $class;

  $self->{host} = $params{host} || D_HOST;
  $self->{port} = $params{port} || D_PORT;
  $self->{password} = $params{password};
  $self->{database}
      = defined $params{database} ? $params{database} : D_DB_INDEX;
  $self->{utf8}          = exists $params{utf8} ? $params{utf8} : 1;
  $self->{lazy}          = $params{lazy};
  $self->{reconnect}     = exists $params{reconnect} ? $params{reconnect} : 1;
  $self->{handle_params} = $params{handle_params} || {};
  $self->{on_connect}    = $params{on_connect};
  $self->{on_disconnect} = $params{on_disconnect};

  $self->connection_timeout( $params{connection_timeout} );
  $self->read_timeout( $params{read_timeout} );
  $self->reconnect_interval( $params{reconnect_interval} );
  $self->on_error( $params{on_error} );

  $self->_reset_internals;
  $self->{_input_queue}      = [];
  $self->{_temp_queue}       = [];
  $self->{_processing_queue} = [];
  $self->{_channels}         = {};
  $self->{_channel_cnt}      = 0;
  $self->{_pchannel_cnt}     = 0;

  unless ( $self->{lazy} ) {
    $self->_connect;
  }

  return $self;
}

sub execute {
  my $self     = shift;
  my $cmd_name = shift;

  my $cmd = $self->_prepare( $cmd_name, [@_] );
  $self->_execute($cmd);

  return;
}

sub disconnect {
  my $self = shift;

  $self->_disconnect;

  return;
}

sub on_error {
  my $self = shift;

  if (@_) {
    my $on_error = shift;

    if ( defined $on_error ) {
      $self->{on_error} = $on_error;
    }
    else {
      $self->{on_error} = sub {
        my $err = shift;
        warn $err->message . "\n";

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

        $self->{$name} = $seconds;
      }

      return $self->{$name};
    };
  }

  foreach my $name ( qw( utf8 reconnect on_connect on_disconnect ) ) {
    *{$name} = sub {
      my $self = shift;

      if (@_) {
        $self->{$name} = shift;
      }

      return $self->{$name};
    };
  }
}

sub _connect {
  my $self = shift;

  $self->{_handle} = AnyEvent::Handle->new(
    %{ $self->{handle_params} },
    connect          => [ $self->{host}, $self->{port} ],
    on_prepare       => $self->_create_on_prepare,
    on_connect       => $self->_create_on_connect,
    on_connect_error => $self->_create_on_connect_error,
    on_rtimeout      => $self->_create_on_rtimeout,
    on_eof           => $self->_create_on_eof,
    on_error         => $self->_create_on_handle_error,
    on_read          => $self->_create_on_read,
  );

  return;
}

sub _create_on_prepare {
  my $self = shift;

  weaken($self);

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }

    return;
  };
}

sub _create_on_connect {
  my $self = shift;

  weaken($self);

  return sub {
    $self->{_connected} = 1;

    unless ( defined $self->{password} ) {
      $self->{_auth_state} = S_DONE;
    }
    if ( $self->{database} == 0 ) {
      $self->{_db_selection_state} = S_DONE;
    }

    if ( $self->{_auth_state} == S_NEED_DO ) {
      $self->_auth;
    }
    elsif ( $self->{_db_selection_state} == S_NEED_DO ) {
      $self->_select_database;
    }
    else {
      $self->{_ready} = 1;
      $self->_process_input_queue;
    }

    if ( defined $self->{on_connect} ) {
      $self->{on_connect}->();
    }
  };
}

sub _create_on_connect_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error(
      "Can't connect to $self->{host}:$self->{port}: $err_msg",
      E_CANT_CONN
    );
    $self->_disconnect($err);
  };
}

sub _create_on_rtimeout {
  my $self = shift;

  weaken($self);

  return sub {
    if ( @{ $self->{_processing_queue} } ) {
      my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
      $self->_disconnect($err);
    }
    else {
      $self->{_handle}->rtimeout(undef);
    }
  };
}

sub _create_on_eof {
  my $self = shift;

  weaken($self);

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

      AE::postpone {
        my $err = _new_error( qq{Operation "$cmd->{name}" aborted:}
            . ' No connection to the server.', E_NO_CONN );
        $cmd->{on_reply}->( undef, $err );
      };

      return;
    }

    push( @{ $self->{_input_queue} }, $cmd );

    return;
  }

  $self->_push_write($cmd);

  return;
}

sub _push_write {
  my $self = shift;
  my $cmd  = shift;

  my $cmd_str = '';
  my @tokens  = ( @{ $cmd->{kwds} }, @{ $cmd->{args} } );
  foreach my $token (@tokens) {
    unless ( defined $token ) {
      $token = '';
    }
    elsif ( $self->{utf8} ) {
      utf8::encode($token);
    }
    $cmd_str .= '$' . length($token) . EOL . $token . EOL;
  }
  $cmd_str = '*' . scalar(@tokens) . EOL . $cmd_str;

  my $handle = $self->{_handle};

  if ( defined $self->{read_timeout}
    && !@{ $self->{_processing_queue} } )
  {
    $handle->rtimeout_reset;
    $handle->rtimeout( $self->{read_timeout} );
  }

  push( @{ $self->{_processing_queue} }, $cmd );
  $handle->push_write($cmd_str);

  return;
}

sub _auth {
  my $self = shift;

  weaken($self);
  $self->{_auth_state} = S_IN_PROGRESS;

  $self->_push_write(
    { name => 'auth',
      kwds => ['auth'],
      args => [ $self->{password} ],

      on_reply => sub {
        my $err = $_[1];

        if ( defined $err
          && $err->message ne 'ERR Client sent AUTH, but no password is set' )
        {
          $self->{_auth_state} = S_NEED_DO;
          $self->_abort($err);

          return;
        }

        $self->{_auth_state} = S_DONE;

        if ( $self->{_db_selection_state} == S_NEED_DO ) {
          $self->_select_database;
        }
        else {
          $self->{_ready} = 1;
          $self->_process_input_queue;
        }
      },
    }
  );

  return;
}

sub _select_database {
  my $self = shift;

  weaken($self);
  $self->{_db_selection_state} = S_IN_PROGRESS;

  $self->_push_write(
    { name => 'select',
      kwds => ['select'],
      args => [ $self->{database} ],

      on_reply => sub {
        my $err = $_[1];

        if ( defined $err ) {
          $self->{_db_selection_state} = S_NEED_DO;
          $self->_abort($err);

          return;
        }

        $self->{_db_selection_state} = S_DONE;

        $self->{_ready} = 1;
        $self->_process_input_queue;
      },
    }
  );

  return;
}

sub _process_input_queue {
  my $self = shift;

  $self->{_temp_queue}  = $self->{_input_queue};
  $self->{_input_queue} = [];

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

sub _new_error {
  return AnyEvent::RipeRedis::Error->new(@_);
}

sub AUTOLOAD {
  our $AUTOLOAD;
  my $cmd_name = $AUTOLOAD;
  $cmd_name =~ s/^.+:://;

  my $sub = sub {
    my $self = shift;

    my $cmd = $self->_prepare( $cmd_name, [@_] );
    $self->_execute($cmd);

    return;
  };

  do {
    no strict 'refs';
    *{$cmd_name} = $sub;
  };

  goto &{$sub};
}

sub DESTROY {
  my $self = shift;

  if ( defined $self->{_handle} ) {
    $self->{_handle}->destroy;
  }

  if ( defined $self->{_processing_queue} ) {
    my @queued_commands = $self->_queued_commands;

    foreach my $cmd (@queued_commands) {
      warn qq{Operation "$cmd->{name}" aborted:}
          . " Client object destroyed prematurely.\n";
    }
  }

  return;
}

1;
__END__

=head1 NAME

AnyEvent::RipeRedis - Flexible non-blocking Redis client

=head1 SYNOPSIS

  use AnyEvent;
  use AnyEvent::RipeRedis;

  my $redis = AnyEvent::RipeRedis->new(
    host     => 'localhost',
    port     => 6379,
    password => 'yourpass',
  );

  my $cv = AE::cv;

  $redis->set( 'foo', 'bar',
    sub {
      my $err = $_[1];

      if ( defined $err ) {
        warn $err->message . "\n";
        $cv->send;

        return;
      }

      $redis->get( 'foo',
        sub {
          my $reply = shift;
          my $err   = shift;

          if ( defined $err ) {
            warn $err->message . "\n";
            $cv->send;

            return;
          }

          print "$reply\n";
          $cv->send;
        }
      );
    }
  );

  $cv->recv;

=head1 DESCRIPTION

AnyEvent::RipeRedis is flexible non-blocking Redis client. Supports
subscriptions, transactions and can automaticaly restore connection after
failure.

Requires Redis 1.2 or higher, and any supported event loop.

=head1 CONSTRUCTOR

=head2 new( %params )

  my $redis = AnyEvent::RipeRedis->new(
    host               => 'localhost',
    port               => 6379,
    password           => 'yourpass',
    database           => 7,
    connection_timeout => 5,
    read_timeout       => 5,
    lazy               => 1,
    reconnect_interval => 5,

    on_connect => sub {
      # handling...
    },

    on_disconnect => sub {
      # handling...
    },

    on_error => sub {
      my $err = shift;

      # error handling...
    },
  );

=over

=item host => $host

Server hostname (default: 127.0.0.1)

=item port => $port

Server port (default: 6379)

=item password => $password

If the password is specified, the C<AUTH> command is sent to the server
after connection.

=item database => $index

Database index. If the index is specified, the client switches to the specified
database after connection. You can also switch to another database after
connection by using C<SELECT> command. The client remembers last selected
database after reconnection and switches to it automaticaly.

The default database index is C<0>.

=item utf8 => $boolean

If enabled, all strings will be converted to UTF-8 before sending to
the server, and all results will be decoded from UTF-8.

Enabled by default.

=item connection_timeout => $fractional_seconds

Specifies connection timeout. If the client could not connect to the server
after specified timeout, the C<on_error> callback is called with the
C<E_CANT_CONN> error. The timeout specifies in seconds and can contain a
fractional part.

  connection_timeout => 10.5,

By default the client use kernel's connection timeout.

=item read_timeout => $fractional_seconds

Specifies read timeout. If the client could not receive a reply from the server
after specified timeout, the client close connection and call the C<on_error>
callback with the C<E_READ_TIMEDOUT> error. The timeout is specifies in seconds
and can contain a fractional part.

  read_timeout => 3.5,

Not set by default.

=item lazy => $boolean

If enabled, the connection establishes at time when you will send the first
command to the server. By default the connection establishes after calling of
the C<new> method.

Disabled by default.

=item reconnect => $boolean

If the connection to the server was lost and the parameter C<reconnect> is
TRUE (default), the client will try to restore the connection when you execute
next command. The client will try to reconnect only once and, if attempt fails,
the error object is passed to command callback. If you need several attempts of
the reconnection, you must retry a command from the callback as many times, as
you need. Such behavior allows to control reconnection procedure.

Enabled by default.

=item reconnect_interval => $fractional_seconds



( run in 1.684 second using v1.01-cache-2.11-cpan-e1769b4cff6 )