AnyEvent-Memcached
view release on metacpan or search on metacpan
lib/AnyEvent/Memcached.pm view on Meta::CPAN
package AnyEvent::Memcached;
use 5.8.8;
=head1 NAME
AnyEvent::Memcached - AnyEvent memcached client
=cut
our $VERSION = '0.08';
=head1 SYNOPSIS
use AnyEvent::Memcached;
my $memd = AnyEvent::Memcached->new(
servers => [ "10.0.0.15:11211", "10.0.0.15:11212" ], # same as in Cache::Memcached
debug => 1,
compress_threshold => 10000,
namespace => 'my-namespace:',
# May use another hashing algo:
hasher => 'AnyEvent::Memcached::Hash::WithNext',
cv => $cv, # AnyEvent->condvar: group callback
);
$memd->set_servers([ "10.0.0.15:11211", "10.0.0.15:11212" ]);
# Basic methods are like in Cache::Memcached, but with additional cb => sub { ... };
# first argument to cb is return value, second is the error(s)
$memd->set( key => $value, cb => sub {
shift or warn "Set failed: @_"
} );
# Single get
$memd->get( 'key', cb => sub {
my ($value,$err) = shift;
$err and return warn "Get failed: @_";
warn "Value for key is $value";
} );
# Multi-get
$memd->get( [ 'key1', 'key2' ], cb => sub {
my ($values,$err) = shift;
$err and return warn "Get failed: @_";
warn "Value for key1 is $values->{key1} and value for key2 is $values->{key2}"
} );
# Additionally there is rget (see memcachedb-1.2.1-beta)
$memd->rget( 'fromkey', 'tokey', cb => sub {
my ($values,$err) = shift;
$err and warn "Get failed: @_";
while (my ($key,$value) = each %$values) {
# ...
}
} );
# Rget with sorted responce values
$memd->rget( 'fromkey', 'tokey', rv => 'array' cb => sub {
my ($values,$err) = shift;
$err and warn "Get failed: @_";
for (0 .. $#values/2) {
my ($key,$value) = @$values[$_*2,$_*2+1];
}
} );
=head1 DESCRIPTION
Asyncronous C<memcached/memcachedb> client for L<AnyEvent> framework
=head1 NOTICE
There is a notices in L<Cache::Memcached::AnyEvent> related to this module. They all has been fixed
=over 4
lib/AnyEvent/Memcached.pm view on Meta::CPAN
=item Prerequisites
We no longer need L<Object::Event> and L<Devel::Leak::Cb>. At all, the dependency list is like in L<Cache::Memcached> + L<AnyEvent>
=item Binary protocol
It seems to me, that usage of binary protocol from pure perl gives very little advantage. So for now I don't implement it
=item Unimplemented Methods
There is a note, that get_multi is not implementeted. In fact, it was implemented by method L</get>, but the documentation was wrong.
=back
In general, this module follows the spirit of L<AnyEvent> rather than correspondence to L<Cache::Memcached> interface.
=cut
use common::sense 2;m{
use strict;
use warnings;
}x;
use Carp;
use AnyEvent 5;
#use Devel::Leak::Cb;
use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::Connection;
use AnyEvent::Connection::Util;
use AnyEvent::Memcached::Conn;
use Storable ();
use AnyEvent::Memcached::Peer;
use AnyEvent::Memcached::Hash;
use AnyEvent::Memcached::Buckets;
# flag definitions
use constant F_STORABLE => 1;
use constant F_COMPRESS => 2;
# size savings required before saving compressed value
use constant COMPRESS_SAVINGS => 0.20; # percent
our $HAVE_ZLIB;
BEGIN {
$HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
}
=head1 METHODS
=head2 new %args
Currently supported options:
=over 4
=item servers
=item namespace
=item debug
=item cv
=item compress_threshold
=item compress_enable
=item timeout
=item hasher
If set, will use instance of this class for hashing instead of default.
For implementing your own hashing, see sources of L<AnyEvent::Memcached::Hash> and L<AnyEvent::Memcached::Hash::With::Next>
=item noreply
If true, additional connection will established for noreply commands.
=item cas
If true, will enable cas/gets commands (since they are not suppotred in memcachedb)
=back
=cut
sub new {
my $self = bless {}, shift;
my %args = @_;
$self->{namespace} = exists $args{namespace} ? delete $args{namespace} : '';
for (qw( debug cv compress_threshold compress_enable timeout noreply cas)) {
$self->{$_} = exists $args{$_} ? delete $args{$_} : 0;
}
$self->{timeout} ||= 3;
$self->{_bucker} = $args{bucker} || 'AnyEvent::Memcached::Buckets';
$self->{_hasher} = $args{hasher} || 'AnyEvent::Memcached::Hash';
$self->set_servers(delete $args{servers});
$self->{compress_enable} and !$HAVE_ZLIB and carp("Have no Compress::Zlib installed, but have compress_enable option");
carp "@{[ keys %args ]} options are not supported yet" if %args;
Carp::confess "Invalid characters in 'namespace' option: '$self->{namespace}'" if $self->{namespace} =~ /[\x00-\x20\x7F]/;
$self;
}
=head2 set_servers
Setup server list
=cut
sub set_servers {
my $self = shift;
my $list = shift;
my $buckets = $self->{_bucker}->new(servers => $list);
#warn R::Dump($list, $buckets);
$self->{hash} = $self->{_hasher}->new(buckets => $buckets);
$self->{peers} =
my $peers = $buckets->peers;
for my $peer ( values %{ $peers } ) {
$peer->{con} = AnyEvent::Memcached::Peer->new(
port => $peer->{port},
host => $peer->{host},
timeout => $self->{timeout},
debug => $self->{debug},
);
# Noreply connection
if ($self->{noreply}) {
$peer->{nrc} = AnyEvent::Memcached::Peer->new(
port => $peer->{port},
host => $peer->{host},
timeout => $self->{timeout},
debug => $self->{debug},# || 1,
);
}
}
return $self;
}
=head2 connect
Establish connection to all servers and invoke event C<connected>, when ready
=cut
sub connect {
my $self = shift;
$_->{con}->connect
for values %{ $self->{peers} };
}
sub _handle_errors {
my $self = shift;
my $peer = shift;
local $_ = shift;
if ($_ eq 'ERROR') {
warn "Error";
}
elsif (/(CLIENT|SERVER)_ERROR (.*)/) {
warn ucfirst(lc $1)." error: $2";
}
else {
warn "Bad response from $peer->{host}:$peer->{port}: $_";
}
}
sub _do {
my $self = shift;
my $key = shift; utf8::decode($key) xor utf8::encode($key) if utf8::is_utf8($key);
my $command = shift; utf8::decode($command) xor utf8::encode($command) if utf8::is_utf8($command);
my $worker = shift; # CODE
my %args = @_;
my $servers = $self->{hash}->servers($key);
my %res;
my %err;
my $res;
if ($key =~ /[\x00-\x20\x7F]/) {
carp "Invalid characters in key '$key'";
return $args{cb} ? $args{cb}(undef, "Invalid key") : 0;
}
if ($args{noreply} and !$self->{noreply}) {
if (!$args{cb}) {
carp "Noreply option not set, but noreply command requested. command ignored";
return 0;
} else {
carp "Noreply option not set, but noreply command requested. fallback to common command";
}
delete $args{noreply};
}
if ($args{noreply}) {
for my $srv ( keys %$servers ) {
for my $real (@{ $servers->{$srv} }) {
lib/AnyEvent/Memcached.pm view on Meta::CPAN
}
$cv->end;
return;
}
=head2 incadd ( $key, $increment, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )
Increment key, and if it not exists, add it with initial value. If add fails, try again to incr or fail
=cut
sub incadd {
my $self = shift;
my $key = shift;
my $val = shift;
my %args = @_;
$self->incr($key => $val, cb => sub {
if (my $rc = shift or @_) {
#if (@_) {
# warn("incr failed: @_");
#} else {
# warn "incr ok";
#}
$args{cb}($rc, @_);
}
else {
$self->add( $key, $val, %args, cb => sub {
if ( my $rc = shift or @_ ) {
#if (@_) {
# warn("add failed: @_");
#} else {
# warn "add ok";
#}
$args{cb}($val, @_);
}
else {
#warn "add failed, try again";
$self->incadd($key,$val,%args);
}
});
}
});
}
=head2 destroy
Shutdown object as much, as possible, incl cleaning of incapsulated objects
=cut
sub AnyEvent::Memcached::destroyed::AUTOLOAD {}
sub destroy {
my $self = shift;
$self->DESTROY;
bless $self, "AnyEvent::Memcached::destroyed";
}
sub DESTROY {
my $self = shift;
warn "(".int($self).") Destroying AE:MC" if $self->{debug};
for (values %{$self->{peers}}) {
$_->{con} and $_->{con}->destroy;
}
%$self = ();
}
=head1 BUGS
Feature requests are welcome
Bug reports are welcome
=head1 AUTHOR
Mons Anderson, C<< <mons at cpan.org> >>
=head1 COPYRIGHT & LICENSE
Copyright 2009 Mons Anderson, all rights reserved.
This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.
=cut
1; # End of AnyEvent::Memcached
( run in 1.211 second using v1.01-cache-2.11-cpan-39bf76dae61 )