Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis/Subscription.pm view on Meta::CPAN
channel => $frame->[2],
data => $frame->[3],
};
}
elsif ($type eq 'smessage') {
$msg = {
type => 'smessage',
channel => $frame->[1],
pattern => undef,
data => $frame->[2],
};
}
else {
return undef; # non-message frame (subscribe confirmation, etc.)
}
# Queue the message for consumption by next() (iterator mode) or the
# callback driver loop (callback mode). The driver invokes _on_message;
# _dispatch_frame is intentionally agnostic about the consumption mode.
# This keeps backpressure uniform: the depth limit applies to both modes.
return if $self->{_closed};
my $redis = $self->{redis};
my $depth = ($redis && $redis->{message_queue_depth})
? $redis->{message_queue_depth}
: 0; # 0 = unbounded (default)
if ($depth && scalar(@{$self->{_pending_messages}}) >= $depth) {
# Queue full. Return a Future that queues the message once a slot
# opens (signalled by next() calling _slot_waiter->done).
$self->{_slot_waiter} //= Future->new;
my $slot = $self->{_slot_waiter};
weaken(my $weak = $self);
return $slot->then(sub {
return Future->done if !$weak || $weak->{_closed};
push @{$weak->{_pending_messages}}, $msg;
if (my $w = delete $weak->{_message_waiter}) {
$w->done unless $w->is_ready;
}
Future->done;
});
}
push @{$self->{_pending_messages}}, $msg;
if (my $w = delete $self->{_message_waiter}) {
$w->done unless $w->is_ready;
}
return undef;
}
# The callback-mode driver loop. Consumes from _pending_messages via
# _dequeue (populated by _run_reader's dispatch path), invokes the
# user's _on_message callback, and awaits its returned Future if any
# for consumer-opted backpressure.
#
# Exits cleanly when _dequeue returns undef (subscription closed or
# paused for reconnect). Dies with the typed error if _dequeue dies
# (fatal); _run_driver's Future failure is visible through the
# client's Future::Selector to any caller using run_until_ready.
#
# Periodic sleep(0) yield every MAX_SYNC_DEPTH iterations prevents
# stack growth when messages are pre-queued and await returns
# synchronously from an already-ready Future.
async sub _run_driver {
my ($self) = @_;
my $iter = 0;
while (!$self->{_closed} && !$self->{_paused}) {
my $msg;
my $deq_ok = eval { $msg = await $self->_dequeue(1); 1 };
unless ($deq_ok) {
my $err = $@;
# _fail_fatal already set _closed and fired on_error; don't
# double-fire. Any other propagation path routes through
# _handle_fatal_error.
return if $self->{_closed} || $self->{_paused};
$self->_handle_fatal_error($err);
return;
}
last unless defined $msg;
last if $self->{_closed} || $self->{_paused};
my $cb = $self->{_on_message} or last;
my $result = $self->_invoke_user_callback($cb, $msg);
if (blessed($result) && $result->isa('Future')) {
my $cb_ok = eval { await $result; 1 };
unless ($cb_ok) {
my $err = $@;
return if $self->{_closed} || $self->{_paused};
$self->_handle_fatal_error(
"on_message callback Future failed: $err"
);
return;
}
}
# Periodic yield prevents stack blowup when pre-queued messages
# resolve await synchronously.
await Future::IO->sleep(0) if ++$iter % MAX_SYNC_DEPTH == 0;
}
}
# Start the driver if not already running. Idempotent.
# Only starts when _on_message is set (callback mode). Iterator mode
# consumers call next() directly â no driver loop needed.
#
# Ownership: the driver Future is added to the client's Future::Selector
# ($redis->{_tasks}) and stored in $self->{_driver_step}. The selector
# owns the task; the slot is the dedup signal. on_ready clears the slot
# regardless of outcome. No ->retain.
sub _start_driver {
my ($self, $force) = @_;
return if $self->{_driver_step} && !$self->{_driver_step}->is_ready;
return unless $self->{_on_message}; # only callback mode needs a driver
return if $self->{_closed};
return if $self->{_paused};
return unless $self->channel_count > 0;
my $redis = $self->{redis} or return;
my $f = $self->_run_driver;
( run in 1.583 second using v1.01-cache-2.11-cpan-71847e10f99 )