Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis/Subscription.pm view on Meta::CPAN
package Async::Redis::Subscription;
use strict;
use warnings;
use 5.018;
use Carp ();
use Future;
use Future::AsyncAwait;
use Future::IO;
use Scalar::Util qw(blessed refaddr weaken);
# Threshold for periodic event-loop yield inside the callback driver
# loop. Prevents stack growth when many messages are pre-queued and
# await on an already-ready Future returns synchronously.
use constant MAX_SYNC_DEPTH => 32;
sub new {
my ($class, %args) = @_;
return bless {
redis => $args{redis},
channels => {}, # channel => 1 (for regular subscribe)
patterns => {}, # pattern => 1 (for psubscribe)
sharded_channels => {}, # channel => 1 (for ssubscribe)
_pending_messages => [], # Queued messages for iterator consumers
_message_waiter => undef, # Future signalled when a message arrives
_slot_waiter => undef, # Future signalled when queue drains below depth
_fatal_error => undef, # Typed error set by _fail_fatal
_on_reconnect => undef, # Callback for reconnect notification
_on_message => undef, # Message-arrived callback (callback mode)
_on_error => undef, # Fatal-error callback
_driver_step => undef, # Running driver loop Future (owned by _tasks selector)
_closed => 0,
_paused => 0, # Set during reconnect; clears in _resume_after_reconnect
}, $class;
}
# Set/get reconnect callback
sub on_reconnect {
my ($self, $cb) = @_;
$self->{_on_reconnect} = $cb if @_ > 1;
return $self->{_on_reconnect};
}
# Set/get message-arrived callback. Once set, next() croaks â the
# subscription is in callback mode for the rest of its lifetime.
# $cb->($sub, $msg) receives the Subscription and the message hashref.
sub on_message {
my ($self, $cb) = @_;
if (@_ > 1) {
if (!$cb && $self->{_on_message}) {
Carp::croak(
"on_message is sticky; cannot clear once set "
. "(construct a new Subscription for iterator mode)"
);
}
$self->{_on_message} = $cb;
# If the subscription already has channels and is open, start
# the driver. If not, it'll be started when channels are added.
$self->_start_driver if $cb;
}
return $self->{_on_message};
}
# Set/get fatal-error callback. Fires once per fatal error; default
# (when unset) is to die so silent death is impossible.
# $cb->($sub, $err) receives the Subscription and the error.
sub on_error {
my ($self, $cb) = @_;
lib/Async/Redis/Subscription.pm view on Meta::CPAN
# Convert a raw RESP pub/sub frame into a message hashref and deliver it.
# In callback mode, invokes _on_message via _invoke_user_callback and
# returns its result (which may be a Future for consumer-side backpressure).
# In iterator mode, queues the message into _pending_messages and signals
# _message_waiter so a blocked next() can wake up.
#
# Non-message frames (subscribe confirmations, etc.) return undef and
# take no action â the driver loop will read the next frame.
sub _dispatch_frame {
my ($self, $frame) = @_;
return unless $frame && ref $frame eq 'ARRAY';
my $type = $frame->[0] // '';
my $msg;
if ($type eq 'message') {
$msg = {
type => 'message',
channel => $frame->[1],
pattern => undef,
data => $frame->[2],
};
}
elsif ($type eq 'pmessage') {
$msg = {
type => 'pmessage',
pattern => $frame->[1],
channel => $frame->[2],
data => $frame->[3],
};
}
elsif ($type eq 'smessage') {
$msg = {
type => 'smessage',
channel => $frame->[1],
pattern => undef,
data => $frame->[2],
};
}
else {
return undef; # non-message frame (subscribe confirmation, etc.)
}
# Queue the message for consumption by next() (iterator mode) or the
# callback driver loop (callback mode). The driver invokes _on_message;
# _dispatch_frame is intentionally agnostic about the consumption mode.
# This keeps backpressure uniform: the depth limit applies to both modes.
return if $self->{_closed};
my $redis = $self->{redis};
my $depth = ($redis && $redis->{message_queue_depth})
? $redis->{message_queue_depth}
: 0; # 0 = unbounded (default)
if ($depth && scalar(@{$self->{_pending_messages}}) >= $depth) {
# Queue full. Return a Future that queues the message once a slot
# opens (signalled by next() calling _slot_waiter->done).
$self->{_slot_waiter} //= Future->new;
my $slot = $self->{_slot_waiter};
weaken(my $weak = $self);
return $slot->then(sub {
return Future->done if !$weak || $weak->{_closed};
push @{$weak->{_pending_messages}}, $msg;
if (my $w = delete $weak->{_message_waiter}) {
$w->done unless $w->is_ready;
}
Future->done;
});
}
push @{$self->{_pending_messages}}, $msg;
if (my $w = delete $self->{_message_waiter}) {
$w->done unless $w->is_ready;
}
return undef;
}
# The callback-mode driver loop. Consumes from _pending_messages via
# _dequeue (populated by _run_reader's dispatch path), invokes the
# user's _on_message callback, and awaits its returned Future if any
# for consumer-opted backpressure.
#
# Exits cleanly when _dequeue returns undef (subscription closed or
# paused for reconnect). Dies with the typed error if _dequeue dies
# (fatal); _run_driver's Future failure is visible through the
# client's Future::Selector to any caller using run_until_ready.
#
# Periodic sleep(0) yield every MAX_SYNC_DEPTH iterations prevents
# stack growth when messages are pre-queued and await returns
# synchronously from an already-ready Future.
async sub _run_driver {
my ($self) = @_;
my $iter = 0;
while (!$self->{_closed} && !$self->{_paused}) {
my $msg;
my $deq_ok = eval { $msg = await $self->_dequeue(1); 1 };
unless ($deq_ok) {
my $err = $@;
# _fail_fatal already set _closed and fired on_error; don't
# double-fire. Any other propagation path routes through
# _handle_fatal_error.
return if $self->{_closed} || $self->{_paused};
$self->_handle_fatal_error($err);
return;
}
last unless defined $msg;
last if $self->{_closed} || $self->{_paused};
my $cb = $self->{_on_message} or last;
my $result = $self->_invoke_user_callback($cb, $msg);
if (blessed($result) && $result->isa('Future')) {
my $cb_ok = eval { await $result; 1 };
unless ($cb_ok) {
my $err = $@;
return if $self->{_closed} || $self->{_paused};
$self->_handle_fatal_error(
"on_message callback Future failed: $err"
);
return;
( run in 0.711 second using v1.01-cache-2.11-cpan-39bf76dae61 )