PAGI

 view release on metacpan or  search on metacpan

t/19-receive-queue-limit.t  view on Meta::CPAN

    my $slow_app = async sub  {
        my ($scope, $receive, $send) = @_;
        if ($scope->{type} eq 'lifespan') {
            while (1) {
                my $event = await $receive->();
                if ($event->{type} eq 'lifespan.startup') {
                    await $send->({ type => 'lifespan.startup.complete' });
                }
                elsif ($event->{type} eq 'lifespan.shutdown') {
                    await $send->({ type => 'lifespan.shutdown.complete' });
                    last;
                }
            }
            return;
        }

        return unless $scope->{type} eq 'websocket';

        # Accept the WebSocket connection
        await $send->({ type => 'websocket.accept' });
        $app_started = 1;

        # Intentionally delay before starting to consume messages
        # This simulates a slow consumer or an app doing expensive setup
        await $loop->delay_future(after => 0.5);

        # Now consume messages and count how many were queued
        my $count = 0;
        while (1) {
            my $event = await $receive->();
            last if $event->{type} eq 'websocket.disconnect';
            $count++ if $event->{type} eq 'websocket.receive';
        }
        $queue_size_observed = $count;
    };

    my $server = PAGI::Server->new(
        app   => $slow_app,
        host  => '127.0.0.1',
        port  => 0,
        quiet => 1,
    );

    $loop->add($server);
    $server->listen->get;

    my $port = $server->port;

    my $client = Net::Async::WebSocket::Client->new;
    $loop->add($client);

    eval {
        $client->connect(url => "ws://127.0.0.1:$port/")->get;

        # Wait for app to accept the connection
        my $deadline = time + 2;
        while (!$app_started && time < $deadline) {
            $loop->loop_once(0.01);
        }

        # Rapidly send many small messages while app is "busy"
        # These will queue up in receive_queue
        for my $i (1..100) {
            $client->send_text_frame("msg$i");
            $messages_sent++;
            # Small delay to allow frames to be sent
            $loop->loop_once(0.001);
        }

        # Close connection - app will then consume the queued messages
        $client->close;

        # Wait for app to finish processing
        $deadline = time + 3;
        while ($queue_size_observed == 0 && time < $deadline) {
            $loop->loop_once(0.1);
        }
    };

    # The test demonstrates that messages queue up
    # Without a limit, all 100 messages are queued
    ok($messages_sent == 100, "Sent 100 messages");
    ok($queue_size_observed > 0, "App observed queued messages: $queue_size_observed");

    # This is the actual issue: with no limit, queue can grow to any size
    # A production fix would cap this (e.g., max_receive_queue option)
    note("Messages sent: $messages_sent, Queue size observed: $queue_size_observed");

    eval { $loop->remove($client) };  # May already be removed
    $server->shutdown->get;
    eval { $loop->remove($server) };
};

# Test: Verify max_receive_queue limit is enforced
subtest 'max_receive_queue limit closes connection when exceeded' => sub {
    my $app_started = 0;
    my $close_sent = 0;

    # App that accepts WebSocket but NEVER calls receive (simulating frozen app)
    my $non_consuming_app = async sub  {
        my ($scope, $receive, $send) = @_;
        if ($scope->{type} eq 'lifespan') {
            while (1) {
                my $event = await $receive->();
                if ($event->{type} eq 'lifespan.startup') {
                    await $send->({ type => 'lifespan.startup.complete' });
                }
                elsif ($event->{type} eq 'lifespan.shutdown') {
                    await $send->({ type => 'lifespan.shutdown.complete' });
                    last;
                }
            }
            return;
        }

        return unless $scope->{type} eq 'websocket';

        await $send->({ type => 'websocket.accept' });
        $app_started = 1;

        # Simulate slow/frozen processing - sleep instead of consuming



( run in 0.895 second using v1.01-cache-2.11-cpan-ceb78f64989 )