PAGI

 view release on metacpan or  search on metacpan

lib/PAGI/Server/Connection.pm  view on Meta::CPAN

package PAGI::Server::Connection;
use strict;
use warnings;
use Future;
use Future::AsyncAwait;
use Scalar::Util qw(weaken refaddr);
use Protocol::WebSocket::Handshake::Server;
use Protocol::WebSocket::Frame;
use Digest::SHA qw(sha1_base64);
use Encode;
use URI::Escape qw(uri_unescape);
use IO::Async::Timer::Countdown;
use IO::Async::Timer::Periodic;
use Time::HiRes qw(gettimeofday tv_interval);
use PAGI::Server::AsyncFile;
use PAGI::Server::ConnectionState;


use constant FILE_CHUNK_SIZE => 65536;  # 64KB chunks for file streaming

# Per-second cache for CLF timestamp in access log (same pattern as HTTP1::format_date)
my $_cached_log_timestamp;
my $_cached_log_time = 0;

# =============================================================================
# Unrecognized Event Type Handler (PAGI spec compliance)
# =============================================================================
# Per main.mkdn: "Servers must raise exceptions if... The type field is unrecognized"

sub _unrecognized_event_type {
    my ($type, $protocol) = @_;
    die "Unrecognized event type '$type' for $protocol protocol\n";
}

# =============================================================================
# Header Validation (CRLF Injection Prevention)
# =============================================================================
# RFC 7230 Section 3.2.6: Field values MUST NOT contain CR or LF

sub _validate_header_value {
    my ($value) = @_;

    if ($value =~ /[\r\n\0]/) {
        die "Invalid header value: contains CR, LF, or null byte\n";
    }
    return $value;
}

sub _validate_header_name {
    my ($name) = @_;

    if ($name =~ /[\r\n\0]/) {
        die "Invalid header name: contains CR, LF, or null byte\n";
    }
    if ($name =~ /[[:cntrl:]]/) {
        die "Invalid header name: contains control characters\n";
    }
    return $name;
}

# RFC 6455 Section 11.3.4: Subprotocol must be a token (no whitespace, separators)
sub _validate_subprotocol {
    my ($value) = @_;

    if ($value =~ /[\r\n\0\s]/) {
        die "Invalid subprotocol: contains CR, LF, null, or whitespace\n";
    }
    # Token characters only (roughly)
    if ($value !~ /^[\w\-\.]+$/) {
        die "Invalid subprotocol: must be alphanumeric, dash, underscore, or dot\n";
    }
    return $value;
}

=head1 NAME

PAGI::Server::Connection - Per-connection state machine

=head1 SYNOPSIS

lib/PAGI/Server/Connection.pm  view on Meta::CPAN

            return { type => 'http.disconnect' } unless $weak_self;

            my $ss = $weak_self->{h2_streams}{$stream_id};
            return { type => 'http.disconnect' } unless $ss;

            # Check queue first
            if (@{$ss->{receive_queue}}) {
                return shift @{$ss->{receive_queue}};
            }

            # If body is already complete, return final body event
            if ($ss->{body_complete}) {
                my $body = $ss->{body};
                $ss->{body} = '';
                return {
                    type => 'http.request',
                    body => $body,
                    more => 0,
                };
            }

            # Wait for body data
            if (!$ss->{body_pending}) {
                $ss->{body_pending} = Future->new;
            }
            await $ss->{body_pending};

            # Re-fetch stream state (may have changed)
            $ss = $weak_self->{h2_streams}{$stream_id};
            return { type => 'http.disconnect' } unless $ss;

            # Check queue after waking
            if (@{$ss->{receive_queue}}) {
                return shift @{$ss->{receive_queue}};
            }

            my $body = $ss->{body};
            $ss->{body} = '';
            return {
                type => 'http.request',
                body => $body,
                more => $ss->{body_complete} ? 0 : 1,
            };
        })->();

        return $future;
    };
}

