PAGI

 view release on metacpan or  search on metacpan

lib/PAGI/Server/AsyncFile.pm  view on Meta::CPAN

package PAGI::Server::AsyncFile;

use strict;
use warnings;


use Future;
use Future::AsyncAwait;
use IO::Async::Function;
use Scalar::Util qw(blessed);

=head1 NAME

PAGI::Server::AsyncFile - Non-blocking file I/O for PAGI::Server internals

=head1 SYNOPSIS

    use PAGI::Server::AsyncFile;
    use IO::Async::Loop;

    # Create or obtain an IO::Async::Loop
    my $loop = IO::Async::Loop->new;

    # Read entire file
    my $content = await PAGI::Server::AsyncFile->read_file($loop, '/path/to/file');

    # Read file in chunks (streaming)
    await PAGI::Server::AsyncFile->read_file_chunked($loop, '/path/to/file', async sub  {
        my ($chunk) = @_;
        # Process each chunk
    }, chunk_size => 65536);

    # Write file
    await PAGI::Server::AsyncFile->write_file($loop, '/path/to/file', $content);

    # Append to file
    await PAGI::Server::AsyncFile->append_file($loop, '/path/to/file', $log_line);

=head1 DESCRIPTION

This module provides non-blocking file I/O operations using L<IO::Async::Function>
worker processes. It is used internally by L<PAGI::Server> for efficient file
streaming.

B<Note:> This is a PAGI::Server internal module. PAGI applications are
loop-agnostic and should use synchronous file I/O (which is simple and fast
for typical file sizes) or bring their own async file library if needed.

It uses L<IO::Async::Function> to offload blocking file operations to worker
processes, preventing the main event loop from being blocked during disk I/O.
Regular file I/O in POSIX is always blocking at the kernel level - even
C<select()>/C<poll()>/C<epoll()> report regular files as always "ready".
This module works around this limitation by running file operations in
separate worker processes, similar to how Node.js/libuv handles file I/O.

=head1 CLASS METHODS

=cut

# Singleton function pool per loop (keyed by loop address)
my %_function_pools;

# Get or create the function pool for a given loop
sub _get_function {
    my ($class, $loop) = @_;

    my $loop_id = blessed($loop) ? "$loop" : 'default';

    unless ($_function_pools{$loop_id}) {
        my $function = IO::Async::Function->new(
            code => sub  {
        my ($op, @args) = @_;
                return _worker_operation($op, @args);
            },
            min_workers => 1,
            max_workers => 4,
            idle_timeout => 30,
        );

        $loop->add($function);
        $_function_pools{$loop_id} = $function;
    }

    return $_function_pools{$loop_id};
}

# Worker process operations
sub _worker_operation {
    my ($op, @args) = @_;

    if ($op eq 'read_file') {
        my ($path) = @args;
        open my $fh, '<:raw', $path or die "Cannot open $path: $!";
        local $/;
        my $content = <$fh>;
        close $fh;
        return $content;
    }
    elsif ($op eq 'read_chunk') {
        my ($path, $offset, $chunk_size) = @args;
        open my $fh, '<:raw', $path or die "Cannot open $path: $!";
        seek($fh, $offset, 0) if $offset;
        my $bytes_read = read($fh, my $buffer, $chunk_size);

lib/PAGI/Server/AsyncFile.pm  view on Meta::CPAN

        return length($content);
    }
    elsif ($op eq 'file_size') {
        my ($path) = @args;
        return -s $path;
    }
    elsif ($op eq 'file_exists') {
        my ($path) = @args;
        return -f $path ? 1 : 0;
    }
    else {
        die "Unknown operation: $op";
    }
}

=head2 read_file

    my $content = await PAGI::Server::AsyncFile->read_file($loop, $path);

Read the entire contents of a file asynchronously. Returns a Future that
resolves to the file contents.

Parameters:

=over 4

=item * C<$loop> - IO::Async::Loop instance

=item * C<$path> - Path to the file to read

=back

Throws an exception if the file cannot be read.

=cut

async sub read_file {
    my ($class, $loop, $path) = @_;

    die "File not found: $path" unless -f $path;
    die "Cannot read file: $path" unless -r $path;

    my $function = $class->_get_function($loop);
    return await $function->call(args => ['read_file', $path]);
}

=head2 read_file_chunked

    await PAGI::Server::AsyncFile->read_file_chunked($loop, $path, async sub  {
        my ($chunk) = @_;
        # Process chunk
    }, chunk_size => 65536);

    # For Range requests (partial file):
    await PAGI::Server::AsyncFile->read_file_chunked($loop, $path, $callback,
        offset => 1000,      # Start at byte 1000
        length => 5000,      # Read 5000 bytes total
    );

Read a file in chunks, calling a callback for each chunk. This is suitable
for streaming large files without loading the entire file into memory.

Parameters:

=over 4

=item * C<$loop> - IO::Async::Loop instance

=item * C<$path> - Path to the file to read

=item * C<$callback> - Async callback called with each chunk. Receives the chunk data.

=item * C<%opts> - Options:

=over 4

=item * C<chunk_size> - Size of each chunk in bytes (default: 65536)

=item * C<offset> - Byte offset to start reading from (default: 0)

=item * C<length> - Maximum bytes to read; omit to read to EOF

=back

=back

Returns a Future that resolves to the number of bytes read when complete.
The callback should return/await properly if it needs to do async operations.

=cut

async sub read_file_chunked {
    my ($class, $loop, $path, $callback, %opts) = @_;

    die "File not found: $path" unless -f $path;
    die "Cannot read file: $path" unless -r $path;

    my $chunk_size = $opts{chunk_size} // 65536;
    my $start_offset = $opts{offset} // 0;
    my $max_length = $opts{length};  # undef means read to EOF

    my $file_size = -s $path;
    my $function = $class->_get_function($loop);

    my $offset = $start_offset;
    my $bytes_sent = 0;

    # Calculate end position
    my $end_pos = defined $max_length
        ? $start_offset + $max_length
        : $file_size;
    $end_pos = $file_size if $end_pos > $file_size;

    while ($offset < $end_pos) {
        my $to_read = $chunk_size;

        # Don't read past the end position
        if ($offset + $to_read > $end_pos) {
            $to_read = $end_pos - $offset;
        }



( run in 0.495 second using v1.01-cache-2.11-cpan-140bd7fdf52 )