Async-Redis

 view release on metacpan or  search on metacpan

t/50-pubsub/on-message.t  view on Meta::CPAN

    # After the unified-reader refactor, _dispatch_frame always queues
    # regardless of mode. The callback driver (_start_driver) dequeues and
    # invokes _on_message; _dispatch_frame is mode-agnostic.
    my $redis = Async::Redis->new(host => 'localhost');
    my $sub = Async::Redis::Subscription->new(redis => $redis);

    $sub->on_message(sub { 1 });   # set callback mode

    my $frame = [ 'message', 'chan', 'payload' ];
    my $result = $sub->_dispatch_frame($frame);

    # Message must be in the queue for the driver to dequeue.
    is(scalar @{$sub->{_pending_messages}}, 1, 'message buffered in queue');
    is($sub->{_pending_messages}[0]{type},    'message', 'correct type');
    is($sub->{_pending_messages}[0]{channel}, 'chan',    'correct channel');
    is($sub->{_pending_messages}[0]{data},    'payload', 'correct data');
    # _dispatch_frame returns undef (no Future) when queued synchronously.
    is($result, undef, 'dispatch returns undef (queued synchronously)');
};

subtest '_dispatch_frame: depth backpressure applies in callback mode too' => sub {
    # Backpressure Future is returned when queue is at depth — same path
    # in both modes since _dispatch_frame is now mode-agnostic.
    my $redis = bless { message_queue_depth => 1 }, 'Async::Redis';
    my $sub = Async::Redis::Subscription->new(redis => $redis);

    $sub->on_message(sub { 1 });   # set callback mode

    # First message fills the queue.
    my $r1 = $sub->_dispatch_frame([ 'message', 'ch', 'v1' ]);
    is($r1, undef, 'first dispatch returns undef (queued)');
    is(scalar @{$sub->{_pending_messages}}, 1, 'queue at depth');

    # Second message exceeds depth — should return a Future.
    my $r2 = $sub->_dispatch_frame([ 'message', 'ch', 'v2' ]);
    ok(ref($r2) && $r2->isa('Future'), 'second dispatch returns Future (at depth)');
    is(scalar @{$sub->{_pending_messages}}, 1, 'queue still at depth (v2 held)');
};

subtest '_dispatch_frame queues message for iterator consumers when no callback' => sub {
    my $redis = Async::Redis->new(host => 'localhost');
    my $sub = Async::Redis::Subscription->new(redis => $redis);

    my $frame = [ 'message', 'chan', 'payload' ];
    my $result = $sub->_dispatch_frame($frame);

    # With no callback, the message is queued for next() consumers.
    is(scalar @{$sub->{_pending_messages}}, 1, 'message buffered in queue');
    is($sub->{_pending_messages}[0]{data}, 'payload', 'buffered message data');
    is($result, undef, 'dispatch returns undef on fallthrough');
};

# --- Integration tests (need Redis) ---

SKIP: {
    _with_redis {
        my ($publisher) = @_;

        # Integration tests bypass the run{} helper's busy-poll pump
        # because it interacts badly with the driver's on_done
        # callbacks (pumping via Future::IO->sleep(0)->get corrupts
        # internal state when the driver fires a failed-Future →
        # on_error → _close sequence). Direct ->get works reliably.

        subtest 'on_message receives messages published to subscribed channels' => sub {
            my $subscriber = _make_subscriber();
            my @received;
            my $sub = $subscriber->subscribe('test:onmsg:basic')->get;
            $sub->on_message(sub {
                my ($s, $msg) = @_;
                push @received, $msg;
            });

            for my $i (1..3) {
                $publisher->publish('test:onmsg:basic', "msg-$i")->get;
            }
            Future::IO->sleep(0.3)->get;

            is(scalar @received, 3, 'received all 3 messages');
            is($received[0]{type},    'message',          'first msg type');
            is($received[0]{channel}, 'test:onmsg:basic', 'first msg channel');
            is($received[0]{data},    'msg-1',            'first msg data');
            is($received[0]{pattern}, undef,              'pattern undef for message');

            $subscriber->disconnect;
        };

        subtest 'no F::AA "lost its returning future" warning from fire-and-forget use' => sub {
            # Placed BEFORE the CLIENT KILL subtests so Redis's client
            # state is still clean.
            my @warnings;
            local $SIG{__WARN__} = sub { push @warnings, @_ };

            {
                my $subscriber = _make_subscriber();
                my $sub = $subscriber->subscribe('test:onmsg:no-warn')->get;
                $sub->on_message(sub { });

                $publisher->publish('test:onmsg:no-warn', 'x')->get;
                Future::IO->sleep(0.3)->get;

                $subscriber->disconnect;
            }
            # Give deferred GC/event-loop callbacks a chance to fire.
            Future::IO->sleep(0.1)->get;

            my @faa_warnings = grep { /lost.+returning future/i } @warnings;
            is(scalar @faa_warnings, 0,
                'no "lost its returning future" warnings from on_message path')
                or note("warnings captured: @warnings");
        };

        # Full end-to-end backpressure timing is flaky in this test
        # harness because the synchronous-callback path is extremely
        # tight — messages are dispatched as soon as frames arrive,
        # which can race the publisher. The backpressure LOGIC is
        # covered by the unit test below (`_dispatch_frame returns
        # Future when callback does`) and the failed-Future
        # integration test. The end-to-end timing test is documented
        # as a known gap pending a deterministic sync primitive.

        subtest 'callback returning a failed Future routes to on_error' => sub {
            # Fresh publisher too — the shared one can linger in a
            # state that interacts oddly with the fatal-close sequence.
            my $pub = _make_publisher();
            my $subscriber = _make_subscriber();
            my $err_seen;
            my $sub = $subscriber->subscribe('test:onmsg:future-fail')->get;
            $sub->on_error(sub {
                my ($s, $err) = @_;
                $err_seen = $err;
            });
            $sub->on_message(sub {
                return Future->fail("callback-future-boom");
            });

            $pub->publish('test:onmsg:future-fail', 'x')->get;
            Future::IO->sleep(0.3)->get;

            like($err_seen, qr/callback-future-boom/, 'on_error fired with callback Future failure');
            ok($sub->is_closed, 'subscription closed after fatal error');
            eval { $subscriber->disconnect };
            eval { $pub->disconnect };
        };

        subtest 'callback returning a Future delays next read until it resolves' => sub {
            my $subscriber = _make_subscriber();
            my @received;
            my $gate = Future->new;
            my $sub = $subscriber->subscribe('test:onmsg:backpressure')->get;
            $sub->on_message(sub {
                my ($s, $msg) = @_;
                push @received, $msg->{data};
                # First message blocks on the gate; subsequent return
                # undef so the driver can drain.
                return @received == 1 ? $gate : undef;
            });

            for my $i (1..3) {
                $publisher->publish('test:onmsg:backpressure', "msg-$i")->get;
            }
            Future::IO->sleep(0.3)->get;

            is(scalar @received, 1, 'driver delivered 1 message then blocked on Future');



( run in 3.070 seconds using v1.01-cache-2.11-cpan-cdf2f3d4e48 )