Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis/Subscription.pm  view on Meta::CPAN


    # Exclusivity check: callback mode disables iterator mode.
    if ($self->{_on_message}) {
        Carp::croak("Cannot call next() on a callback-driven subscription");
    }

    # In iterator mode the unified reader (_run_reader) feeds the queue.
    # The reader is already running (started by _pubsub_command during
    # subscribe). No separate driver start is needed.

    return await $self->_dequeue;
}

# Read one pub/sub frame from the underlying connection. On transient
# read error, attempt reconnect if enabled and fire on_reconnect on
# success; on unrecoverable failure, propagate the error.
# Returns a Future resolving to the raw frame (arrayref) or undef if
# the connection is gone and no more frames are available.
# Shared by next() and the callback driver loop added in a later task.
async sub _read_frame_with_reconnect {
    my ($self) = @_;
    my $redis = $self->{redis};

    while (1) {
        my $frame;
        my $ok = eval {
            $frame = await $redis->_read_pubsub_frame;
            1;
        };

        unless ($ok) {
            my $error = $@;
            if ($redis->{reconnect} && $self->channel_count > 0) {
                my $reconnect_error;
                my $reconnect_ok = eval {
                    await $redis->_reconnect_pubsub;
                    1;
                };
                $reconnect_error = $@ unless $reconnect_ok;
                unless ($reconnect_ok) {
                    die $reconnect_error;
                }

                if ($self->{_on_reconnect}) {
                    $self->{_on_reconnect}->($self);
                }

                next;
            }
            die $error;
        }

        return $frame;
    }
}

