Async-Redis

 view release on metacpan or  search on metacpan

lib/Async/Redis/Subscription.pm  view on Meta::CPAN

# The callback-mode driver loop. Consumes from _pending_messages via
# _dequeue (populated by _run_reader's dispatch path), invokes the
# user's _on_message callback, and awaits its returned Future if any
# for consumer-opted backpressure.
#
# Exits cleanly when _dequeue returns undef (subscription closed or
# paused for reconnect). Dies with the typed error if _dequeue dies
# (fatal); _run_driver's Future failure is visible through the
# client's Future::Selector to any caller using run_until_ready.
#
# Periodic sleep(0) yield every MAX_SYNC_DEPTH iterations prevents
# stack growth when messages are pre-queued and await returns
# synchronously from an already-ready Future.
async sub _run_driver {
    my ($self) = @_;
    my $iter = 0;
    while (!$self->{_closed} && !$self->{_paused}) {
        my $msg;
        my $deq_ok = eval { $msg = await $self->_dequeue(1); 1 };
        unless ($deq_ok) {
            my $err = $@;

t/93-socket-cleanup/close-with-watchers.t  view on Meta::CPAN

        my @warnings;
        local $SIG{__WARN__} = sub { push @warnings, @_ };

        # Rapidly connect and disconnect, sometimes with pending operations
        for my $i (1..10) {
            my $r = Async::Redis->new(
                host => $ENV{REDIS_HOST} // 'localhost',
            );
            run { $r->connect };

            # On odd iterations, start an operation before disconnect
            if ($i % 2) {
                my $f = $r->ping;  # Start a command
                $r->disconnect;   # Disconnect while it's pending
            } else {
                $r->disconnect;   # Clean disconnect
            }
        }

        # Give async cleanup time to complete
        run { Future::IO->sleep(0.2) };



( run in 0.588 second using v1.01-cache-2.11-cpan-71847e10f99 )