AnyEvent-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

package AnyEvent::RipeRedis;

use 5.008000;
use strict;
use warnings;
use base qw( Exporter );

our $VERSION = '0.48';

use AnyEvent::RipeRedis::Error;

use AnyEvent;
use AnyEvent::Handle;
use Scalar::Util qw( looks_like_number weaken );
use Digest::SHA qw( sha1_hex );
use Carp qw( croak );

my %ERROR_CODES;

BEGIN {
  %ERROR_CODES = %AnyEvent::RipeRedis::Error::ERROR_CODES;
  our @EXPORT_OK   = keys %ERROR_CODES;
  our %EXPORT_TAGS = ( err_codes => \@EXPORT_OK );
}

use constant {
  # Default values
  D_HOST     => 'localhost',
  D_PORT     => 6379,
  D_DB_INDEX => 0,

  %ERROR_CODES,

  # Operation status
  S_NEED_DO     => 1,
  S_IN_PROGRESS => 2,
  S_DONE        => 3,

  # String terminator
  EOL        => "\r\n",
  EOL_LENGTH => 2,
};

my %SUB_CMDS = (
  subscribe  => 1,
  psubscribe => 1,
);

my %SUBUNSUB_CMDS = (
  %SUB_CMDS,
  unsubscribe  => 1,
  punsubscribe => 1,
);

my %MESSAGE_TYPES = (
  message  => 1,
  pmessage => 1,
);

my %NEED_PREPROCESS = (
  multi       => 1,
  exec        => 1,
  discard     => 1,
  eval_cached => 1,
  %SUBUNSUB_CMDS,
);

my %NEED_POSTPROCESS = (
  info         => 1,
  cluster_info => 1,
  select       => 1,
  quit         => 1,
);

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

      my $self = shift;
      return $self->{$name};
    }
  }

  foreach my $name ( qw( connection_timeout read_timeout
      reconnect_interval ) )
  {
    *{$name} = sub {
      my $self = shift;

      if (@_) {
        my $seconds = shift;

        if ( defined $seconds
          && ( !looks_like_number($seconds) || $seconds < 0 ) )
        {
          croak qq{"$name" must be a positive number};
        }
        $self->{$name} = $seconds;
      }

      return $self->{$name};
    };
  }

  foreach my $name ( qw( utf8 reconnect on_connect on_disconnect ) ) {
    *{$name} = sub {
      my $self = shift;

      if (@_) {
        $self->{$name} = shift;
      }

      return $self->{$name};
    };
  }
}

sub _connect {
  my $self = shift;

  $self->{_handle} = AnyEvent::Handle->new(
    %{ $self->{handle_params} },
    connect          => [ $self->{host}, $self->{port} ],
    on_prepare       => $self->_create_on_prepare,
    on_connect       => $self->_create_on_connect,
    on_connect_error => $self->_create_on_connect_error,
    on_rtimeout      => $self->_create_on_rtimeout,
    on_eof           => $self->_create_on_eof,
    on_error         => $self->_create_on_handle_error,
    on_read          => $self->_create_on_read,
  );

  return;
}

sub _create_on_prepare {
  my $self = shift;

  weaken($self);

  return sub {
    if ( defined $self->{connection_timeout} ) {
      return $self->{connection_timeout};
    }

    return;
  };
}

sub _create_on_connect {
  my $self = shift;

  weaken($self);

  return sub {
    $self->{_connected} = 1;

    unless ( defined $self->{password} ) {
      $self->{_auth_state} = S_DONE;
    }
    if ( $self->{database} == 0 ) {
      $self->{_db_selection_state} = S_DONE;
    }

    if ( $self->{_auth_state} == S_NEED_DO ) {
      $self->_auth;
    }
    elsif ( $self->{_db_selection_state} == S_NEED_DO ) {
      $self->_select_database;
    }
    else {
      $self->{_ready} = 1;
      $self->_process_input_queue;
    }

    if ( defined $self->{on_connect} ) {
      $self->{on_connect}->();
    }
  };
}

sub _create_on_connect_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error(
      "Can't connect to $self->{host}:$self->{port}: $err_msg",
      E_CANT_CONN
    );
    $self->_disconnect($err);
  };
}

sub _create_on_rtimeout {
  my $self = shift;

  weaken($self);

  return sub {
    if ( @{ $self->{_processing_queue} } ) {
      my $err = _new_error( 'Read timed out.', E_READ_TIMEDOUT );
      $self->_disconnect($err);
    }
    else {
      $self->{_handle}->rtimeout(undef);
    }
  };
}

sub _create_on_eof {
  my $self = shift;

  weaken($self);

  return sub {
    my $err = _new_error( 'Connection closed by remote host.',
        E_CONN_CLOSED_BY_REMOTE_HOST );
    $self->_disconnect($err);
  };
}