# Convert a raw RESP pub/sub frame into a message hashref and deliver it.
# In callback mode, invokes _on_message via _invoke_user_callback and
# returns its result (which may be a Future for consumer-side backpressure).
# In iterator mode, queues the message into _pending_messages and signals
# _message_waiter so a blocked next() can wake up.
#
# Non-message frames (subscribe confirmations, etc.) return undef and
# take no action — the driver loop will read the next frame.
sub _dispatch_frame {
    my ($self, $frame) = @_;
    return unless $frame && ref $frame eq 'ARRAY';

    my $type = $frame->[0] // '';
    my $msg;

    if ($type eq 'message') {
        $msg = {
            type    => 'message',
            channel => $frame->[1],
            pattern => undef,
            data    => $frame->[2],
        };
    }
    elsif ($type eq 'pmessage') {
        $msg = {
            type    => 'pmessage',
            pattern => $frame->[1],
            channel => $frame->[2],
            data    => $frame->[3],
        };
    }
    elsif ($type eq 'smessage') {
        $msg = {
            type    => 'smessage',
            channel => $frame->[1],
            pattern => undef,
            data    => $frame->[2],
        };
    }
    else {
        return undef;   # non-message frame (subscribe confirmation, etc.)
    }

    # Queue the message for consumption by next() (iterator mode) or the
    # callback driver loop (callback mode). The driver invokes _on_message;
    # _dispatch_frame is intentionally agnostic about the consumption mode.
    # This keeps backpressure uniform: the depth limit applies to both modes.
    return if $self->{_closed};

    my $redis = $self->{redis};
    my $depth = ($redis && $redis->{message_queue_depth})
        ? $redis->{message_queue_depth}
        : 0;  # 0 = unbounded (default)

    if ($depth && scalar(@{$self->{_pending_messages}}) >= $depth) {
        # Queue full. Return a Future that queues the message once a slot
        # opens (signalled by next() calling _slot_waiter->done).
        $self->{_slot_waiter} //= Future->new;
        my $slot = $self->{_slot_waiter};
        weaken(my $weak = $self);
        return $slot->then(sub {
            return Future->done if !$weak || $weak->{_closed};
            push @{$weak->{_pending_messages}}, $msg;
            if (my $w = delete $weak->{_message_waiter}) {
                $w->done unless $w->is_ready;

lib/Async/Redis/Subscription.pm  view on Meta::CPAN


        if (blessed($result) && $result->isa('Future')) {
            my $cb_ok = eval { await $result; 1 };
            unless ($cb_ok) {
                my $err = $@;
                return if $self->{_closed} || $self->{_paused};
                $self->_handle_fatal_error(
                    "on_message callback Future failed: $err"
                );
                return;
            }
        }

        # Periodic yield prevents stack blowup when pre-queued messages
        # resolve await synchronously.
        await Future::IO->sleep(0) if ++$iter % MAX_SYNC_DEPTH == 0;
    }
}

# Start the driver if not already running. Idempotent.
# Only starts when _on_message is set (callback mode). Iterator mode
# consumers call next() directly — no driver loop needed.
#
# Ownership: the driver Future is added to the client's Future::Selector
# ($redis->{_tasks}) and stored in $self->{_driver_step}. The selector
# owns the task; the slot is the dedup signal. on_ready clears the slot
# regardless of outcome. No ->retain.
sub _start_driver {
    my ($self, $force) = @_;
    return if $self->{_driver_step} && !$self->{_driver_step}->is_ready;
    return unless $self->{_on_message};   # only callback mode needs a driver
    return if $self->{_closed};
    return if $self->{_paused};
    return unless $self->channel_count > 0;

    my $redis = $self->{redis} or return;

    my $f = $self->_run_driver;
    $self->{_driver_step} = $f;
    $redis->{_tasks}->add(data => 'subscription-driver', f => $f);
    $f->on_ready(sub { $self->{_driver_step} = undef });
    return;
}

# Backward-compatible wrapper
async sub next_message {
    my ($self) = @_;
    my $msg = await $self->next();
    return undef unless $msg;

    # Convert new format to old format for compatibility
    return {
        channel => $msg->{channel},
        message => $msg->{data},
        pattern => $msg->{pattern},
        type    => $msg->{type},
    };
}

# Intentional teardown: marks the subscription closed and wakes any
# blocked next() with undef. Clears the parent _subscription slot
# with an identity guard so a stale _close cannot evict a newer
# subscription object that reused the same slot.
sub _close {
    my ($self) = @_;
    return if $self->{_closed};
    $self->{_closed} = 1;

    $self->{_pending_messages} = [];

    if (my $w = delete $self->{_message_waiter}) {
        $w->done unless $w->is_ready;
    }
    if (my $w = delete $self->{_slot_waiter}) {
        $w->done unless $w->is_ready;
    }

    # Identity-guarded parent-slot clear.
    my $redis = $self->{redis};
    if ($redis && defined $redis->{_subscription}
        && refaddr($redis->{_subscription}) == refaddr($self)) {
        delete $redis->{_subscription};
    }

    # Cancel any running driver Future. The driver's await on _dequeue
    # also unwinds because we resolved _message_waiter above, so this is
    # belt-and-suspenders; either path exits the driver cleanly.
    if (my $f = delete $self->{_driver_step}) {
        $f->cancel unless $f->is_ready;
    }
}

# Unrecoverable failure: marks the subscription closed with a typed
# error. Any blocked next() will die with that error. The error is
# preserved for callers who call next() after the fact.
# In callback mode, fires on_error if registered; dies otherwise.
sub _fail_fatal {
    my ($self, $typed_error) = @_;
    return if $self->{_closed};
    $self->{_closed}      = 1;
    $self->{_fatal_error} = $typed_error;

    $self->{_pending_messages} = [];

    if (my $w = delete $self->{_message_waiter}) {
        $w->fail($typed_error) unless $w->is_ready;
    }
    if (my $w = delete $self->{_slot_waiter}) {
        $w->done unless $w->is_ready;
    }

    # Identity-guarded parent-slot clear.
    my $redis = $self->{redis};
    if ($redis && defined $redis->{_subscription}
        && refaddr($redis->{_subscription}) == refaddr($self)) {
        delete $redis->{_subscription};
    }

    # Cancel any running driver Future. _message_waiter was failed with
    # the typed error above, so driver's _dequeue also dies with the
    # typed error; cancel is belt-and-suspenders.
    if (my $f = delete $self->{_driver_step}) {
        $f->cancel unless $f->is_ready;
    }

    # Notify callback-mode consumers of the fatal error. In iterator mode
    # the caller detects it via die from next(). Loud-by-default: if
    # no on_error is registered in callback mode, die so silent death
    # of a listener is impossible.
    if (my $cb = $self->{_on_error}) {
        local $@;
        my $ok = eval { $cb->($self, $typed_error); 1 };
        unless ($ok) {
            Carp::carp("on_error callback died: " . ($@ // 'unknown error'));
        }
        return;
    }
    # In iterator mode (no callback), callers discover the error via next().
    # In callback mode with no on_error, die loudly.
    if ($self->{_on_message}) {
        die $typed_error;
    }
}

# Called before a reconnect attempt. Does NOT mark the subscription
# closed — the reader has already exited (connection dropped). Channels
# and patterns remain in their tracking hashes for replay via
# _resume_after_reconnect.
#
# Fixes a latent "two drivers after reconnect" bug from the closure-based
# driver era: clearing the driver slot without cancelling left the old
# driver suspended on _dequeue. After _resume_after_reconnect started a
# new driver, both raced. The fix: cancel the Future explicitly, and
# wake _dequeue via the _paused flag so its await exits cleanly.

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

closed and the driver stops.

B<If C<on_error> is not registered, fatal errors C<die>.> Silent death
of a pub/sub consumer is a debugging nightmare; loud-by-default
prevents it. If you genuinely want to swallow errors, register an
explicit no-op: C<< $sub->on_error(sub { }) >>.

Callback exceptions (dying inside C<on_message>) are also routed to
C<on_error>; the callback-died message is prepended to the error
string.

=head2 Ordering guarantee

Callbacks fire in the order frames arrive on the connection. No
concurrent invocation (Perl is single-threaded and the driver runs on
the event loop). After a reconnect, C<on_reconnect> always fires before
any post-reconnect C<on_message>.

=head2 Re-entrancy

Inside an C<on_message> callback you may safely:

=over 4

=item * Call C<< $sub->subscribe(...) >> — the new channel is added
cleanly; messages on it arrive via the same callback.

=item * Call C<< $sub->on_message($new_cb) >> — the current message is
dispatched to the previously-installed handler; the next frame uses
the new handler.

=item * C<die> — routed to C<on_error>.

=back

=head2 Backpressure and Redis server limits

Synchronous callbacks provide backpressure by blocking the driver loop:
while your callback runs, the driver doesn't read the next frame, so
TCP fills, Redis's output buffer grows. But Redis enforces
C<client-output-buffer-limit pubsub> (defaulting to S<32mb 8mb 60>
in recent versions) — if your subscriber cannot keep up for sustained
periods, B<Redis will disconnect you>. There is no amount of
client-side buffering that changes this: the limit is on the server.

If your processing is genuinely slow, return a Future from your
callback (enabling opt-in backpressure above) AND consider moving the
expensive work to a worker pool so the callback can return quickly.
Long synchronous processing in pub/sub callbacks is an anti-pattern at
scale regardless of client.

=head1 INTERNAL LIFECYCLE METHODS

The following methods are used by L<Async::Redis> to manage subscription
state. They are not part of the public API for end consumers, but are
documented here for maintainers.

=head2 _close

Intentional teardown. Marks the subscription closed and wakes any
blocked C<next()> with C<undef>. Clears the parent C<_subscription>
slot on the L<Async::Redis> object with an identity guard — a stale
C<_close> call from an earlier subscription object cannot evict a newer
one that has since taken the slot.

=head2 _fail_fatal($typed_error)

Unrecoverable failure. Marks the subscription closed with a typed error
object. Any blocked C<next()> call will C<die> with that error. The
error is preserved for callers who call C<next()> after the fact.
Routes through C<_close>'s identity guard for parent-slot clearing.

=head2 _pause_for_reconnect

Called before a reconnect attempt. Does B<not> mark the subscription
closed — the underlying reader has already exited due to the connection
drop. Channel/pattern tracking hashes are left intact for replay.

=head2 _resume_after_reconnect

Async. Replays all tracked C<SUBSCRIBE>, C<PSUBSCRIBE>, and
C<SSUBSCRIBE> commands on the freshly reconnected socket. Sets
C<in_pubsub=1> before sending replay commands so that racing message
frames classify correctly (mirrors the timing of the initial
subscribe). Fires C<on_reconnect> after replay. Callback-mode subscriptions
restart their callback driver; iterator-mode subscriptions continue to receive
frames through the parent Redis connection's reader.

=cut



( run in 0.444 second using v1.01-cache-2.11-cpan-df04353d9ac )