PAGI
view release on metacpan or search on metacpan
t/19-receive-queue-limit.t view on Meta::CPAN
my $slow_app = async sub {
my ($scope, $receive, $send) = @_;
if ($scope->{type} eq 'lifespan') {
while (1) {
my $event = await $receive->();
if ($event->{type} eq 'lifespan.startup') {
await $send->({ type => 'lifespan.startup.complete' });
}
elsif ($event->{type} eq 'lifespan.shutdown') {
await $send->({ type => 'lifespan.shutdown.complete' });
last;
}
}
return;
}
return unless $scope->{type} eq 'websocket';
# Accept the WebSocket connection
await $send->({ type => 'websocket.accept' });
$app_started = 1;
# Intentionally delay before starting to consume messages
# This simulates a slow consumer or an app doing expensive setup
await $loop->delay_future(after => 0.5);
# Now consume messages and count how many were queued
my $count = 0;
while (1) {
my $event = await $receive->();
last if $event->{type} eq 'websocket.disconnect';
$count++ if $event->{type} eq 'websocket.receive';
}
$queue_size_observed = $count;
};
my $server = PAGI::Server->new(
app => $slow_app,
host => '127.0.0.1',
port => 0,
quiet => 1,
);
$loop->add($server);
$server->listen->get;
my $port = $server->port;
my $client = Net::Async::WebSocket::Client->new;
$loop->add($client);
eval {
$client->connect(url => "ws://127.0.0.1:$port/")->get;
# Wait for app to accept the connection
my $deadline = time + 2;
while (!$app_started && time < $deadline) {
$loop->loop_once(0.01);
}
# Rapidly send many small messages while app is "busy"
# These will queue up in receive_queue
for my $i (1..100) {
$client->send_text_frame("msg$i");
$messages_sent++;
# Small delay to allow frames to be sent
$loop->loop_once(0.001);
}
# Close connection - app will then consume the queued messages
$client->close;
# Wait for app to finish processing
$deadline = time + 3;
while ($queue_size_observed == 0 && time < $deadline) {
$loop->loop_once(0.1);
}
};
# The test demonstrates that messages queue up
# Without a limit, all 100 messages are queued
ok($messages_sent == 100, "Sent 100 messages");
ok($queue_size_observed > 0, "App observed queued messages: $queue_size_observed");
# This is the actual issue: with no limit, queue can grow to any size
# A production fix would cap this (e.g., max_receive_queue option)
note("Messages sent: $messages_sent, Queue size observed: $queue_size_observed");
eval { $loop->remove($client) }; # May already be removed
$server->shutdown->get;
eval { $loop->remove($server) };
};
# Test: Verify max_receive_queue limit is enforced
subtest 'max_receive_queue limit closes connection when exceeded' => sub {
my $app_started = 0;
my $close_sent = 0;
# App that accepts WebSocket but NEVER calls receive (simulating frozen app)
my $non_consuming_app = async sub {
my ($scope, $receive, $send) = @_;
if ($scope->{type} eq 'lifespan') {
while (1) {
my $event = await $receive->();
if ($event->{type} eq 'lifespan.startup') {
await $send->({ type => 'lifespan.startup.complete' });
}
elsif ($event->{type} eq 'lifespan.shutdown') {
await $send->({ type => 'lifespan.shutdown.complete' });
last;
}
}
return;
}
return unless $scope->{type} eq 'websocket';
await $send->({ type => 'websocket.accept' });
$app_started = 1;
# Simulate slow/frozen processing - sleep instead of consuming
( run in 0.895 second using v1.01-cache-2.11-cpan-ceb78f64989 )