AnyEvent-RipeRedis-Cluster
view release on metacpan or search on metacpan
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
0xff9f, 0xefbe, 0xdfdd, 0xcffc, 0xbf1b, 0xaf3a, 0x9f59, 0x8f78,
0x9188, 0x81a9, 0xb1ca, 0xa1eb, 0xd10c, 0xc12d, 0xf14e, 0xe16f,
0x1080, 0x00a1, 0x30c2, 0x20e3, 0x5004, 0x4025, 0x7046, 0x6067,
0x83b9, 0x9398, 0xa3fb, 0xb3da, 0xc33d, 0xd31c, 0xe37f, 0xf35e,
0x02b1, 0x1290, 0x22f3, 0x32d2, 0x4235, 0x5214, 0x6277, 0x7256,
0xb5ea, 0xa5cb, 0x95a8, 0x8589, 0xf56e, 0xe54f, 0xd52c, 0xc50d,
0x34e2, 0x24c3, 0x14a0, 0x0481, 0x7466, 0x6447, 0x5424, 0x4405,
0xa7db, 0xb7fa, 0x8799, 0x97b8, 0xe75f, 0xf77e, 0xc71d, 0xd73c,
0x26d3, 0x36f2, 0x0691, 0x16b0, 0x6657, 0x7676, 0x4615, 0x5634,
0xd94c, 0xc96d, 0xf90e, 0xe92f, 0x99c8, 0x89e9, 0xb98a, 0xa9ab,
0x5844, 0x4865, 0x7806, 0x6827, 0x18c0, 0x08e1, 0x3882, 0x28a3,
0xcb7d, 0xdb5c, 0xeb3f, 0xfb1e, 0x8bf9, 0x9bd8, 0xabbb, 0xbb9a,
0x4a75, 0x5a54, 0x6a37, 0x7a16, 0x0af1, 0x1ad0, 0x2ab3, 0x3a92,
0xfd2e, 0xed0f, 0xdd6c, 0xcd4d, 0xbdaa, 0xad8b, 0x9de8, 0x8dc9,
0x7c26, 0x6c07, 0x5c64, 0x4c45, 0x3ca2, 0x2c83, 0x1ce0, 0x0cc1,
0xef1f, 0xff3e, 0xcf5d, 0xdf7c, 0xaf9b, 0xbfba, 0x8fd9, 0x9ff8,
0x6e17, 0x7e36, 0x4e55, 0x5e74, 0x2e93, 0x3eb2, 0x0ed1, 0x1ef0,
);
my %PREDEFINED_CMDS = (
sort => { readonly => 0, key_pos => 1 },
zunionstore => { readonly => 0, key_pos => 1 },
zinterstore => { readonly => 0, key_pos => 1 },
eval => { readonly => 0, movablekeys => 1, key_pos => 0 },
evalsha => { readonly => 0, movablekeys => 1, key_pos => 0 },
);
my %SUB_CMDS = (
subscribe => 1,
psubscribe => 1,
);
sub new {
my $class = shift;
my %params = @_;
my $self = bless {}, $class;
unless ( defined $params{startup_nodes} ) {
croak 'Startup nodes not specified';
}
unless ( ref( $params{startup_nodes} ) eq 'ARRAY' ) {
croak 'Startup nodes must be specified as array reference';
}
unless ( @{ $params{startup_nodes} } ) {
croak 'Specified empty list of startup nodes';
}
$self->{startup_nodes} = $params{startup_nodes};
$self->{allow_slaves} = $params{allow_slaves};
$self->{lazy} = $params{lazy};
$self->refresh_interval( $params{refresh_interval} );
$self->{on_node_connect} = $params{on_node_connect};
$self->{on_node_disconnect} = $params{on_node_disconnect};
$self->{on_node_error} = $params{on_node_error};
$self->on_error( $params{on_error} );
my %node_params;
foreach my $name ( qw( password utf8 connection_timeout read_timeout
reconnect reconnect_interval handle_params ) )
{
next unless defined $params{$name};
$node_params{$name} = $params{$name};
}
$self->{_node_params} = \%node_params;
$self->_reset_internals;
$self->{_input_queue} = [];
$self->{_temp_queue} = [];
unless ( $self->{lazy} ) {
$self->_init;
}
return $self;
}
sub execute {
my $self = shift;
my $cmd_name = shift;
my $cmd = $self->_prepare( $cmd_name, [@_] );
$self->_route($cmd);
return;
}
sub disconnect {
my $self = shift;
foreach my $node ( values %{ $self->{_nodes_pool} } ) {
$node->disconnect;
}
$self->_reset_internals;
$self->_abort;
return;
}
sub nodes {
my $self = shift;
my $key = shift;
my $allow_slaves = shift;
return unless defined $self->{_slots};
my $slot;
if ( defined $key ) {
$slot = hash_slot($key);
}
my $nodes = $self->_nodes( $slot, $allow_slaves );
return unless defined $nodes;
return wantarray
? @{ $self->{_nodes_pool} }{ @{$nodes} }
: $self->{_nodes_pool}{ $nodes->[0] };
}
sub refresh_interval {
my $self = shift;
if (@_) {
my $seconds = shift;
if ( defined $seconds ) {
if ( !looks_like_number($seconds) || $seconds < 0 ) {
croak qq{"refresh_interval" must be a positive number};
}
$self->{refresh_interval} = $seconds;
}
else {
$self->{refresh_interval} = D_REFRESH_INTERVAL;
}
}
return $self->{refresh_interval};
}
sub on_error {
my $self = shift;
if ( @_ ) {
my $on_error = shift;
if ( defined $on_error ) {
$self->{on_error} = $on_error;
}
else {
$self->{on_error} = sub {
my $err = shift;
warn $err->message . "\n";
};
}
}
return $self->{on_error};
}
sub crc16 {
my $data = shift;
unless ( utf8::downgrade( $data, 1 ) ) {
utf8::encode($data);
}
my $crc = 0;
foreach my $char ( split //, $data ) {
$crc = ( $crc << 8 & 0xff00 )
^ $CRC16_TAB[ ( ( $crc >> 8 ) ^ ord($char) ) & 0x00ff ];
}
return $crc;
}
sub hash_slot {
my $hashtag = shift;
if ( $hashtag =~ m/\{([^}]*?)\}/ ) {
if ( length $1 > 0 ) {
$hashtag = $1;
}
}
return crc16($hashtag) % MAX_SLOTS;
}
sub _init {
my $self = shift;
$self->{_init_state} = S_IN_PROGRESS;
undef $self->{_refresh_timer};
weaken($self);
$self->_discover_cluster(
sub {
my $err = $_[1];
if ( defined $err ) {
$self->{_init_state} = S_NEED_DO;
$self->{_ready} = 0;
$self->_abort($err);
return;
}
$self->{_init_state} = S_DONE;
$self->{_ready} = 1;
$self->_process_input_queue;
if ( $self->{refresh_interval} > 0 ) {
$self->{_refresh_timer} = AE::timer(
$self->{refresh_interval}, 0,
sub {
$self->{_init_state} = S_NEED_DO;
$self->{_ready} = 0;
}
);
}
}
);
lib/AnyEvent/RipeRedis/Cluster.pm view on Meta::CPAN
{ host => 'localhost', port => 7002 },
],
password => 'yourpass',
connection_timeout => 5,
read_timeout => 5,
refresh_interval => 5,
lazy => 1,
reconnect_interval => 5,
on_node_connect => sub {
my $host = shift;
my $port = shift;
# handling...
},
on_node_disconnect => sub {
my $host = shift;
my $port = shift;
# handling...
},
on_node_error => sub {
my $err = shift;
my $host = shift;
my $port = shift;
# error handling...
},
on_error => sub {
my $err = shift;
# error handling...
},
);
=over
=item startup_nodes => \@nodes
Specifies the list of startup nodes. Parameter should contain the array of
hashes that contains addresses of some nodes in the cluster. Each hash should
contain C<host> and C<port> elements. The client will try to connect to random
node from the list to retrieve information about all cluster nodes and slots
mapping. If the client could not connect to first selected node, it will try
to connect to another random node from the list.
=item password => $password
If the password is specified, the C<AUTH> command is sent to all nodes
of the cluster after connection.
=item allow_slaves => $boolean
If enabled, the client will try to send read-only commands to slave nodes.
Disabled by default.
=item utf8 => $boolean
If enabled, all strings will be converted to UTF-8 before sending to nodes,
and all results will be decoded from UTF-8.
Enabled by default.
=item connection_timeout => $fractional_seconds
Specifies connection timeout. If the client could not connect to the node
after specified timeout, the C<on_node_error> callback is called with the
C<E_CANT_CONN> error. The timeout specifies in seconds and can contain a
fractional part.
connection_timeout => 10.5,
By default the client use kernel's connection timeout.
=item read_timeout => $fractional_seconds
Specifies read timeout. If the client could not receive a reply from the node
after specified timeout, the client close connection and call the
C<on_node_error> callback with the C<E_READ_TIMEDOUT> error. The timeout is
specifies in seconds and can contain a fractional part.
read_timeout => 3.5,
Not set by default.
=item lazy => $boolean
If enabled, the initial connection to the startup node establishes at time when
you will send the first command to the cluster. By default the initial
connection establishes after calling of the C<new> method.
Disabled by default.
=item reconnect => $boolean
If the connection to the node was lost and the parameter C<reconnect> is
TRUE (default), the client will try to restore the connection when you execute
next command. The client will try to reconnect only once and, if attempt fails,
the error object is passed to command callback. If you need several attempts of
the reconnection, you must retry a command from the callback as many times, as
you need. Such behavior allows to control reconnection procedure.
Enabled by default.
=item reconnect_interval => $fractional_seconds
If the parameter is specified, the client will try to reconnect only after
this interval. Commands executed between reconnections will be queued.
reconnect_interval => 5,
Not set by default.
=item refresh_interval => $fractional_seconds
Cluster state refresh interval. If set to zero, cluster state will be updated
only on MOVED redirect.
( run in 0.956 second using v1.01-cache-2.11-cpan-ceb78f64989 )