AnyEvent-Redis

 view release on metacpan or  search on metacpan

lib/AnyEvent/Redis.pm  view on Meta::CPAN

sub run_cmd {
    my $self = shift;
    my $cmd  = shift;

    $self->{cmd_cb} or return $self->connect($cmd, @_);
    $self->{cmd_cb}->($cmd, @_);
}

sub DESTROY { }

sub AUTOLOAD {
    my $self = shift;
    (my $method = $AUTOLOAD) =~ s/.*:://;
    $self->run_cmd($method, @_);
}

sub all_cv {
    my $self = shift;
    $self->{all_cv} = shift if @_;
    $self->{all_cv} ||= AE::cv;
}

sub cleanup {
    my $self = shift;
    delete $self->{cmd_cb};
    delete $self->{sock};
    $self->{on_error}->(@_) if $self->{on_error};
    $self->{on_cleanup}->(@_) if $self->{on_cleanup};
    for (splice(@{$self->{pending_cvs}}),
         splice(@{$self->{multi_cvs} || []}))
    {
        eval { $_->croak(@_) };
        warn "Exception in cleanup callback (ignored): $@" if $@;
    }
    return;
}

sub connect {
    my $self = shift;

    my $cv;
    if (@_) {
        $cv = pop if UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
        $cv ||= AE::cv;
        push @{$self->{connect_queue}}, [ $cv, @_ ];
    }

    return $cv if $self->{sock};
    weaken $self;

    $self->{sock} = tcp_connect $self->{host}, $self->{port}, sub {
        my $fh = shift
            or do {
              my $err = "Can't connect Redis server: $!";
              $self->cleanup($err);
              eval { $cv->croak($err) };
              warn "Exception in connect failure callback (ignored): $@" if $@;
              return
            };

        binmode $fh; # ensure bytes until we decode

        my $hd = AnyEvent::Handle->new(
            fh => $fh,
            on_error => sub { $_[0]->destroy;
                              $self->cleanup($_[2]) if $_[1];
                          },
            on_eof   => sub { $_[0]->destroy;
                              $self->cleanup('connection closed');
                          },
            encoding => $self->{encoding},
        );

        $self->{cmd_cb} = sub {
            my $command = lc shift;
            my $is_pubsub    = $command =~ /^p?(?:un)?subscribe\z/;
            my $is_subscribe = $command =~ /^p?subscribe\z/;


            # Are we already subscribed to anything?
            if ($self->{sub} && %{$self->{sub}}) {
                croak "Use of non-pubsub command during pubsub session may result in unexpected behaviour"
                  unless $is_pubsub;
            }
            # Are we already in a transaction?
            if ($self->{multi_write}) {
                croak "Use of pubsub or multi command in transaction is not supported"
                  if $is_pubsub || $command eq 'multi';
            } else {
                croak "Can't 'exec' a transaction because none is pending"
                  if $command eq 'exec';
            }

            my ($cv, $cb);
            if (@_) {
                $cv = pop if ref $_[-1] && UNIVERSAL::isa($_[-1], 'AnyEvent::CondVar');
                $cb = pop if ref $_[-1] eq 'CODE';
            }
            $cv ||= AE::cv;
            croak "Must provide a CODE reference for subscriptions" if $is_subscribe && !$cb;

            my $send = join("\r\n",
                  "*" . (1 + @_),
                  map { ('$' . length $_ => $_) }
                        (uc($command), map { $self->{encoding} && length($_)
                                             ? $self->{encoding}->encode($_)
                                             : $_ } @_))
                . "\r\n";

            warn $send if DEBUG;

            # $self is weakened to avoid leaks, hold on to a strong copy
            # controlled via a CV.
            my $cmd_cv = AE::cv;
            $cmd_cv->cb(sub {
                my $strong_self = $self;
              });

            # pubsub is very different - get it out of the way first

            if ($is_pubsub) {



( run in 1.755 second using v1.01-cache-2.11-cpan-39bf76dae61 )