Cache-Memcached-AnyEvent
view release on metacpan or search on metacpan
lib/Cache/Memcached/AnyEvent.pm view on Meta::CPAN
$self->{_active_server_count}++;
$self->{_server_handles}->{ $server } = $h;
}
sub add_server {
my ($self, @servers) = @_;
push @{$self->{servers}}, @servers;
$self->selector->set_servers( $self->{servers} );
}
sub connect {
my $self = shift;
return if $self->{_is_connecting} || $self->{_is_connected};
$self->disconnect();
delete $self->{_is_disconnecting};
$self->{_is_connecting} = {};
$self->{_active_servers} = [];
$self->{_active_server_count} = 0;
my $connect_cv = AE::cv {
delete $self->{_is_connecting};
if (! $self->{_active_server_count}) {
die "Failed to connect to any memcached servers";
}
$self->{_is_connected} = 1;
if (my $cb = $self->{ on_connect }) {
$cb->($self);
}
$self->_drain_queue;
};
foreach my $server ( @{ $self->{ servers } }) {
$self->_connect_one($server, $connect_cv);
}
}
sub delete {
my ($self, @args) = @_;
my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
my $noreply = !defined $cb;
$self->_push_queue( $self->protocol->delete($self, @args, $noreply, $cb ) );
}
sub get_handle { shift->{_server_handles}->{ $_[0] } }
{
my $installer = sub {
my ($name, $code) = @_;
{
no strict 'refs';
*{$name} = $code;
}
};
foreach my $method ( qw( get get_multi ) ) {
$installer->( $method, sub {
my ($self, $keys, $cb) = @_;
Scalar::Util::weaken($self);
$self->_push_queue( $self->protocol->$method($self, $keys, $cb) );
} );
}
foreach my $method ( qw( decr incr ) ) {
$installer->($method, sub {
my ($self, @args) = @_;
my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
my ($key, $value, $initial) = @args;
Scalar::Util::weaken($self);
$self->_push_queue( $self->protocol->$method( $self, $key, $value, $initial, $cb ) );
});
}
foreach my $method ( qw(add append prepend replace set) ) {
$installer->($method, sub {
my ($self, @args) = @_;
my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
my ($key, $value, $exptime, $noreply) = @args;
Scalar::Util::weaken($self);
$self->_push_queue( $self->protocol->$method( $self, $key, $value, $exptime, $noreply, $cb ) );
});
}
}
sub stats {
my ($self, @args) = @_;
my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
my ($name) = @args;
$self->_push_queue( $self->protocol->stats($self, $name, $cb) );
}
sub version {
my ($self, $cb) = @_;
$self->_push_queue( $self->protocol->version($self, $cb) );
}
sub flush_all {
my ($self, @args) = @_;
my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
my $noreply = !!$cb;
my $delay = shift @args || 0;
$self->_push_queue( $self->protocol->flush_all($self, $delay, $noreply, $cb) );
}
sub _push_queue {
my ($self, $cb) = @_;
push @{$self->{queue}}, $cb;
$self->_drain_queue unless $self->{_is_draining};
}
sub _drain_queue {
my $self = shift;
if (! $self->{_is_connected}) {
if ($self->{_is_connecting} or $self->{_is_disconnecting}) {
return;
}
$self->connect;
return;
}
if ($self->{_is_draining}) {
return;
}
my $cb = shift @{$self->{queue}};
return unless $cb;
my $guard = AnyEvent::Util::guard {
my $t; $t = AE::timer 0, 0, sub {
$self->{_is_draining}--;
undef $t;
$self->_drain_queue;
};
};
$self->{_is_draining}++;
$cb->($guard);
}
sub disconnect {
my $self = shift;
( run in 2.106 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )