Future-Buffer
view release on metacpan or search on metacpan
lib/Future/Buffer.pm view on Meta::CPAN
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# (C) Paul Evans, 2020-2024 -- leonerd@leonerd.org.uk
package Future::Buffer 0.06;
use v5.14;
use warnings;
use Future;
use Scalar::Util qw( weaken );
=head1 NAME
C<Future::Buffer> - a string buffer that uses Futures
=head1 SYNOPSIS
use Future::Buffer;
use Future::AsyncAwait;
use Future::IO;
my $buffer = Future::Buffer->new(
fill => sub { Future::IO->sysread( $socket, 8192 ) }
);
async sub print_lines
{
while(1) {
my $line = await $buffer->read_until( "\n" );
chomp $line;
say "Got a line: $line";
}
}
await print_lines();
=head1 DESCRIPTION
Objects in this class provide a string buffer, on which operations return
L<Future> instances which will complete when data is available. Data can be
inserted into the buffer either in a push-based manner by calling the C<write>
method, or in a pull-based manner by providing it with a C<fill> callback by
which it can request data itself. This flexibility allows the buffer to act as
an adapter between push- and pull-based providers and consumers.
Each C<read>-like method returns a L<Future> which will complete once there
are enough bytes in the buffer to satisfy the required condition. The buffer
behaves somewhat like a pipe, where bytes provided at the writing end (either
by the C<write> method or the C<fill> callback) are eventually consumed at the
reading end by one of the C<read> futures.
Multiple C<read> futures can remain pending at once, and will be completed in
the order they were created when more data is eventually available. Thus, any
call to the C<write> method to provide more data can potentially result in
multiple futures becoming ready.
I<Since version 0.04> the buffer supports an end-of-file condition. The
L</close> method or a C<fill> callback future yielding an empty result will
mark that the buffer is now closed. Once it has exhausted the remaining stored
data any further read futures will yield empty.
=cut
=head1 CONSTRUCTOR
=cut
=head2 new
$buffer = Future::Buffer->new( %args );
Returns a new L<Future::Buffer> instance.
Takes the following named arguments:
=over 4
=item fill => CODE
$data = await $fill->();
Optional callback which the buffer will invoke when it needs more data.
Any read futures which are waiting on the fill future are constructed by using
the fill future as a prototype, ensuring they have the correct type.
If the result is an empty list this will be treated as an end-of-file
notification and the buffer is closed.
=back
=cut
sub new
{
my $class = shift;
my %args = @_;
return bless {
pending => [],
data => "",
fill => $args{fill},
}, $class;
}
=head1 METHODS
=cut
sub _fill
{
my $self = shift;
return $self->{fill_f} if $self->{fill_f};
my $fill = $self->{fill};
# Arm the fill loop
my $f = $self->{fill_f} = $fill->(); # TODO: give it a size hint?
weaken( my $weakself = $self );
$f->on_done( sub {
my $self = $weakself or return;
undef $self->{fill_f};
if( @_ ) {
my ( $data ) = @_;
$self->{data} .= $data;
}
else {
$self->{at_eof} = 1;
}
$self->_invoke_pending;
$self->_fill if @{ $self->{pending} };
});
}
sub _new_read_future
{
my $self = shift;
my ( $code ) = @_;
my $pending = $self->{pending};
# First see if the buffer is already sufficient;
if( !@$pending and
( my @ret = $code->( \$self->{data} ) ) ) {
return Future->done( @ret );
}
my $f;
if( $self->{fill} and my $fill_f = $self->_fill ) {
$f = $fill_f->new;
}
else {
$f = Future->new;
}
push @$pending, [ $code, $f ];
$self->_invoke_pending if length $self->{data};
$f->on_cancel( sub {
shift @$pending while @$pending and $pending->[0]->[1]->is_cancelled;
return if @$pending or !$self->{fill_f};
$self->{fill_f}->cancel;
undef $self->{fill_f};
} );
return $f;
}
sub _invoke_pending
{
my $self = shift;
( run in 1.668 second using v1.01-cache-2.11-cpan-99c4e6809bf )