PAGI

 view release on metacpan or  search on metacpan

examples/worker-pool-prototype.pl  view on Meta::CPAN

        close $child_wr;

        # Make pipes non-blocking for async I/O
        $parent_rd->blocking(0);
        $parent_wr->blocking(0);

        push @{$self->{worker_pids}}, $pid;
        push @{$self->{pipes}}, {
            to_worker   => $parent_wr,
            from_worker => $parent_rd,
            busy        => 0,
            id          => $id,
        };

        say "Started worker $id (PID $pid)";
    }

    sub _get_idle_worker ($self) {
        for my $w (@{$self->{pipes}}) {
            return $w unless $w->{busy};
        }
        return undef;
    }

    async sub call ($self, $handler_name, %opts) {
        my $args = $opts{args} // [];

        # Wait for an idle worker
        my $worker;
        while (!($worker = $self->_get_idle_worker())) {
            await Future::IO->sleep(0.01);
        }

        $worker->{busy} = 1;

        my $job = freeze({ handler => $handler_name, args => $args });
        my $len = pack('N', length($job));

        # Send job to worker using Future::IO
        await Future::IO->syswrite($worker->{to_worker}, $len . $job);

        # Read response length (4 bytes)
        my $resp_len_buf = await Future::IO->sysread($worker->{from_worker}, 4);
        my $resp_len = unpack('N', $resp_len_buf);

examples/worker-pool-prototype.pl  view on Meta::CPAN

        # Read response data
        my $resp_data = '';
        while (length($resp_data) < $resp_len) {
            my $chunk = await Future::IO->sysread(
                $worker->{from_worker},
                $resp_len - length($resp_data)
            );
            $resp_data .= $chunk;
        }

        $worker->{busy} = 0;

        my $response = thaw($resp_data);

        if ($response->{error}) {
            die $response->{error};
        }

        return $response->{result};
    }

t/19-receive-queue-limit.t  view on Meta::CPAN


    eval {
        $client->connect(url => "ws://127.0.0.1:$port/")->get;

        # Wait for app to accept the connection
        my $deadline = time + 2;
        while (!$app_started && time < $deadline) {
            $loop->loop_once(0.01);
        }

        # Rapidly send many small messages while app is "busy"
        # These will queue up in receive_queue
        for my $i (1..100) {
            $client->send_text_frame("msg$i");
            $messages_sent++;
            # Small delay to allow frames to be sent
            $loop->loop_once(0.001);
        }

        # Close connection - app will then consume the queued messages
        $client->close;



( run in 1.162 second using v1.01-cache-2.11-cpan-39bf76dae61 )