AnyEvent-RipeRedis

 view release on metacpan or  search on metacpan

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN


  if ( $self->{_multi_mode}
    && ( exists $SUBUNSUB_CMDS{ $cmd->{name} }
      || exists $NEED_POSTPROCESS{ $cmd->{name} } ) )
  {
    croak qq{Command "$cmd->{name}" not allowed after "multi" command.}
        . ' First, the transaction must be finalized.';
  }

  if ( exists $NEED_PREPROCESS{ $cmd->{name} } ) {
    if ( $cmd->{name} eq 'multi' ) {
      $self->{_multi_mode} = 1;
    }
    elsif ( $cmd->{name} eq 'exec'
      || $cmd->{name} eq 'discard' )
    {
      $self->{_multi_mode} = 0;
    }
    elsif ( $cmd->{name} eq 'eval_cached' ) {
      my $script = $cmd->{args}[0];
      unless ( exists $EVAL_CACHE{$script} ) {
        $EVAL_CACHE{$script} = sha1_hex($script);
      }
      $cmd->{args}[0] = $EVAL_CACHE{$script};
      $cmd->{script}  = $script;
    }
    else {    # subscribe, unsubscribe, psubscribe, punsubscribe
      if ( exists $SUB_CMDS{ $cmd->{name} }
        && !defined $cmd->{on_message} )
      {
        croak '"on_message" callback must be specified';
      }

      if ( @{ $cmd->{args} } ) {
        $cmd->{reply_cnt} = scalar @{ $cmd->{args} };
      }
    }
  }

  unless ( $self->{_ready} ) {
    if ( defined $self->{_handle} ) {
      if ( $self->{_connected} ) {
        if ( $self->{_auth_state} == S_DONE ) {
          if ( $self->{_db_selection_state} == S_NEED_DO ) {
            $self->_select_database;
          }
        }
        elsif ( $self->{_auth_state} == S_NEED_DO ) {
          $self->_auth;
        }
      }
    }
    elsif ( $self->{lazy} ) {
      undef $self->{lazy};
      $self->_connect;
    }
    elsif ( $self->{reconnect} ) {
      if ( defined $self->{reconnect_interval}
        && $self->{reconnect_interval} > 0 )
      {
        unless ( defined $self->{_reconnect_timer} ) {
          weaken($self);

          $self->{_reconnect_timer} = AE::timer(
            $self->{reconnect_interval}, 0,
            sub {
              undef $self->{_reconnect_timer};
              $self->_connect;
            }
          );
        }
      }
      else {
        $self->_connect;
      }
    }
    else {
      AE::postpone {
        my $err = _new_error( qq{Operation "$cmd->{name}" aborted:}
            . ' No connection to the server.', E_NO_CONN );
        $cmd->{on_reply}->( undef, $err );
      };

      return;
    }

    push( @{ $self->{_input_queue} }, $cmd );

    return;
  }

  $self->_push_write($cmd);

  return;
}

sub _push_write {
  my $self = shift;
  my $cmd  = shift;

  my $cmd_str = '';
  my @tokens  = ( @{ $cmd->{kwds} }, @{ $cmd->{args} } );
  foreach my $token (@tokens) {
    unless ( defined $token ) {
      $token = '';
    }
    elsif ( $self->{utf8} ) {
      utf8::encode($token);
    }
    $cmd_str .= '$' . length($token) . EOL . $token . EOL;
  }
  $cmd_str = '*' . scalar(@tokens) . EOL . $cmd_str;

  my $handle = $self->{_handle};

  if ( defined $self->{read_timeout}
    && !@{ $self->{_processing_queue} } )
  {
    $handle->rtimeout_reset;
    $handle->rtimeout( $self->{read_timeout} );
  }

  push( @{ $self->{_processing_queue} }, $cmd );
  $handle->push_write($cmd_str);

  return;
}

lib/AnyEvent/RipeRedis.pm  view on Meta::CPAN

  }

  if ( !defined $cmd->{reply_cnt}
    || --$cmd->{reply_cnt} == 0 )
  {
    shift @{ $self->{_processing_queue} };

    if ( exists $NEED_POSTPROCESS{ $cmd->{name} } ) {
      if ( $cmd->{name} eq 'info'
        || $cmd->{name} eq 'cluster_info' )
      {
        $reply = _parse_info($reply);
      }
      elsif ( $cmd->{name} eq 'select' ) {
        $self->{database} = $cmd->{args}[0];
      }
      else {    # quit
        $self->_disconnect;
      }
    }

    $cmd->{on_reply}->($reply);
  }

  return;
}

sub _parse_info {
  return { map { split( m/:/, $_, 2 ) }
      grep { m/^[^#]/ } split( EOL, $_[0] ) };
}

sub _disconnect {
  my $self = shift;
  my $err  = shift;

  my $was_connected = $self->{_connected};

  if ( defined $self->{_handle} ) {
    $self->{_handle}->destroy;
  }
  $self->_reset_internals;
  $self->_abort($err);

  if ( $was_connected && defined $self->{on_disconnect} ) {
    $self->{on_disconnect}->();
  }

  return;
}

sub _reset_internals {
  my $self = shift;

  $self->{_handle}             = undef;
  $self->{_connected}          = 0;
  $self->{_auth_state}         = S_NEED_DO;
  $self->{_db_selection_state} = S_NEED_DO;
  $self->{_ready}              = 0;
  $self->{_multi_mode}         = 0;
  $self->{_reconnect_timer}    = undef;

  return;
}

sub _abort {
  my $self = shift;
  my $err  = shift;

  my @queued_commands = $self->_queued_commands;
  my %channels        = %{ $self->{_channels} };

  $self->{_input_queue}      = [];
  $self->{_temp_queue}       = [];
  $self->{_processing_queue} = [];
  $self->{_channels}         = {};
  $self->{_channel_cnt}      = 0;
  $self->{_pchannel_cnt}     = 0;

  if ( !defined $err && @queued_commands ) {
    $err = _new_error( 'Connection closed by client prematurely.',
        E_CONN_CLOSED_BY_CLIENT );
  }

  if ( defined $err ) {
    my $err_msg  = $err->message;
    my $err_code = $err->code;

    $self->{on_error}->($err);

    if ( %channels && $err_code != E_CONN_CLOSED_BY_CLIENT ) {
      foreach my $name ( keys %channels ) {
        my $err = _new_error(
          qq{Subscription to channel "$name" lost: $err_msg},
          $err_code
        );

        my $cmd = $channels{$name};
        $cmd->{on_reply}->( undef, $err );
      }
    }

    foreach my $cmd (@queued_commands) {
      my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
          $err_code );
      $cmd->{on_reply}->( undef, $err );
    }
  }

  return;
}

sub _queued_commands {
  my $self = shift;

  return (
    @{ $self->{_processing_queue} },
    @{ $self->{_temp_queue} },
    @{ $self->{_input_queue} },
  );
}



( run in 1.883 second using v1.01-cache-2.11-cpan-385001e3568 )