AnyEvent-Redis-Federated
view release on metacpan or search on metacpan
lib/AnyEvent/Redis/Federated.pm view on Meta::CPAN
package AnyEvent::Redis::Federated;
# An AnyEvent-based Redis client which implements timeouts, connection
# retries, multi-machine pool configuration (including consistent
# hashing), and other magic bits.
use strict;
use warnings;
use AnyEvent::Redis;
use AnyEvent;
use Set::ConsistentHash; # for hash ring logic
use Digest::MD5 qw(md5); # for hashing keys
use Scalar::Util qw(weaken);
use List::Util qw(shuffle);
our $VERSION = "0.08";
# keep a global object cache that will contain weak references to
# objects keyed on their tag. this allows for sharing of objects
# within a given process by modules that are otherwise unaware of
# each other provided they use the same tag.
our %object_cache;
# These are all for failure handling (server down or unresponsive).
# If a connection to a given server fails, we'll retry up to
# MAX_HOST_RETRIES and then only retry once in a while. That
# interval is dictated by BASE_RETRY_INTERVAL. If that retry fails,
# we'll multiply that by RETRY_INTERVAL_MULT up to but not exceeding
# MAX_RETRY_INTERVAL.
#
# If we ever get a successful retry, we'll erase any memory of the
# failure and pretend things are just fine.
use constant MAX_HOST_RETRIES => 3; # how many in a row before we pass
use constant BASE_RETRY_INTERVAL => 2; # in seconds
use constant RETRY_INTERVAL_MULT => 2; # multiply this much each retry fail
use constant MAX_RETRY_INTERVAL => 600; # no more than this long
use constant DEFAULT_WEIGHT => 10; # for consistent hashing
use constant COMMAND_TIMEOUT => 1; # used in poll()
use constant QUERY_ALL => 0; # don't query all addresses by default
my %defaults = (
command_timeout => COMMAND_TIMEOUT,
max_host_retries => MAX_HOST_RETRIES,
base_retry_interval => BASE_RETRY_INTERVAL,
retry_interval_mult => RETRY_INTERVAL_MULT,
max_retry_interval => MAX_RETRY_INTERVAL,
query_all => QUERY_ALL,
quiet => $ENV{QUIET},
);
sub new {
my $class = shift;
my $self = { @_ };
# tag short circuit
if ($self->{tag}) {
if ($object_cache{$self->{tag}}) {
return $object_cache{$self->{tag}};
}
}
# basic init
while (my ($k, $v) = each %defaults) {
next if exists $self->{$k};
$self->{$k} = $v;
}
# condvar for finishing up stuff (used in poll())
$self->{cv} = undef;
# setup server_status tracking
$self->{server_status} = { };
# request state
$self->{request_serial} = 0;
$self->{request_state} = { };
# we must have configuration
if (not $self->{config}) {
die("No configuration provided. Can't instantiate Redis client");
}
# populate node list
$self->{nodes} = [keys %{$self->{config}->{nodes}}];
if ($self->{debug}) {
print "node list: ", join ', ', @{$self->{nodes}};
print "\n";
}
# setup the addresses array
foreach my $node (keys %{$self->{config}->{nodes}}) {
if ($self->{config}->{nodes}->{$node}->{addresses}) {
# shuffle the existing addresses array
@{$self->{config}->{nodes}->{$node}->{addresses}} = shuffle(@{$self->{config}->{nodes}->{$node}->{addresses}});
# and set the first to be our targeted server
$self->{config}->{nodes}->{$node}->{address} = ${$self->{config}->{nodes}->{$node}->{addresses}}[-1];
}
}
# setup the consistent hash
my $set = Set::ConsistentHash->new;
my @targets = map { $_, DEFAULT_WEIGHT } @{$self->{nodes}};
$set->set_targets(@targets);
$set->set_hash_func(\&_hash);
$self->{set} = $set;
$self->{buckets} = $self->{set}->buckets;
$self->{idle_timeout} = 0 if not exists $self->{idle_timeout};
print "config done.\n" if $self->{debug};
bless $self, $class;
# cache it for later use
if ($self->{tag}) {
$object_cache{$self->{tag}} = $self;
weaken($object_cache{$self->{tag}});
}
return $self;
}
sub removeNode {
my ($self, $node) = @_;
$self->{set}->modify_targets($node => 0);
$self->{buckets} = $self->{set}->buckets;
}
sub addNode {
my ($self, $name, $ref) = @_;
$self->{config}->{nodes}->{$name} = $ref;
$self->{set}->modify_targets($name => DEFAULT_WEIGHT);
$self->{buckets} = $self->{set}->buckets;
}
sub DESTROY {
}
sub _hash {
return unpack("N", md5(shift));
}
sub commandTimeout {
my ($self, $time) = @_;
if (defined $time) {
$self->{command_timeout} = $time;
}
return $self->{command_timeout};
}
sub queryAll {
my ($self, $val) = @_;
if (defined $val) {
$self->{query_all} = $val;
}
return $self->{query_all};
}
sub nodeToHost {
my ($self, $node) = @_;
return $self->{config}->{nodes}->{$node}->{address};
}
sub keyToNode {
my ($self, $key) = @_;
my $node = $self->{buckets}->[_hash($key) % 1024];
return $node;
}
sub isServerDown {
my ($self, $server) = @_;
return 1 if $self->{server_status}{"$server:down"};
return 0;
}
sub isServerUp {
my ($self, $server) = @_;
return 0 if $self->{server_status}{"$server:down"};
return 1;
}
sub nextServer {
my ($self, $server, $node) = @_;
return $server unless $self->{config}->{nodes}->{$node}->{addresses};
$self->{config}->{nodes}->{$node}->{address} = shift(@{$self->{config}->{nodes}->{$node}->{addresses}});
push @{$self->{config}->{nodes}->{$node}->{addresses}}, $self->{config}->{nodes}->{$node}->{address};
warn "redis server for $node changed from $server to $self->{config}->{nodes}->{$node}->{address} selected\n" if $self->{debug};
return $self->{config}->{nodes}->{$node}->{address};
}
## return only on-line/up servers?
sub allServers {
my ($self, $node) = @_;
my $hosts = [ grep { $self->isServerUp($_) } @{$self->{config}->{nodes}->{$node}->{addresses}} ];
return $hosts;
}
sub markServerUp {
my ($self, $server) = @_;
if ($self->{server_status}{"$server:down"}) {
my $down_since = localtime($self->{server_status}{"$server:down_since"});
delete $self->{server_status}{"$server:down"};
delete $self->{server_status}{"$server:retries"};
delete $self->{server_status}{"$server:down_since"};
delete $self->{server_status}{"$server:retry_pending"};
delete $self->{server_status}{"$server:retry_interval"};
warn "redis server $server back up (down since $down_since)\n" if $self->{debug};
lib/AnyEvent/Redis/Federated.pm view on Meta::CPAN
my $key = $_[0];
my $hk = $key;
my $cb = sub { };
if (ref $_[-1] eq 'CODE') {
$cb = pop @_;
}
# key group?
if (ref($_[0]) eq 'ARRAY') {
$hk = $_[0]->[0];
$key = $_[0]->[1];
$_[0] = $key;
}
my $node = $self->keyToNode($hk);
my $query_all = $self->{query_all};
if ($call =~ s/_all$//) {
$query_all = 1;
}
## The normal single-server case...
if (not $query_all) {
my $server = $self->nodeToHost($node);
print "server [$server] of node [$node] for key [$key] hashkey [$hk]\n" if $self->{debug};
if ($self->isServerDown($server)) {
# try another if we can
if ($self->{config}->{nodes}->{$node}->{addresses}) {
print "server [$server] seems down\n" if $self->{debug};
$server = $self->nextServer($server, $node);
print "trying next server in line [$server] for node [$node]\n" if $self->{debug};
}
# bail otherwise
else {
print "server $server down. abandoning call.\n" if $self->{debug};
$cb->(undef);
return $self;
}
}
return $self->scheduleCall($server, $call, [@_], $cb);
}
## Need to fire this one at all up servers in the node group...
else {
my $servers = $self->allServers($node);
for my $server (@$servers) {
$self->scheduleCall($server, $call, [@_], $cb);
}
return $self;
}
}
sub poll {
my ($self) = @_;
#return if $self->{pending_requests} < 1;
return if not defined $self->{cv};
my $rid = $self->{request_serial};
my $timeout = $self->{command_timeout};
my $w;
if ($timeout) {
$w = AnyEvent->signal(signal => "ALRM", cb => sub {
warn "AnyEvent::Redis::Federated::poll alarm timeout! ($rid)\n" if $self->{debug};
# check the state of requests, marking remaining as cancelled
while (my ($rid, $state) = each %{$self->{request_state}}) {
if ($self->{request_state}->{$rid}) {
print "found pending request to cancel: $rid\n" if $self->{debug};
$self->{request_state}->{$rid} = 0;
$self->{cv}->end;
undef $w;
}
}
});
print "scheduling alarm timer in poll() for $timeout\n" if $self->{debug};
alarm($timeout);
}
$self->{cv}->recv;
$self->{cv} = undef;
alarm(0);
undef $w;
}
sub scheduleCall {
my ($self, $server, $call, $args, $cb) = @_;
# have a non-idle connection already?
my $r;
if ($self->{conn}->{$server}) {
if ($self->{idle_timeout}) {
if ($self->{last_used}->{$server} > time - $self->{idle_timeout}) {
$r = $self->{conn}->{$server};
}
}
else {
$r = $self->{conn}->{$server};
}
}
# otherwise create a new connection
if (not defined $r) {
my ($host, $port) = split /:/, $server;
print "attempting new connection to $server\n" if $self->{debug};
$r = AnyEvent::Redis->new(
host => $host,
port => $port,
on_error => sub {
warn @_ unless $self->{quiet};
$self->markServerDown($server);
$self->{cv}->end;
}
);
$self->{conn}->{$server} = $r;
}
if (not defined $self->{cv}) {
( run in 0.305 second using v1.01-cache-2.11-cpan-d7f47b0818f )