AnyEvent-Memcached

 view release on metacpan or  search on metacpan

lib/AnyEvent/Memcached.pm  view on Meta::CPAN

package AnyEvent::Memcached;

use 5.8.8;

=head1 NAME

AnyEvent::Memcached - AnyEvent memcached client

=cut

our $VERSION = '0.08';

=head1 SYNOPSIS

    use AnyEvent::Memcached;

    my $memd = AnyEvent::Memcached->new(
        servers => [ "10.0.0.15:11211", "10.0.0.15:11212" ], # same as in Cache::Memcached
        debug   => 1,
        compress_threshold => 10000,
        namespace => 'my-namespace:',

        # May use another hashing algo:
        hasher  => 'AnyEvent::Memcached::Hash::WithNext',

        cv      => $cv, # AnyEvent->condvar: group callback
    );

    $memd->set_servers([ "10.0.0.15:11211", "10.0.0.15:11212" ]);

    # Basic methods are like in Cache::Memcached, but with additional cb => sub { ... };
    # first argument to cb is return value, second is the error(s)

    $memd->set( key => $value, cb => sub {
        shift or warn "Set failed: @_"
    } );

    # Single get
    $memd->get( 'key', cb => sub {
        my ($value,$err) = shift;
        $err and return warn "Get failed: @_";
        warn "Value for key is $value";
    } );

    # Multi-get
    $memd->get( [ 'key1', 'key2' ], cb => sub {
        my ($values,$err) = shift;
        $err and return warn "Get failed: @_";
        warn "Value for key1 is $values->{key1} and value for key2 is $values->{key2}"
    } );

    # Additionally there is rget (see memcachedb-1.2.1-beta)

    $memd->rget( 'fromkey', 'tokey', cb => sub {
        my ($values,$err) = shift;
        $err and warn "Get failed: @_";
        while (my ($key,$value) = each %$values) {
            # ...
        }
    } );

    # Rget with sorted responce values
    $memd->rget( 'fromkey', 'tokey', rv => 'array' cb => sub {
        my ($values,$err) = shift;
        $err and warn "Get failed: @_";
        for (0 .. $#values/2) {
            my ($key,$value) = @$values[$_*2,$_*2+1];
        }
    } );

=head1 DESCRIPTION

Asyncronous C<memcached/memcachedb> client for L<AnyEvent> framework

=head1 NOTICE

There is a notices in L<Cache::Memcached::AnyEvent> related to this module. They all has been fixed

=over 4

lib/AnyEvent/Memcached.pm  view on Meta::CPAN

=item Prerequisites

We no longer need L<Object::Event> and L<Devel::Leak::Cb>. At all, the dependency list is like in L<Cache::Memcached> + L<AnyEvent>

=item Binary protocol

It seems to me, that usage of binary protocol from pure perl gives very little advantage. So for now I don't implement it

=item Unimplemented Methods

There is a note, that get_multi is not implementeted. In fact, it was implemented by method L</get>, but the documentation was wrong.

=back

In general, this module follows the spirit of L<AnyEvent> rather than correspondence to L<Cache::Memcached> interface.

=cut

use common::sense 2;m{
use strict;
use warnings;
}x;

use Carp;
use AnyEvent 5;
#use Devel::Leak::Cb;

use AnyEvent::Socket;
use AnyEvent::Handle;
use AnyEvent::Connection;
use AnyEvent::Connection::Util;
use AnyEvent::Memcached::Conn;
use Storable ();

use AnyEvent::Memcached::Peer;
use AnyEvent::Memcached::Hash;
use AnyEvent::Memcached::Buckets;

# flag definitions
use constant F_STORABLE => 1;
use constant F_COMPRESS => 2;

# size savings required before saving compressed value
use constant COMPRESS_SAVINGS => 0.20; # percent

our $HAVE_ZLIB;
BEGIN {
	$HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
}

=head1 METHODS

=head2 new %args

Currently supported options:

=over 4

=item servers
=item namespace
=item debug
=item cv
=item compress_threshold
=item compress_enable
=item timeout
=item hasher

If set, will use instance of this class for hashing instead of default.
For implementing your own hashing, see sources of L<AnyEvent::Memcached::Hash> and L<AnyEvent::Memcached::Hash::With::Next>

=item noreply

If true, additional connection will established for noreply commands.

=item cas

If true, will enable cas/gets commands (since they are not suppotred in memcachedb)

=back

=cut

sub new {
	my $self = bless {}, shift;
	my %args = @_;
	$self->{namespace} = exists $args{namespace} ? delete $args{namespace} : '';
	for (qw( debug cv compress_threshold compress_enable timeout noreply cas)) {
		$self->{$_} = exists $args{$_} ? delete $args{$_} : 0;
	}
	$self->{timeout} ||= 3;
	$self->{_bucker} = $args{bucker} || 'AnyEvent::Memcached::Buckets';
	$self->{_hasher} = $args{hasher} || 'AnyEvent::Memcached::Hash';

	$self->set_servers(delete $args{servers});
	$self->{compress_enable} and !$HAVE_ZLIB and carp("Have no Compress::Zlib installed, but have compress_enable option");
	carp "@{[ keys %args ]} options are not supported yet" if %args;
	Carp::confess "Invalid characters in 'namespace' option: '$self->{namespace}'" if $self->{namespace} =~ /[\x00-\x20\x7F]/;
	$self;
}