sub _h2_create_send {
    my ($self, $stream_id, $stream_state) = @_;

    weaken(my $weak_self = $self);

    my $status;
    my @response_headers;

    # Streaming state for deferred data provider pattern
    my @data_queue;
    my $eof_pending = 0;
    my $streaming_started = 0;

    # Data callback for nghttp2's streaming response.
    # Returns ($data, $eof) when data is available, or undef to defer.
    my $data_callback = sub {
        my ($cb_stream_id, $max_len) = @_;

        if (@data_queue) {
            my $chunk = shift @data_queue;
            # Respect max_len — XS truncates without preserving remainder
            if (length($chunk) > $max_len) {
                unshift @data_queue, substr($chunk, $max_len);
                $chunk = substr($chunk, 0, $max_len);
            }
            my $eof = (!@data_queue && $eof_pending) ? 1 : 0;
            return ($chunk, $eof);
        }

        # Queue empty but EOF pending — signal end of stream
        if ($eof_pending) {
            return ('', 1);
        }

        # Queue empty, more data expected — defer (NGHTTP2_ERR_DEFERRED in C layer)
        return undef;
    };

    return async sub {
        my ($event) = @_;
        return unless $weak_self;
        return if $weak_self->{closed};

        my $type = $event->{type} // '';

        if ($type eq 'http.response.start') {
            my $ss = $weak_self->{h2_streams}{$stream_id} or return;
            return if $ss->{response_started};
            $ss->{response_started} = 1;

            $status = $event->{status} // 200;
            @response_headers = map {
                [_validate_header_name($_->[0]), _validate_header_value($_->[1])]
            } @{$event->{headers} // []};
        }
        elsif ($type eq 'http.response.body') {
            my $ss = $weak_self->{h2_streams}{$stream_id} or return;
            return unless $ss->{response_started};

            my $body = $event->{body} // '';
            my $more = $event->{more} // 0;

            if ($more) {
                if (!$streaming_started) {
                    # First streaming chunk — submit with data callback
                    $streaming_started = 1;
                    push @data_queue, $body if length($body);
                    $weak_self->{h2_session}->submit_response_streaming(
                        $stream_id,
                        status        => $status,
                        headers       => \@response_headers,
                        data_callback => $data_callback,
                    );
                    $weak_self->_h2_write_pending;
                } else {
                    # Subsequent chunk — backpressure check then push and resume
                    if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
                        await $weak_self->_wait_for_drain;
                        return unless $weak_self;
                        return if $weak_self->{closed};
                        return unless $weak_self->{h2_streams}{$stream_id};
                    }
                    push @data_queue, $body if length($body);
                    $weak_self->{h2_session}->resume_stream($stream_id);
                    $weak_self->_h2_write_pending;
                }
            } else {
                if ($streaming_started) {
                    # Final chunk on an already-streaming response
                    if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
                        await $weak_self->_wait_for_drain;
                        return unless $weak_self;
                        return if $weak_self->{closed};
                        return unless $weak_self->{h2_streams}{$stream_id};
                    }
                    $eof_pending = 1;
                    push @data_queue, $body if length($body);
                    $weak_self->{h2_session}->resume_stream($stream_id);
                    $weak_self->_h2_write_pending;
                } else {
                    # Non-streaming: single response (unchanged one-shot path)
                    $weak_self->{h2_session}->submit_response($stream_id,
                        status  => $status,
                        headers => \@response_headers,
                        body    => $body,
                    );
                    $weak_self->_h2_write_pending;
                }
            }
        }
        else {
            _unrecognized_event_type($type, 'http');
        }
    };
}

# =============================================================================
# HTTP/2 WebSocket over HTTP/2 (RFC 8441)
# =============================================================================

sub _h2_create_websocket_scope {
    my ($self, $stream_id, $stream_state) = @_;

    my $pseudo  = $stream_state->{pseudo};
    my $headers = $stream_state->{headers};

    my $full_path = $pseudo->{':path'} // '/';
    my ($path, $query_string) = split(/\?/, $full_path, 2);
    $query_string //= '';

    # Match HTTP/1.1 pipeline: URI::Escape + UTF-8 decode with fallback
    my $unescaped = uri_unescape($path);
    my $decoded_path = eval { decode('UTF-8', $unescaped, Encode::FB_CROAK) }
                       // $unescaped;

    # Extract subprotocols from headers
    my @subprotocols;
    for my $header (@$headers) {
        my ($name, $value) = @$header;
        if ($name eq 'sec-websocket-protocol') {
            push @subprotocols, map { s/^\s+|\s+$//gr } split /,/, $value;
        }
    }

    my $connection_state = PAGI::Server::ConnectionState->new(
        connection => $self,
    );

    return {
        type         => 'websocket',
        pagi         => {
            version      => '0.2',
            spec_version => '0.2',
            features     => {},
        },
        http_version => '2',
        scheme       => $self->_get_ws_scheme,
        path         => $decoded_path,
        raw_path     => $path,
        query_string => $query_string,
        root_path    => '',

lib/PAGI/Server/Connection.pm  view on Meta::CPAN

            # Wait for events
            while (1) {
                if (@{$ss->{receive_queue}}) {
                    return shift @{$ss->{receive_queue}};
                }

                return { type => 'websocket.disconnect', code => 1006, reason => '' }
                    if $weak_self->{closed};

                if (!$ss->{body_pending}) {
                    $ss->{body_pending} = Future->new;
                }
                await $ss->{body_pending};

                $ss = $weak_self->{h2_streams}{$stream_id};
                return { type => 'websocket.disconnect', code => 1006, reason => '' }
                    unless $ss;
            }
        })->();

        return $future;
    };
}

