AnyEvent-Redis-Federated

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis/Federated.pm  view on Meta::CPAN

package AnyEvent::Redis::Federated;

# An AnyEvent-based Redis client which implements timeouts, connection
# retries, multi-machine pool configuration (including consistent
# hashing), and other magic bits. 

use strict;
use warnings;
use AnyEvent::Redis;
use AnyEvent;
use Set::ConsistentHash;   # for hash ring logic
use Digest::MD5 qw(md5);   # for hashing keys
use Scalar::Util qw(weaken);
use List::Util qw(shuffle);

our $VERSION = "0.08";

# keep a global object cache that will contain weak references to
# objects keyed on their tag.  this allows for sharing of objects
# within a given process by modules that are otherwise unaware of
# each other provided they use the same tag.
our %object_cache;

# These are all for failure handling (server down or unresponsive).
# If a connection to a given server fails, we'll retry up to
# MAX_HOST_RETRIES and then only retry once in a while.  That
# interval is dictated by BASE_RETRY_INTERVAL.  If that retry fails,
# we'll multiply that by RETRY_INTERVAL_MULT up to but not exceeding
# MAX_RETRY_INTERVAL.
#
# If we ever get a successful retry, we'll erase any memory of the
# failure and pretend things are just fine.

use constant MAX_HOST_RETRIES      =>   3; # how many in a row before we pass
use constant BASE_RETRY_INTERVAL   =>   2; # in seconds
use constant RETRY_INTERVAL_MULT   =>   2; # multiply this much each retry fail
use constant MAX_RETRY_INTERVAL    => 600; # no more than this long
use constant DEFAULT_WEIGHT        =>  10; # for consistent hashing
use constant COMMAND_TIMEOUT       =>   1; # used in poll()
use constant QUERY_ALL             =>   0; # don't query all addresses by default

my %defaults = (
	command_timeout     => COMMAND_TIMEOUT,
	max_host_retries    => MAX_HOST_RETRIES,
	base_retry_interval => BASE_RETRY_INTERVAL,
	retry_interval_mult => RETRY_INTERVAL_MULT,
	max_retry_interval  => MAX_RETRY_INTERVAL,
	query_all           => QUERY_ALL,
	quiet               => $ENV{QUIET},
);

sub new {
	my $class = shift;
	my $self = { @_ };

	# tag short circuit
	if ($self->{tag}) {
		if ($object_cache{$self->{tag}}) {
			return $object_cache{$self->{tag}};
		}
	}

	# basic init
	while (my ($k, $v) = each %defaults) {
		next if exists $self->{$k};
		$self->{$k} = $v;
	}

	# condvar for finishing up stuff (used in poll())
	$self->{cv} = undef;

	# setup server_status tracking
	$self->{server_status} = { };

	# request state
	$self->{request_serial} = 0;
	$self->{request_state} = { };

	# we must have configuration
	if (not $self->{config}) {
		die("No configuration provided. Can't instantiate Redis client");
	}

	# populate node list
	$self->{nodes} = [keys %{$self->{config}->{nodes}}];

	if ($self->{debug}) {
		print "node list: ", join ', ', @{$self->{nodes}};
		print "\n";
	}

	# setup the addresses array
	foreach my $node (keys %{$self->{config}->{nodes}}) {
		if ($self->{config}->{nodes}->{$node}->{addresses}) {
			# shuffle the existing addresses array
			@{$self->{config}->{nodes}->{$node}->{addresses}} = shuffle(@{$self->{config}->{nodes}->{$node}->{addresses}});
			# and set the first to be our targeted server
			$self->{config}->{nodes}->{$node}->{address} = ${$self->{config}->{nodes}->{$node}->{addresses}}[-1];
		}
	}

	# setup the consistent hash
	my $set = Set::ConsistentHash->new;
	my @targets = map { $_, DEFAULT_WEIGHT } @{$self->{nodes}};
	$set->set_targets(@targets);
	$set->set_hash_func(\&_hash);
	$self->{set} = $set;
	$self->{buckets} = $self->{set}->buckets;

	$self->{idle_timeout} = 0 if not exists $self->{idle_timeout};

	print "config done.\n" if $self->{debug};
	bless $self, $class;

	# cache it for later use
	if ($self->{tag}) {
		$object_cache{$self->{tag}} = $self;
		weaken($object_cache{$self->{tag}});
	}

	return $self;
}

sub removeNode {
	my ($self, $node) = @_;
	$self->{set}->modify_targets($node => 0);
	$self->{buckets} = $self->{set}->buckets;
}

sub addNode {
	my ($self, $name, $ref) = @_;
	$self->{config}->{nodes}->{$name} = $ref;
	$self->{set}->modify_targets($name => DEFAULT_WEIGHT);
	$self->{buckets} = $self->{set}->buckets;
}

sub DESTROY {
}

sub _hash {
	return unpack("N", md5(shift));
}

sub commandTimeout {
	my ($self, $time) = @_;
	if (defined $time) {
		$self->{command_timeout} = $time;
	}
	return $self->{command_timeout};
}

sub queryAll {
	my ($self, $val) = @_;
	if (defined $val) {
		$self->{query_all} = $val;
	}
	return $self->{query_all};
}

sub nodeToHost {
	my ($self, $node) = @_;
	return $self->{config}->{nodes}->{$node}->{address};
}

sub keyToNode {
	my ($self, $key) = @_;
	my $node = $self->{buckets}->[_hash($key) % 1024];
	return $node;
}

sub isServerDown {
	my ($self, $server) = @_;
	return 1 if $self->{server_status}{"$server:down"};
 	return 0;
}

sub isServerUp {
	my ($self, $server) = @_;



( run in 2.413 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )