AnyEvent-Redis

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis.pm  view on Meta::CPAN

package AnyEvent::Redis;

use strict;
use 5.008_001;
our $VERSION = '0.24';

use constant DEBUG => $ENV{ANYEVENT_REDIS_DEBUG};
use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use AnyEvent::Redis::Protocol;
use Carp qw( croak confess );
use Encode ();
use Scalar::Util qw(weaken);

our $AUTOLOAD;

sub new {
    my ($class, %args) = @_;

    my $host = delete $args{host} || '127.0.0.1';
    my $port = delete $args{port} || 6379;

    if (my $encoding = $args{encoding}) {
        $args{encoding} = Encode::find_encoding($encoding);
        croak qq{Encoding "$encoding" not found} unless ref $args{encoding};
    }

    bless {
        host => $host,
        port => $port,
        pending_cvs => [],
        %args,
    }, $class;
}

sub run_cmd {
    my $self = shift;
    my $cmd  = shift;

    $self->{cmd_cb} or return $self->connect($cmd, @_);
    $self->{cmd_cb}->($cmd, @_);
}

sub DESTROY { }

sub AUTOLOAD {
    my $self = shift;
    (my $method = $AUTOLOAD) =~ s/.*:://;
    $self->run_cmd($method, @_);
}

sub all_cv {
    my $self = shift;
    $self->{all_cv} = shift if @_;
    $self->{all_cv} ||= AE::cv;
}

sub cleanup {
    my $self = shift;
    delete $self->{cmd_cb};
    delete $self->{sock};
    $self->{on_error}->(@_) if $self->{on_error};
    $self->{on_cleanup}->(@_) if $self->{on_cleanup};
    for (splice(@{$self->{pending_cvs}}),
         splice(@{$self->{multi_cvs} || []}))
    {
        eval { $_->croak(@_) };
        warn "Exception in cleanup callback (ignored): $@" if $@;
    }
    return;
}

sub connect {
    my $self = shift;

    my $cv;
    if (@_) {
        $cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
        $cv ||= AE::cv;
        push @{$self->{connect_queue}}, [ $cv, @_ ];
    }

    return $cv if $self->{sock};
    weaken $self;

    $self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
        my $fh = shift
            or do {
              my $err = "Can't connect Redis server: $!";
              $self->cleanup($err);
              eval { $cv->croak($err) };
              warn "Exception in connect failure callback (ignored): $@" if $@;
              return
            };

        binmode $fh; # ensure bytes until we decode

        my $hd = AnyEvent::Handle->new(
            fh => $fh,
            on_error => sub { $_[0]->destroy;
                              $self->cleanup($_[2]) if $_[1];
                          },
            on_eof   => sub { $_[0]->destroy;
                              $self->cleanup('connection closed');
                          },
            encoding => $self->{encoding},
        );

        $self->{cmd_cb} = sub {
            my $command = lc shift;
            my $is_pubsub    = $command =~ /^p?(?:un)?subscribe\z/;
            my $is_subscribe = $command =~ /^p?subscribe\z/;


            # Are we already subscribed to anything?
            if ($self->{sub} && %{$self->{sub}}) {
                croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
                  unless $is_pubsub;
            }
            # Are we already in a transaction?
            if ($self->{multi_write}) {
                croak "Use of pubsub or multi command in transaction is not supported"
                  if $is_pubsub || $command eq 'multi';
            } else {
                croak "Can't 'exec' a transaction because none is pending"
                  if $command eq 'exec';
            }

            my ($cv, $cb);
            if (@_) {
                $cv = pop if ref $_[-1] && UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
                $cb = pop if ref $_[-1] eq 'CODE';
            }
            $cv ||= AE::cv;
            croak "Must provide a CODE reference for subscriptions" if $is_subscribe && !$cb;

            my $send = join("\r\n",
                  "*" . (1 + @_),
                  map { ('$' . length $_ => $_) }
                        (uc($command), map { $self->{encoding} && length($_)
                                             ? $self->{encoding}->encode($_)
                                             : $_ } @_))
                . "\r\n";

            warn $send if DEBUG;

            # $self is weakened to avoid leaks, hold on to a strong copy
            # controlled via a CV.
            my $cmd_cv = AE::cv;
            $cmd_cv->cb(sub {
                my $strong_self = $self;
              });

            # pubsub is very different - get it out of the way first

            if ($is_pubsub) {

                $hd->push_write($send);

                my $already = $self->{sub} && %{$self->{sub}};

                if ($is_subscribe) {
                    $self->{sub}->{$_} ||= [$cv, $cb] for @_;
                }

                if (!$already && @_) {
                    my $res_cb; $res_cb = sub {
                        $hd->push_read("AnyEvent::Redis::Protocol" => sub {
                            my ($res, $err) = @_;

                            if (ref $res) {
                                my $action = lc $res->[0];
                                warn "$action $res->[1]" if DEBUG;

                                if ($action eq 'message') {
                                    $self->{sub}->{$res->[1]}[1]->($res->[2], $res->[1]);

                                } elsif ($action eq 'pmessage') {
                                    $self->{sub}->{$res->[1]}[1]->($res->[3], $res->[2], $res->[1]);

                                } elsif ($action eq 'subscribe' || $action eq 'psubscribe') {
                                    $self->{sub_count} = $res->[2];

                                } elsif ($action eq 'unsubscribe' || $action eq 'punsubscribe') {
                                    $self->{sub_count} = $res->[2];
                                    eval { $self->{sub}->{$res->[1]}[0]->send };
                                    warn "Exception in callback (ignored): $@" if $@;
                                    delete $self->{sub}->{$res->[1]};
                                    $self->all_cv->end;
                                    $cmd_cv->send;

                                } else {
                                    warn "Unknown pubsub action: $action";
                                }
                            }

                            if ($self->{sub_count} || %{$self->{sub}}) {
                                # Carry on reading while we are subscribed
                                $res_cb->();
                            }
                        });
                    };

                    $res_cb->();
                }

                return $cv;



( run in 1.557 second using v1.01-cache-2.11-cpan-39bf76dae61 )