sub _h2_create_websocket_send {
    my ($self, $stream_id, $stream_state) = @_;

    weaken(my $weak_self = $self);

    return async sub {
        my ($event) = @_;
        return unless $weak_self;
        return if $weak_self->{closed};

        my $ss = $weak_self->{h2_streams}{$stream_id};
        return unless $ss;

        my $type = $event->{type} // '';

        if ($type eq 'websocket.accept') {
            return if $ss->{ws_accepted};

            # HTTP/2 WebSocket: respond with 200 (not 101)
            my @headers;
            if (my $subprotocol = $event->{subprotocol}) {
                $subprotocol = _validate_subprotocol($subprotocol);
                push @headers, ['sec-websocket-protocol', $subprotocol];
            }
            if (my $extra = $event->{headers}) {
                push @headers, map {
                    [_validate_header_name($_->[0]), _validate_header_value($_->[1])]
                } @$extra;
            }

            $ss->{ws_accepted} = 1;
            $ss->{response_started} = 1;
            $ss->{ws_frame} = Protocol::WebSocket::Frame->new(
                max_payload_size => $weak_self->{max_ws_frame_size},
            );

            # Submit 200 response with streaming body that defers
            $weak_self->{h2_session}->submit_response($stream_id,
                status  => 200,
                headers => \@headers,
                body    => sub { return undef },  # defer until submit_data
            );
            $weak_self->_h2_write_pending;

            # Process any data that arrived before accept
            if (length($ss->{body}) > 0) {
                my $buffered = $ss->{body};
                $ss->{body} = '';
                $weak_self->_h2_process_ws_frames($stream_id, $ss, $buffered);
            }
        }
        elsif ($type eq 'websocket.send') {
            return unless $ss->{ws_accepted};

            my $frame;
            if (defined $event->{text}) {
                $frame = Protocol::WebSocket::Frame->new(
                    buffer => $event->{text},
                    type   => 'text',
                );
            }
            elsif (defined $event->{bytes}) {
                $frame = Protocol::WebSocket::Frame->new(
                    buffer => $event->{bytes},
                    type   => 'binary',
                );
            }
            else {
                return;
            }

            my $bytes = $frame->to_bytes;
            $weak_self->{h2_session}->submit_data($stream_id, $bytes, 0);
            $weak_self->_h2_write_pending;
        }
        elsif ($type eq 'websocket.close') {
            if (!$ss->{ws_accepted}) {
                # Reject: send 403
                $weak_self->{h2_session}->submit_response($stream_id,
                    status  => 403,
                    headers => [['content-type', 'text/plain']],
                    body    => 'Forbidden',
                );
                $weak_self->_h2_write_pending;
                return;
            }

            my $code = $event->{code} // 1000;
            my $reason = $event->{reason} // '';

            my $frame = Protocol::WebSocket::Frame->new(
                type   => 'close',
                buffer => pack('n', $code) . $reason,
            );

            # Send close frame + END_STREAM
            $weak_self->{h2_session}->submit_data($stream_id, $frame->to_bytes, 1);

lib/PAGI/Server/Connection.pm  view on Meta::CPAN

    };

    return sub {
        return Future->done($sse_disconnect->()) unless $weak_self;
        return Future->done($sse_disconnect->()) if $weak_self->{closed};

        my $ss = $weak_self->{h2_streams}{$stream_id};
        return Future->done($sse_disconnect->()) unless $ss;

        my $future = (async sub {
            return $sse_disconnect->() unless $weak_self;

            my $ss = $weak_self->{h2_streams}{$stream_id};
            return $sse_disconnect->() unless $ss;

            # Check queue first
            if (@{$ss->{receive_queue}}) {
                return shift @{$ss->{receive_queue}};
            }

            # First call returns sse.request with body
            if (!$ss->{sse_request_sent}) {
                $ss->{sse_request_sent} = 1;
                return {
                    type => 'sse.request',
                    body => $ss->{body},
                    more => 0,
                };
            }

            # Wait for disconnect
            while (1) {
                if (@{$ss->{receive_queue}}) {
                    return shift @{$ss->{receive_queue}};
                }

                return $sse_disconnect->()
                    if $weak_self->{closed};

                if (!$ss->{body_pending}) {
                    $ss->{body_pending} = Future->new;
                }
                await $ss->{body_pending};

                $ss = $weak_self->{h2_streams}{$stream_id};
                return $sse_disconnect->() unless $ss;
            }
        })->();

        return $future;
    };
}

sub _h2_create_sse_send {
    my ($self, $stream_id, $stream_state) = @_;

    weaken(my $weak_self = $self);

    # Streaming state for data provider pattern
    my @data_queue;
    my $streaming_started = 0;

    my $data_callback = sub {
        my ($cb_stream_id, $max_len) = @_;

        if (@data_queue) {
            my $chunk = shift @data_queue;
            if (length($chunk) > $max_len) {
                unshift @data_queue, substr($chunk, $max_len);
                $chunk = substr($chunk, 0, $max_len);
            }
            return ($chunk, 0);  # SSE streams never EOF via data_callback
        }

        # Queue empty — defer
        return undef;
    };

    return async sub {
        my ($event) = @_;
        return unless $weak_self;
        return if $weak_self->{closed};

        my $ss = $weak_self->{h2_streams}{$stream_id};
        return unless $ss;

        # Reset SSE idle timer on send activity
        $weak_self->_reset_sse_idle_timer;

        my $type = $event->{type} // '';

        # Dev-mode event validation (PAGI spec compliance)
        if ($weak_self->{validate_events}) {
            require PAGI::Server::EventValidator;
            PAGI::Server::EventValidator::validate_sse_send($event);
        }

        if ($type eq 'sse.start') {
            return if $ss->{response_started};
            $ss->{response_started} = 1;

            my $status = $event->{status} // 200;
            my $headers = $event->{headers} // [];

            # Ensure Content-Type is text/event-stream
            my $has_content_type = 0;
            for my $h (@$headers) {
                if (lc($h->[0]) eq 'content-type') {
                    $has_content_type = 1;
                    last;
                }
            }

            my @final_headers;
            for my $h (@$headers) {
                push @final_headers, [_validate_header_name($h->[0]), _validate_header_value($h->[1])];
            }
            if (!$has_content_type) {
                push @final_headers, ['content-type', 'text/event-stream'];
            }
            push @final_headers, ['cache-control', 'no-cache'];

            $streaming_started = 1;
            $weak_self->{h2_session}->submit_response_streaming(
                $stream_id,
                status        => $status,
                headers       => \@final_headers,
                data_callback => $data_callback,
            );
            $weak_self->_h2_write_pending;

            # Set protocol-specific keepalive writer (HTTP/2 DATA frames)
            $weak_self->{sse_keepalive_writer} = sub {
                my ($text) = @_;
                return unless $weak_self;
                return if $weak_self->{closed};
                return unless $weak_self->{h2_streams}{$stream_id};
                push @data_queue, $text;
                $weak_self->{h2_session}->resume_stream($stream_id);
                $weak_self->_h2_write_pending;
            };

            # Start SSE idle timer if configured
            $weak_self->_start_sse_idle_timer;
        }
        elsif ($type eq 'sse.send') {
            return unless $ss->{response_started};

            # Backpressure check
            if ($weak_self->_get_write_buffer_size >= $weak_self->{write_high_watermark}) {
                await $weak_self->_wait_for_drain;
                return Future->done unless $weak_self;
                return Future->done if $weak_self->{closed};
                return unless $weak_self->{h2_streams}{$stream_id};
            }

            my $sse_data = _format_sse_event($event);
            push @data_queue, $sse_data;
            $weak_self->{h2_session}->resume_stream($stream_id);
            $weak_self->_h2_write_pending;
        }
        elsif ($type eq 'sse.comment') {
            return unless $ss->{response_started};

            my $comment = _format_sse_comment($event);
            push @data_queue, $comment;
            $weak_self->{h2_session}->resume_stream($stream_id);
            $weak_self->_h2_write_pending;
        }
        elsif ($type eq 'sse.keepalive') {
            my $interval = $event->{interval} // 0;
            my $comment = $event->{comment};

            if ($interval > 0) {
                $weak_self->_start_sse_keepalive($interval, $comment);
            }
            else {
                $weak_self->_stop_sse_keepalive;
            }
        }
        else {
            _unrecognized_event_type($type, 'sse');
        }

lib/PAGI/Server/Connection.pm  view on Meta::CPAN

    # Seek to offset if specified
    if ($offset && $offset > 0) {
        seek($fh, $offset, 0) or die "Cannot seek: $!";
    }

    # For filehandles, we can't easily use the worker pool (can't pass fh across fork).
    # Use blocking reads in small chunks - not ideal but practical.
    # TODO: Consider IO::Async::FileStream for better event loop integration.

    my $remaining = $length;  # undef means read to EOF
    my $stream = $self->{stream};

    while (1) {
        my $to_read = FILE_CHUNK_SIZE;
        if (defined $remaining) {
            $to_read = $remaining if $remaining < $to_read;
            last if $to_read <= 0;
        }

        my $bytes_read = read($fh, my $chunk, $to_read);

        last if !defined $bytes_read;  # Error
        last if $bytes_read == 0;      # EOF

        $self->{_response_size} += $bytes_read;

        if ($chunked) {
            my $len = sprintf("%x", length($chunk));
            $stream->write("$len\r\n$chunk\r\n");
        }
        else {
            $stream->write($chunk);
        }

        if (defined $remaining) {
            $remaining -= $bytes_read;
        }
    }

    # Send final chunk if chunked encoding
    if ($chunked) {
        $stream->write("0\r\n\r\n");
    }
}

1;

__END__

=head1 SSE OVER HTTP/2

SSE events (C<sse.start>, C<sse.send>, C<sse.comment>, C<sse.keepalive>)
work transparently over both HTTP/1.1 and HTTP/2. Applications do not need
to change their SSE handling code based on protocol version.

=head2 How It Works

When a request arrives with C<Accept: text/event-stream>, the connection
detects it as SSE regardless of HTTP version. Over HTTP/1.1, SSE data is
sent using chunked Transfer-Encoding. Over HTTP/2, SSE data is sent as
DATA frames via the C<submit_response_streaming>/C<data_callback> mechanism.
This difference is transparent to the application.

The C<http_version> field in the scope hash will be C<'2'> for HTTP/2
connections, allowing applications to distinguish if needed.

=head2 SSE Idle Timeout over HTTP/2

The C<sse_idle_timeout> setting applies at the B<connection level>,
not per-stream. Over HTTP/1.1, this is a non-issue since each SSE
stream occupies its own TCP connection. Over HTTP/2, where multiple
streams share a single connection, an idle SSE stream timeout will
close the B<entire connection>, terminating all active streams.

=head3 Trade-offs

B<Pros:>

=over 4

=item * Simple, consistent behavior across protocols

=item * Matches the approach used by Go (net/http2), Rust (hyper/Axum),
Java (Netty/Reactor Netty/Vert.x), Python (Hypercorn), and gRPC

=item * No additional complexity in stream lifecycle management

=back

B<Cons:>

=over 4

=item * Closing the connection affects all multiplexed HTTP/2 streams,
not just the idle SSE stream

=item * Clients multiplexing SSE + REST on one HTTP/2 connection may
see unexpected disconnects on their REST requests

=back

B<Recommendation:> Use SSE keepalive comments (C<sse.keepalive> event)
with an interval shorter than C<sse_idle_timeout> to prevent the timer
from firing. This is the industry-standard approach used across all
major frameworks. For production deployments behind reverse proxies
(Envoy, Nginx, HAProxy), align your keepalive interval with the
proxy's stream idle timeout.

B<Note:> Per-stream idle timeout (using HTTP/2 RST_STREAM to close
only the idle SSE stream) is a future enhancement. Only Node.js
(http2stream.setTimeout) and Envoy (stream_idle_timeout) implement
this among mainstream servers.

=head1 SEE ALSO

L<PAGI::Server>, L<PAGI::Server::Protocol::HTTP1>

=head1 AUTHOR

John Napiorkowski E<lt>jjnapiork@cpan.orgE<gt>



( run in 1.000 second using v1.01-cache-2.11-cpan-140bd7fdf52 )