AnyEvent-RipeRedis-Cluster

 view release on metacpan or  search on metacpan

lib/AnyEvent/RipeRedis/Cluster.pm  view on Meta::CPAN

use warnings;
use base qw( Exporter );

our $VERSION = '0.32';

use AnyEvent::RipeRedis;
use AnyEvent::RipeRedis::Error;

use AnyEvent::Socket;
use List::MoreUtils qw( bsearch );
use Scalar::Util qw( looks_like_number weaken );
use Carp qw( croak );

my %ERROR_CODES;

BEGIN {
  %ERROR_CODES = %AnyEvent::RipeRedis::Error::ERROR_CODES;
  my @err_codes  = keys %ERROR_CODES;
  our @EXPORT_OK = ( @err_codes, qw( crc16 hash_slot ) );
  our %EXPORT_TAGS = ( err_codes => \@err_codes );
}

lib/AnyEvent/RipeRedis/Cluster.pm  view on Meta::CPAN


  return crc16($hashtag) % MAX_SLOTS;
}

sub _init {
  my $self = shift;

  $self->{_init_state} = S_IN_PROGRESS;
  undef $self->{_refresh_timer};

  weaken($self);

  $self->_discover_cluster(
    sub {
      my $err = $_[1];

      if ( defined $err ) {
        $self->{_init_state} = S_NEED_DO;

        $self->{_ready} = 0;
        $self->_abort($err);

lib/AnyEvent/RipeRedis/Cluster.pm  view on Meta::CPAN

      unless ( defined $nodes_pool{$hostport} ) {
        $nodes_pool{$hostport} = $self->_new_node(
            $node_params->{host}, $node_params->{port} );
      }
    }

    $self->{_nodes_pool} = \%nodes_pool;
    $nodes = [ keys %nodes_pool ];
  }

  weaken($self);

  $self->_execute(
    { name => 'cluster_state',
      args => [],

      on_reply => sub {
        my $err = $_[1];

        if ( defined $err ) {
          $cb->( undef, $err );

lib/AnyEvent/RipeRedis/Cluster.pm  view on Meta::CPAN


  return;
}

sub _load_commands {
  my $self = shift;
  my $cb   = shift;

  my $nodes = $self->_nodes( undef, $self->{allow_slaves} );

  weaken($self);

  $self->_execute(
    { name => 'command',
      args => [],

      on_reply => sub {
        my $commands_raw = shift;
        my $err          = shift;

        if ( defined $err ) {

lib/AnyEvent/RipeRedis/Cluster.pm  view on Meta::CPAN

    on_disconnect => $self->_create_on_node_disconnect( $host, $port ),
    on_error      => $self->_create_on_node_error( $host, $port ),
  );
}

sub _create_on_node_connect {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  weaken($self);

  return sub {
    if ( defined $self->{on_node_connect} ) {
      $self->{on_node_connect}->( $host, $port );
    }
  };
}

sub _create_on_node_disconnect {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  weaken($self);

  return sub {
    if ( defined $self->{on_node_disconnect} ) {
      $self->{on_node_disconnect}->( $host, $port );
    }
  };
}

sub _create_on_node_error {
  my $self = shift;
  my $host = shift;
  my $port = shift;

  weaken($self);

  return sub {
    my $err = shift;

    if ( defined $self->{on_node_error} ) {
      $self->{on_node_error}->( $err, $host, $port );
    }
  };
}

sub _prepare {
  my $self     = shift;
  my $cmd_name = shift;
  my $args     = shift;

  weaken($self);

  my $cbs;
  if ( ref( $args->[-1] ) eq 'HASH' ) {
    $cbs = pop @{$args};
  }
  else {
    $cbs = {};
    if ( ref( $args->[-1] ) eq 'CODE' ) {
      if ( exists $SUB_CMDS{$cmd_name} ) {
        $cbs->{on_message} = pop @{$args};

lib/AnyEvent/RipeRedis/Cluster.pm  view on Meta::CPAN

  elsif ( $node_index == scalar @{$nodes} ) {
    $node_index = 0;
  }
  my $hostport = $nodes->[$node_index];
  my $node     = $self->{_nodes_pool}{$hostport};

  my $cmd_name = $cmd->{name} eq 'cluster_state'
      ? 'cluster_info'
      : $cmd->{name};

  weaken($self);

  $node->execute( $cmd_name, @{ $cmd->{args} },
    { on_reply => sub {
        my $reply = shift;
        my $err   = shift;

        if ( $cmd->{name} eq 'cluster_state' ) {
          unless ( defined $err ) {
            if ( $reply->{cluster_state} eq 'ok' ) {
              $reply = 1;



( run in 0.732 second using v1.01-cache-2.11-cpan-65fba6d93b7 )