view release on metacpan or search on metacpan
to named path parameters after route definition
- Router: any() multi-method matcher supporting wildcard ('*') and
explicit method lists (e.g., any [qw(GET POST)] => '/path' => $handler)
- Router: named routes and uri_for() now support {name:pattern} syntax
and any() routes
- Router: group() for organizing routes under a common prefix, with three
forms: callback (sub), router-object, and string (auto-require). Supports
middleware, nesting, named route namespacing via as(), and conflict detection
- Router: mount() now accepts a string class name with auto-require and
to_app dispatch
- SSE over HTTP/2 with streaming DATA frames, keepalive comments, and
disconnect handling
- Add PAGI::Utils::Random module for cryptographically secure random bytes
- Rate limiter: periodic cleanup and safety valve for expired buckets
- CORS middleware: warn when configured with wildcard origins and credentials
[Security Fixes]
- Fix XSS in Debug middleware panel by escaping scope values in HTML output
- Add max_chunk_size limit (default 1MB) to HTTP/1.1 chunked transfer
parser to prevent denial-of-service via unbounded chunk sizes
- Use cryptographically secure random bytes (via /dev/urandom with
0.001018 - 2026-02-15
[Bug Fixes]
- Skip HTTP/2 subtests when Net::HTTP2::nghttp2 is not installed, fixing
CPAN tester failures on systems without optional nghttp2 dependency
0.001017 - 2026-02-11
[New Features]
- HTTP/2 support (experimental). Requires Net::HTTP2::nghttp2. Enable with
--http2 flag. Supports ALPN negotiation (h2/http1.1 over TLS), cleartext
h2c via connection preface detection, WebSocket over HTTP/2 (RFC 8441),
streaming responses with backpressure, and configurable protocol settings
(h2_max_concurrent_streams, h2_initial_window_size, etc.)
- Worker heartbeat monitoring for multi-worker mode. Parent process detects
workers with blocked event loops via Unix pipe heartbeat and replaces them
with SIGKILL + respawn. Default 50s timeout (heartbeat_timeout option,
--heartbeat-timeout CLI flag). Only detects event loop starvation â async
handlers using await are unaffected regardless of duration.
- Custom access log format strings (--access-log-format). Supports atoms
like %a (address), %s (status), %D (duration μs), %b (body size).
Enables structured JSON logging via format string.
- Track response body size in access log (%b atom)
docs/specs/index.mkdn
docs/specs/lifespan.mkdn
docs/specs/main.mkdn
docs/specs/tls.mkdn
docs/specs/www.mkdn
docs/superpowers/plans/2026-04-06-response-writer.md
docs/superpowers/plans/2026-04-06-router-head-stripping-and-route-table.md
docs/superpowers/specs/2026-04-06-router-head-stripping-and-route-table-design.md
examples/01-hello-http/README.md
examples/01-hello-http/app.pl
examples/02-streaming-response/README.md
examples/02-streaming-response/app.pl
examples/03-request-body/README.md
examples/03-request-body/app.pl
examples/04-websocket-echo/README.md
examples/04-websocket-echo/app.pl
examples/05-sse-broadcaster/README.md
examples/05-sse-broadcaster/app.pl
examples/06-lifespan-state/README.md
examples/06-lifespan-state/app.pl
examples/07-extension-fullflush/README.md
examples/07-extension-fullflush/app.pl
lib/PAGI/Test/Client.pm
lib/PAGI/Test/Response.pm
lib/PAGI/Test/SSE.pm
lib/PAGI/Test/WebSocket.pm
lib/PAGI/Tutorial.pod
lib/PAGI/Utils.pm
lib/PAGI/Utils/Random.pm
lib/PAGI/WebSocket.pm
t/00-load.t
t/01-hello-http.t
t/02-streaming.t
t/03-request-body.t
t/04-websocket.t
t/05-sse.t
t/06-lifespan.t
t/07-extensions.t
t/08-tls.t
t/09-psgi-bridge.t
t/10-http-compliance.t
t/11-multiworker.t
t/12-fork-loop-isolation.t
t/http2/02-server-config.t
t/http2/03-detection.t
t/http2/04-read-handler.t
t/http2/05-request-lifecycle.t
t/http2/06-integration.t
t/http2/07-websocket.t
t/http2/08-websocket-edge.t
t/http2/09-cli.t
t/http2/09-multiworker.t
t/http2/10-h2c.t
t/http2/11-streaming.t
t/http2/12-error-handling.t
t/http2/13-sse-detection.t
t/http2/14-sse-events.t
t/http2/15-sse-keepalive.t
t/http2/16-sse-cleanup.t
t/integration-endpoint-router-demo.t
t/lib/Test/PAGI/Server.pm
t/lib/TestRoutes/Admin.pm
t/lib/TestRoutes/Users.pm
t/lifespan.t
docs/implementations.mkdn view on Meta::CPAN
# Implementations
Complete or upcoming implementations of PAGI-compatible components â servers, frameworks, and other useful pieces. Contributions should include the project name, repository link, supported protocols, and status (prototype, beta, stable).
## Servers
- **PAGI::Server** â Reference IO::Async-based server with HTTP/1.1, WebSocket, SSE, and TLS support. Features multi-worker pre-fork mode, async file streaming, graceful shutdown, and signal-based worker scaling. Protocols: `http`, `websocket`, `ss...
## Application Frameworks
- **Thunderhorse** â A no-compromises async web framework built on PAGI. Features controller-based architecture, WebSocket and SSE support, Template::Toolkit integration, middleware system, and context-based request handling. Protocols: `http`, `we...
## Middleware & Apps
Bundled with PAGI::Server:
- **PAGI::Middleware::*** â 37 middleware components including GZip, CORS, CSRF, Session, RateLimit, Auth (Basic/Bearer), and more.
docs/specs/main.mkdn view on Meta::CPAN
### Connection Scope
Each incoming connection causes the application to be invoked with a `scope` hashref. Its keys include:
- `type`: Protocol type, e.g., `http`, `websocket`
- `pagi`: A hashref with at least:
- `version => '0.2'`
- `spec_version => '0.2'` (optional; protocol-specific override if it diverges)
- `features` (optional): a hashref of server-reported capabilities such as:
- `supports_streaming` => 1 if streaming responses are supported for this scope
- `max_request_body_size` => maximum accepted request body size (in bytes)
- `max_concurrent_streams` => maximum streams per HTTP/2/HTTP/3 connection
- `supports_trailers` => 1 if trailer frames are accepted for the response
Clarification on PAGI version keys:
- `version`: Signals the core PAGI specification version that governs scopes/events. This value **must** be a string.
- `spec_version`: Indicates the version for the specific protocol (such as HTTP or WebSocket). Servers may omit it, in which case clients should assume it matches `version`.
- `features`: Server-defined key/value pairs that describe optional behaviors for the connection. Values should use the permitted data types from this specification.
- Additional keys may be defined per protocol specification (e.g., HTTP, WebSocket) and must be documented there.
docs/specs/www.mkdn view on Meta::CPAN
**Validation:**
- `offset` MUST be a non-negative integer
- `length` MUST be a non-negative integer if provided
- If `offset` exceeds file size, servers SHOULD send zero bytes
**Examples:**
```perl
# Full file streaming
await $send->({
type => 'http.response.body',
file => '/var/www/static/large-video.mp4',
});
# Range request (bytes 1000-1999)
await $send->({
type => 'http.response.body',
file => '/var/www/static/document.pdf',
offset => 1000,
docs/specs/www.mkdn view on Meta::CPAN
- `QUERY_STRING` -> `query_string`
- `CONTENT_TYPE` -> extracted from `headers`
- `CONTENT_LENGTH` -> extracted from `headers`
- `SERVER_NAME`, `SERVER_PORT` -> `server`
- `REMOTE_ADDR`, `REMOTE_PORT` -> `client`
- `SERVER_PROTOCOL` -> `http_version`
- `psgi.url_scheme` -> `scheme`
- `psgi.version` -> `[1, 1]` (PAGI servers MUST advertise the PSGI version they emulate when bridging)
- `psgi.input` -> constructed from `http.request` events
- `psgi.errors` -> handled by the server as appropriate
- `psgi.streaming`, `psgi.nonblocking`, `psgi.multithread`, `psgi.multiprocess` -> derived from PAGI server capabilities and advertised via PSGI adapter docs
Response mappings:
- `status` and `headers` map directly to `http.response.start`
- Body content from PSGI maps directly to `http.response.body` messages.
## PAGI Encoding Differences
- `path`: Decoded UTF-8 string from percent-encoded input. The server first
percent-decodes `raw_path`, then attempts UTF-8 decoding of the resulting
docs/superpowers/plans/2026-04-06-response-writer.md view on Meta::CPAN
Run: `bash -c 'source ~/perl5/perlbrew/etc/bashrc && perlbrew use perl-5.40.0@default && RELEASE_TESTING=1 prove -l t/response-writer.t'`
Expected: FAIL â `on_close` method doesn't exist, `is_closed` method doesn't exist
- [ ] **Step 3: Implement `on_close`, `is_closed` on Writer**
In `lib/PAGI/Response.pm`, replace the entire `PAGI::Response::Writer` package (lines 1091-1132) with:
```perl
# Writer class for streaming responses
package PAGI::Response::Writer {
use strict;
use warnings;
use Future::AsyncAwait;
use Carp qw(croak);
sub new {
my ($class, $send, %opts) = @_;
my $self = bless {
send => $send,
docs/superpowers/plans/2026-04-06-response-writer.md view on Meta::CPAN
```
- [ ] **Step 4: Run test to verify it passes**
Run: `bash -c 'source ~/perl5/perlbrew/etc/bashrc && perlbrew use perl-5.40.0@default && RELEASE_TESTING=1 prove -l t/response-writer.t'`
Expected: PASS
- [ ] **Step 5: Run all response tests**
Run: `bash -c 'source ~/perl5/perlbrew/etc/bashrc && perlbrew use perl-5.40.0@default && RELEASE_TESTING=1 prove -l t/response.t t/response-writer.t t/02-streaming.t'`
Expected: PASS
- [ ] **Step 6: Commit**
```bash
git add lib/PAGI/Response.pm t/response-writer.t
git commit -m "feat: add push-style writer() method to PAGI::Response"
```
docs/superpowers/plans/2026-04-06-router-head-stripping-and-route-table.md view on Meta::CPAN
is $headers{'x-custom'}, 'head-handler', 'explicit HEAD handler was called';
};
```
- [ ] **Step 2: Run test to verify it passes (explicit HEAD routes are already dispatched directly)**
Run: `bash -c 'source ~/perl5/perlbrew/etc/bashrc && perlbrew use perl-5.40.0@default && RELEASE_TESTING=1 prove -l t/router-head.t'`
Expected: PASS â explicit HEAD routes match before the GET fallback because they have `method => 'HEAD'`, and `$match_method` is set to `'GET'` only for the GET-fallback path. The explicit HEAD route matches because the dispatch also checks `$route...
- [ ] **Step 3: Write failing test â streaming GET handler has all chunks stripped**
Append to `t/router-head.t`:
```perl
subtest 'HEAD strips body from streaming response (multiple chunks)' => sub {
my $router = PAGI::App::Router->new;
$router->get('/stream' => async sub {
my ($scope, $receive, $send) = @_;
await $send->({
type => 'http.response.start',
status => 200,
headers => [['content-type', 'text/plain']],
});
await $send->({ type => 'http.response.body', body => 'chunk1', more => 1 });
await $send->({ type => 'http.response.body', body => 'chunk2', more => 1 });
docs/superpowers/plans/2026-04-06-router-head-stripping-and-route-table.md view on Meta::CPAN
- [ ] **Step 8: Run test to verify it passes**
Run: `bash -c 'source ~/perl5/perlbrew/etc/bashrc && perlbrew use perl-5.40.0@default && RELEASE_TESTING=1 prove -l t/router-head.t'`
Expected: PASS
- [ ] **Step 9: Commit**
```bash
git add t/router-head.t
git commit -m "test: add HEAD edge case tests (explicit head route, streaming, 404, 405)"
```
---
## Task 3: Route Table API â HTTP Routes
### Files
- Create: `t/router-route-table.t`
- Modify: `lib/PAGI/App/Router.pm`
examples/02-streaming-response/README.md view on Meta::CPAN
# 02 â Streaming Response with Disconnect Handling
Shows how to:
- Drain the incoming `http.request` body (if any) before replying.
- Send multiple `http.response.body` chunks with `more => 1`.
- Emit `http.response.trailers` when `trailers => 1` was advertised.
- Watch for `{ type => 'http.disconnect' }` while streaming and stop if the client drops.
## Quick Start
**1. Start the server:**
```bash
pagi-server --app examples/02-streaming-response/app.pl --port 5000
```
**2. Demo with curl:**
```bash
# Watch chunks stream in (one per second)
curl -N http://localhost:5000/
# => Chunk 1 of 5
# => Chunk 2 of 5
# => ...
# Test disconnect handling - press Ctrl+C during streaming
curl -N http://localhost:5000/
# (press Ctrl+C to see server handle disconnect)
```
## Spec References
- HTTP events, trailers, disconnect â `docs/specs/www.mkdn`
- Cancellation semantics â `docs/specs/main.mkdn`
examples/07-extension-fullflush/README.md view on Meta::CPAN
# 07 â Extension-Aware Streaming with FullFlush
Demonstrates how to:
- Check for extension support via `scope->{extensions}{fullflush}`
- Use `http.fullflush` event during streaming to force immediate TCP buffer flush
- Only send extension events when the server advertises support
The fullflush extension is useful for real-time streaming scenarios where you want each chunk delivered to the client immediately rather than waiting for TCP buffer fill or Nagle's algorithm.
## Quick Start
**1. Start the server:**
```bash
pagi-server --app examples/07-extension-fullflush/app.pl --port 5000
```
**2. Demo with curl:**
```bash
# Watch real-time streaming with immediate flush
curl -N http://localhost:5000/
# => Chunk 1 (flushed immediately)
# => Chunk 2 (flushed immediately)
# => ...
# Each chunk appears instantly rather than being buffered
```
**Note:** The difference from regular streaming is most noticeable with small chunks that would normally be buffered by TCP.
## Spec References
- Extensions section â `docs/specs/main.mkdn`
- Fullflush example â `docs/extensions.mkdn`
examples/07-extension-fullflush/app.pl view on Meta::CPAN
use strict;
use warnings;
use Future::AsyncAwait;
# Demonstrates fullflush extension during streaming response.
# The fullflush event forces immediate TCP buffer flush, useful for
# Server-Sent Events or real-time streaming where latency matters.
async sub app {
my ($scope, $receive, $send) = @_;
die "Unsupported scope type: $scope->{type}" if $scope->{type} ne 'http';
# Drain request body if present
while (1) {
my $event = await $receive->();
last if $event->{type} ne 'http.request';
examples/11-job-runner/README.md view on Meta::CPAN
```bash
perl -Ilib -Iexamples/11-job-runner/lib bin/pagi-server \
--app examples/11-job-runner/app.pl --port 5001
```
Then open http://localhost:5001 in your browser.
## Features
- **Real-time job queue** - Create countdown jobs and watch them execute
- **Live progress streaming** - SSE updates show second-by-second progress
- **WebSocket dashboard** - Queue-wide updates pushed to all connected clients
- **Concurrent execution** - Worker processes up to 3 jobs simultaneously
## Architecture
```
âââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ
â Browser (app.js) â
â ââââââââââââââââ ââââââââââââââââ ââââââââââââââââââââ â
â â WebSocket â â SSE â â HTTP (REST) â â
examples/11-job-runner/lib/JobRunner/SSE.pm view on Meta::CPAN
data => $JSON->encode($data),
});
}
1;
__END__
# NAME
JobRunner::SSE - Server-Sent Events for job progress streaming
# DESCRIPTION
Provides real-time progress streaming for individual jobs via SSE.
## Endpoint
GET /api/jobs/:id/progress
## Events
- **status** - Initial job status on connection
- **progress** - Progress update { percent, message }
- **complete** - Job completed successfully { status, result, duration }
examples/README.md view on Meta::CPAN
- For timers/sleeps: `Future::IO` (loop-agnostic)
- Run examples with: `pagi-server examples/01-hello-http/app.pl --port 5000`
Note: Some advanced examples (job-runner, chat) use `IO::Async` directly for
timer and subprocess features. These are PAGI::Server-specific patterns.
Examples assume you understand the core spec (`docs/specs/main.mkdn`) plus the relevant protocol documents.
## Example List
1. `01-hello-http` - minimal HTTP response
2. `02-streaming-response` - chunked body, trailers, disconnect handling
3. `03-request-body` - reads multi-event request bodies
4. `04-websocket-echo` - handshake and echo loop
5. `05-sse-broadcaster` - server-sent events
6. `06-lifespan-state` - lifespan protocol with shared state
7. `07-extension-fullflush` - middleware using the `fullflush` extension
8. `08-tls-introspection` - prints TLS metadata when present
9. `09-psgi-bridge` - wraps a PSGI app for PAGI use (via `PAGI::App::WrapPSGI`)
10. `10-chat-showcase` - WebSocket chat demo with multiple clients
11. `11-job-runner` - background job processing example
12. `12-utf8` - UTF-8 handling demonstration
examples/sse-dashboard/README.md view on Meta::CPAN
# SSE Dashboard Example
Live dashboard using PAGI::SSE for real-time metrics streaming.
## Run
```bash
pagi-server --app examples/sse-dashboard/app.pl --port 5000
```
Visit http://localhost:5000/
## Features
- Real-time server metrics streaming
- Automatic keepalive for proxy compatibility
- Reconnection support via `Last-Event-ID`
- Multiple event types (`connected`, `reconnected`, `metrics`)
- Subscriber tracking
## API
- `SSE /events` - Metrics stream (2-second updates)
- `GET /*` - Static files from `public/`
examples/sse-dashboard/app.pl view on Meta::CPAN
#!/usr/bin/env perl
#
# Live Dashboard using PAGI::SSE
#
# Demonstrates real-time server metrics streaming with:
# - Automatic keepalive for proxy compatibility
# - Reconnection support via Last-Event-ID
# - Multiple event types
#
# Run: pagi-server --app examples/sse-dashboard/app.pl --port 5000
# Open: http://localhost:5000/
#
use strict;
use warnings;
lib/PAGI/App/File.pm view on Meta::CPAN
)->to_app;
=head1 DESCRIPTION
PAGI::App::File serves static files from a configured root directory.
=head2 Features
=over 4
=item * Efficient streaming (no memory bloat for large files)
=item * ETag caching with If-None-Match support (304 Not Modified)
=item * Range requests (HTTP 206 Partial Content)
=item * Automatic MIME type detection for common file types
=item * Index file resolution (index.html, index.htm)
=back
lib/PAGI/App/File.pm view on Meta::CPAN
status => 206,
headers => [
['content-type', $content_type],
['content-length', $length],
['content-range', "bytes $start-$end/$size"],
['accept-ranges', 'bytes'],
['etag', $etag],
],
});
# Use file response with offset/length for efficient streaming
if ($method eq 'HEAD') {
await $send->({ type => 'http.response.body', body => '', more => 0 });
}
else {
await $send->({
type => 'http.response.body',
file => $file_path,
offset => $start,
length => $length,
});
lib/PAGI/App/File.pm view on Meta::CPAN
type => 'http.response.start',
status => 200,
headers => [
['content-type', $content_type],
['content-length', $size],
['accept-ranges', 'bytes'],
['etag', $etag],
],
});
# Use file response for efficient streaming (sendfile or worker pool)
if ($method eq 'HEAD') {
await $send->({ type => 'http.response.body', body => '', more => 0 });
}
else {
await $send->({
type => 'http.response.body',
file => $file_path,
});
}
};
lib/PAGI/App/WrapPSGI.pm view on Meta::CPAN
last unless $event->{more};
}
# Create psgi.input
open my $input, '<', \$body or die $!;
$env->{'psgi.input'} = $input;
# Call PSGI app
my $response = $psgi_app->($env);
# Handle response - could be arrayref or coderef (streaming)
if (ref $response eq 'CODE') {
# Delayed/streaming response
await $self->_handle_streaming_response($send, $response);
} else {
await $self->_send_response($send, $response);
}
};
}
sub _build_env {
my ($self, $scope) = @_;
my %env = (
lib/PAGI/App/WrapPSGI.pm view on Meta::CPAN
SCRIPT_NAME => $scope->{root_path},
PATH_INFO => $scope->{path},
QUERY_STRING => $scope->{query_string},
SERVER_PROTOCOL => 'HTTP/' . $scope->{http_version},
'psgi.version' => [1, 1],
'psgi.url_scheme' => $scope->{scheme},
'psgi.errors' => \*STDERR,
'psgi.multithread' => 0,
'psgi.multiprocess' => 0,
'psgi.run_once' => 0,
'psgi.streaming' => 1,
'psgi.nonblocking' => 1,
);
# Add headers
for my $header (@{$scope->{headers}}) {
my ($name, $value) = @$header;
my $key = uc($name);
$key =~ s/-/_/g;
if ($key eq 'CONTENT_TYPE') {
$env{CONTENT_TYPE} = $value;
lib/PAGI/App/WrapPSGI.pm view on Meta::CPAN
local $/;
my $content = <$body>;
await $send->({
type => 'http.response.body',
body => $content // '',
more => 0,
});
}
}
# Handle PSGI delayed/streaming response pattern
async sub _handle_streaming_response {
my ($self, $send, $responder_callback) = @_;
my @body_chunks;
my $response_started = 0;
my $writer;
# Create a writer object for streaming
my $create_writer = sub {
my ($send_ref, $status, $headers) = @_;
return {
write => sub {
my ($chunk) = @_;
push @body_chunks, $chunk;
},
close => sub {
# Mark as closed - will be handled after responder returns
},
lib/PAGI/App/WrapPSGI.pm view on Meta::CPAN
});
} else {
await $send->({
type => 'http.response.body',
body => $body // '',
more => 0,
});
}
}
# Simple writer class for streaming responses
package PAGI::App::WrapPSGI::Writer;
sub write {
my ($self, $chunk) = @_;
push @{$self->{chunks}}, $chunk;
}
sub close {
my ($self) = @_;
# Nothing special needed - chunks are already collected
lib/PAGI/Cookbook.pod view on Meta::CPAN
$router->sse('/events/:channel' => async sub {
my ($scope, $receive, $send) = @_;
my $sse = PAGI::SSE->new($scope, $receive, $send);
# Access channel parameter
my $channel = $scope->{path_params}{channel};
await $sse->start;
await $sse->send_event("Subscribed to: $channel");
# ... streaming logic ...
});
$router->to_app;
=head2 Route-Level Middleware
Apply middleware to specific routes by passing an arrayref:
use strict;
use warnings;
lib/PAGI/Cookbook.pod view on Meta::CPAN
# SSE with path parameters
async sub handle_channel {
my ($self, $sse) = @_;
# Access path parameter
my $channel = $sse->scope->{'pagi.params'}{channel};
await $sse->start;
await $sse->send_event("Channel: $channel");
# ... streaming logic ...
}
1;
=head3 Lifecycle Hooks
Override C<on_startup> and C<on_shutdown> for application lifecycle management:
package MyApp;
use parent 'PAGI::Endpoint::Router';
lib/PAGI/Middleware/ContentLength.pm view on Meta::CPAN
return async sub {
my ($scope, $receive, $send) = @_;
# Skip for non-HTTP requests
if ($scope->{type} ne 'http') {
await $app->($scope, $receive, $send);
return;
}
my @buffered_events;
my $has_content_length = 0;
my $is_streaming = 0;
my $status;
my @headers;
# Create intercepting send to buffer response
my $wrapped_send = async sub {
my ($event) = @_;
my $type = $event->{type};
if ($type eq 'http.response.start') {
$status = $event->{status};
@headers = @{$event->{headers} // []};
# Check if Content-Length already present
for my $h (@headers) {
if (lc($h->[0]) eq 'content-length') {
$has_content_length = 1;
last;
}
# If Transfer-Encoding is chunked, don't add Content-Length
if (lc($h->[0]) eq 'transfer-encoding' && lc($h->[1]) eq 'chunked') {
$is_streaming = 1;
last;
}
}
# If already has Content-Length or is streaming, pass through
if ($has_content_length || $is_streaming || $self->{auto_chunked}) {
await $send->($event);
return;
}
# Buffer the start event to add Content-Length later
push @buffered_events, $event;
}
elsif ($type eq 'http.response.body') {
# If we're passing through (has Content-Length or streaming)
if ($has_content_length || $is_streaming || $self->{auto_chunked}) {
await $send->($event);
return;
}
# Check if this is a streaming response (more => 1)
if ($event->{more}) {
$is_streaming = 1;
# Flush buffered events and switch to pass-through
for my $buffered (@buffered_events) {
await $send->($buffered);
}
@buffered_events = ();
await $send->($event);
return;
}
lib/PAGI/Middleware/ContentLength.pm view on Meta::CPAN
else {
# Pass through other events (trailers, etc.)
await $send->($event);
}
};
# Run the inner app
await $app->($scope, $receive, $wrapped_send);
# If we have buffered events, calculate Content-Length and send
if (@buffered_events && !$has_content_length && !$is_streaming) {
# Calculate total body length
my $body_length = 0;
for my $event (@buffered_events) {
if ($event->{type} eq 'http.response.body') {
$body_length += length($event->{body} // '');
}
}
# Send start with Content-Length
for my $event (@buffered_events) {
lib/PAGI/Middleware/ContentLength.pm view on Meta::CPAN
}
1;
__END__
=head1 NOTES
=over 4
=item * For streaming responses (multiple body events with more => 1),
this middleware switches to pass-through mode to avoid buffering.
=item * Responses that already have Content-Length are passed through unchanged.
=item * Responses with Transfer-Encoding: chunked are passed through unchanged.
=item * SSE and WebSocket responses should not use this middleware.
=back
lib/PAGI/Middleware/ETag.pm view on Meta::CPAN
use PAGI::Middleware::Builder;
my $app = builder {
enable 'ETag';
$my_app;
};
=head1 DESCRIPTION
PAGI::Middleware::ETag generates ETag headers for responses based on
the response body content. Works best with buffered (non-streaming) responses.
=head1 CONFIGURATION
=over 4
=item * weak (default: 0)
If true, generate weak ETags (W/"...").
=back
lib/PAGI/Middleware/ETag.pm view on Meta::CPAN
return async sub {
my ($scope, $receive, $send) = @_;
if ($scope->{type} ne 'http') {
await $app->($scope, $receive, $send);
return;
}
my @body_parts;
my $original_headers;
my $status;
my $is_streaming = 0;
my $wrapped_send = async sub {
my ($event) = @_;
if ($event->{type} eq 'http.response.start') {
$status = $event->{status};
$original_headers = $event->{headers};
# Check if already has ETag
for my $h (@{$original_headers // []}) {
if (lc($h->[0]) eq 'etag') {
# Already has ETag, pass through
await $send->($event);
$is_streaming = 1; # Flag to pass through body
return;
}
}
}
elsif ($event->{type} eq 'http.response.body') {
if ($is_streaming) {
await $send->($event);
return;
}
push @body_parts, $event->{body} // '';
# If streaming, can't generate ETag
if ($event->{more}) {
$is_streaming = 1;
await $send->({
type => 'http.response.start',
status => $status,
headers => $original_headers,
});
for my $part (@body_parts) {
await $send->({
type => 'http.response.body',
body => $part,
more => 1,
lib/PAGI/Middleware/ETag.pm view on Meta::CPAN
@body_parts = ();
}
}
else {
await $send->($event);
}
};
await $app->($scope, $receive, $wrapped_send);
return if $is_streaming;
# Generate ETag from body
my $body = join('', @body_parts);
my $etag = $self->_generate_etag($body);
# Add ETag to headers
my @new_headers = @{$original_headers // []};
push @new_headers, ['ETag', $etag];
await $send->({
lib/PAGI/Middleware/GZip.pm view on Meta::CPAN
for my $h (@{$event->{headers} // []}) {
if (lc($h->[0]) eq 'content-type') {
$content_type = $h->[1];
last;
}
}
$response_started = 1;
# Don't send yet - buffer to compress
}
elsif ($event->{type} eq 'http.response.body') {
# If we're already in streaming mode, pass through all chunks
if ($headers_sent) {
await $send->($event);
return;
}
push @body_parts, $event->{body} // '';
# If streaming (more => 1), switch to pass-through mode
if ($event->{more}) {
if (!$headers_sent) {
await $send->({
type => 'http.response.start',
status => 200,
headers => $original_headers,
});
$headers_sent = 1;
}
await $send->($event);
}
}
else {
await $send->($event);
}
};
await $app->($scope, $receive, $wrapped_send);
# If headers already sent (streaming), we're done
return if $headers_sent;
# Combine body
my $body = join('', @body_parts);
# Decide whether to compress
my $should_compress = $self->_should_compress($content_type, length($body));
if ($should_compress && length($body) > 0) {
my $compressed;
lib/PAGI/Middleware/Head.pm view on Meta::CPAN
elsif ($type eq 'http.response.body') {
# Suppress body content but preserve the event structure
# Send an empty body with more => 0 to complete the response
if (!$event->{more}) {
await $send->({
type => 'http.response.body',
body => '',
more => 0,
});
}
# Otherwise, skip the event entirely (streaming chunks)
}
elsif ($type eq 'http.response.trailers') {
# Skip trailers for HEAD requests
}
else {
# Pass through other events
await $send->($event);
}
};
lib/PAGI/Middleware/Static.pm view on Meta::CPAN
# For HEAD requests, don't send body
if ($scope->{method} eq 'HEAD') {
await $send->({
type => 'http.response.body',
body => '',
more => 0,
});
return;
}
# Use file response for efficient streaming (sendfile or worker pool)
# This also enables XSendfile middleware to intercept the response
if ($is_range) {
await $send->({
type => 'http.response.body',
file => $file_path,
offset => $start,
length => $body_size,
});
}
else {
lib/PAGI/Middleware/Static.pm view on Meta::CPAN
=head1 CACHING
The middleware generates ETags based on file path, size, and modification
time. Clients can use If-None-Match to receive 304 Not Modified responses
when the file hasn't changed.
=head1 RANGE REQUESTS
The middleware supports HTTP Range requests for partial content, useful
for resumable downloads and media streaming. Only single byte ranges
are supported (not multi-range requests).
=head1 SEE ALSO
L<PAGI::Middleware> - Base class for middleware
=cut
lib/PAGI/Request.pm view on Meta::CPAN
sub scope { shift->{scope} }
# Application state (injected by PAGI::Lifespan, read-only)
sub state {
my $self = shift;
return $self->{scope}{state} // {};
}
# Body streaming - mutually exclusive with buffered body methods
sub body_stream {
my ($self, %opts) = @_;
croak "Body already consumed; streaming not available" if $self->{scope}{'pagi.request.body.read'};
croak "Body streaming already started" if $self->{scope}{'pagi.request.body.stream.created'};
$self->{scope}{'pagi.request.body.stream.created'} = 1;
my $max_bytes = $opts{max_bytes};
my $limit_name = defined $max_bytes ? 'max_bytes' : undef;
if (!defined $max_bytes) {
my $cl = $self->content_length;
if (defined $cl) {
$max_bytes = $cl;
$limit_name = 'content-length';
lib/PAGI/Request.pm view on Meta::CPAN
limit_name => $limit_name,
decode => $opts{decode},
strict => $opts{strict},
);
}
# Read raw body bytes (async, cached in scope)
async sub body {
my $self = shift;
croak "Body streaming already started; buffered helpers unavailable"
if $self->{scope}{'pagi.request.body.stream.created'};
# Return cached body if already read
return $self->{scope}{'pagi.request.body'} if $self->{scope}{'pagi.request.body.read'};
my $receive = $self->{receive};
die "No receive callback provided" unless $receive;
my $body = '';
while (1) {
lib/PAGI/Request.pm view on Meta::CPAN
=head2 body_stream
my $stream = $req->body_stream;
my $stream = $req->body_stream(
max_bytes => 10 * 1024 * 1024, # 10MB limit
decode => 'UTF-8', # Decode to UTF-8
strict => 1, # Strict UTF-8 decoding
);
Returns a L<PAGI::Request::BodyStream> for streaming body consumption. This is
useful for processing large request bodies incrementally without loading them
entirely into memory.
B<Options:>
=over 4
=item * C<max_bytes> - Maximum body size. Defaults to Content-Length header if present.
=item * C<decode> - Encoding to decode chunks to (typically 'UTF-8').
=item * C<strict> - If true, throw on invalid UTF-8. Default: false (use replacement chars).
=back
B<Important:> Body streaming is mutually exclusive with buffered body methods
(C<body>, C<text>, C<json>, C<form_params>). Once you start streaming, you cannot use
those methods, and vice versa.
Example:
# Stream large upload to file
my $stream = $req->body_stream(max_bytes => 100 * 1024 * 1024);
await $stream->stream_to_file('/uploads/data.bin');
See L<PAGI::Request::BodyStream> for full documentation.
lib/PAGI/Request/BodyStream.pm view on Meta::CPAN
=head1 NAME
PAGI::Request::BodyStream - Streaming body consumption for PAGI requests
=head1 SYNOPSIS
use PAGI::Request::BodyStream;
use Future::AsyncAwait;
# Basic streaming
my $stream = PAGI::Request::BodyStream->new(receive => $receive);
while (!$stream->is_done) {
my $chunk = await $stream->next_chunk;
last unless defined $chunk;
print "Got chunk: ", length($chunk), " bytes\n";
}
# With size limit
my $stream = PAGI::Request::BodyStream->new(
lib/PAGI/Request/BodyStream.pm view on Meta::CPAN
await $stream->stream_to_file('/tmp/upload.dat');
# Stream to custom sink
await $stream->stream_to(async sub ($chunk) {
# Process chunk
print STDERR "Processing: ", length($chunk), " bytes\n";
});
=head1 DESCRIPTION
PAGI::Request::BodyStream provides streaming body consumption for large request
bodies. This is useful when you need to process request data incrementally
without loading the entire body into memory.
The stream is pull-based: you call C<next_chunk()> to receive the next chunk
of data. The stream handles:
=over 4
=item * Size limits with customizable error messages
=item * UTF-8 decoding with proper handling of incomplete sequences at chunk boundaries
=item * Client disconnect detection
=item * Convenient file streaming with C<stream_to_file()>
=back
B<Important>: Streaming is mutually exclusive with buffered body methods like
C<body()>, C<json()>, C<form()> in L<PAGI::Request>. Once you start streaming,
you cannot use those methods.
=head1 CONSTRUCTOR
=head2 new
my $stream = PAGI::Request::BodyStream->new(
receive => $receive, # Required: PAGI receive callback
max_bytes => 10485760, # Optional: max body size
decode => 'UTF-8', # Optional: decode to UTF-8
lib/PAGI/Request/BodyStream.pm view on Meta::CPAN
sub is_done {
my ($self) = @_;
return $self->{_done};
}
=head2 error
my $error = $stream->error;
Returns any error that occurred during streaming, or undef.
=cut
sub error {
my ($self) = @_;
return $self->{_error};
}
=head2 stream_to_file
lib/PAGI/Response.pm view on Meta::CPAN
inline => 1,
);
# Partial file (for range requests)
await $res->send_file('/path/to/video.mp4',
offset => 1024, # Start from byte 1024
length => 65536, # Send 64KB
);
Send a file as the response. This method uses the PAGI protocol's C<file>
key for efficient server-side streaming. The file is B<not> read into memory.
For production, use L<PAGI::Middleware::XSendfile> to delegate file serving
to your reverse proxy.
B<Options:>
=over 4
=item * C<filename> - Set Content-Disposition attachment filename
=item * C<inline> - Use Content-Disposition: inline instead of attachment
lib/PAGI/Response.pm view on Meta::CPAN
$self->_mark_sent;
# Send response start
await $self->{send}->({
type => 'http.response.start',
status => $self->status, # uses lazy default of 200
headers => $self->{_headers},
});
# Use PAGI file protocol for efficient server-side streaming
my $body_event = {
type => 'http.response.body',
file => $path,
};
# Add offset/length only if not reading from start or not full file
$body_event->{offset} = $offset if $offset > 0;
$body_event->{length} = $length if $length < $max_length;
await $self->{send}->($body_event);
}
# Writer class for streaming responses
package PAGI::Response::Writer {
use strict;
use warnings;
use Future::AsyncAwait;
use Carp qw(croak);
use Scalar::Util qw(blessed);
sub new {
my ($class, $send, %opts) = @_;
my $self = bless {
lib/PAGI/Server.pm view on Meta::CPAN
=head2 effective_max_connections
my $max = $server->effective_max_connections;
Returns the effective maximum connections limit. If C<max_connections>
was set explicitly, returns that value. Otherwise returns the default
of 1000.
=head1 FILE RESPONSE STREAMING
PAGI::Server supports efficient file streaming via the C<file> and C<fh>
keys in C<http.response.body> events:
# Stream entire file
await $send->({
type => 'http.response.body',
file => '/path/to/file.mp4',
more => 0,
});
# Stream partial file (for Range requests)
lib/PAGI/Server/AsyncFile.pm view on Meta::CPAN
use PAGI::Server::AsyncFile;
use IO::Async::Loop;
# Create or obtain an IO::Async::Loop
my $loop = IO::Async::Loop->new;
# Read entire file
my $content = await PAGI::Server::AsyncFile->read_file($loop, '/path/to/file');
# Read file in chunks (streaming)
await PAGI::Server::AsyncFile->read_file_chunked($loop, '/path/to/file', async sub {
my ($chunk) = @_;
# Process each chunk
}, chunk_size => 65536);
# Write file
await PAGI::Server::AsyncFile->write_file($loop, '/path/to/file', $content);
# Append to file
await PAGI::Server::AsyncFile->append_file($loop, '/path/to/file', $log_line);
=head1 DESCRIPTION
This module provides non-blocking file I/O operations using L<IO::Async::Function>
worker processes. It is used internally by L<PAGI::Server> for efficient file
streaming.
B<Note:> This is a PAGI::Server internal module. PAGI applications are
loop-agnostic and should use synchronous file I/O (which is simple and fast
for typical file sizes) or bring their own async file library if needed.
It uses L<IO::Async::Function> to offload blocking file operations to worker
processes, preventing the main event loop from being blocked during disk I/O.
Regular file I/O in POSIX is always blocking at the kernel level - even
C<select()>/C<poll()>/C<epoll()> report regular files as always "ready".
This module works around this limitation by running file operations in
lib/PAGI/Server/AsyncFile.pm view on Meta::CPAN
# Process chunk
}, chunk_size => 65536);
# For Range requests (partial file):
await PAGI::Server::AsyncFile->read_file_chunked($loop, $path, $callback,
offset => 1000, # Start at byte 1000
length => 5000, # Read 5000 bytes total
);
Read a file in chunks, calling a callback for each chunk. This is suitable
for streaming large files without loading the entire file into memory.
Parameters:
=over 4
=item * C<$loop> - IO::Async::Loop instance
=item * C<$path> - Path to the file to read
=item * C<$callback> - Async callback called with each chunk. Receives the chunk data.
lib/PAGI/Server/Connection.pm view on Meta::CPAN
use Digest::SHA qw(sha1_base64);
use Encode;
use URI::Escape qw(uri_unescape);
use IO::Async::Timer::Countdown;
use IO::Async::Timer::Periodic;
use Time::HiRes qw(gettimeofday tv_interval);
use PAGI::Server::AsyncFile;
use PAGI::Server::ConnectionState;
use constant FILE_CHUNK_SIZE => 65536; # 64KB chunks for file streaming
# Per-second cache for CLF timestamp in access log (same pattern as HTTP1::format_date)
my $_cached_log_timestamp;
my $_cached_log_time = 0;
# =============================================================================
# Unrecognized Event Type Handler (PAGI spec compliance)
# =============================================================================
# Per main.mkdn: "Servers must raise exceptions if... The type field is unrecognized"
lib/PAGI/Server/Connection.pm view on Meta::CPAN
my ($self, $stream_id, $stream_state) = @_;
weaken(my $weak_self = $self);
my $status;
my @response_headers;
# Streaming state for deferred data provider pattern
my @data_queue;
my $eof_pending = 0;
my $streaming_started = 0;
# Data callback for nghttp2's streaming response.
# Returns ($data, $eof) when data is available, or undef to defer.
my $data_callback = sub {
my ($cb_stream_id, $max_len) = @_;
if (@data_queue) {
my $chunk = shift @data_queue;
# Respect max_len â XS truncates without preserving remainder
if (length($chunk) > $max_len) {
unshift @data_queue, substr($chunk, $max_len);
$chunk = substr($chunk, 0, $max_len);
lib/PAGI/Server/Connection.pm view on Meta::CPAN
} @{$event->{headers} // []};
}
elsif ($type eq 'http.response.body') {
my $ss = $weak_self->{h2_streams}{$stream_id} or return;
return unless $ss->{response_started};
my $body = $event->{body} // '';
my $more = $event->{more} // 0;
if ($more) {
if (!$streaming_started) {
# First streaming chunk â submit with data callback
$streaming_started = 1;
push @data_queue, $body if length($body);
$weak_self->{h2_session}->submit_response_streaming(
$stream_id,
status => $status,
headers => \@response_headers,
data_callback => $data_callback,
);
$weak_self->_h2_write_pending;
} else {
# Subsequent chunk â backpressure check then push and resume
if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
await $weak_self->_wait_for_drain;
return unless $weak_self;
return if $weak_self->{closed};
return unless $weak_self->{h2_streams}{$stream_id};
}
push @data_queue, $body if length($body);
$weak_self->{h2_session}->resume_stream($stream_id);
$weak_self->_h2_write_pending;
}
} else {
if ($streaming_started) {
# Final chunk on an already-streaming response
if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
await $weak_self->_wait_for_drain;
return unless $weak_self;
return if $weak_self->{closed};
return unless $weak_self->{h2_streams}{$stream_id};
}
$eof_pending = 1;
push @data_queue, $body if length($body);
$weak_self->{h2_session}->resume_stream($stream_id);
$weak_self->_h2_write_pending;
} else {
# Non-streaming: single response (unchanged one-shot path)
$weak_self->{h2_session}->submit_response($stream_id,
status => $status,
headers => \@response_headers,
body => $body,
);
$weak_self->_h2_write_pending;
}
}
}
else {
lib/PAGI/Server/Connection.pm view on Meta::CPAN
[_validate_header_name($_->[0]), _validate_header_value($_->[1])]
} @$extra;
}
$ss->{ws_accepted} = 1;
$ss->{response_started} = 1;
$ss->{ws_frame} = Protocol::WebSocket::Frame->new(
max_payload_size => $weak_self->{max_ws_frame_size},
);
# Submit 200 response with streaming body that defers
$weak_self->{h2_session}->submit_response($stream_id,
status => 200,
headers => \@headers,
body => sub { return undef }, # defer until submit_data
);
$weak_self->_h2_write_pending;
# Process any data that arrived before accept
if (length($ss->{body}) > 0) {
my $buffered = $ss->{body};
lib/PAGI/Server/Connection.pm view on Meta::CPAN
};
}
sub _h2_create_sse_send {
my ($self, $stream_id, $stream_state) = @_;
weaken(my $weak_self = $self);
# Streaming state for data provider pattern
my @data_queue;
my $streaming_started = 0;
my $data_callback = sub {
my ($cb_stream_id, $max_len) = @_;
if (@data_queue) {
my $chunk = shift @data_queue;
if (length($chunk) > $max_len) {
unshift @data_queue, substr($chunk, $max_len);
$chunk = substr($chunk, 0, $max_len);
}
lib/PAGI/Server/Connection.pm view on Meta::CPAN
my @final_headers;
for my $h (@$headers) {
push @final_headers, [_validate_header_name($h->[0]), _validate_header_value($h->[1])];
}
if (!$has_content_type) {
push @final_headers, ['content-type', 'text/event-stream'];
}
push @final_headers, ['cache-control', 'no-cache'];
$streaming_started = 1;
$weak_self->{h2_session}->submit_response_streaming(
$stream_id,
status => $status,
headers => \@final_headers,
data_callback => $data_callback,
);
$weak_self->_h2_write_pending;
# Set protocol-specific keepalive writer (HTTP/2 DATA frames)
$weak_self->{sse_keepalive_writer} = sub {
my ($text) = @_;
lib/PAGI/Server/Connection.pm view on Meta::CPAN
SSE events (C<sse.start>, C<sse.send>, C<sse.comment>, C<sse.keepalive>)
work transparently over both HTTP/1.1 and HTTP/2. Applications do not need
to change their SSE handling code based on protocol version.
=head2 How It Works
When a request arrives with C<Accept: text/event-stream>, the connection
detects it as SSE regardless of HTTP version. Over HTTP/1.1, SSE data is
sent using chunked Transfer-Encoding. Over HTTP/2, SSE data is sent as
DATA frames via the C<submit_response_streaming>/C<data_callback> mechanism.
This difference is transparent to the application.
The C<http_version> field in the scope hash will be C<'2'> for HTTP/2
connections, allowing applications to distinguish if needed.
=head2 SSE Idle Timeout over HTTP/2
The C<sse_idle_timeout> setting applies at the B<connection level>,
not per-stream. Over HTTP/1.1, this is a non-issue since each SSE
stream occupies its own TCP connection. Over HTTP/2, where multiple
lib/PAGI/Server/Protocol/HTTP2.pm view on Meta::CPAN
=head2 submit_response
$session->submit_response($stream_id,
status => 200,
headers => [['content-type', 'text/html']],
body => $body,
);
Submit a response on a stream. C<body> can be a string (sent as single
response) or a coderef for streaming.
=cut
sub submit_response {
my ($self, $stream_id, %args) = @_;
return $self->{nghttp2}->submit_response($stream_id, %args);
}
=head2 submit_response_streaming
$session->submit_response_streaming($stream_id,
status => 200,
headers => [['content-type', 'text/event-stream']],
data_callback => sub {
my ($stream_id, $max_len) = @_;
return ($chunk, $is_eof);
},
);
Submit a streaming response with a data provider callback.
=cut
sub submit_response_streaming {
my ($self, $stream_id, %args) = @_;
return $self->{nghttp2}->submit_response($stream_id,
status => $args{status},
headers => $args{headers},
data_callback => $args{data_callback},
callback_data => $args{callback_data},
);
}
=head2 resume_stream
lib/PAGI/Server/Protocol/HTTP2.pm view on Meta::CPAN
Key differences that affect PAGI integration:
=over 4
=item * Multiplexing - Multiple concurrent requests on one TCP connection
=item * Binary Framing - nghttp2 handles all framing; PAGI feeds/extracts bytes
=item * Header Compression - HPACK is built into nghttp2
=item * Flow Control - Per-stream and connection-level, via streaming callbacks
=back
=head1 SEE ALSO
L<Net::HTTP2::nghttp2>, L<PAGI::Server::Protocol::HTTP1>
=cut