=head2 set_servers

    Setup server list

=cut

sub set_servers {
	my $self = shift;
	my $list = shift;
	my $buckets = $self->{_bucker}->new(servers => $list);
	#warn R::Dump($list, $buckets);
	$self->{hash} = $self->{_hasher}->new(buckets => $buckets);
	$self->{peers} = 
	my $peers = $buckets->peers;
	for my $peer ( values %{ $peers } ) {
		$peer->{con} = AnyEvent::Memcached::Peer->new(
			port      => $peer->{port},
			host      => $peer->{host},
			timeout   => $self->{timeout},
			debug     => $self->{debug},
		);
		# Noreply connection
		if ($self->{noreply}) {
			$peer->{nrc} = AnyEvent::Memcached::Peer->new(
				port      => $peer->{port},
				host      => $peer->{host},
				timeout   => $self->{timeout},
				debug     => $self->{debug},# || 1,
			);
		}
	}
	return $self;
}

=head2 connect

    Establish connection to all servers and invoke event C<connected>, when ready

=cut

sub connect {
	my $self = shift;
	$_->{con}->connect
		for values %{ $self->{peers} };
}

sub _handle_errors {
	my $self = shift;
	my $peer = shift;
	local $_ = shift;
	if ($_ eq 'ERROR') {
		warn "Error";
	}
	elsif (/(CLIENT|SERVER)_ERROR (.*)/) {
		warn ucfirst(lc $1)." error: $2";
	}
	else {
		warn "Bad response from $peer->{host}:$peer->{port}: $_";
	}
}

sub _do {
	my $self    = shift;
	my $key     = shift; utf8::decode($key) xor utf8::encode($key) if utf8::is_utf8($key);
	my $command = shift; utf8::decode($command) xor utf8::encode($command) if utf8::is_utf8($command);
	my $worker  = shift; # CODE
	my %args    = @_;
	my $servers = $self->{hash}->servers($key);
	my %res;
	my %err;
	my $res;

	if ($key =~ /[\x00-\x20\x7F]/) {
		carp "Invalid characters in key '$key'";
		return $args{cb} ? $args{cb}(undef, "Invalid key") : 0;
	}
	if ($args{noreply} and !$self->{noreply}) {
		if (!$args{cb}) {
			carp "Noreply option not set, but noreply command requested. command ignored";
			return 0;
		} else {
			carp "Noreply option not set, but noreply command requested. fallback to common command";
		}
		delete $args{noreply};
	}
	if ($args{noreply}) {
		for my $srv ( keys %$servers ) {
			for my $real (@{ $servers->{$srv} }) {

lib/AnyEvent/Memcached.pm  view on Meta::CPAN

	}
	$cv->end;
	return;
}

=head2 incadd ( $key, $increment, [cv => $cv], [ noreply => 1 ], cb => $cb->( $rc, $err ) )

Increment key, and if it not exists, add it with initial value. If add fails, try again to incr or fail

=cut

sub incadd {
	my $self = shift;
	my $key = shift;
	my $val = shift;
	my %args = @_;
	$self->incr($key => $val, cb => sub {
		if (my $rc = shift or @_) {
			#if (@_) {
			#	warn("incr failed: @_");
			#} else {
			#	warn "incr ok";
			#}
			$args{cb}($rc, @_);
		}
		else {
			$self->add( $key, $val, %args, cb => sub {
				if ( my $rc = shift or @_ ) {
					#if (@_) {
					#	warn("add failed: @_");
					#} else {
					#	warn "add ok";
					#}
					$args{cb}($val, @_);
				}
				else {
					#warn "add failed, try again";
					$self->incadd($key,$val,%args);
				}
			});
		}
	});
}

=head2 destroy

Shutdown object as much, as possible, incl cleaning of incapsulated objects

=cut

sub AnyEvent::Memcached::destroyed::AUTOLOAD {}

sub destroy {
	my $self = shift;
	$self->DESTROY;
	bless $self, "AnyEvent::Memcached::destroyed";
}

sub DESTROY {
	my $self = shift;
	warn "(".int($self).") Destroying AE:MC" if $self->{debug};
	for (values %{$self->{peers}}) {
		$_->{con} and $_->{con}->destroy;
	}
	%$self = ();
}

=head1 BUGS

Feature requests are welcome

Bug reports are welcome

=head1 AUTHOR

Mons Anderson, C<< <mons at cpan.org> >>

=head1 COPYRIGHT & LICENSE

Copyright 2009 Mons Anderson, all rights reserved.

This program is free software; you can redistribute it and/or modify it
under the same terms as Perl itself.

=cut

1; # End of AnyEvent::Memcached



( run in 1.211 second using v1.01-cache-2.11-cpan-39bf76dae61 )