PAGI
view release on metacpan or search on metacpan
lib/PAGI/Server/AsyncFile.pm view on Meta::CPAN
package PAGI::Server::AsyncFile;
use strict;
use warnings;
use Future;
use Future::AsyncAwait;
use IO::Async::Function;
use Scalar::Util qw(blessed);
=head1 NAME
PAGI::Server::AsyncFile - Non-blocking file I/O for PAGI::Server internals
=head1 SYNOPSIS
use PAGI::Server::AsyncFile;
use IO::Async::Loop;
# Create or obtain an IO::Async::Loop
my $loop = IO::Async::Loop->new;
# Read entire file
my $content = await PAGI::Server::AsyncFile->read_file($loop, '/path/to/file');
# Read file in chunks (streaming)
await PAGI::Server::AsyncFile->read_file_chunked($loop, '/path/to/file', async sub {
my ($chunk) = @_;
# Process each chunk
}, chunk_size => 65536);
# Write file
await PAGI::Server::AsyncFile->write_file($loop, '/path/to/file', $content);
# Append to file
await PAGI::Server::AsyncFile->append_file($loop, '/path/to/file', $log_line);
=head1 DESCRIPTION
This module provides non-blocking file I/O operations using L<IO::Async::Function>
worker processes. It is used internally by L<PAGI::Server> for efficient file
streaming.
B<Note:> This is a PAGI::Server internal module. PAGI applications are
loop-agnostic and should use synchronous file I/O (which is simple and fast
for typical file sizes) or bring their own async file library if needed.
It uses L<IO::Async::Function> to offload blocking file operations to worker
processes, preventing the main event loop from being blocked during disk I/O.
Regular file I/O in POSIX is always blocking at the kernel level - even
C<select()>/C<poll()>/C<epoll()> report regular files as always "ready".
This module works around this limitation by running file operations in
separate worker processes, similar to how Node.js/libuv handles file I/O.
=head1 CLASS METHODS
=cut
# Singleton function pool per loop (keyed by loop address)
my %_function_pools;
# Get or create the function pool for a given loop
sub _get_function {
my ($class, $loop) = @_;
my $loop_id = blessed($loop) ? "$loop" : 'default';
unless ($_function_pools{$loop_id}) {
my $function = IO::Async::Function->new(
code => sub {
my ($op, @args) = @_;
return _worker_operation($op, @args);
},
min_workers => 1,
max_workers => 4,
idle_timeout => 30,
);
$loop->add($function);
$_function_pools{$loop_id} = $function;
}
return $_function_pools{$loop_id};
}
# Worker process operations
sub _worker_operation {
my ($op, @args) = @_;
if ($op eq 'read_file') {
my ($path) = @args;
open my $fh, '<:raw', $path or die "Cannot open $path: $!";
local $/;
my $content = <$fh>;
close $fh;
return $content;
}
elsif ($op eq 'read_chunk') {
my ($path, $offset, $chunk_size) = @args;
open my $fh, '<:raw', $path or die "Cannot open $path: $!";
seek($fh, $offset, 0) if $offset;
my $bytes_read = read($fh, my $buffer, $chunk_size);
lib/PAGI/Server/AsyncFile.pm view on Meta::CPAN
return length($content);
}
elsif ($op eq 'file_size') {
my ($path) = @args;
return -s $path;
}
elsif ($op eq 'file_exists') {
my ($path) = @args;
return -f $path ? 1 : 0;
}
else {
die "Unknown operation: $op";
}
}
=head2 read_file
my $content = await PAGI::Server::AsyncFile->read_file($loop, $path);
Read the entire contents of a file asynchronously. Returns a Future that
resolves to the file contents.
Parameters:
=over 4
=item * C<$loop> - IO::Async::Loop instance
=item * C<$path> - Path to the file to read
=back
Throws an exception if the file cannot be read.
=cut
async sub read_file {
my ($class, $loop, $path) = @_;
die "File not found: $path" unless -f $path;
die "Cannot read file: $path" unless -r $path;
my $function = $class->_get_function($loop);
return await $function->call(args => ['read_file', $path]);
}
=head2 read_file_chunked
await PAGI::Server::AsyncFile->read_file_chunked($loop, $path, async sub {
my ($chunk) = @_;
# Process chunk
}, chunk_size => 65536);
# For Range requests (partial file):
await PAGI::Server::AsyncFile->read_file_chunked($loop, $path, $callback,
offset => 1000, # Start at byte 1000
length => 5000, # Read 5000 bytes total
);
Read a file in chunks, calling a callback for each chunk. This is suitable
for streaming large files without loading the entire file into memory.
Parameters:
=over 4
=item * C<$loop> - IO::Async::Loop instance
=item * C<$path> - Path to the file to read
=item * C<$callback> - Async callback called with each chunk. Receives the chunk data.
=item * C<%opts> - Options:
=over 4
=item * C<chunk_size> - Size of each chunk in bytes (default: 65536)
=item * C<offset> - Byte offset to start reading from (default: 0)
=item * C<length> - Maximum bytes to read; omit to read to EOF
=back
=back
Returns a Future that resolves to the number of bytes read when complete.
The callback should return/await properly if it needs to do async operations.
=cut
async sub read_file_chunked {
my ($class, $loop, $path, $callback, %opts) = @_;
die "File not found: $path" unless -f $path;
die "Cannot read file: $path" unless -r $path;
my $chunk_size = $opts{chunk_size} // 65536;
my $start_offset = $opts{offset} // 0;
my $max_length = $opts{length}; # undef means read to EOF
my $file_size = -s $path;
my $function = $class->_get_function($loop);
my $offset = $start_offset;
my $bytes_sent = 0;
# Calculate end position
my $end_pos = defined $max_length
? $start_offset + $max_length
: $file_size;
$end_pos = $file_size if $end_pos > $file_size;
while ($offset < $end_pos) {
my $to_read = $chunk_size;
# Don't read past the end position
if ($offset + $to_read > $end_pos) {
$to_read = $end_pos - $offset;
}
( run in 0.495 second using v1.01-cache-2.11-cpan-140bd7fdf52 )