Async-Redis
view release on metacpan or search on metacpan
lib/Async/Redis/Subscription.pm view on Meta::CPAN
# The callback-mode driver loop. Consumes from _pending_messages via
# _dequeue (populated by _run_reader's dispatch path), invokes the
# user's _on_message callback, and awaits its returned Future if any
# for consumer-opted backpressure.
#
# Exits cleanly when _dequeue returns undef (subscription closed or
# paused for reconnect). Dies with the typed error if _dequeue dies
# (fatal); _run_driver's Future failure is visible through the
# client's Future::Selector to any caller using run_until_ready.
#
# Periodic sleep(0) yield every MAX_SYNC_DEPTH iterations prevents
# stack growth when messages are pre-queued and await returns
# synchronously from an already-ready Future.
async sub _run_driver {
my ($self) = @_;
my $iter = 0;
while (!$self->{_closed} && !$self->{_paused}) {
my $msg;
my $deq_ok = eval { $msg = await $self->_dequeue(1); 1 };
unless ($deq_ok) {
my $err = $@;
t/93-socket-cleanup/close-with-watchers.t view on Meta::CPAN
my @warnings;
local $SIG{__WARN__} = sub { push @warnings, @_ };
# Rapidly connect and disconnect, sometimes with pending operations
for my $i (1..10) {
my $r = Async::Redis->new(
host => $ENV{REDIS_HOST} // 'localhost',
);
run { $r->connect };
# On odd iterations, start an operation before disconnect
if ($i % 2) {
my $f = $r->ping; # Start a command
$r->disconnect; # Disconnect while it's pending
} else {
$r->disconnect; # Clean disconnect
}
}
# Give async cleanup time to complete
run { Future::IO->sleep(0.2) };
( run in 0.588 second using v1.01-cache-2.11-cpan-71847e10f99 )