AnyEvent-Redis
view release on metacpan or search on metacpan
lib/AnyEvent/Redis.pm view on Meta::CPAN
sub run_cmd {
my $self = shift;
my $cmd = shift;
$self->{cmd_cb} or return $self->connect($cmd, @_);
$self->{cmd_cb}->($cmd, @_);
}
sub DESTROY { }
sub AUTOLOAD {
my $self = shift;
(my $method = $AUTOLOAD) =~ s/.*:://;
$self->run_cmd($method, @_);
}
sub all_cv {
my $self = shift;
$self->{all_cv} = shift if @_;
$self->{all_cv} ||= AE::cv;
}
sub cleanup {
my $self = shift;
delete $self->{cmd_cb};
delete $self->{sock};
$self->{on_error}->(@_) if $self->{on_error};
$self->{on_cleanup}->(@_) if $self->{on_cleanup};
for (splice(@{$self->{pending_cvs}}),
splice(@{$self->{multi_cvs} || []}))
{
eval { $_->croak(@_) };
warn "Exception in cleanup callback (ignored): $@" if $@;
}
return;
}
sub connect {
my $self = shift;
my $cv;
if (@_) {
$cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
$cv ||= AE::cv;
push @{$self->{connect_queue}}, [ $cv, @_ ];
}
return $cv if $self->{sock};
weaken $self;
$self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
my $fh = shift
or do {
my $err = "Can't connect Redis server: $!";
$self->cleanup($err);
eval { $cv->croak($err) };
warn "Exception in connect failure callback (ignored): $@" if $@;
return
};
binmode $fh; # ensure bytes until we decode
my $hd = AnyEvent::Handle->new(
fh => $fh,
on_error => sub { $_[0]->destroy;
$self->cleanup($_[2]) if $_[1];
},
on_eof => sub { $_[0]->destroy;
$self->cleanup('connection closed');
},
encoding => $self->{encoding},
);
$self->{cmd_cb} = sub {
my $command = lc shift;
my $is_pubsub = $command =~ /^p?(?:un)?subscribe\z/;
my $is_subscribe = $command =~ /^p?subscribe\z/;
# Are we already subscribed to anything?
if ($self->{sub} && %{$self->{sub}}) {
croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
unless $is_pubsub;
}
# Are we already in a transaction?
if ($self->{multi_write}) {
croak "Use of pubsub or multi command in transaction is not supported"
if $is_pubsub || $command eq 'multi';
} else {
croak "Can't 'exec' a transaction because none is pending"
if $command eq 'exec';
}
my ($cv, $cb);
if (@_) {
$cv = pop if ref $_[-1] && UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
$cb = pop if ref $_[-1] eq 'CODE';
}
$cv ||= AE::cv;
croak "Must provide a CODE reference for subscriptions" if $is_subscribe && !$cb;
my $send = join("\r\n",
"*" . (1 + @_),
map { ('$' . length $_ => $_) }
(uc($command), map { $self->{encoding} && length($_)
? $self->{encoding}->encode($_)
: $_ } @_))
. "\r\n";
warn $send if DEBUG;
# $self is weakened to avoid leaks, hold on to a strong copy
# controlled via a CV.
my $cmd_cv = AE::cv;
$cmd_cv->cb(sub {
my $strong_self = $self;
});
# pubsub is very different - get it out of the way first
if ($is_pubsub) {
( run in 1.755 second using v1.01-cache-2.11-cpan-39bf76dae61 )