PAGI
view release on metacpan or search on metacpan
examples/worker-pool-prototype.pl view on Meta::CPAN
close $child_wr;
# Make pipes non-blocking for async I/O
$parent_rd->blocking(0);
$parent_wr->blocking(0);
push @{$self->{worker_pids}}, $pid;
push @{$self->{pipes}}, {
to_worker => $parent_wr,
from_worker => $parent_rd,
busy => 0,
id => $id,
};
say "Started worker $id (PID $pid)";
}
sub _get_idle_worker ($self) {
for my $w (@{$self->{pipes}}) {
return $w unless $w->{busy};
}
return undef;
}
async sub call ($self, $handler_name, %opts) {
my $args = $opts{args} // [];
# Wait for an idle worker
my $worker;
while (!($worker = $self->_get_idle_worker())) {
await Future::IO->sleep(0.01);
}
$worker->{busy} = 1;
my $job = freeze({ handler => $handler_name, args => $args });
my $len = pack('N', length($job));
# Send job to worker using Future::IO
await Future::IO->syswrite($worker->{to_worker}, $len . $job);
# Read response length (4 bytes)
my $resp_len_buf = await Future::IO->sysread($worker->{from_worker}, 4);
my $resp_len = unpack('N', $resp_len_buf);
examples/worker-pool-prototype.pl view on Meta::CPAN
# Read response data
my $resp_data = '';
while (length($resp_data) < $resp_len) {
my $chunk = await Future::IO->sysread(
$worker->{from_worker},
$resp_len - length($resp_data)
);
$resp_data .= $chunk;
}
$worker->{busy} = 0;
my $response = thaw($resp_data);
if ($response->{error}) {
die $response->{error};
}
return $response->{result};
}
t/19-receive-queue-limit.t view on Meta::CPAN
eval {
$client->connect(url => "ws://127.0.0.1:$port/")->get;
# Wait for app to accept the connection
my $deadline = time + 2;
while (!$app_started && time < $deadline) {
$loop->loop_once(0.01);
}
# Rapidly send many small messages while app is "busy"
# These will queue up in receive_queue
for my $i (1..100) {
$client->send_text_frame("msg$i");
$messages_sent++;
# Small delay to allow frames to be sent
$loop->loop_once(0.001);
}
# Close connection - app will then consume the queued messages
$client->close;
( run in 1.162 second using v1.01-cache-2.11-cpan-39bf76dae61 )