AnyEvent-Redis
view release on metacpan or search on metacpan
lib/AnyEvent/Redis.pm view on Meta::CPAN
package AnyEvent::Redis;
use strict;
use 5.008_001;
our $VERSION = '0.24';
use constant DEBUG => $ENV{ANYEVENT_REDIS_DEBUG};
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Redis::Protocol;
use Carp qw( croak confess );
use Encode ();
use Scalar::Util qw(weaken);
our $AUTOLOAD;
sub new {
my ($class, %args) = @_;
my $host = delete $args{host} || '127.0.0.1';
my $port = delete $args{port} || 6379;
if (my $encoding = $args{encoding}) {
$args{encoding} = Encode::find_encoding($encoding);
croak qq{Encoding "$encoding" not found} unless ref $args{encoding};
}
bless {
host => $host,
port => $port,
pending_cvs => [],
%args,
}, $class;
}
sub run_cmd {
my $self = shift;
my $cmd = shift;
$self->{cmd_cb} or return $self->connect($cmd, @_);
$self->{cmd_cb}->($cmd, @_);
}
sub DESTROY { }
sub AUTOLOAD {
my $self = shift;
(my $method = $AUTOLOAD) =~ s/.*:://;
$self->run_cmd($method, @_);
}
sub all_cv {
my $self = shift;
$self->{all_cv} = shift if @_;
$self->{all_cv} ||= AE::cv;
}
sub cleanup {
my $self = shift;
delete $self->{cmd_cb};
delete $self->{sock};
$self->{on_error}->(@_) if $self->{on_error};
$self->{on_cleanup}->(@_) if $self->{on_cleanup};
for (splice(@{$self->{pending_cvs}}),
splice(@{$self->{multi_cvs} || []}))
{
eval { $_->croak(@_) };
warn "Exception in cleanup callback (ignored): $@" if $@;
}
return;
}
sub connect {
my $self = shift;
my $cv;
if (@_) {
$cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
$cv ||= AE::cv;
push @{$self->{connect_queue}}, [ $cv, @_ ];
}
return $cv if $self->{sock};
weaken $self;
$self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
my $fh = shift
or do {
my $err = "Can't connect Redis server: $!";
$self->cleanup($err);
eval { $cv->croak($err) };
warn "Exception in connect failure callback (ignored): $@" if $@;
return
};
binmode $fh; # ensure bytes until we decode
my $hd = AnyEvent::Handle->new(
fh => $fh,
on_error => sub { $_[0]->destroy;
$self->cleanup($_[2]) if $_[1];
},
on_eof => sub { $_[0]->destroy;
$self->cleanup('connection closed');
},
encoding => $self->{encoding},
);
$self->{cmd_cb} = sub {
my $command = lc shift;
my $is_pubsub = $command =~ /^p?(?:un)?subscribe\z/;
my $is_subscribe = $command =~ /^p?subscribe\z/;
# Are we already subscribed to anything?
if ($self->{sub} && %{$self->{sub}}) {
croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
unless $is_pubsub;
}
# Are we already in a transaction?
if ($self->{multi_write}) {
croak "Use of pubsub or multi command in transaction is not supported"
if $is_pubsub || $command eq 'multi';
} else {
croak "Can't 'exec' a transaction because none is pending"
if $command eq 'exec';
}
my ($cv, $cb);
if (@_) {
$cv = pop if ref $_[-1] && UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
$cb = pop if ref $_[-1] eq 'CODE';
}
$cv ||= AE::cv;
croak "Must provide a CODE reference for subscriptions" if $is_subscribe && !$cb;
my $send = join("\r\n",
"*" . (1 + @_),
map { ('$' . length $_ => $_) }
(uc($command), map { $self->{encoding} && length($_)
? $self->{encoding}->encode($_)
: $_ } @_))
. "\r\n";
warn $send if DEBUG;
# $self is weakened to avoid leaks, hold on to a strong copy
# controlled via a CV.
my $cmd_cv = AE::cv;
$cmd_cv->cb(sub {
my $strong_self = $self;
});
# pubsub is very different - get it out of the way first
if ($is_pubsub) {
$hd->push_write($send);
my $already = $self->{sub} && %{$self->{sub}};
if ($is_subscribe) {
$self->{sub}->{$_} ||= [$cv, $cb] for @_;
}
if (!$already && @_) {
my $res_cb; $res_cb = sub {
$hd->push_read("AnyEvent::Redis::Protocol" => sub {
my ($res, $err) = @_;
if (ref $res) {
my $action = lc $res->[0];
warn "$action $res->[1]" if DEBUG;
if ($action eq 'message') {
$self->{sub}->{$res->[1]}[1]->($res->[2], $res->[1]);
} elsif ($action eq 'pmessage') {
$self->{sub}->{$res->[1]}[1]->($res->[3], $res->[2], $res->[1]);
} elsif ($action eq 'subscribe' || $action eq 'psubscribe') {
$self->{sub_count} = $res->[2];
} elsif ($action eq 'unsubscribe' || $action eq 'punsubscribe') {
$self->{sub_count} = $res->[2];
eval { $self->{sub}->{$res->[1]}[0]->send };
warn "Exception in callback (ignored): $@" if $@;
delete $self->{sub}->{$res->[1]};
$self->all_cv->end;
$cmd_cv->send;
} else {
warn "Unknown pubsub action: $action";
}
}
if ($self->{sub_count} || %{$self->{sub}}) {
# Carry on reading while we are subscribed
$res_cb->();
}
});
};
$res_cb->();
}
return $cv;
( run in 1.557 second using v1.01-cache-2.11-cpan-39bf76dae61 )