Future-Buffer

 view release on metacpan or  search on metacpan

lib/Future/Buffer.pm  view on Meta::CPAN

#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2020-2024 -- leonerd@leonerd.org.uk

package Future::Buffer 0.06;

use v5.14;
use warnings;

use Future;

use Scalar::Util qw( weaken );

=head1 NAME

C<Future::Buffer> - a string buffer that uses Futures

=head1 SYNOPSIS

   use Future::Buffer;

   use Future::AsyncAwait;
   use Future::IO;

   my $buffer = Future::Buffer->new(
      fill => sub { Future::IO->sysread( $socket, 8192 ) }
   );

   async sub print_lines
   {
      while(1) {
         my $line = await $buffer->read_until( "\n" );
         chomp $line;

         say "Got a line: $line";
      }
   }

   await print_lines();

=head1 DESCRIPTION

Objects in this class provide a string buffer, on which operations return
L<Future> instances which will complete when data is available. Data can be
inserted into the buffer either in a push-based manner by calling the C<write>
method, or in a pull-based manner by providing it with a C<fill> callback by
which it can request data itself. This flexibility allows the buffer to act as
an adapter between push- and pull-based providers and consumers.

Each C<read>-like method returns a L<Future> which will complete once there
are enough bytes in the buffer to satisfy the required condition. The buffer
behaves somewhat like a pipe, where bytes provided at the writing end (either
by the C<write> method or the C<fill> callback) are eventually consumed at the
reading end by one of the C<read> futures.

Multiple C<read> futures can remain pending at once, and will be completed in
the order they were created when more data is eventually available. Thus, any
call to the C<write> method to provide more data can potentially result in
multiple futures becoming ready.

I<Since version 0.04> the buffer supports an end-of-file condition. The
L</close> method or a C<fill> callback future yielding an empty result will
mark that the buffer is now closed. Once it has exhausted the remaining stored
data any further read futures will yield empty.

=cut

=head1 CONSTRUCTOR

=cut

=head2 new

   $buffer = Future::Buffer->new( %args );

Returns a new L<Future::Buffer> instance.

Takes the following named arguments:

=over 4

=item fill => CODE

   $data = await $fill->();

Optional callback which the buffer will invoke when it needs more data.

Any read futures which are waiting on the fill future are constructed by using
the fill future as a prototype, ensuring they have the correct type.

If the result is an empty list this will be treated as an end-of-file
notification and the buffer is closed.

=back

=cut

sub new
{
   my $class = shift;
   my %args = @_;

   return bless {
      pending => [],
      data    => "",
      fill    => $args{fill},
   }, $class;
}

=head1 METHODS

=cut

sub _fill
{
   my $self = shift;

   return $self->{fill_f} if $self->{fill_f};

   my $fill = $self->{fill};

   # Arm the fill loop
   my $f = $self->{fill_f} = $fill->(); # TODO: give it a size hint?

   weaken( my $weakself = $self );

   $f->on_done( sub {
      my $self = $weakself or return;

      undef $self->{fill_f};

      if( @_ ) {
         my ( $data ) = @_;
         $self->{data} .= $data;
      }
      else {
         $self->{at_eof} = 1;
      }

      $self->_invoke_pending;

      $self->_fill if @{ $self->{pending} };
   });
}

sub _new_read_future
{
   my $self = shift;
   my ( $code ) = @_;

   my $pending = $self->{pending};

   # First see if the buffer is already sufficient;
   if( !@$pending and
         ( my @ret = $code->( \$self->{data} ) ) ) {
      return Future->done( @ret );
   }

   my $f;
   if( $self->{fill} and my $fill_f = $self->_fill ) {
      $f = $fill_f->new;
   }
   else {
      $f = Future->new;
   }

   push @$pending, [ $code, $f ];

   $self->_invoke_pending if length $self->{data};

   $f->on_cancel( sub {
      shift @$pending while @$pending and $pending->[0]->[1]->is_cancelled;
      return if @$pending or !$self->{fill_f};

      $self->{fill_f}->cancel;
      undef $self->{fill_f};
   } );

   return $f;
}

sub _invoke_pending
{
   my $self = shift;



( run in 1.668 second using v1.01-cache-2.11-cpan-99c4e6809bf )