AnyEvent-Memcached
view release on metacpan or search on metacpan
lib/AnyEvent/Memcached/Peer.pm view on Meta::CPAN
package #hide
AnyEvent::Memcached::Peer;
use common::sense 2;m{
use strict;
use warnings;
}x;
use base 'AnyEvent::Connection';
use Carp;
use AnyEvent::Connection::Util;
use Scalar::Util qw(weaken);
#use Devel::Leak::Cb;
sub DEBUG () { 0 }
use AnyEvent::Memcached::Conn;
sub new {
my $self = shift->SUPER::new(
rawcon => 'AnyEvent::Memcached::Conn',
reconnect => 1,
@_,
);
$self->{waitingcb} = {};
$self;
}
sub connect {
my $self = shift;
$self->{connecting} and return;
$self->{grd}{con} = $self->reg_cb( connected => sub { $self->{failed} = 0; } );
$self->{grd}{cfl} = $self->reg_cb( connfail => sub { $self->{failed} = 1; } );
$self->{grd}{dis} = $self->reg_cb( disconnect => sub {
shift;shift;
%$self or return;
warn "Peer $self->{host}:$self->{port} disconnected".(@_ ? ": @_" : '')."\n" if $self->{debug};
my $e = @_ ? "@_" : "disconnected";
for ( keys %{$self->{waitingcb}} ) {
if ($self->{waitingcb}{$_}) {
#warn "Cleanup: ",::sub_fullname( $self->{waitingcb}{$_} );
$self->{waitingcb}{$_}(undef,$e);
}
delete $self->{waitingcb}{$_};
}
} );
$self->SUPER::connect(@_);
return;
}
sub conntrack {
my $self = shift;
my ($method,$args,$cb) = @_;
if($self->{connecting} and $self->{failed}) {
warn "Is connecting, have fails => not connected" if DEBUG;
$cb and $cb->(undef, "Not connected");
return;
}
elsif (!$self->{connected}) {
my @args = @$args; # copy to avoid rewriting
warn time()." Not connected, do connect for ".\@args.", ".dumper($args[0]) if DEBUG;
my ($c,$t);
weaken( $self->{waitingcb}{int $cb} = $cb ) if $cb;
weaken( $self );
# This rely on correct event invocation order of Object::Event.
# If this could change, I'll add own queue
$c = $self->reg_cb(
connected => sub {
shift->unreg_me;
#$c or return;
warn "connected cb for ".\@args.", ".dumper($args[0]) if DEBUG;
undef $c;undef $t;
$self or return;
delete $self->{waitingcb}{int $cb} if $cb;
return $self->{con}->$method(@args);
},
);
$t = AnyEvent->timer(
after => $self->{timeout},# + 0.05, # Since there are timers inside connect, we need to delay a bit longer
cb => sub {
#$t or return;
warn time()." timeout $self->{timeout} cb for $args->[0]" if DEBUG;
undef $c;undef $t;
$self or return;
if ($cb){
$self->{waitingcb}{int $cb};
$cb->(undef, "Connect timeout");
}
},
);
$self->connect();
}
else {
Carp::cluck "How do I get here?";
return $self->{con}->$method(@$args);
}
}
sub command {
my $self = shift;
if ($self->{connected}) {
return $self->{con}->command( @_ );
}
else {
my ($cmd,%args) = @_;
$self->conntrack( command => \@_, $args{cb} );
}
}
sub request {
my $self = shift;
if ($self->{connected}) {
return $self->{con}->say(@_);
}
else {
# no cb
$self->conntrack( say => \@_ );
}
}
sub reader {
my $self = shift;
if ($self->{connected}) {
return $self->{con}->reader(@_);
( run in 2.163 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )