Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis/Subscription.pm view on Meta::CPAN
$sub->on_reconnect(sub {
my ($sub) = @_;
warn "Reconnected, may have lost messages";
# re-poll state, log, etc.
});
Messages published while the connection was down are lost (Redis pub/sub
has no persistence).
=head1 CALLBACK-DRIVEN DELIVERY
As an alternative to the C<await $sub-E<gt>next> iterator, you can
register a callback to receive messages:
my $sub = await $redis->subscribe('chat');
$sub->on_message(sub {
my ($sub, $msg) = @_;
# $msg has the same shape as next() returns:
# { type => 'message'|'pmessage'|'smessage',
# channel => ...,
# pattern => ..., # defined for pmessage, undef otherwise
# data => ... }
});
Callback mode is designed for fire-and-forget listeners â background
dispatchers, websocket gateways, channel-layer middleware â where the
iterator pattern's requirement to be inside an awaited async sub is
awkward or triggers Future::AsyncAwait "lost its returning future"
warnings.
=head2 Exclusivity
Once C<on_message> is set on a Subscription, it is callback-mode for
the rest of its lifetime. Calls to C<< $sub->next >> will C<croak>.
This is sticky â there is no way to switch back. If you need iterator
mode, construct a new Subscription.
=head2 Signature
$sub->on_message(sub {
my ($subscription, $message) = @_;
...
});
The callback receives the C<$subscription> itself as its first argument
(consistent with C<on_reconnect>), and the message hashref as its
second. The return value is normally ignored; if the return is a
C<Future>, see L</Backpressure>.
=head2 Backpressure
If your callback returns a C<Future>, the driver waits for that Future
to resolve before reading the next frame:
$sub->on_message(async sub {
my ($sub, $msg) = @_;
await store_to_database($msg); # driver waits before next read
});
Synchronous callbacks (or callbacks returning non-Future values) do not
block the driver. This gives consumers opt-in backpressure with no
default overhead.
If the returned Future fails, the failure is routed to C<on_error>.
=head2 Fatal error handling
$sub->on_error(sub {
my ($sub, $err) = @_;
...
});
C<on_error> fires when the underlying read encounters an error that
cannot be recovered by reconnect (e.g., reconnect is disabled, or
reconnect itself failed). After C<on_error> fires, the subscription is
closed and the driver stops.
B<If C<on_error> is not registered, fatal errors C<die>.> Silent death
of a pub/sub consumer is a debugging nightmare; loud-by-default
prevents it. If you genuinely want to swallow errors, register an
explicit no-op: C<< $sub->on_error(sub { }) >>.
Callback exceptions (dying inside C<on_message>) are also routed to
C<on_error>; the callback-died message is prepended to the error
string.
=head2 Ordering guarantee
Callbacks fire in the order frames arrive on the connection. No
concurrent invocation (Perl is single-threaded and the driver runs on
the event loop). After a reconnect, C<on_reconnect> always fires before
any post-reconnect C<on_message>.
=head2 Re-entrancy
Inside an C<on_message> callback you may safely:
=over 4
=item * Call C<< $sub->subscribe(...) >> â the new channel is added
cleanly; messages on it arrive via the same callback.
=item * Call C<< $sub->on_message($new_cb) >> â the current message is
dispatched to the previously-installed handler; the next frame uses
the new handler.
=item * C<die> â routed to C<on_error>.
=back
=head2 Backpressure and Redis server limits
Synchronous callbacks provide backpressure by blocking the driver loop:
while your callback runs, the driver doesn't read the next frame, so
TCP fills, Redis's output buffer grows. But Redis enforces
C<client-output-buffer-limit pubsub> (defaulting to S<32mb 8mb 60>
in recent versions) â if your subscriber cannot keep up for sustained
periods, B<Redis will disconnect you>. There is no amount of
client-side buffering that changes this: the limit is on the server.
If your processing is genuinely slow, return a Future from your
callback (enabling opt-in backpressure above) AND consider moving the
expensive work to a worker pool so the callback can return quickly.
Long synchronous processing in pub/sub callbacks is an anti-pattern at
scale regardless of client.
=head1 INTERNAL LIFECYCLE METHODS
The following methods are used by L<Async::Redis> to manage subscription
state. They are not part of the public API for end consumers, but are
documented here for maintainers.
=head2 _close
Intentional teardown. Marks the subscription closed and wakes any
blocked C<next()> with C<undef>. Clears the parent C<_subscription>
slot on the L<Async::Redis> object with an identity guard â a stale
C<_close> call from an earlier subscription object cannot evict a newer
one that has since taken the slot.
=head2 _fail_fatal($typed_error)
Unrecoverable failure. Marks the subscription closed with a typed error
object. Any blocked C<next()> call will C<die> with that error. The
error is preserved for callers who call C<next()> after the fact.
Routes through C<_close>'s identity guard for parent-slot clearing.
=head2 _pause_for_reconnect
Called before a reconnect attempt. Does B<not> mark the subscription
closed â the underlying reader has already exited due to the connection
drop. Channel/pattern tracking hashes are left intact for replay.
=head2 _resume_after_reconnect
Async. Replays all tracked C<SUBSCRIBE>, C<PSUBSCRIBE>, and
C<SSUBSCRIBE> commands on the freshly reconnected socket. Sets
C<in_pubsub=1> before sending replay commands so that racing message
frames classify correctly (mirrors the timing of the initial
subscribe). Fires C<on_reconnect> after replay. Callback-mode subscriptions
restart their callback driver; iterator-mode subscriptions continue to receive
frames through the parent Redis connection's reader.
=cut
( run in 0.600 second using v1.01-cache-2.11-cpan-140bd7fdf52 )