AnyEvent-Redis-Federated
view release on metacpan or search on metacpan
lib/AnyEvent/Redis/Federated.pm view on Meta::CPAN
package AnyEvent::Redis::Federated;
# An AnyEvent-based Redis client which implements timeouts, connection
# retries, multi-machine pool configuration (including consistent
# hashing), and other magic bits.
use strict;
use warnings;
use AnyEvent::Redis;
use AnyEvent;
use Set::ConsistentHash; # for hash ring logic
use Digest::MD5 qw(md5); # for hashing keys
use Scalar::Util qw(weaken);
use List::Util qw(shuffle);
our $VERSION = "0.08";
# keep a global object cache that will contain weak references to
# objects keyed on their tag. this allows for sharing of objects
# within a given process by modules that are otherwise unaware of
# each other provided they use the same tag.
our %object_cache;
# These are all for failure handling (server down or unresponsive).
# If a connection to a given server fails, we'll retry up to
# MAX_HOST_RETRIES and then only retry once in a while. That
# interval is dictated by BASE_RETRY_INTERVAL. If that retry fails,
# we'll multiply that by RETRY_INTERVAL_MULT up to but not exceeding
# MAX_RETRY_INTERVAL.
#
# If we ever get a successful retry, we'll erase any memory of the
# failure and pretend things are just fine.
use constant MAX_HOST_RETRIES => 3; # how many in a row before we pass
use constant BASE_RETRY_INTERVAL => 2; # in seconds
use constant RETRY_INTERVAL_MULT => 2; # multiply this much each retry fail
use constant MAX_RETRY_INTERVAL => 600; # no more than this long
use constant DEFAULT_WEIGHT => 10; # for consistent hashing
use constant COMMAND_TIMEOUT => 1; # used in poll()
use constant QUERY_ALL => 0; # don't query all addresses by default
my %defaults = (
command_timeout => COMMAND_TIMEOUT,
max_host_retries => MAX_HOST_RETRIES,
base_retry_interval => BASE_RETRY_INTERVAL,
retry_interval_mult => RETRY_INTERVAL_MULT,
max_retry_interval => MAX_RETRY_INTERVAL,
query_all => QUERY_ALL,
quiet => $ENV{QUIET},
);
sub new {
my $class = shift;
my $self = { @_ };
# tag short circuit
if ($self->{tag}) {
if ($object_cache{$self->{tag}}) {
return $object_cache{$self->{tag}};
}
}
# basic init
while (my ($k, $v) = each %defaults) {
next if exists $self->{$k};
$self->{$k} = $v;
}
# condvar for finishing up stuff (used in poll())
$self->{cv} = undef;
# setup server_status tracking
$self->{server_status} = { };
# request state
$self->{request_serial} = 0;
$self->{request_state} = { };
# we must have configuration
if (not $self->{config}) {
die("No configuration provided. Can't instantiate Redis client");
}
# populate node list
$self->{nodes} = [keys %{$self->{config}->{nodes}}];
if ($self->{debug}) {
print "node list: ", join ', ', @{$self->{nodes}};
print "\n";
}
# setup the addresses array
foreach my $node (keys %{$self->{config}->{nodes}}) {
if ($self->{config}->{nodes}->{$node}->{addresses}) {
# shuffle the existing addresses array
@{$self->{config}->{nodes}->{$node}->{addresses}} = shuffle(@{$self->{config}->{nodes}->{$node}->{addresses}});
# and set the first to be our targeted server
$self->{config}->{nodes}->{$node}->{address} = ${$self->{config}->{nodes}->{$node}->{addresses}}[-1];
}
}
# setup the consistent hash
my $set = Set::ConsistentHash->new;
my @targets = map { $_, DEFAULT_WEIGHT } @{$self->{nodes}};
$set->set_targets(@targets);
$set->set_hash_func(\&_hash);
$self->{set} = $set;
$self->{buckets} = $self->{set}->buckets;
$self->{idle_timeout} = 0 if not exists $self->{idle_timeout};
print "config done.\n" if $self->{debug};
bless $self, $class;
# cache it for later use
if ($self->{tag}) {
$object_cache{$self->{tag}} = $self;
weaken($object_cache{$self->{tag}});
}
return $self;
}
sub removeNode {
my ($self, $node) = @_;
$self->{set}->modify_targets($node => 0);
$self->{buckets} = $self->{set}->buckets;
}
sub addNode {
my ($self, $name, $ref) = @_;
$self->{config}->{nodes}->{$name} = $ref;
$self->{set}->modify_targets($name => DEFAULT_WEIGHT);
$self->{buckets} = $self->{set}->buckets;
}
sub DESTROY {
}
sub _hash {
return unpack("N", md5(shift));
}
sub commandTimeout {
my ($self, $time) = @_;
if (defined $time) {
$self->{command_timeout} = $time;
}
return $self->{command_timeout};
}
sub queryAll {
my ($self, $val) = @_;
if (defined $val) {
$self->{query_all} = $val;
}
return $self->{query_all};
}
sub nodeToHost {
my ($self, $node) = @_;
return $self->{config}->{nodes}->{$node}->{address};
}
sub keyToNode {
my ($self, $key) = @_;
my $node = $self->{buckets}->[_hash($key) % 1024];
return $node;
}
sub isServerDown {
my ($self, $server) = @_;
return 1 if $self->{server_status}{"$server:down"};
return 0;
}
sub isServerUp {
my ($self, $server) = @_;
( run in 2.413 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )