Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis/Subscription.pm view on Meta::CPAN
# Exclusivity check: callback mode disables iterator mode.
if ($self->{_on_message}) {
Carp::croak("Cannot call next() on a callback-driven subscription");
}
# In iterator mode the unified reader (_run_reader) feeds the queue.
# The reader is already running (started by _pubsub_command during
# subscribe). No separate driver start is needed.
return await $self->_dequeue;
}
# Read one pub/sub frame from the underlying connection. On transient
# read error, attempt reconnect if enabled and fire on_reconnect on
# success; on unrecoverable failure, propagate the error.
# Returns a Future resolving to the raw frame (arrayref) or undef if
# the connection is gone and no more frames are available.
# Shared by next() and the callback driver loop added in a later task.
async sub _read_frame_with_reconnect {
my ($self) = @_;
my $redis = $self->{redis};
while (1) {
my $frame;
my $ok = eval {
$frame = await $redis->_read_pubsub_frame;
1;
};
unless ($ok) {
my $error = $@;
if ($redis->{reconnect} && $self->channel_count > 0) {
my $reconnect_error;
my $reconnect_ok = eval {
await $redis->_reconnect_pubsub;
1;
};
$reconnect_error = $@ unless $reconnect_ok;
unless ($reconnect_ok) {
die $reconnect_error;
}
if ($self->{_on_reconnect}) {
$self->{_on_reconnect}->($self);
}
next;
}
die $error;
}
return $frame;
}
}
# Convert a raw RESP pub/sub frame into a message hashref and deliver it.
# In callback mode, invokes _on_message via _invoke_user_callback and
# returns its result (which may be a Future for consumer-side backpressure).
# In iterator mode, queues the message into _pending_messages and signals
# _message_waiter so a blocked next() can wake up.
#
# Non-message frames (subscribe confirmations, etc.) return undef and
# take no action â the driver loop will read the next frame.
sub _dispatch_frame {
my ($self, $frame) = @_;
return unless $frame && ref $frame eq 'ARRAY';
my $type = $frame->[0] // '';
my $msg;
if ($type eq 'message') {
$msg = {
type => 'message',
channel => $frame->[1],
pattern => undef,
data => $frame->[2],
};
}
elsif ($type eq 'pmessage') {
$msg = {
type => 'pmessage',
pattern => $frame->[1],
channel => $frame->[2],
data => $frame->[3],
};
}
elsif ($type eq 'smessage') {
$msg = {
type => 'smessage',
channel => $frame->[1],
pattern => undef,
data => $frame->[2],
};
}
else {
return undef; # non-message frame (subscribe confirmation, etc.)
}
# Queue the message for consumption by next() (iterator mode) or the
# callback driver loop (callback mode). The driver invokes _on_message;
# _dispatch_frame is intentionally agnostic about the consumption mode.
# This keeps backpressure uniform: the depth limit applies to both modes.
return if $self->{_closed};
my $redis = $self->{redis};
my $depth = ($redis && $redis->{message_queue_depth})
? $redis->{message_queue_depth}
: 0; # 0 = unbounded (default)
if ($depth && scalar(@{$self->{_pending_messages}}) >= $depth) {
# Queue full. Return a Future that queues the message once a slot
# opens (signalled by next() calling _slot_waiter->done).
$self->{_slot_waiter} //= Future->new;
my $slot = $self->{_slot_waiter};
weaken(my $weak = $self);
return $slot->then(sub {
return Future->done if !$weak || $weak->{_closed};
push @{$weak->{_pending_messages}}, $msg;
if (my $w = delete $weak->{_message_waiter}) {
$w->done unless $w->is_ready;
lib/Async/Redis/Subscription.pm view on Meta::CPAN
if (blessed($result) && $result->isa('Future')) {
my $cb_ok = eval { await $result; 1 };
unless ($cb_ok) {
my $err = $@;
return if $self->{_closed} || $self->{_paused};
$self->_handle_fatal_error(
"on_message callback Future failed: $err"
);
return;
}
}
# Periodic yield prevents stack blowup when pre-queued messages
# resolve await synchronously.
await Future::IO->sleep(0) if ++$iter % MAX_SYNC_DEPTH == 0;
}
}
# Start the driver if not already running. Idempotent.
# Only starts when _on_message is set (callback mode). Iterator mode
# consumers call next() directly â no driver loop needed.
#
# Ownership: the driver Future is added to the client's Future::Selector
# ($redis->{_tasks}) and stored in $self->{_driver_step}. The selector
# owns the task; the slot is the dedup signal. on_ready clears the slot
# regardless of outcome. No ->retain.
sub _start_driver {
my ($self, $force) = @_;
return if $self->{_driver_step} && !$self->{_driver_step}->is_ready;
return unless $self->{_on_message}; # only callback mode needs a driver
return if $self->{_closed};
return if $self->{_paused};
return unless $self->channel_count > 0;
my $redis = $self->{redis} or return;
my $f = $self->_run_driver;
$self->{_driver_step} = $f;
$redis->{_tasks}->add(data => 'subscription-driver', f => $f);
$f->on_ready(sub { $self->{_driver_step} = undef });
return;
}
# Backward-compatible wrapper
async sub next_message {
my ($self) = @_;
my $msg = await $self->next();
return undef unless $msg;
# Convert new format to old format for compatibility
return {
channel => $msg->{channel},
message => $msg->{data},
pattern => $msg->{pattern},
type => $msg->{type},
};
}
# Intentional teardown: marks the subscription closed and wakes any
# blocked next() with undef. Clears the parent _subscription slot
# with an identity guard so a stale _close cannot evict a newer
# subscription object that reused the same slot.
sub _close {
my ($self) = @_;
return if $self->{_closed};
$self->{_closed} = 1;
$self->{_pending_messages} = [];
if (my $w = delete $self->{_message_waiter}) {
$w->done unless $w->is_ready;
}
if (my $w = delete $self->{_slot_waiter}) {
$w->done unless $w->is_ready;
}
# Identity-guarded parent-slot clear.
my $redis = $self->{redis};
if ($redis && defined $redis->{_subscription}
&& refaddr($redis->{_subscription}) == refaddr($self)) {
delete $redis->{_subscription};
}
# Cancel any running driver Future. The driver's await on _dequeue
# also unwinds because we resolved _message_waiter above, so this is
# belt-and-suspenders; either path exits the driver cleanly.
if (my $f = delete $self->{_driver_step}) {
$f->cancel unless $f->is_ready;
}
}
# Unrecoverable failure: marks the subscription closed with a typed
# error. Any blocked next() will die with that error. The error is
# preserved for callers who call next() after the fact.
# In callback mode, fires on_error if registered; dies otherwise.
sub _fail_fatal {
my ($self, $typed_error) = @_;
return if $self->{_closed};
$self->{_closed} = 1;
$self->{_fatal_error} = $typed_error;
$self->{_pending_messages} = [];
if (my $w = delete $self->{_message_waiter}) {
$w->fail($typed_error) unless $w->is_ready;
}
if (my $w = delete $self->{_slot_waiter}) {
$w->done unless $w->is_ready;
}
# Identity-guarded parent-slot clear.
my $redis = $self->{redis};
if ($redis && defined $redis->{_subscription}
&& refaddr($redis->{_subscription}) == refaddr($self)) {
delete $redis->{_subscription};
}
# Cancel any running driver Future. _message_waiter was failed with
# the typed error above, so driver's _dequeue also dies with the
# typed error; cancel is belt-and-suspenders.
if (my $f = delete $self->{_driver_step}) {
$f->cancel unless $f->is_ready;
}
# Notify callback-mode consumers of the fatal error. In iterator mode
# the caller detects it via die from next(). Loud-by-default: if
# no on_error is registered in callback mode, die so silent death
# of a listener is impossible.
if (my $cb = $self->{_on_error}) {
local $@;
my $ok = eval { $cb->($self, $typed_error); 1 };
unless ($ok) {
Carp::carp("on_error callback died: " . ($@ // 'unknown error'));
}
return;
}
# In iterator mode (no callback), callers discover the error via next().
# In callback mode with no on_error, die loudly.
if ($self->{_on_message}) {
die $typed_error;
}
}
# Called before a reconnect attempt. Does NOT mark the subscription
# closed â the reader has already exited (connection dropped). Channels
# and patterns remain in their tracking hashes for replay via
# _resume_after_reconnect.
#
# Fixes a latent "two drivers after reconnect" bug from the closure-based
# driver era: clearing the driver slot without cancelling left the old
# driver suspended on _dequeue. After _resume_after_reconnect started a
# new driver, both raced. The fix: cancel the Future explicitly, and
# wake _dequeue via the _paused flag so its await exits cleanly.
lib/Async/Redis/Subscription.pm view on Meta::CPAN
closed and the driver stops.
B<If C<on_error> is not registered, fatal errors C<die>.> Silent death
of a pub/sub consumer is a debugging nightmare; loud-by-default
prevents it. If you genuinely want to swallow errors, register an
explicit no-op: C<< $sub->on_error(sub { }) >>.
Callback exceptions (dying inside C<on_message>) are also routed to
C<on_error>; the callback-died message is prepended to the error
string.
=head2 Ordering guarantee
Callbacks fire in the order frames arrive on the connection. No
concurrent invocation (Perl is single-threaded and the driver runs on
the event loop). After a reconnect, C<on_reconnect> always fires before
any post-reconnect C<on_message>.
=head2 Re-entrancy
Inside an C<on_message> callback you may safely:
=over 4
=item * Call C<< $sub->subscribe(...) >> â the new channel is added
cleanly; messages on it arrive via the same callback.
=item * Call C<< $sub->on_message($new_cb) >> â the current message is
dispatched to the previously-installed handler; the next frame uses
the new handler.
=item * C<die> â routed to C<on_error>.
=back
=head2 Backpressure and Redis server limits
Synchronous callbacks provide backpressure by blocking the driver loop:
while your callback runs, the driver doesn't read the next frame, so
TCP fills, Redis's output buffer grows. But Redis enforces
C<client-output-buffer-limit pubsub> (defaulting to S<32mb 8mb 60>
in recent versions) â if your subscriber cannot keep up for sustained
periods, B<Redis will disconnect you>. There is no amount of
client-side buffering that changes this: the limit is on the server.
If your processing is genuinely slow, return a Future from your
callback (enabling opt-in backpressure above) AND consider moving the
expensive work to a worker pool so the callback can return quickly.
Long synchronous processing in pub/sub callbacks is an anti-pattern at
scale regardless of client.
=head1 INTERNAL LIFECYCLE METHODS
The following methods are used by L<Async::Redis> to manage subscription
state. They are not part of the public API for end consumers, but are
documented here for maintainers.
=head2 _close
Intentional teardown. Marks the subscription closed and wakes any
blocked C<next()> with C<undef>. Clears the parent C<_subscription>
slot on the L<Async::Redis> object with an identity guard â a stale
C<_close> call from an earlier subscription object cannot evict a newer
one that has since taken the slot.
=head2 _fail_fatal($typed_error)
Unrecoverable failure. Marks the subscription closed with a typed error
object. Any blocked C<next()> call will C<die> with that error. The
error is preserved for callers who call C<next()> after the fact.
Routes through C<_close>'s identity guard for parent-slot clearing.
=head2 _pause_for_reconnect
Called before a reconnect attempt. Does B<not> mark the subscription
closed â the underlying reader has already exited due to the connection
drop. Channel/pattern tracking hashes are left intact for replay.
=head2 _resume_after_reconnect
Async. Replays all tracked C<SUBSCRIBE>, C<PSUBSCRIBE>, and
C<SSUBSCRIBE> commands on the freshly reconnected socket. Sets
C<in_pubsub=1> before sending replay commands so that racing message
frames classify correctly (mirrors the timing of the initial
subscribe). Fires C<on_reconnect> after replay. Callback-mode subscriptions
restart their callback driver; iterator-mode subscriptions continue to receive
frames through the parent Redis connection's reader.
=cut
( run in 0.444 second using v1.01-cache-2.11-cpan-df04353d9ac )