AnyEvent-RipeRedis-Cluster
view release on metacpan or search on metacpan
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
use warnings;
use base qw( Exporter );
our $VERSION = '0.32';
use AnyEvent::RipeRedis;
use AnyEvent::RipeRedis::Error;
use AnyEvent::Socket;
use List::MoreUtils qw( bsearch );
use Scalar::Util qw( looks_like_number weaken );
use Carp qw( croak );
my %ERROR_CODES;
BEGIN {
%ERROR_CODES = %AnyEvent::RipeRedis::Error::ERROR_CODES;
my @err_codes = keys %ERROR_CODES;
our @EXPORT_OK = ( @err_codes, qw( crc16 hash_slot ) );
our %EXPORT_TAGS = ( err_codes => \@err_codes );
}
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
return crc16($hashtag) % MAX_SLOTS;
}
sub _init {
my $self = shift;
$self->{_init_state} = S_IN_PROGRESS;
undef $self->{_refresh_timer};
weaken($self);
$self->_discover_cluster(
sub {
my $err = $_[1];
if ( defined $err ) {
$self->{_init_state} = S_NEED_DO;
$self->{_ready} = 0;
$self->_abort($err);
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
unless ( defined $nodes_pool{$hostport} ) {
$nodes_pool{$hostport} = $self->_new_node(
$node_params->{host}, $node_params->{port} );
}
}
$self->{_nodes_pool} = \%nodes_pool;
$nodes = [ keys %nodes_pool ];
}
weaken($self);
$self->_execute(
{ name => 'cluster_state',
args => [],
on_reply => sub {
my $err = $_[1];
if ( defined $err ) {
$cb->( undef, $err );
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
return;
}
sub _load_commands {
my $self = shift;
my $cb = shift;
my $nodes = $self->_nodes( undef, $self->{allow_slaves} );
weaken($self);
$self->_execute(
{ name => 'command',
args => [],
on_reply => sub {
my $commands_raw = shift;
my $err = shift;
if ( defined $err ) {
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
on_disconnect => $self->_create_on_node_disconnect( $host, $port ),
on_error => $self->_create_on_node_error( $host, $port ),
);
}
sub _create_on_node_connect {
my $self = shift;
my $host = shift;
my $port = shift;
weaken($self);
return sub {
if ( defined $self->{on_node_connect} ) {
$self->{on_node_connect}->( $host, $port );
}
};
}
sub _create_on_node_disconnect {
my $self = shift;
my $host = shift;
my $port = shift;
weaken($self);
return sub {
if ( defined $self->{on_node_disconnect} ) {
$self->{on_node_disconnect}->( $host, $port );
}
};
}
sub _create_on_node_error {
my $self = shift;
my $host = shift;
my $port = shift;
weaken($self);
return sub {
my $err = shift;
if ( defined $self->{on_node_error} ) {
$self->{on_node_error}->( $err, $host, $port );
}
};
}
sub _prepare {
my $self = shift;
my $cmd_name = shift;
my $args = shift;
weaken($self);
my $cbs;
if ( ref( $args->[-1] ) eq 'HASH' ) {
$cbs = pop @{$args};
}
else {
$cbs = {};
if ( ref( $args->[-1] ) eq 'CODE' ) {
if ( exists $SUB_CMDS{$cmd_name} ) {
$cbs->{on_message} = pop @{$args};
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
elsif ( $node_index == scalar @{$nodes} ) {
$node_index = 0;
}
my $hostport = $nodes->[$node_index];
my $node = $self->{_nodes_pool}{$hostport};
my $cmd_name = $cmd->{name} eq 'cluster_state'
? 'cluster_info'
: $cmd->{name};
weaken($self);
$node->execute( $cmd_name, @{ $cmd->{args} },
{ on_reply => sub {
my $reply = shift;
my $err = shift;
if ( $cmd->{name} eq 'cluster_state' ) {
unless ( defined $err ) {
if ( $reply->{cluster_state} eq 'ok' ) {
$reply = 1;
( run in 0.732 second using v1.01-cache-2.11-cpan-65fba6d93b7 )