PAGI

 view release on metacpan or  search on metacpan

lib/PAGI/Request/BodyStream.pm  view on Meta::CPAN

package PAGI::Request::BodyStream;

use strict;
use warnings;

use Future::AsyncAwait;
use Encode qw(decode FB_CROAK FB_DEFAULT LEAVE_SRC);
use Carp qw(croak);


=head1 NAME

PAGI::Request::BodyStream - Streaming body consumption for PAGI requests

=head1 SYNOPSIS

    use PAGI::Request::BodyStream;
    use Future::AsyncAwait;

    # Basic streaming
    my $stream = PAGI::Request::BodyStream->new(receive => $receive);

    while (!$stream->is_done) {
        my $chunk = await $stream->next_chunk;
        last unless defined $chunk;
        print "Got chunk: ", length($chunk), " bytes\n";
    }

    # With size limit
    my $stream = PAGI::Request::BodyStream->new(
        receive   => $receive,
        max_bytes => 1024 * 1024,  # 1MB limit
    );

    # With UTF-8 decoding
    my $stream = PAGI::Request::BodyStream->new(
        receive => $receive,
        decode  => 'UTF-8',
    );

    # Stream to file
    await $stream->stream_to_file('/tmp/upload.dat');

    # Stream to custom sink
    await $stream->stream_to(async sub ($chunk) {
        # Process chunk
        print STDERR "Processing: ", length($chunk), " bytes\n";
    });

=head1 DESCRIPTION

PAGI::Request::BodyStream provides streaming body consumption for large request
bodies. This is useful when you need to process request data incrementally
without loading the entire body into memory.

The stream is pull-based: you call C<next_chunk()> to receive the next chunk
of data. The stream handles:

=over 4

=item * Size limits with customizable error messages

=item * UTF-8 decoding with proper handling of incomplete sequences at chunk boundaries

=item * Client disconnect detection

=item * Convenient file streaming with C<stream_to_file()>

=back

B<Important>: Streaming is mutually exclusive with buffered body methods like
C<body()>, C<json()>, C<form()> in L<PAGI::Request>. Once you start streaming,
you cannot use those methods.

=head1 CONSTRUCTOR

=head2 new

    my $stream = PAGI::Request::BodyStream->new(
        receive    => $receive,      # Required: PAGI receive callback
        max_bytes  => 10485760,      # Optional: max body size
        decode     => 'UTF-8',       # Optional: decode to UTF-8
        strict     => 1,             # Optional: strict UTF-8 (croak on invalid)
        limit_name => 'body_size',   # Optional: name for limit error message
    );

Creates a new body stream.

=over 4

=item * C<receive> - Required. The PAGI receive callback.

=item * C<max_bytes> - Optional. Maximum bytes to read. Throws error if exceeded.

=item * C<decode> - Optional. Encoding to decode chunks to (typically 'UTF-8').

=item * C<strict> - Optional. If true, throw on invalid UTF-8. If false (default),
use replacement characters.

=item * C<limit_name> - Optional. Name to use in error message when max_bytes
is exceeded (default: 'max_bytes').

=back

=cut

sub new {
    my ($class, %args) = @_;
    my $receive = $args{receive} // croak("receive is required");

    my $self = bless {
        receive       => $receive,
        max_bytes     => $args{max_bytes},
        decode        => $args{decode},
        strict        => $args{strict} // 0,
        limit_name    => $args{limit_name} // 'max_bytes',
        _bytes_read   => 0,
        _done         => 0,
        _error        => undef,
        _buffer       => '',  # For incomplete UTF-8 sequences
    }, $class;

    return $self;
}

=head1 METHODS

=head2 next_chunk

    my $chunk = await $stream->next_chunk;

Returns a Future that resolves to the next chunk of data, or undef when the

lib/PAGI/Request/BodyStream.pm  view on Meta::CPAN

        return undef;
    }

    # Extract body chunk
    my $chunk = $message->{body} // '';
    my $more = $message->{more} // 0;

    # Check size limit before processing
    if (defined $self->{max_bytes}) {
        my $new_total = $self->{_bytes_read} + length($chunk);
        if ($new_total > $self->{max_bytes}) {
            $self->{_error} = "Request body $self->{limit_name} exceeded";
            $self->{_done} = 1;
            croak($self->{_error});
        }
    }

    $self->{_bytes_read} += length($chunk);

    # Mark done if no more chunks
    $self->{_done} = 1 unless $more;

    # Decode if requested
    if ($self->{decode}) {
        $chunk = $self->_decode_chunk($chunk, !$more);
    }

    return $chunk;
}

=head2 bytes_read

    my $total = $stream->bytes_read;

Returns the total number of raw bytes read so far (before any decoding).

=cut

sub bytes_read {
    my ($self) = @_;
    return $self->{_bytes_read};
}

=head2 is_done

    if ($stream->is_done) { ... }

Returns true if the stream has been exhausted (no more chunks available).

=cut

sub is_done {
    my ($self) = @_;
    return $self->{_done};
}

=head2 error

    my $error = $stream->error;

Returns any error that occurred during streaming, or undef.

=cut

sub error {
    my ($self) = @_;
    return $self->{_error};
}

=head2 stream_to_file

    await $stream->stream_to_file($path);

Streams the entire request body to a file. Returns a Future that resolves
to the number of bytes written.

This is efficient for large uploads as it doesn't load the entire body into
memory - chunks are written incrementally as they arrive from the network.

B<Note:> File writes are B<blocking> (synchronous I/O). Since chunks are
typically small (e.g., 64KB), each write completes quickly. The method
remains async overall because it awaits network chunks between writes.

B<Note:> Cannot be used with the C<decode> option as that would corrupt binary
data. Use C<stream_to()> with a custom handler if you need decoded chunks
written to a file.

For fully non-blocking file I/O, use C<stream_to()> with your preferred
async file library:

    # Non-blocking alternative (bring your own async file library)
    await $stream->stream_to(async sub {
        my ($chunk) = @_;
        await $my_async_file_writer->write($chunk);
    });

=cut

async sub stream_to_file {
    my ($self, $path) = @_;
    croak("path is required") unless defined $path;
    croak("stream_to_file() cannot be used with decode option - use stream_to() instead")
        if $self->{decode};

    my $bytes_written = 0;
    my $fh;

    while (!$self->is_done) {
        my $chunk = await $self->next_chunk;
        last unless defined $chunk;
        next unless length $chunk;

        # Open file on first chunk (truncate mode)
        unless ($fh) {
            open $fh, '>:raw', $path
                or croak("Cannot open $path for writing: $!");
        }

        # Blocking write - typically fast for small chunks
        print $fh $chunk
            or croak("Cannot write to $path: $!");



( run in 0.826 second using v1.01-cache-2.11-cpan-140bd7fdf52 )