Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis/Subscription.pm  view on Meta::CPAN


    $sub->on_reconnect(sub {
        my ($sub) = @_;
        warn "Reconnected, may have lost messages";
        # re-poll state, log, etc.
    });

Messages published while the connection was down are lost (Redis pub/sub
has no persistence).

=head1 CALLBACK-DRIVEN DELIVERY

As an alternative to the C<await $sub-E<gt>next> iterator, you can
register a callback to receive messages:

    my $sub = await $redis->subscribe('chat');
    $sub->on_message(sub {
        my ($sub, $msg) = @_;
        # $msg has the same shape as next() returns:
        #   { type => 'message'|'pmessage'|'smessage',
        #     channel => ...,
        #     pattern => ...,  # defined for pmessage, undef otherwise
        #     data    => ... }
    });

Callback mode is designed for fire-and-forget listeners — background
dispatchers, websocket gateways, channel-layer middleware — where the
iterator pattern's requirement to be inside an awaited async sub is
awkward or triggers Future::AsyncAwait "lost its returning future"
warnings.

=head2 Exclusivity

Once C<on_message> is set on a Subscription, it is callback-mode for
the rest of its lifetime. Calls to C<< $sub->next >> will C<croak>.
This is sticky — there is no way to switch back. If you need iterator
mode, construct a new Subscription.

=head2 Signature

    $sub->on_message(sub {
        my ($subscription, $message) = @_;
        ...
    });

The callback receives the C<$subscription> itself as its first argument
(consistent with C<on_reconnect>), and the message hashref as its
second. The return value is normally ignored; if the return is a
C<Future>, see L</Backpressure>.

=head2 Backpressure

If your callback returns a C<Future>, the driver waits for that Future
to resolve before reading the next frame:

    $sub->on_message(async sub {
        my ($sub, $msg) = @_;
        await store_to_database($msg);    # driver waits before next read
    });

Synchronous callbacks (or callbacks returning non-Future values) do not
block the driver. This gives consumers opt-in backpressure with no
default overhead.

If the returned Future fails, the failure is routed to C<on_error>.

=head2 Fatal error handling

    $sub->on_error(sub {
        my ($sub, $err) = @_;
        ...
    });

C<on_error> fires when the underlying read encounters an error that
cannot be recovered by reconnect (e.g., reconnect is disabled, or
reconnect itself failed). After C<on_error> fires, the subscription is
closed and the driver stops.

B<If C<on_error> is not registered, fatal errors C<die>.> Silent death
of a pub/sub consumer is a debugging nightmare; loud-by-default
prevents it. If you genuinely want to swallow errors, register an
explicit no-op: C<< $sub->on_error(sub { }) >>.

Callback exceptions (dying inside C<on_message>) are also routed to
C<on_error>; the callback-died message is prepended to the error
string.

=head2 Ordering guarantee

Callbacks fire in the order frames arrive on the connection. No
concurrent invocation (Perl is single-threaded and the driver runs on
the event loop). After a reconnect, C<on_reconnect> always fires before
any post-reconnect C<on_message>.

=head2 Re-entrancy

Inside an C<on_message> callback you may safely:

=over 4

=item * Call C<< $sub->subscribe(...) >> — the new channel is added
cleanly; messages on it arrive via the same callback.

=item * Call C<< $sub->on_message($new_cb) >> — the current message is
dispatched to the previously-installed handler; the next frame uses
the new handler.

=item * C<die> — routed to C<on_error>.

=back

=head2 Backpressure and Redis server limits

Synchronous callbacks provide backpressure by blocking the driver loop:
while your callback runs, the driver doesn't read the next frame, so
TCP fills, Redis's output buffer grows. But Redis enforces
C<client-output-buffer-limit pubsub> (defaulting to S<32mb 8mb 60>
in recent versions) — if your subscriber cannot keep up for sustained
periods, B<Redis will disconnect you>. There is no amount of
client-side buffering that changes this: the limit is on the server.

If your processing is genuinely slow, return a Future from your
callback (enabling opt-in backpressure above) AND consider moving the
expensive work to a worker pool so the callback can return quickly.
Long synchronous processing in pub/sub callbacks is an anti-pattern at
scale regardless of client.

=head1 INTERNAL LIFECYCLE METHODS

The following methods are used by L<Async::Redis> to manage subscription
state. They are not part of the public API for end consumers, but are
documented here for maintainers.

=head2 _close

Intentional teardown. Marks the subscription closed and wakes any
blocked C<next()> with C<undef>. Clears the parent C<_subscription>
slot on the L<Async::Redis> object with an identity guard — a stale
C<_close> call from an earlier subscription object cannot evict a newer
one that has since taken the slot.

=head2 _fail_fatal($typed_error)

Unrecoverable failure. Marks the subscription closed with a typed error
object. Any blocked C<next()> call will C<die> with that error. The
error is preserved for callers who call C<next()> after the fact.
Routes through C<_close>'s identity guard for parent-slot clearing.

=head2 _pause_for_reconnect

Called before a reconnect attempt. Does B<not> mark the subscription
closed — the underlying reader has already exited due to the connection
drop. Channel/pattern tracking hashes are left intact for replay.

=head2 _resume_after_reconnect

Async. Replays all tracked C<SUBSCRIBE>, C<PSUBSCRIBE>, and
C<SSUBSCRIBE> commands on the freshly reconnected socket. Sets
C<in_pubsub=1> before sending replay commands so that racing message
frames classify correctly (mirrors the timing of the initial
subscribe). Fires C<on_reconnect> after replay. Callback-mode subscriptions
restart their callback driver; iterator-mode subscriptions continue to receive
frames through the parent Redis connection's reader.

=cut



( run in 0.600 second using v1.01-cache-2.11-cpan-140bd7fdf52 )