AnyEvent-Memcached

 view release on metacpan or  search on metacpan

lib/AnyEvent/Memcached.pm  view on Meta::CPAN


=cut

sub set_servers {
	my $self = shift;
	my $list = shift;
	my $buckets = $self->{_bucker}->new(servers => $list);
	#warn R::Dump($list, $buckets);
	$self->{hash} = $self->{_hasher}->new(buckets => $buckets);
	$self->{peers} = 
	my $peers = $buckets->peers;
	for my $peer ( values %{ $peers } ) {
		$peer->{con} = AnyEvent::Memcached::Peer->new(
			port      => $peer->{port},
			host      => $peer->{host},
			timeout   => $self->{timeout},
			debug     => $self->{debug},
		);
		# Noreply connection
		if ($self->{noreply}) {
			$peer->{nrc} = AnyEvent::Memcached::Peer->new(
				port      => $peer->{port},
				host      => $peer->{host},
				timeout   => $self->{timeout},
				debug     => $self->{debug},# || 1,
			);
		}
	}
	return $self;
}

=head2 connect

    Establish connection to all servers and invoke event C<connected>, when ready

=cut

sub connect {
	my $self = shift;
	$_->{con}->connect
		for values %{ $self->{peers} };
}

sub _handle_errors {
	my $self = shift;
	my $peer = shift;
	local $_ = shift;
	if ($_ eq 'ERROR') {
		warn "Error";
	}
	elsif (/(CLIENT|SERVER)_ERROR (.*)/) {
		warn ucfirst(lc $1)." error: $2";
	}
	else {
		warn "Bad response from $peer->{host}:$peer->{port}: $_";
	}
}

sub _do {
	my $self    = shift;
	my $key     = shift; utf8::decode($key) xor utf8::encode($key) if utf8::is_utf8($key);
	my $command = shift; utf8::decode($command) xor utf8::encode($command) if utf8::is_utf8($command);
	my $worker  = shift; # CODE
	my %args    = @_;
	my $servers = $self->{hash}->servers($key);
	my %res;
	my %err;
	my $res;

	if ($key =~ /[\x00-\x20\x7F]/) {
		carp "Invalid characters in key '$key'";
		return $args{cb} ? $args{cb}(undef, "Invalid key") : 0;
	}
	if ($args{noreply} and !$self->{noreply}) {
		if (!$args{cb}) {
			carp "Noreply option not set, but noreply command requested. command ignored";
			return 0;
		} else {
			carp "Noreply option not set, but noreply command requested. fallback to common command";
		}
		delete $args{noreply};
	}
	if ($args{noreply}) {
		for my $srv ( keys %$servers ) {
			for my $real (@{ $servers->{$srv} }) {
				my $cmd = $command.' noreply';
				substr($cmd, index($cmd,'%s'),2) = $real;
				$self->{peers}{$srv}{nrc}->request($cmd);
				$self->{peers}{$srv}{lastnr} = $cmd;
				unless ($self->{peers}{$srv}{nrc}->handles('command')) {
					$self->{peers}{$srv}{nrc}->reg_cb(command => sub { # cb {
						shift;
						warn "Got data from $srv noreply connection (while shouldn't): @_\nLast noreply command was $self->{peers}{$srv}{lastnr}\n";
					});
					$self->{peers}{$srv}{nrc}->want_command();
				}
			}
		}
		$args{cb}(1) if $args{cb};
		return 1;
	}
	$_ and $_->begin for $self->{cv}, $args{cv};
	my $cv = AE::cv {
		#use Data::Dumper;
		#warn Dumper $res,\%res,\%err;
		if ($res != -1) {
			$args{cb}($res);
		}
		elsif (!%err) {
			warn "-1 while not err";
			$args{cb}($res{$key});
		}
		else {
			$args{cb}(undef, dumper($err{$key}));
		}
		#warn "cv end";
		
		$_ and $_->end for $args{cv}, $self->{cv};
	};
	$cv->begin;
	for my $srv ( keys %$servers ) {
		for my $real (@{ $servers->{$srv} }) {



( run in 0.490 second using v1.01-cache-2.11-cpan-39bf76dae61 )