AnyEvent-Memcached
view release on metacpan or search on metacpan
lib/AnyEvent/Memcached.pm view on Meta::CPAN
=cut
sub set_servers {
my $self = shift;
my $list = shift;
my $buckets = $self->{_bucker}->new(servers => $list);
#warn R::Dump($list, $buckets);
$self->{hash} = $self->{_hasher}->new(buckets => $buckets);
$self->{peers} =
my $peers = $buckets->peers;
for my $peer ( values %{ $peers } ) {
$peer->{con} = AnyEvent::Memcached::Peer->new(
port => $peer->{port},
host => $peer->{host},
timeout => $self->{timeout},
debug => $self->{debug},
);
# Noreply connection
if ($self->{noreply}) {
$peer->{nrc} = AnyEvent::Memcached::Peer->new(
port => $peer->{port},
host => $peer->{host},
timeout => $self->{timeout},
debug => $self->{debug},# || 1,
);
}
}
return $self;
}
=head2 connect
Establish connection to all servers and invoke event C<connected>, when ready
=cut
sub connect {
my $self = shift;
$_->{con}->connect
for values %{ $self->{peers} };
}
sub _handle_errors {
my $self = shift;
my $peer = shift;
local $_ = shift;
if ($_ eq 'ERROR') {
warn "Error";
}
elsif (/(CLIENT|SERVER)_ERROR (.*)/) {
warn ucfirst(lc $1)." error: $2";
}
else {
warn "Bad response from $peer->{host}:$peer->{port}: $_";
}
}
sub _do {
my $self = shift;
my $key = shift; utf8::decode($key) xor utf8::encode($key) if utf8::is_utf8($key);
my $command = shift; utf8::decode($command) xor utf8::encode($command) if utf8::is_utf8($command);
my $worker = shift; # CODE
my %args = @_;
my $servers = $self->{hash}->servers($key);
my %res;
my %err;
my $res;
if ($key =~ /[\x00-\x20\x7F]/) {
carp "Invalid characters in key '$key'";
return $args{cb} ? $args{cb}(undef, "Invalid key") : 0;
}
if ($args{noreply} and !$self->{noreply}) {
if (!$args{cb}) {
carp "Noreply option not set, but noreply command requested. command ignored";
return 0;
} else {
carp "Noreply option not set, but noreply command requested. fallback to common command";
}
delete $args{noreply};
}
if ($args{noreply}) {
for my $srv ( keys %$servers ) {
for my $real (@{ $servers->{$srv} }) {
my $cmd = $command.' noreply';
substr($cmd, index($cmd,'%s'),2) = $real;
$self->{peers}{$srv}{nrc}->request($cmd);
$self->{peers}{$srv}{lastnr} = $cmd;
unless ($self->{peers}{$srv}{nrc}->handles('command')) {
$self->{peers}{$srv}{nrc}->reg_cb(command => sub { # cb {
shift;
warn "Got data from $srv noreply connection (while shouldn't): @_\nLast noreply command was $self->{peers}{$srv}{lastnr}\n";
});
$self->{peers}{$srv}{nrc}->want_command();
}
}
}
$args{cb}(1) if $args{cb};
return 1;
}
$_ and $_->begin for $self->{cv}, $args{cv};
my $cv = AE::cv {
#use Data::Dumper;
#warn Dumper $res,\%res,\%err;
if ($res != -1) {
$args{cb}($res);
}
elsif (!%err) {
warn "-1 while not err";
$args{cb}($res{$key});
}
else {
$args{cb}(undef, dumper($err{$key}));
}
#warn "cv end";
$_ and $_->end for $args{cv}, $self->{cv};
};
$cv->begin;
for my $srv ( keys %$servers ) {
for my $real (@{ $servers->{$srv} }) {
( run in 0.490 second using v1.01-cache-2.11-cpan-39bf76dae61 )