AnyEvent-RipeRedis-Cluster

 view release on metacpan or  search on metacpan

lib/AnyEvent/RipeRedis/Cluster.pm  view on Meta::CPAN

    else {
      $self->{refresh_interval} = D_REFRESH_INTERVAL;
    }
  }

  return $self->{refresh_interval};
}

sub on_error {
  my $self = shift;

  if ( @_ ) {
    my $on_error = shift;

    if ( defined $on_error ) {
      $self->{on_error} = $on_error;
    }
    else {
      $self->{on_error} = sub {
        my $err = shift;
        warn $err->message . "\n";
      };
    }
  }

  return $self->{on_error};
}

sub crc16 {
  my $data = shift;

  unless ( utf8::downgrade( $data, 1 ) ) {
    utf8::encode($data);
  }

  my $crc = 0;
  foreach my $char ( split //, $data ) {
    $crc = ( $crc << 8 & 0xff00 )
        ^ $CRC16_TAB[ ( ( $crc >> 8 ) ^ ord($char) ) & 0x00ff ];
  }

  return $crc;
}

sub hash_slot {
  my $hashtag = shift;

  if ( $hashtag =~ m/\{([^}]*?)\}/ ) {
    if ( length $1 > 0 ) {
      $hashtag = $1;
    }
  }

  return crc16($hashtag) % MAX_SLOTS;
}

sub _init {
  my $self = shift;

  $self->{_init_state} = S_IN_PROGRESS;
  undef $self->{_refresh_timer};

  weaken($self);

  $self->_discover_cluster(
    sub {
      my $err = $_[1];

      if ( defined $err ) {
        $self->{_init_state} = S_NEED_DO;

        $self->{_ready} = 0;
        $self->_abort($err);

        return;
      }

      $self->{_init_state} = S_DONE;

      $self->{_ready} = 1;
      $self->_process_input_queue;

      if ( $self->{refresh_interval} > 0 ) {
        $self->{_refresh_timer} = AE::timer(
          $self->{refresh_interval}, 0,
          sub {
            $self->{_init_state} = S_NEED_DO;
            $self->{_ready}      = 0;
          }
        );
      }
    }
  );

  return;
}

sub _discover_cluster {
  my $self = shift;
  my $cb   = shift;

  my $nodes;

  if ( defined $self->{_slots} ) {
    $nodes = $self->_nodes( undef, $self->{allow_slaves} );
  }
  else {
    my %nodes_pool;

    foreach my $node_params ( @{ $self->{startup_nodes} } ) {
      my $hostport = "$node_params->{host}:$node_params->{port}";

      unless ( defined $nodes_pool{$hostport} ) {
        $nodes_pool{$hostport} = $self->_new_node(
            $node_params->{host}, $node_params->{port} );
      }
    }

    $self->{_nodes_pool} = \%nodes_pool;
    $nodes = [ keys %nodes_pool ];
  }

  weaken($self);

  $self->_execute(
    { name => 'cluster_state',
      args => [],

      on_reply => sub {
        my $err = $_[1];

        if ( defined $err ) {
          $cb->( undef, $err );
          return;
        }

        $self->_execute(
          { name => 'cluster_slots',
            args => [],

            on_reply => sub {
              my $slots = shift;
              my $err   = shift;

lib/AnyEvent/RipeRedis/Cluster.pm  view on Meta::CPAN

          return;
        }

        $cmd->{on_reply}->($reply);
      },

      defined $cmd->{on_message}
      ? ( on_message => $cmd->{on_message} )
      : (),
    }
  );

  return;
}

sub _nodes {
  my $self = shift;
  my $slot = shift;
  my $allow_slaves = shift;

  if ( defined $slot ) {
    my ($range) = bsearch {
      $slot > $_->[1] ? -1 : $slot < $_->[0] ? 1 : 0;
    }
    @{ $self->{_slots} };

    return unless defined $range;

    return $allow_slaves
        ? $range->[2]
        : [ $range->[2][0] ];
  }

  return $allow_slaves
      ? $self->{_nodes}
      : $self->{_master_nodes};
}

sub _process_input_queue {
  my $self = shift;

  $self->{_temp_queue}  = $self->{_input_queue};
  $self->{_input_queue} = [];

  while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
    $self->_route($cmd);
  }

  return;
}

sub _reset_internals {
  my $self = shift;

  $self->{_nodes_pool}    = undef;
  $self->{_nodes}         = undef;
  $self->{_master_nodes}  = undef;
  $self->{_slots}         = undef;
  $self->{_commands}      = undef;
  $self->{_init_state}    = S_NEED_DO;
  $self->{_refresh_timer} = undef;
  $self->{_ready}         = 0;

  return;
}

sub _abort {
  my $self = shift;
  my $err  = shift;

  my @queued_commands = $self->_queued_commands;

  $self->{_input_queue} = [];
  $self->{_temp_queue}  = [];

  if ( !defined $err && @queued_commands ) {
    $err = _new_error( 'Connection closed by client prematurely',
        E_CONN_CLOSED_BY_CLIENT );
  }

  if ( defined $err ) {
    my $err_msg  = $err->message;
    my $err_code = $err->code;

    $self->{on_error}->($err);

    foreach my $cmd (@queued_commands) {
      my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
          $err_code );

      $cmd->{on_reply}->( undef, $err );
    }
  }

  return;
}

sub _queued_commands {
  my $self = shift;

  return (
    @{ $self->{_temp_queue} },
    @{ $self->{_input_queue} },
  );
}

sub _new_error {
  return AnyEvent::RipeRedis::Error->new(@_);
}

sub AUTOLOAD {
  our $AUTOLOAD;
  my $cmd_name = $AUTOLOAD;
  $cmd_name =~ s/^.+:://;

  my $sub = sub {
    my $self = shift;

    my $cmd = $self->_prepare( $cmd_name, [@_] );
    $self->_route($cmd);



( run in 2.149 seconds using v1.01-cache-2.11-cpan-97f6503c9c8 )