AnyEvent-Redis-Federated

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis/Federated.pm  view on Meta::CPAN

package AnyEvent::Redis::Federated;

# An AnyEvent-based Redis client which implements timeouts, connection
# retries, multi-machine pool configuration (including consistent
# hashing), and other magic bits. 

use strict;
use warnings;
use AnyEvent::Redis;
use AnyEvent;
use Set::ConsistentHash;   # for hash ring logic
use Digest::MD5 qw(md5);   # for hashing keys
use Scalar::Util qw(weaken);
use List::Util qw(shuffle);

our $VERSION = "0.08";

# keep a global object cache that will contain weak references to
# objects keyed on their tag.  this allows for sharing of objects
# within a given process by modules that are otherwise unaware of
# each other provided they use the same tag.
our %object_cache;

# These are all for failure handling (server down or unresponsive).
# If a connection to a given server fails, we'll retry up to
# MAX_HOST_RETRIES and then only retry once in a while.  That
# interval is dictated by BASE_RETRY_INTERVAL.  If that retry fails,
# we'll multiply that by RETRY_INTERVAL_MULT up to but not exceeding
# MAX_RETRY_INTERVAL.
#
# If we ever get a successful retry, we'll erase any memory of the
# failure and pretend things are just fine.

use constant MAX_HOST_RETRIES      =>   3; # how many in a row before we pass
use constant BASE_RETRY_INTERVAL   =>   2; # in seconds
use constant RETRY_INTERVAL_MULT   =>   2; # multiply this much each retry fail
use constant MAX_RETRY_INTERVAL    => 600; # no more than this long
use constant DEFAULT_WEIGHT        =>  10; # for consistent hashing
use constant COMMAND_TIMEOUT       =>   1; # used in poll()
use constant QUERY_ALL             =>   0; # don't query all addresses by default

my %defaults = (
	command_timeout     => COMMAND_TIMEOUT,
	max_host_retries    => MAX_HOST_RETRIES,
	base_retry_interval => BASE_RETRY_INTERVAL,
	retry_interval_mult => RETRY_INTERVAL_MULT,
	max_retry_interval  => MAX_RETRY_INTERVAL,
	query_all           => QUERY_ALL,
	quiet               => $ENV{QUIET},
);

sub new {
	my $class = shift;
	my $self = { @_ };

	# tag short circuit
	if ($self->{tag}) {
		if ($object_cache{$self->{tag}}) {
			return $object_cache{$self->{tag}};
		}
	}

	# basic init
	while (my ($k, $v) = each %defaults) {
		next if exists $self->{$k};
		$self->{$k} = $v;
	}

	# condvar for finishing up stuff (used in poll())
	$self->{cv} = undef;

	# setup server_status tracking
	$self->{server_status} = { };

	# request state
	$self->{request_serial} = 0;
	$self->{request_state} = { };

	# we must have configuration
	if (not $self->{config}) {
		die("No configuration provided. Can't instantiate Redis client");
	}

	# populate node list
	$self->{nodes} = [keys %{$self->{config}->{nodes}}];

	if ($self->{debug}) {
		print "node list: ", join ', ', @{$self->{nodes}};
		print "\n";
	}

	# setup the addresses array
	foreach my $node (keys %{$self->{config}->{nodes}}) {
		if ($self->{config}->{nodes}->{$node}->{addresses}) {
			# shuffle the existing addresses array
			@{$self->{config}->{nodes}->{$node}->{addresses}} = shuffle(@{$self->{config}->{nodes}->{$node}->{addresses}});
			# and set the first to be our targeted server
			$self->{config}->{nodes}->{$node}->{address} = ${$self->{config}->{nodes}->{$node}->{addresses}}[-1];
		}
	}

	# setup the consistent hash
	my $set = Set::ConsistentHash->new;
	my @targets = map { $_, DEFAULT_WEIGHT } @{$self->{nodes}};
	$set->set_targets(@targets);
	$set->set_hash_func(\&_hash);
	$self->{set} = $set;
	$self->{buckets} = $self->{set}->buckets;

	$self->{idle_timeout} = 0 if not exists $self->{idle_timeout};

	print "config done.\n" if $self->{debug};
	bless $self, $class;

	# cache it for later use
	if ($self->{tag}) {
		$object_cache{$self->{tag}} = $self;
		weaken($object_cache{$self->{tag}});
	}

	return $self;
}

sub removeNode {
	my ($self, $node) = @_;
	$self->{set}->modify_targets($node => 0);
	$self->{buckets} = $self->{set}->buckets;
}

sub addNode {
	my ($self, $name, $ref) = @_;
	$self->{config}->{nodes}->{$name} = $ref;
	$self->{set}->modify_targets($name => DEFAULT_WEIGHT);
	$self->{buckets} = $self->{set}->buckets;
}

sub DESTROY {
}

sub _hash {
	return unpack("N", md5(shift));
}

sub commandTimeout {
	my ($self, $time) = @_;
	if (defined $time) {
		$self->{command_timeout} = $time;
	}
	return $self->{command_timeout};
}

sub queryAll {
	my ($self, $val) = @_;
	if (defined $val) {
		$self->{query_all} = $val;
	}
	return $self->{query_all};
}

sub nodeToHost {
	my ($self, $node) = @_;
	return $self->{config}->{nodes}->{$node}->{address};
}