sub _create_on_handle_error {
  my $self = shift;

  weaken($self);

  return sub {
    my $err_msg = pop;

    my $err = _new_error( $err_msg, E_IO );
    $self->_disconnect($err);
  };
}

sub _create_on_read {
  my $self = shift;

  weaken($self);

  my $str_len;
  my @bufs;
  my $bufs_num = 0;

  return sub {
    my $handle = shift;

    MAIN: while (1) {
      return if $handle->destroyed;

      my $reply;
      my $err_code;

      if ( defined $str_len ) {
        if ( length( $handle->{rbuf} ) < $str_len + EOL_LENGTH ) {
          return;
        }

        $reply = substr( $handle->{rbuf}, 0, $str_len, '' );
        substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );
        if ( $self->{utf8} ) {
          utf8::decode($reply);
        }

        undef $str_len;
      }
      else {
        my $eol_pos = index( $handle->{rbuf}, EOL );

        if ( $eol_pos < 0 ) {
          return;
        }

        $reply = substr( $handle->{rbuf}, 0, $eol_pos, '' );
        my $type = substr( $reply, 0, 1, '' );
        substr( $handle->{rbuf}, 0, EOL_LENGTH, '' );

        if ( $type ne '+' && $type ne ':' ) {
          if ( $type eq '$' ) {
            if ( $reply >= 0 ) {
              $str_len = $reply;
              next;
            }

            undef $reply;
          }
          elsif ( $type eq '*' ) {
            if ( $reply > 0 ) {
              push( @bufs,
                { reply      => [],
                  err_code   => undef,
                  chunks_cnt => $reply,
                }
              );
              $bufs_num++;

              next;
            }
            elsif ( $reply == 0 ) {

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

      while ( $bufs_num > 0 ) {
        my $curr_buf = $bufs[-1];
        if ( defined $err_code ) {
          unless ( ref($reply) ) {
            $reply = _new_error( $reply, $err_code );
          }
          $curr_buf->{err_code} = E_OPRN_ERROR;
        }
        push( @{ $curr_buf->{reply} }, $reply );
        if ( --$curr_buf->{chunks_cnt} > 0 ) {
          next MAIN;
        }

        $reply    = $curr_buf->{reply};
        $err_code = $curr_buf->{err_code};
        pop @bufs;
        $bufs_num--;
      }

      $self->_process_reply( $reply, $err_code );
    }

    return;
  };
}

sub _prepare {
  my $self     = shift;
  my $cmd_name = shift;
  my $args     = shift;

  my $cbs;
  if ( ref( $args->[-1] ) eq 'HASH' ) {
    $cbs = pop @{$args};
  }
  else {
    $cbs = {};
    if ( ref( $args->[-1] ) eq 'CODE' ) {
      if ( exists $SUB_CMDS{$cmd_name} ) {
        $cbs->{on_message} = pop @{$args};
      }
      else {
        $cbs->{on_reply} = pop @{$args};
      }
    }
  }

  my @kwds
      = $cmd_name eq 'eval_cached'
      ? ('evalsha')
      : split( m/_/, lc($cmd_name) );

  my $cmd = {
    name => $cmd_name,
    kwds => \@kwds,
    args => $args,
    %{$cbs},
  };

  unless ( defined $cmd->{on_reply} ) {
    weaken($self);

    $cmd->{on_reply} = sub {
      my $err = $_[1];

      if ( defined $err ) {
        $self->{on_error}->($err);
        return;
      }
    };
  }

  return $cmd;
}

sub _execute {
  my $self = shift;
  my $cmd  = shift;

  if ( $self->{_multi_mode}
    && ( exists $SUBUNSUB_CMDS{ $cmd->{name} }
      || exists $NEED_POSTPROCESS{ $cmd->{name} } ) )
  {
    croak qq{Command "$cmd->{name}" not allowed after "multi" command.}
        . ' First, the transaction must be finalized.';
  }

  if ( exists $NEED_PREPROCESS{ $cmd->{name} } ) {
    if ( $cmd->{name} eq 'multi' ) {
      $self->{_multi_mode} = 1;
    }
    elsif ( $cmd->{name} eq 'exec'
      || $cmd->{name} eq 'discard' )
    {
      $self->{_multi_mode} = 0;
    }
    elsif ( $cmd->{name} eq 'eval_cached' ) {
      my $script = $cmd->{args}[0];
      unless ( exists $EVAL_CACHE{$script} ) {
        $EVAL_CACHE{$script} = sha1_hex($script);
      }
      $cmd->{args}[0] = $EVAL_CACHE{$script};
      $cmd->{script}  = $script;
    }
    else {    # subscribe, unsubscribe, psubscribe, punsubscribe
      if ( exists $SUB_CMDS{ $cmd->{name} }
        && !defined $cmd->{on_message} )
      {
        croak '"on_message" callback must be specified';
      }

      if ( @{ $cmd->{args} } ) {
        $cmd->{reply_cnt} = scalar @{ $cmd->{args} };
      }
    }
  }

  unless ( $self->{_ready} ) {
    if ( defined $self->{_handle} ) {
      if ( $self->{_connected} ) {
        if ( $self->{_auth_state} == S_DONE ) {
          if ( $self->{_db_selection_state} == S_NEED_DO ) {
            $self->_select_database;
          }
        }
        elsif ( $self->{_auth_state} == S_NEED_DO ) {
          $self->_auth;
        }
      }
    }
    elsif ( $self->{lazy} ) {
      undef $self->{lazy};
      $self->_connect;
    }
    elsif ( $self->{reconnect} ) {
      if ( defined $self->{reconnect_interval}
        && $self->{reconnect_interval} > 0 )
      {
        unless ( defined $self->{_reconnect_timer} ) {
          weaken($self);

          $self->{_reconnect_timer} = AE::timer(
            $self->{reconnect_interval}, 0,
            sub {
              undef $self->{_reconnect_timer};
              $self->_connect;
            }
          );
        }
      }
      else {
        $self->_connect;
      }
    }
    else {
      AE::postpone {
        my $err = _new_error( qq{Operation "$cmd->{name}" aborted:}
            . ' No connection to the server.', E_NO_CONN );
        $cmd->{on_reply}->( undef, $err );
      };

      return;
    }

    push( @{ $self->{_input_queue} }, $cmd );

    return;
  }

  $self->_push_write($cmd);

  return;
}

sub _push_write {
  my $self = shift;
  my $cmd  = shift;

  my $cmd_str = '';
  my @tokens  = ( @{ $cmd->{kwds} }, @{ $cmd->{args} } );
  foreach my $token (@tokens) {
    unless ( defined $token ) {
      $token = '';
    }
    elsif ( $self->{utf8} ) {
      utf8::encode($token);
    }
    $cmd_str .= '$' . length($token) . EOL . $token . EOL;
  }
  $cmd_str = '*' . scalar(@tokens) . EOL . $cmd_str;

  my $handle = $self->{_handle};

  if ( defined $self->{read_timeout}
    && !@{ $self->{_processing_queue} } )
  {
    $handle->rtimeout_reset;
    $handle->rtimeout( $self->{read_timeout} );
  }

  push( @{ $self->{_processing_queue} }, $cmd );
  $handle->push_write($cmd_str);

  return;
}

sub _auth {
  my $self = shift;

  weaken($self);
  $self->{_auth_state} = S_IN_PROGRESS;

  $self->_push_write(
    { name => 'auth',
      kwds => ['auth'],
      args => [ $self->{password} ],

      on_reply => sub {
        my $err = $_[1];

        if ( defined $err
          && $err->message ne 'ERR Client sent AUTH, but no password is set' )
        {
          $self->{_auth_state} = S_NEED_DO;
          $self->_abort($err);

          return;
        }

        $self->{_auth_state} = S_DONE;

        if ( $self->{_db_selection_state} == S_NEED_DO ) {
          $self->_select_database;
        }
        else {
          $self->{_ready} = 1;
          $self->_process_input_queue;
        }
      },
    }
  );

  return;
}

sub _select_database {
  my $self = shift;

  weaken($self);
  $self->{_db_selection_state} = S_IN_PROGRESS;

  $self->_push_write(
    { name => 'select',
      kwds => ['select'],
      args => [ $self->{database} ],

      on_reply => sub {
        my $err = $_[1];

        if ( defined $err ) {
          $self->{_db_selection_state} = S_NEED_DO;
          $self->_abort($err);

          return;
        }

        $self->{_db_selection_state} = S_DONE;

        $self->{_ready} = 1;
        $self->_process_input_queue;
      },
    }
  );

  return;
}

sub _process_input_queue {
  my $self = shift;

  $self->{_temp_queue}  = $self->{_input_queue};
  $self->{_input_queue} = [];

  while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
    $self->_push_write($cmd);
  }

  return;
}

sub _process_reply {
  my $self     = shift;
  my $reply    = shift;
  my $err_code = shift;

  if ( defined $err_code ) {
    $self->_process_error( $reply, $err_code );
  }
  elsif ( $self->{_channel_cnt} + $self->{_pchannel_cnt} > 0
    && ref($reply) && exists $MESSAGE_TYPES{ $reply->[0] } )
  {
    $self->_process_message($reply);
  }
  else {
    $self->_process_success($reply);
  }

  return;
}



( run in 0.631 second using v1.01-cache-2.11-cpan-39bf76dae61 )