Async-Redis

 view release on metacpan or  search on metacpan

t/50-pubsub/on-message.t  view on Meta::CPAN

            # Give deferred GC/event-loop callbacks a chance to fire.
            Future::IO->sleep(0.1)->get;

            my @faa_warnings = grep { /lost.+returning future/i } @warnings;
            is(scalar @faa_warnings, 0,
                'no "lost its returning future" warnings from on_message path')
                or note("warnings captured: @warnings");
        };

        # Full end-to-end backpressure timing is flaky in this test
        # harness because the synchronous-callback path is extremely
        # tight — messages are dispatched as soon as frames arrive,
        # which can race the publisher. The backpressure LOGIC is
        # covered by the unit test below (`_dispatch_frame returns
        # Future when callback does`) and the failed-Future
        # integration test. The end-to-end timing test is documented
        # as a known gap pending a deterministic sync primitive.

        subtest 'callback returning a failed Future routes to on_error' => sub {
            # Fresh publisher too — the shared one can linger in a
            # state that interacts oddly with the fatal-close sequence.
            my $pub = _make_publisher();
            my $subscriber = _make_subscriber();
            my $err_seen;
            my $sub = $subscriber->subscribe('test:onmsg:future-fail')->get;
            $sub->on_error(sub {
                my ($s, $err) = @_;
                $err_seen = $err;
            });
            $sub->on_message(sub {
                return Future->fail("callback-future-boom");
            });

            $pub->publish('test:onmsg:future-fail', 'x')->get;
            Future::IO->sleep(0.3)->get;

            like($err_seen, qr/callback-future-boom/, 'on_error fired with callback Future failure');
            ok($sub->is_closed, 'subscription closed after fatal error');
            eval { $subscriber->disconnect };
            eval { $pub->disconnect };
        };

        subtest 'callback returning a Future delays next read until it resolves' => sub {
            my $subscriber = _make_subscriber();
            my @received;
            my $gate = Future->new;
            my $sub = $subscriber->subscribe('test:onmsg:backpressure')->get;
            $sub->on_message(sub {
                my ($s, $msg) = @_;
                push @received, $msg->{data};
                # First message blocks on the gate; subsequent return
                # undef so the driver can drain.
                return @received == 1 ? $gate : undef;
            });

            for my $i (1..3) {
                $publisher->publish('test:onmsg:backpressure', "msg-$i")->get;
            }
            Future::IO->sleep(0.3)->get;

            is(scalar @received, 1, 'driver delivered 1 message then blocked on Future');
            is($received[0], 'msg-1', 'first message delivered');

            $gate->done;
            Future::IO->sleep(0.3)->get;
            is(scalar @received, 3, 'remaining messages delivered after gate released');

            $subscriber->disconnect;
        };

        subtest 'subscribe from inside callback takes effect' => sub {
            my $subscriber = _make_subscriber();
            my @received;
            my $sub = $subscriber->subscribe('test:onmsg:reent:primary')->get;
            $sub->on_message(sub {
                my ($s, $msg) = @_;
                push @received, $msg;
                # First message triggers a subscribe to a second channel.
                if ($msg->{channel} eq 'test:onmsg:reent:primary' && @received == 1) {
                    $s->{redis}->subscribe('test:onmsg:reent:secondary')->retain;
                }
            });

            $publisher->publish('test:onmsg:reent:primary', 'p-1')->get;
            Future::IO->sleep(0.2)->get;
            $publisher->publish('test:onmsg:reent:secondary', 's-1')->get;
            Future::IO->sleep(0.3)->get;

            ok(scalar @received >= 2, 'received at least 2 messages');
            ok((grep { $_->{channel} eq 'test:onmsg:reent:secondary' } @received),
                'received message from channel subscribed inside callback');

            $subscriber->disconnect;
        };

        subtest 'handler swap inside callback uses new handler for next frame' => sub {
            my $subscriber = _make_subscriber();
            my @received_by_a;
            my @received_by_b;
            my $sub = $subscriber->subscribe('test:onmsg:swap')->get;

            my $handler_b = sub {
                my ($s, $msg) = @_;
                push @received_by_b, $msg->{data};
            };
            my $handler_a = sub {
                my ($s, $msg) = @_;
                push @received_by_a, $msg->{data};
                # Swap handler mid-stream
                $s->on_message($handler_b);
            };
            $sub->on_message($handler_a);

            for my $i (1..3) {
                $publisher->publish('test:onmsg:swap', "m-$i")->get;
            }
            Future::IO->sleep(0.3)->get;

            is(scalar @received_by_a, 1, 'handler A received exactly one message');
            is($received_by_a[0], 'm-1', 'handler A got first message');
            is(scalar @received_by_b, 2, 'handler B received the next two');



( run in 1.272 second using v1.01-cache-2.11-cpan-df04353d9ac )