sub keyToNode {
	my ($self, $key) = @_;
	my $node = $self->{buckets}->[_hash($key) % 1024];
	return $node;
}

sub isServerDown {
	my ($self, $server) = @_;
	return 1 if $self->{server_status}{"$server:down"};
 	return 0;
}

sub isServerUp {
	my ($self, $server) = @_;
	return 0 if $self->{server_status}{"$server:down"};
	return 1;
}

sub nextServer {
	my ($self, $server, $node) = @_;
	return $server unless $self->{config}->{nodes}->{$node}->{addresses};
	$self->{config}->{nodes}->{$node}->{address} = shift(@{$self->{config}->{nodes}->{$node}->{addresses}});
	push @{$self->{config}->{nodes}->{$node}->{addresses}}, $self->{config}->{nodes}->{$node}->{address};
	warn "redis server for $node changed from $server to $self->{config}->{nodes}->{$node}->{address} selected\n" if $self->{debug};
	return $self->{config}->{nodes}->{$node}->{address};
}

## return only on-line/up servers?

sub allServers {
	my ($self, $node) = @_;
	my $hosts = [ grep { $self->isServerUp($_) } @{$self->{config}->{nodes}->{$node}->{addresses}} ];
	return $hosts;
}

sub markServerUp {
	my ($self, $server) = @_;
	if ($self->{server_status}{"$server:down"}) {
		my $down_since = localtime($self->{server_status}{"$server:down_since"});
		delete $self->{server_status}{"$server:down"};
		delete $self->{server_status}{"$server:retries"};
		delete $self->{server_status}{"$server:down_since"};
		delete $self->{server_status}{"$server:retry_pending"};
		delete $self->{server_status}{"$server:retry_interval"};
 		warn "redis server $server back up (down since $down_since)\n" if $self->{debug};

lib/AnyEvent/Redis/Federated.pm  view on Meta::CPAN

	my $key = $_[0];
	my $hk  = $key;
	my $cb  = sub { };

	if (ref $_[-1] eq 'CODE') {
		$cb = pop @_;
	}

	# key group?
	if (ref($_[0]) eq 'ARRAY') {
		$hk  = $_[0]->[0];
		$key = $_[0]->[1];
		$_[0] = $key;
	}

	my $node = $self->keyToNode($hk);
	my $query_all = $self->{query_all};

	if ($call =~ s/_all$//) {
		$query_all = 1;
	}

	## The normal single-server case...
	if (not $query_all) {
		my $server = $self->nodeToHost($node);
		print "server [$server] of node [$node] for key [$key] hashkey [$hk]\n" if $self->{debug};

		if ($self->isServerDown($server)) {
			# try another if we can
			if ($self->{config}->{nodes}->{$node}->{addresses}) {
				print "server [$server] seems down\n" if $self->{debug};
				$server = $self->nextServer($server, $node);
				print "trying next server in line [$server] for node [$node]\n" if $self->{debug};
			}
			# bail otherwise
			else {
				print "server $server down.  abandoning call.\n" if $self->{debug};
				$cb->(undef);
				return $self;
			}
		}

		return $self->scheduleCall($server, $call, [@_], $cb);
	}

	## Need to fire this one at all up servers in the node group...
	else {
		my $servers = $self->allServers($node);
		for my $server (@$servers) {
			$self->scheduleCall($server, $call, [@_], $cb);
		}
		return $self;
	}
}

sub poll {
	my ($self) = @_;
	#return if $self->{pending_requests} < 1;
	return if not defined $self->{cv};
	my $rid = $self->{request_serial};
	my $timeout = $self->{command_timeout};

	my $w;
	if ($timeout) {
		$w = AnyEvent->signal(signal => "ALRM", cb => sub {
			warn "AnyEvent::Redis::Federated::poll alarm timeout! ($rid)\n" if $self->{debug};

			# check the state of requests, marking remaining as cancelled
			while (my ($rid, $state) = each %{$self->{request_state}}) {
				if ($self->{request_state}->{$rid}) {
					print "found pending request to cancel: $rid\n" if $self->{debug};
					$self->{request_state}->{$rid} = 0;
					$self->{cv}->end;
					undef $w;
				}
			}
		});
		print "scheduling alarm timer in poll() for $timeout\n" if $self->{debug};
		alarm($timeout);
	}

	$self->{cv}->recv;
	$self->{cv} = undef;
	alarm(0);
	undef $w;
}

sub scheduleCall {
	my ($self, $server, $call, $args, $cb) = @_;

	# have a non-idle connection already?
	my $r;
	if ($self->{conn}->{$server}) {
		if ($self->{idle_timeout}) {
			if ($self->{last_used}->{$server} > time - $self->{idle_timeout}) {
				$r = $self->{conn}->{$server};
			}
		}
		else {
			$r = $self->{conn}->{$server};
		}
	}

	# otherwise create a new connection
	if (not defined $r) {
		my ($host, $port) = split /:/, $server;
		print "attempting new connection to $server\n" if $self->{debug};
		$r = AnyEvent::Redis->new(
			host => $host,
			port => $port,
			on_error => sub {
				warn @_ unless $self->{quiet};
				$self->markServerDown($server);
				$self->{cv}->end;
			}
		);

		$self->{conn}->{$server} = $r;
	}

	if (not defined $self->{cv}) {



( run in 0.305 second using v1.01-cache-2.11-cpan-d7f47b0818f )