AnyEvent-Redis-Federated

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis/Federated.pm  view on Meta::CPAN

package AnyEvent::Redis::Federated;

# An AnyEvent-based Redis client which implements timeouts, connection
# retries, multi-machine pool configuration (including consistent
# hashing), and other magic bits. 

use strict;
use warnings;
use AnyEvent::Redis;
use AnyEvent;
use Set::ConsistentHash;   # for hash ring logic
use Digest::MD5 qw(md5);   # for hashing keys
use Scalar::Util qw(weaken);
use List::Util qw(shuffle);

our $VERSION = "0.08";

# keep a global object cache that will contain weak references to
# objects keyed on their tag.  this allows for sharing of objects
# within a given process by modules that are otherwise unaware of
# each other provided they use the same tag.
our %object_cache;

# These are all for failure handling (server down or unresponsive).
# If a connection to a given server fails, we'll retry up to
# MAX_HOST_RETRIES and then only retry once in a while.  That
# interval is dictated by BASE_RETRY_INTERVAL.  If that retry fails,
# we'll multiply that by RETRY_INTERVAL_MULT up to but not exceeding
# MAX_RETRY_INTERVAL.
#
# If we ever get a successful retry, we'll erase any memory of the
# failure and pretend things are just fine.

use constant MAX_HOST_RETRIES      =>   3; # how many in a row before we pass
use constant BASE_RETRY_INTERVAL   =>   2; # in seconds
use constant RETRY_INTERVAL_MULT   =>   2; # multiply this much each retry fail
use constant MAX_RETRY_INTERVAL    => 600; # no more than this long
use constant DEFAULT_WEIGHT        =>  10; # for consistent hashing
use constant COMMAND_TIMEOUT       =>   1; # used in poll()
use constant QUERY_ALL             =>   0; # don't query all addresses by default

my %defaults = (
	command_timeout     => COMMAND_TIMEOUT,
	max_host_retries    => MAX_HOST_RETRIES,
	base_retry_interval => BASE_RETRY_INTERVAL,
	retry_interval_mult => RETRY_INTERVAL_MULT,
	max_retry_interval  => MAX_RETRY_INTERVAL,
	query_all           => QUERY_ALL,
	quiet               => $ENV{QUIET},
);

sub new {
	my $class = shift;
	my $self = { @_ };

	# tag short circuit
	if ($self->{tag}) {
		if ($object_cache{$self->{tag}}) {
			return $object_cache{$self->{tag}};
		}
	}

	# basic init
	while (my ($k, $v) = each %defaults) {
		next if exists $self->{$k};
		$self->{$k} = $v;
	}

	# condvar for finishing up stuff (used in poll())
	$self->{cv} = undef;

	# setup server_status tracking
	$self->{server_status} = { };

	# request state
	$self->{request_serial} = 0;
	$self->{request_state} = { };



( run in 2.164 seconds using v1.01-cache-2.11-cpan-df04353d9ac )