AnyEvent-RipeRedis-Cluster
view release on metacpan or search on metacpan
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
else {
$self->{refresh_interval} = D_REFRESH_INTERVAL;
}
}
return $self->{refresh_interval};
}
sub on_error {
my $self = shift;
if ( @_ ) {
my $on_error = shift;
if ( defined $on_error ) {
$self->{on_error} = $on_error;
}
else {
$self->{on_error} = sub {
my $err = shift;
warn $err->message . "\n";
};
}
}
return $self->{on_error};
}
sub crc16 {
my $data = shift;
unless ( utf8::downgrade( $data, 1 ) ) {
utf8::encode($data);
}
my $crc = 0;
foreach my $char ( split //, $data ) {
$crc = ( $crc << 8 & 0xff00 )
^ $CRC16_TAB[ ( ( $crc >> 8 ) ^ ord($char) ) & 0x00ff ];
}
return $crc;
}
sub hash_slot {
my $hashtag = shift;
if ( $hashtag =~ m/\{([^}]*?)\}/ ) {
if ( length $1 > 0 ) {
$hashtag = $1;
}
}
return crc16($hashtag) % MAX_SLOTS;
}
sub _init {
my $self = shift;
$self->{_init_state} = S_IN_PROGRESS;
undef $self->{_refresh_timer};
weaken($self);
$self->_discover_cluster(
sub {
my $err = $_[1];
if ( defined $err ) {
$self->{_init_state} = S_NEED_DO;
$self->{_ready} = 0;
$self->_abort($err);
return;
}
$self->{_init_state} = S_DONE;
$self->{_ready} = 1;
$self->_process_input_queue;
if ( $self->{refresh_interval} > 0 ) {
$self->{_refresh_timer} = AE::timer(
$self->{refresh_interval}, 0,
sub {
$self->{_init_state} = S_NEED_DO;
$self->{_ready} = 0;
}
);
}
}
);
return;
}
sub _discover_cluster {
my $self = shift;
my $cb = shift;
my $nodes;
if ( defined $self->{_slots} ) {
$nodes = $self->_nodes( undef, $self->{allow_slaves} );
}
else {
my %nodes_pool;
foreach my $node_params ( @{ $self->{startup_nodes} } ) {
my $hostport = "$node_params->{host}:$node_params->{port}";
unless ( defined $nodes_pool{$hostport} ) {
$nodes_pool{$hostport} = $self->_new_node(
$node_params->{host}, $node_params->{port} );
}
}
$self->{_nodes_pool} = \%nodes_pool;
$nodes = [ keys %nodes_pool ];
}
weaken($self);
$self->_execute(
{ name => 'cluster_state',
args => [],
on_reply => sub {
my $err = $_[1];
if ( defined $err ) {
$cb->( undef, $err );
return;
}
$self->_execute(
{ name => 'cluster_slots',
args => [],
on_reply => sub {
my $slots = shift;
my $err = shift;
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
return;
}
$cmd->{on_reply}->($reply);
},
defined $cmd->{on_message}
? ( on_message => $cmd->{on_message} )
: (),
}
);
return;
}
sub _nodes {
my $self = shift;
my $slot = shift;
my $allow_slaves = shift;
if ( defined $slot ) {
my ($range) = bsearch {
$slot > $_->[1] ? -1 : $slot < $_->[0] ? 1 : 0;
}
@{ $self->{_slots} };
return unless defined $range;
return $allow_slaves
? $range->[2]
: [ $range->[2][0] ];
}
return $allow_slaves
? $self->{_nodes}
: $self->{_master_nodes};
}
sub _process_input_queue {
my $self = shift;
$self->{_temp_queue} = $self->{_input_queue};
$self->{_input_queue} = [];
while ( my $cmd = shift @{ $self->{_temp_queue} } ) {
$self->_route($cmd);
}
return;
}
sub _reset_internals {
my $self = shift;
$self->{_nodes_pool} = undef;
$self->{_nodes} = undef;
$self->{_master_nodes} = undef;
$self->{_slots} = undef;
$self->{_commands} = undef;
$self->{_init_state} = S_NEED_DO;
$self->{_refresh_timer} = undef;
$self->{_ready} = 0;
return;
}
sub _abort {
my $self = shift;
my $err = shift;
my @queued_commands = $self->_queued_commands;
$self->{_input_queue} = [];
$self->{_temp_queue} = [];
if ( !defined $err && @queued_commands ) {
$err = _new_error( 'Connection closed by client prematurely',
E_CONN_CLOSED_BY_CLIENT );
}
if ( defined $err ) {
my $err_msg = $err->message;
my $err_code = $err->code;
$self->{on_error}->($err);
foreach my $cmd (@queued_commands) {
my $err = _new_error( qq{Operation "$cmd->{name}" aborted: $err_msg},
$err_code );
$cmd->{on_reply}->( undef, $err );
}
}
return;
}
sub _queued_commands {
my $self = shift;
return (
@{ $self->{_temp_queue} },
@{ $self->{_input_queue} },
);
}
sub _new_error {
return AnyEvent::RipeRedis::Error->new(@_);
}
sub AUTOLOAD {
our $AUTOLOAD;
my $cmd_name = $AUTOLOAD;
$cmd_name =~ s/^.+:://;
my $sub = sub {
my $self = shift;
my $cmd = $self->_prepare( $cmd_name, [@_] );
$self->_route($cmd);
( run in 2.149 seconds using v1.01-cache-2.11-cpan-97f6503c9c8 )