Cache-Memcached-AnyEvent
view release on metacpan or search on metacpan
lib/Cache/Memcached/AnyEvent.pm view on Meta::CPAN
sub serializer {
$_[0]->{serializer} ||=
$_[0]->_build_helper('Serializer', $_[0]->{serializer_class});
}
sub _build_helper {
my ($self, $prefix, $klass) = @_;
if ($klass !~ s/^\+//) {
$klass = "Cache::Memcached::AnyEvent::${prefix}::$klass";
}
Module::Runtime::check_module_name($klass);
Module::Runtime::require_module($klass);
return $klass->new(memcached => $self);
}
BEGIN {
foreach my $attr ( qw(auto_reconnect compress_threshold reconnect_delay servers namespace) ) {
eval <<EOSUB;
sub $attr {
my \$self = shift;
my \$ret = \$self->{$attr};
if (\@_) {
\$self->{$attr} = shift;
}
return \$ret;
}
EOSUB
Carp::confess if $@;
}
}
sub _connect_one {
my ($self, $server, $cv) = @_;
return if $self->{_is_connecting}->{$server};
$cv->begin if $cv;
my ($host, $port) = split( /:/, $server );
$port ||= 11211;
$self->{_is_connecting}->{$server} = tcp_connect $host, $port, sub {
$self->_on_tcp_connect($server, @_);
$cv->end if $cv;
};
}
sub _on_tcp_connect {
my ($self, $server, $fh, $host, $port) = @_;
delete $self->{_is_connecting}->{$server}; # thanks, buddy
if (! $fh) {
# connect failed
warn("failed to connect to $server [$!]");
if ($self->{auto_reconnect} > $self->{_connect_attempts}->{ $server }++) {
# XXX this watcher holds a reference to $self, which means
# it will make your program wait for it to fire until
# auto_reconnect attempts have been made.
# if you need to close immediately, you need to call disconnect
$self->{_reconnect}->{$server} = AE::timer $self->{reconnect_delay}, 0, sub {
delete $self->{_reconnect}->{$server};
$self->_connect_one($server);
};
}
} else {
my $h; $h = AnyEvent::Handle->new(
fh => $fh,
on_drain => sub {
my $h = shift;
if (defined $h->{wbuf} && $h->{wbuf} eq "") {
delete $h->{wbuf}; $h->{wbuf} = "";
}
if (defined $h->{rbuf} && $h->{rbuf} eq "") {
delete $h->{rbuf}; $h->{rbuf} = "";
}
},
on_eof => sub {
my $h = delete $self->{_server_handles}->{$server};
$h->destroy();
undef $h;
},
on_error => sub {
my $h = delete $self->{_server_handles}->{$server};
$h->destroy();
$self->_connect_one($server) if $self->{auto_reconnect};
undef $h;
},
);
$self->_add_active_server( $server, $h );
delete $self->{_connect_attempts}->{ $server };
$self->protocol->prepare_handle( $fh );
}
}
sub _add_active_server {
my ($self, $server, $h) = @_;
push @{$self->{_active_servers}}, $server;
$self->{_active_server_count}++;
$self->{_server_handles}->{ $server } = $h;
}
sub add_server {
my ($self, @servers) = @_;
push @{$self->{servers}}, @servers;
$self->selector->set_servers( $self->{servers} );
}
sub connect {
my $self = shift;
return if $self->{_is_connecting} || $self->{_is_connected};
$self->disconnect();
delete $self->{_is_disconnecting};
$self->{_is_connecting} = {};
$self->{_active_servers} = [];
$self->{_active_server_count} = 0;
my $connect_cv = AE::cv {
delete $self->{_is_connecting};
lib/Cache/Memcached/AnyEvent.pm view on Meta::CPAN
my ($key, $value, $initial) = @args;
Scalar::Util::weaken($self);
$self->_push_queue( $self->protocol->$method( $self, $key, $value, $initial, $cb ) );
});
}
foreach my $method ( qw(add append prepend replace set) ) {
$installer->($method, sub {
my ($self, @args) = @_;
my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
my ($key, $value, $exptime, $noreply) = @args;
Scalar::Util::weaken($self);
$self->_push_queue( $self->protocol->$method( $self, $key, $value, $exptime, $noreply, $cb ) );
});
}
}
sub stats {
my ($self, @args) = @_;
my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
my ($name) = @args;
$self->_push_queue( $self->protocol->stats($self, $name, $cb) );
}
sub version {
my ($self, $cb) = @_;
$self->_push_queue( $self->protocol->version($self, $cb) );
}
sub flush_all {
my ($self, @args) = @_;
my $cb = pop @args if (ref $args[-1] eq 'CODE' or ref $args[-1] eq 'AnyEvent::CondVar');
my $noreply = !!$cb;
my $delay = shift @args || 0;
$self->_push_queue( $self->protocol->flush_all($self, $delay, $noreply, $cb) );
}
sub _push_queue {
my ($self, $cb) = @_;
push @{$self->{queue}}, $cb;
$self->_drain_queue unless $self->{_is_draining};
}
sub _drain_queue {
my $self = shift;
if (! $self->{_is_connected}) {
if ($self->{_is_connecting} or $self->{_is_disconnecting}) {
return;
}
$self->connect;
return;
}
if ($self->{_is_draining}) {
return;
}
my $cb = shift @{$self->{queue}};
return unless $cb;
my $guard = AnyEvent::Util::guard {
my $t; $t = AE::timer 0, 0, sub {
$self->{_is_draining}--;
undef $t;
$self->_drain_queue;
};
};
$self->{_is_draining}++;
$cb->($guard);
}
sub disconnect {
my $self = shift;
my $handles = delete $self->{_server_handles};
foreach my $handle ( values %$handles ) {
if ($handle) {
eval {
$handle->stop_read;
$handle->push_shutdown();
$handle->destroy();
};
}
}
delete $self->{_is_connecting};
delete $self->{_is_connected};
delete $self->{_is_draining};
$self->{_server_handles} = {};
$self->{_is_disconnecting}++;
}
sub DESTROY {
my $self = shift;
$self->disconnect;
}
sub _get_handle_for {
$_[0]->selector->get_handle($_[1]);
}
sub _prepare_key {
my ($self, $key) = @_;
if (my $ns = $self->{namespace}) {
$key = $ns . $key;
}
return $key;
}
sub normalize_key {
my ($self, $key_ref) = @_;
if (my $ns = $self->{namespace}) {
$$key_ref =~ s/^$ns//;
}
}
sub deserialize {
my ($self, $flags_ref, $data_ref) = @_;
if (defined $$flags_ref && defined $$data_ref) {
if (HAVE_ZLIB && $$flags_ref & F_COMPRESS()) {
( run in 0.620 second using v1.01-cache-2.11-cpan-39bf76dae61 )