PAGI
view release on metacpan or search on metacpan
lib/PAGI/Server/Connection.pm view on Meta::CPAN
package PAGI::Server::Connection;
use strict;
use warnings;
use Future;
use Future::AsyncAwait;
use Scalar::Util qw(weaken refaddr);
use Protocol::WebSocket::Handshake::Server;
use Protocol::WebSocket::Frame;
use Digest::SHA qw(sha1_base64);
use Encode;
use URI::Escape qw(uri_unescape);
use IO::Async::Timer::Countdown;
use IO::Async::Timer::Periodic;
use Time::HiRes qw(gettimeofday tv_interval);
use PAGI::Server::AsyncFile;
use PAGI::Server::ConnectionState;
use constant FILE_CHUNK_SIZE => 65536; # 64KB chunks for file streaming
# Per-second cache for CLF timestamp in access log (same pattern as HTTP1::format_date)
my $_cached_log_timestamp;
my $_cached_log_time = 0;
# =============================================================================
# Unrecognized Event Type Handler (PAGI spec compliance)
# =============================================================================
# Per main.mkdn: "Servers must raise exceptions if... The type field is unrecognized"
sub _unrecognized_event_type {
my ($type, $protocol) = @_;
die "Unrecognized event type '$type' for $protocol protocol\n";
}
# =============================================================================
# Header Validation (CRLF Injection Prevention)
# =============================================================================
# RFC 7230 Section 3.2.6: Field values MUST NOT contain CR or LF
sub _validate_header_value {
my ($value) = @_;
if ($value =~ /[\r\n\0]/) {
die "Invalid header value: contains CR, LF, or null byte\n";
}
return $value;
}
sub _validate_header_name {
my ($name) = @_;
if ($name =~ /[\r\n\0]/) {
die "Invalid header name: contains CR, LF, or null byte\n";
}
if ($name =~ /[[:cntrl:]]/) {
die "Invalid header name: contains control characters\n";
}
return $name;
}
# RFC 6455 Section 11.3.4: Subprotocol must be a token (no whitespace, separators)
sub _validate_subprotocol {
my ($value) = @_;
if ($value =~ /[\r\n\0\s]/) {
die "Invalid subprotocol: contains CR, LF, null, or whitespace\n";
}
# Token characters only (roughly)
if ($value !~ /^[\w\-\.]+$/) {
die "Invalid subprotocol: must be alphanumeric, dash, underscore, or dot\n";
}
return $value;
}
=head1 NAME
PAGI::Server::Connection - Per-connection state machine
=head1 SYNOPSIS
lib/PAGI/Server/Connection.pm view on Meta::CPAN
return { type => 'http.disconnect' } unless $weak_self;
my $ss = $weak_self->{h2_streams}{$stream_id};
return { type => 'http.disconnect' } unless $ss;
# Check queue first
if (@{$ss->{receive_queue}}) {
return shift @{$ss->{receive_queue}};
}
# If body is already complete, return final body event
if ($ss->{body_complete}) {
my $body = $ss->{body};
$ss->{body} = '';
return {
type => 'http.request',
body => $body,
more => 0,
};
}
# Wait for body data
if (!$ss->{body_pending}) {
$ss->{body_pending} = Future->new;
}
await $ss->{body_pending};
# Re-fetch stream state (may have changed)
$ss = $weak_self->{h2_streams}{$stream_id};
return { type => 'http.disconnect' } unless $ss;
# Check queue after waking
if (@{$ss->{receive_queue}}) {
return shift @{$ss->{receive_queue}};
}
my $body = $ss->{body};
$ss->{body} = '';
return {
type => 'http.request',
body => $body,
more => $ss->{body_complete} ? 0 : 1,
};
})->();
return $future;
};
}
sub _h2_create_send {
my ($self, $stream_id, $stream_state) = @_;
weaken(my $weak_self = $self);
my $status;
my @response_headers;
# Streaming state for deferred data provider pattern
my @data_queue;
my $eof_pending = 0;
my $streaming_started = 0;
# Data callback for nghttp2's streaming response.
# Returns ($data, $eof) when data is available, or undef to defer.
my $data_callback = sub {
my ($cb_stream_id, $max_len) = @_;
if (@data_queue) {
my $chunk = shift @data_queue;
# Respect max_len â XS truncates without preserving remainder
if (length($chunk) > $max_len) {
unshift @data_queue, substr($chunk, $max_len);
$chunk = substr($chunk, 0, $max_len);
}
my $eof = (!@data_queue && $eof_pending) ? 1 : 0;
return ($chunk, $eof);
}
# Queue empty but EOF pending â signal end of stream
if ($eof_pending) {
return ('', 1);
}
# Queue empty, more data expected â defer (NGHTTP2_ERR_DEFERRED in C layer)
return undef;
};
return async sub {
my ($event) = @_;
return unless $weak_self;
return if $weak_self->{closed};
my $type = $event->{type} // '';
if ($type eq 'http.response.start') {
my $ss = $weak_self->{h2_streams}{$stream_id} or return;
return if $ss->{response_started};
$ss->{response_started} = 1;
$status = $event->{status} // 200;
@response_headers = map {
[_validate_header_name($_->[0]), _validate_header_value($_->[1])]
} @{$event->{headers} // []};
}
elsif ($type eq 'http.response.body') {
my $ss = $weak_self->{h2_streams}{$stream_id} or return;
return unless $ss->{response_started};
my $body = $event->{body} // '';
my $more = $event->{more} // 0;
if ($more) {
if (!$streaming_started) {
# First streaming chunk â submit with data callback
$streaming_started = 1;
push @data_queue, $body if length($body);
$weak_self->{h2_session}->submit_response_streaming(
$stream_id,
status => $status,
headers => \@response_headers,
data_callback => $data_callback,
);
$weak_self->_h2_write_pending;
} else {
# Subsequent chunk â backpressure check then push and resume
if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
await $weak_self->_wait_for_drain;
return unless $weak_self;
return if $weak_self->{closed};
return unless $weak_self->{h2_streams}{$stream_id};
}
push @data_queue, $body if length($body);
$weak_self->{h2_session}->resume_stream($stream_id);
$weak_self->_h2_write_pending;
}
} else {
if ($streaming_started) {
# Final chunk on an already-streaming response
if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
await $weak_self->_wait_for_drain;
return unless $weak_self;
return if $weak_self->{closed};
return unless $weak_self->{h2_streams}{$stream_id};
}
$eof_pending = 1;
push @data_queue, $body if length($body);
$weak_self->{h2_session}->resume_stream($stream_id);
$weak_self->_h2_write_pending;
} else {
# Non-streaming: single response (unchanged one-shot path)
$weak_self->{h2_session}->submit_response($stream_id,
status => $status,
headers => \@response_headers,
body => $body,
);
$weak_self->_h2_write_pending;
}
}
}
else {
_unrecognized_event_type($type, 'http');
}
};
}
# =============================================================================
# HTTP/2 WebSocket over HTTP/2 (RFC 8441)
# =============================================================================
sub _h2_create_websocket_scope {
my ($self, $stream_id, $stream_state) = @_;
my $pseudo = $stream_state->{pseudo};
my $headers = $stream_state->{headers};
my $full_path = $pseudo->{':path'} // '/';
my ($path, $query_string) = split(/\?/, $full_path, 2);
$query_string //= '';
# Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
my $unescaped = uri_unescape($path);
my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
// $unescaped;
# Extract subprotocols from headers
my @subprotocols;
for my $header (@$headers) {
my ($name, $value) = @$header;
if ($name eq 'sec-websocket-protocol') {
push @subprotocols, map { s/^\s+|\s+$//gr } split /,/, $value;
}
}
my $connection_state = PAGI::Server::ConnectionState->new(
connection => $self,
);
return {
type => 'websocket',
pagi => {
version => '0.2',
spec_version => '0.2',
features => {},
},
http_version => '2',
scheme => $self->_get_ws_scheme,
path => $decoded_path,
raw_path => $path,
query_string => $query_string,
root_path => '',
lib/PAGI/Server/Connection.pm view on Meta::CPAN
# Wait for events
while (1) {
if (@{$ss->{receive_queue}}) {
return shift @{$ss->{receive_queue}};
}
return { type => 'websocket.disconnect', code => 1006, reason => '' }
if $weak_self->{closed};
if (!$ss->{body_pending}) {
$ss->{body_pending} = Future->new;
}
await $ss->{body_pending};
$ss = $weak_self->{h2_streams}{$stream_id};
return { type => 'websocket.disconnect', code => 1006, reason => '' }
unless $ss;
}
})->();
return $future;
};
}
sub _h2_create_websocket_send {
my ($self, $stream_id, $stream_state) = @_;
weaken(my $weak_self = $self);
return async sub {
my ($event) = @_;
return unless $weak_self;
return if $weak_self->{closed};
my $ss = $weak_self->{h2_streams}{$stream_id};
return unless $ss;
my $type = $event->{type} // '';
if ($type eq 'websocket.accept') {
return if $ss->{ws_accepted};
# HTTP/2 WebSocket: respond with 200 (not 101)
my @headers;
if (my $subprotocol = $event->{subprotocol}) {
$subprotocol = _validate_subprotocol($subprotocol);
push @headers, ['sec-websocket-protocol', $subprotocol];
}
if (my $extra = $event->{headers}) {
push @headers, map {
[_validate_header_name($_->[0]), _validate_header_value($_->[1])]
} @$extra;
}
$ss->{ws_accepted} = 1;
$ss->{response_started} = 1;
$ss->{ws_frame} = Protocol::WebSocket::Frame->new(
max_payload_size => $weak_self->{max_ws_frame_size},
);
# Submit 200 response with streaming body that defers
$weak_self->{h2_session}->submit_response($stream_id,
status => 200,
headers => \@headers,
body => sub { return undef }, # defer until submit_data
);
$weak_self->_h2_write_pending;
# Process any data that arrived before accept
if (length($ss->{body}) > 0) {
my $buffered = $ss->{body};
$ss->{body} = '';
$weak_self->_h2_process_ws_frames($stream_id, $ss, $buffered);
}
}
elsif ($type eq 'websocket.send') {
return unless $ss->{ws_accepted};
my $frame;
if (defined $event->{text}) {
$frame = Protocol::WebSocket::Frame->new(
buffer => $event->{text},
type => 'text',
);
}
elsif (defined $event->{bytes}) {
$frame = Protocol::WebSocket::Frame->new(
buffer => $event->{bytes},
type => 'binary',
);
}
else {
return;
}
my $bytes = $frame->to_bytes;
$weak_self->{h2_session}->submit_data($stream_id, $bytes, 0);
$weak_self->_h2_write_pending;
}
elsif ($type eq 'websocket.close') {
if (!$ss->{ws_accepted}) {
# Reject: send 403
$weak_self->{h2_session}->submit_response($stream_id,
status => 403,
headers => [['content-type', 'text/plain']],
body => 'Forbidden',
);
$weak_self->_h2_write_pending;
return;
}
my $code = $event->{code} // 1000;
my $reason = $event->{reason} // '';
my $frame = Protocol::WebSocket::Frame->new(
type => 'close',
buffer => pack('n', $code) . $reason,
);
# Send close frame + END_STREAM
$weak_self->{h2_session}->submit_data($stream_id, $frame->to_bytes, 1);
lib/PAGI/Server/Connection.pm view on Meta::CPAN
};
return sub {
return Future->done($sse_disconnect->()) unless $weak_self;
return Future->done($sse_disconnect->()) if $weak_self->{closed};
my $ss = $weak_self->{h2_streams}{$stream_id};
return Future->done($sse_disconnect->()) unless $ss;
my $future = (async sub {
return $sse_disconnect->() unless $weak_self;
my $ss = $weak_self->{h2_streams}{$stream_id};
return $sse_disconnect->() unless $ss;
# Check queue first
if (@{$ss->{receive_queue}}) {
return shift @{$ss->{receive_queue}};
}
# First call returns sse.request with body
if (!$ss->{sse_request_sent}) {
$ss->{sse_request_sent} = 1;
return {
type => 'sse.request',
body => $ss->{body},
more => 0,
};
}
# Wait for disconnect
while (1) {
if (@{$ss->{receive_queue}}) {
return shift @{$ss->{receive_queue}};
}
return $sse_disconnect->()
if $weak_self->{closed};
if (!$ss->{body_pending}) {
$ss->{body_pending} = Future->new;
}
await $ss->{body_pending};
$ss = $weak_self->{h2_streams}{$stream_id};
return $sse_disconnect->() unless $ss;
}
})->();
return $future;
};
}
sub _h2_create_sse_send {
my ($self, $stream_id, $stream_state) = @_;
weaken(my $weak_self = $self);
# Streaming state for data provider pattern
my @data_queue;
my $streaming_started = 0;
my $data_callback = sub {
my ($cb_stream_id, $max_len) = @_;
if (@data_queue) {
my $chunk = shift @data_queue;
if (length($chunk) > $max_len) {
unshift @data_queue, substr($chunk, $max_len);
$chunk = substr($chunk, 0, $max_len);
}
return ($chunk, 0); # SSE streams never EOF via data_callback
}
# Queue empty â defer
return undef;
};
return async sub {
my ($event) = @_;
return unless $weak_self;
return if $weak_self->{closed};
my $ss = $weak_self->{h2_streams}{$stream_id};
return unless $ss;
# Reset SSE idle timer on send activity
$weak_self->_reset_sse_idle_timer;
my $type = $event->{type} // '';
# Dev-mode event validation (PAGI spec compliance)
if ($weak_self->{validate_events}) {
require PAGI::Server::EventValidator;
PAGI::Server::EventValidator::validate_sse_send($event);
}
if ($type eq 'sse.start') {
return if $ss->{response_started};
$ss->{response_started} = 1;
my $status = $event->{status} // 200;
my $headers = $event->{headers} // [];
# Ensure Content-Type is text/event-stream
my $has_content_type = 0;
for my $h (@$headers) {
if (lc($h->[0]) eq 'content-type') {
$has_content_type = 1;
last;
}
}
my @final_headers;
for my $h (@$headers) {
push @final_headers, [_validate_header_name($h->[0]), _validate_header_value($h->[1])];
}
if (!$has_content_type) {
push @final_headers, ['content-type', 'text/event-stream'];
}
push @final_headers, ['cache-control', 'no-cache'];
$streaming_started = 1;
$weak_self->{h2_session}->submit_response_streaming(
$stream_id,
status => $status,
headers => \@final_headers,
data_callback => $data_callback,
);
$weak_self->_h2_write_pending;
# Set protocol-specific keepalive writer (HTTP/2 DATA frames)
$weak_self->{sse_keepalive_writer} = sub {
my ($text) = @_;
return unless $weak_self;
return if $weak_self->{closed};
return unless $weak_self->{h2_streams}{$stream_id};
push @data_queue, $text;
$weak_self->{h2_session}->resume_stream($stream_id);
$weak_self->_h2_write_pending;
};
# Start SSE idle timer if configured
$weak_self->_start_sse_idle_timer;
}
elsif ($type eq 'sse.send') {
return unless $ss->{response_started};
# Backpressure check
if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
await $weak_self->_wait_for_drain;
return Future->done unless $weak_self;
return Future->done if $weak_self->{closed};
return unless $weak_self->{h2_streams}{$stream_id};
}
my $sse_data = _format_sse_event($event);
push @data_queue, $sse_data;
$weak_self->{h2_session}->resume_stream($stream_id);
$weak_self->_h2_write_pending;
}
elsif ($type eq 'sse.comment') {
return unless $ss->{response_started};
my $comment = _format_sse_comment($event);
push @data_queue, $comment;
$weak_self->{h2_session}->resume_stream($stream_id);
$weak_self->_h2_write_pending;
}
elsif ($type eq 'sse.keepalive') {
my $interval = $event->{interval} // 0;
my $comment = $event->{comment};
if ($interval > 0) {
$weak_self->_start_sse_keepalive($interval, $comment);
}
else {
$weak_self->_stop_sse_keepalive;
}
}
else {
_unrecognized_event_type($type, 'sse');
}
lib/PAGI/Server/Connection.pm view on Meta::CPAN
# Seek to offset if specified
if ($offset && $offset > 0) {
seek($fh, $offset, 0) or die "Cannot seek: $!";
}
# For filehandles, we can't easily use the worker pool (can't pass fh across fork).
# Use blocking reads in small chunks - not ideal but practical.
# TODO: Consider IO::Async::FileStream for better event loop integration.
my $remaining = $length; # undef means read to EOF
my $stream = $self->{stream};
while (1) {
my $to_read = FILE_CHUNK_SIZE;
if (defined $remaining) {
$to_read = $remaining if $remaining < $to_read;
last if $to_read <= 0;
}
my $bytes_read = read($fh, my $chunk, $to_read);
last if !defined $bytes_read; # Error
last if $bytes_read == 0; # EOF
$self->{_response_size} += $bytes_read;
if ($chunked) {
my $len = sprintf("%x", length($chunk));
$stream->write("$len\r\n$chunk\r\n");
}
else {
$stream->write($chunk);
}
if (defined $remaining) {
$remaining -= $bytes_read;
}
}
# Send final chunk if chunked encoding
if ($chunked) {
$stream->write("0\r\n\r\n");
}
}
1;
__END__
=head1 SSE OVER HTTP/2
SSE events (C<sse.start>, C<sse.send>, C<sse.comment>, C<sse.keepalive>)
work transparently over both HTTP/1.1 and HTTP/2. Applications do not need
to change their SSE handling code based on protocol version.
=head2 How It Works
When a request arrives with C<Accept: text/event-stream>, the connection
detects it as SSE regardless of HTTP version. Over HTTP/1.1, SSE data is
sent using chunked Transfer-Encoding. Over HTTP/2, SSE data is sent as
DATA frames via the C<submit_response_streaming>/C<data_callback> mechanism.
This difference is transparent to the application.
The C<http_version> field in the scope hash will be C<'2'> for HTTP/2
connections, allowing applications to distinguish if needed.
=head2 SSE Idle Timeout over HTTP/2
The C<sse_idle_timeout> setting applies at the B<connection level>,
not per-stream. Over HTTP/1.1, this is a non-issue since each SSE
stream occupies its own TCP connection. Over HTTP/2, where multiple
streams share a single connection, an idle SSE stream timeout will
close the B<entire connection>, terminating all active streams.
=head3 Trade-offs
B<Pros:>
=over 4
=item * Simple, consistent behavior across protocols
=item * Matches the approach used by Go (net/http2), Rust (hyper/Axum),
Java (Netty/Reactor Netty/Vert.x), Python (Hypercorn), and gRPC
=item * No additional complexity in stream lifecycle management
=back
B<Cons:>
=over 4
=item * Closing the connection affects all multiplexed HTTP/2 streams,
not just the idle SSE stream
=item * Clients multiplexing SSE + REST on one HTTP/2 connection may
see unexpected disconnects on their REST requests
=back
B<Recommendation:> Use SSE keepalive comments (C<sse.keepalive> event)
with an interval shorter than C<sse_idle_timeout> to prevent the timer
from firing. This is the industry-standard approach used across all
major frameworks. For production deployments behind reverse proxies
(Envoy, Nginx, HAProxy), align your keepalive interval with the
proxy's stream idle timeout.
B<Note:> Per-stream idle timeout (using HTTP/2 RST_STREAM to close
only the idle SSE stream) is a future enhancement. Only Node.js
(http2stream.setTimeout) and Envoy (stream_idle_timeout) implement
this among mainstream servers.
=head1 SEE ALSO
L<PAGI::Server>, L<PAGI::Server::Protocol::HTTP1>
=head1 AUTHOR
John Napiorkowski E<lt>jjnapiork@cpan.orgE<gt>
( run in 1.000 second using v1.01-cache-2.11-cpan-140bd7fdf52 )