AnyEvent-Memcached
view release on metacpan or search on metacpan
lib/AnyEvent/Memcached.pm view on Meta::CPAN
sub _handle_errors {
my $self = shift;
my $peer = shift;
local $_ = shift;
if ($_ eq 'ERROR') {
warn "Error";
}
elsif (/(CLIENT|SERVER)_ERROR (.*)/) {
warn ucfirst(lc $1)." error: $2";
}
else {
warn "Bad response from $peer->{host}:$peer->{port}: $_";
}
}
sub _do {
my $self = shift;
my $key = shift; utf8::decode($key) xor utf8::encode($key) if utf8::is_utf8($key);
my $command = shift; utf8::decode($command) xor utf8::encode($command) if utf8::is_utf8($command);
my $worker = shift; # CODE
my %args = @_;
my $servers = $self->{hash}->servers($key);
my %res;
my %err;
my $res;
if ($key =~ /[\x00-\x20\x7F]/) {
carp "Invalid characters in key '$key'";
return $args{cb} ? $args{cb}(undef, "Invalid key") : 0;
}
if ($args{noreply} and !$self->{noreply}) {
if (!$args{cb}) {
carp "Noreply option not set, but noreply command requested. command ignored";
return 0;
} else {
carp "Noreply option not set, but noreply command requested. fallback to common command";
}
delete $args{noreply};
}
if ($args{noreply}) {
for my $srv ( keys %$servers ) {
for my $real (@{ $servers->{$srv} }) {
my $cmd = $command.' noreply';
substr($cmd, index($cmd,'%s'),2) = $real;
$self->{peers}{$srv}{nrc}->request($cmd);
$self->{peers}{$srv}{lastnr} = $cmd;
unless ($self->{peers}{$srv}{nrc}->handles('command')) {
$self->{peers}{$srv}{nrc}->reg_cb(command => sub { # cb {
shift;
warn "Got data from $srv noreply connection (while shouldn't): @_\nLast noreply command was $self->{peers}{$srv}{lastnr}\n";
});
$self->{peers}{$srv}{nrc}->want_command();
}
}
}
$args{cb}(1) if $args{cb};
return 1;
}
$_ and $_->begin for $self->{cv}, $args{cv};
my $cv = AE::cv {
#use Data::Dumper;
#warn Dumper $res,\%res,\%err;
if ($res != -1) {
$args{cb}($res);
}
elsif (!%err) {
warn "-1 while not err";
$args{cb}($res{$key});
}
else {
$args{cb}(undef, dumper($err{$key}));
}
#warn "cv end";
$_ and $_->end for $args{cv}, $self->{cv};
};
$cv->begin;
for my $srv ( keys %$servers ) {
for my $real (@{ $servers->{$srv} }) {
$cv->begin;
my $cmd = $command;
substr($cmd, index($cmd,'%s'),2) = $real;
$self->{peers}{$srv}{con}->command(
$cmd,
cb => sub { # cb {
if (defined( local $_ = shift )) {
my ($ok,$fail) = $worker->($_);
if (defined $ok) {
$res{$real}{$srv} = $ok;
$res = (!defined $res ) || $res == $ok ? $ok : -1;
} else {
$err{$real}{$srv} = $fail;
$res = -1;
}
} else {
warn "do failed: @_/$!";
$err{$real}{$srv} = $_;
$res = -1;
}
$cv->end;
}
);
}
}
$cv->end;
return;
}
sub _set {
my $self = shift;
my $cmd = shift;
my $key = shift;
my $cas;
if ($cmd eq 'cas') {
$cas = shift;
}
my $val = shift;
my %args = @_;
return $args{cb}(undef, "Readonly") if $self->{readonly};
if ($cas =~ /\D/) {
carp "Invalid characters in cas '$cas'";
( run in 0.680 second using v1.01-cache-2.11-cpan-39bf76dae61 )