Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

package Async::Redis::Subscription;

use strict;
use warnings;
use 5.018;

use Carp ();
use Future;
use Future::AsyncAwait;
use Future::IO;
use Scalar::Util qw(blessed refaddr weaken);


# Threshold for periodic event-loop yield inside the callback driver
# loop. Prevents stack growth when many messages are pre-queued and
# await on an already-ready Future returns synchronously.
use constant MAX_SYNC_DEPTH => 32;

sub new {
    my ($class, %args) = @_;

    return bless {
        redis             => $args{redis},
        channels          => {},      # channel => 1 (for regular subscribe)
        patterns          => {},      # pattern => 1 (for psubscribe)
        sharded_channels  => {},      # channel => 1 (for ssubscribe)
        _pending_messages => [],      # Queued messages for iterator consumers
        _message_waiter   => undef,   # Future signalled when a message arrives
        _slot_waiter      => undef,   # Future signalled when queue drains below depth
        _fatal_error      => undef,   # Typed error set by _fail_fatal
        _on_reconnect     => undef,   # Callback for reconnect notification
        _on_message       => undef,   # Message-arrived callback (callback mode)
        _on_error         => undef,   # Fatal-error callback
        _driver_step      => undef,   # Running driver loop Future (owned by _tasks selector)
        _closed           => 0,
        _paused           => 0,       # Set during reconnect; clears in _resume_after_reconnect
    }, $class;
}

# Set/get reconnect callback
sub on_reconnect {
    my ($self, $cb) = @_;
    $self->{_on_reconnect} = $cb if @_ > 1;
    return $self->{_on_reconnect};
}

# Set/get message-arrived callback. Once set, next() croaks — the
# subscription is in callback mode for the rest of its lifetime.
# $cb->($sub, $msg) receives the Subscription and the message hashref.
sub on_message {
    my ($self, $cb) = @_;
    if (@_ > 1) {
        if (!$cb && $self->{_on_message}) {
            Carp::croak(
                "on_message is sticky; cannot clear once set "
              . "(construct a new Subscription for iterator mode)"
            );
        }
        $self->{_on_message} = $cb;
        # If the subscription already has channels and is open, start
        # the driver. If not, it'll be started when channels are added.
        $self->_start_driver if $cb;
    }
    return $self->{_on_message};
}

# Set/get fatal-error callback. Fires once per fatal error; default
# (when unset) is to die so silent death is impossible.
# $cb->($sub, $err) receives the Subscription and the error.
sub on_error {
    my ($self, $cb) = @_;

lib/Async/Redis/Subscription.pm  view on Meta::CPAN


# Convert a raw RESP pub/sub frame into a message hashref and deliver it.
# In callback mode, invokes _on_message via _invoke_user_callback and
# returns its result (which may be a Future for consumer-side backpressure).
# In iterator mode, queues the message into _pending_messages and signals
# _message_waiter so a blocked next() can wake up.
#
# Non-message frames (subscribe confirmations, etc.) return undef and
# take no action — the driver loop will read the next frame.
sub _dispatch_frame {
    my ($self, $frame) = @_;
    return unless $frame && ref $frame eq 'ARRAY';

    my $type = $frame->[0] // '';
    my $msg;

    if ($type eq 'message') {
        $msg = {
            type    => 'message',
            channel => $frame->[1],
            pattern => undef,
            data    => $frame->[2],
        };
    }
    elsif ($type eq 'pmessage') {
        $msg = {
            type    => 'pmessage',
            pattern => $frame->[1],
            channel => $frame->[2],
            data    => $frame->[3],
        };
    }
    elsif ($type eq 'smessage') {
        $msg = {
            type    => 'smessage',
            channel => $frame->[1],
            pattern => undef,
            data    => $frame->[2],
        };
    }
    else {
        return undef;   # non-message frame (subscribe confirmation, etc.)
    }

    # Queue the message for consumption by next() (iterator mode) or the
    # callback driver loop (callback mode). The driver invokes _on_message;
    # _dispatch_frame is intentionally agnostic about the consumption mode.
    # This keeps backpressure uniform: the depth limit applies to both modes.
    return if $self->{_closed};

    my $redis = $self->{redis};
    my $depth = ($redis && $redis->{message_queue_depth})
        ? $redis->{message_queue_depth}
        : 0;  # 0 = unbounded (default)

    if ($depth && scalar(@{$self->{_pending_messages}}) >= $depth) {
        # Queue full. Return a Future that queues the message once a slot
        # opens (signalled by next() calling _slot_waiter->done).
        $self->{_slot_waiter} //= Future->new;
        my $slot = $self->{_slot_waiter};
        weaken(my $weak = $self);
        return $slot->then(sub {
            return Future->done if !$weak || $weak->{_closed};
            push @{$weak->{_pending_messages}}, $msg;
            if (my $w = delete $weak->{_message_waiter}) {
                $w->done unless $w->is_ready;
            }
            Future->done;
        });
    }

    push @{$self->{_pending_messages}}, $msg;
    if (my $w = delete $self->{_message_waiter}) {
        $w->done unless $w->is_ready;
    }
    return undef;
}

# The callback-mode driver loop. Consumes from _pending_messages via
# _dequeue (populated by _run_reader's dispatch path), invokes the
# user's _on_message callback, and awaits its returned Future if any
# for consumer-opted backpressure.
#
# Exits cleanly when _dequeue returns undef (subscription closed or
# paused for reconnect). Dies with the typed error if _dequeue dies
# (fatal); _run_driver's Future failure is visible through the
# client's Future::Selector to any caller using run_until_ready.
#
# Periodic sleep(0) yield every MAX_SYNC_DEPTH iterations prevents
# stack growth when messages are pre-queued and await returns
# synchronously from an already-ready Future.
async sub _run_driver {
    my ($self) = @_;
    my $iter = 0;
    while (!$self->{_closed} && !$self->{_paused}) {
        my $msg;
        my $deq_ok = eval { $msg = await $self->_dequeue(1); 1 };
        unless ($deq_ok) {
            my $err = $@;
            # _fail_fatal already set _closed and fired on_error; don't
            # double-fire. Any other propagation path routes through
            # _handle_fatal_error.
            return if $self->{_closed} || $self->{_paused};
            $self->_handle_fatal_error($err);
            return;
        }
        last unless defined $msg;
        last if $self->{_closed} || $self->{_paused};

        my $cb = $self->{_on_message} or last;
        my $result = $self->_invoke_user_callback($cb, $msg);

        if (blessed($result) && $result->isa('Future')) {
            my $cb_ok = eval { await $result; 1 };
            unless ($cb_ok) {
                my $err = $@;
                return if $self->{_closed} || $self->{_paused};
                $self->_handle_fatal_error(
                    "on_message callback Future failed: $err"
                );
                return;



( run in 0.711 second using v1.01-cache-2.11-cpan-39bf76dae61 )