Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN

off being presumed writeable, so the first of these events to be observed will
be C<on_writeable_stop>.

=cut

sub _init
{
   my $self = shift;

   $self->{writequeue} = []; # Queue of Writers
   $self->{readqueue} = []; # Queue of Readers
   $self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN)
   $self->{readbuff} = "";

   $self->{reader} = "_sysread";
   $self->{writer} = "_syswrite";

   $self->{read_len}  = $READLEN;
   $self->{write_len} = $WRITELEN;

   $self->{want} = WANT_READ_FOR_READ;

   $self->{close_on_read_eof} = 1;
}

=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=head2 read_handle => IO

The IO handle to read from. Must implement C<fileno> and C<sysread> methods.

=head2 write_handle => IO

The IO handle to write to. Must implement C<fileno> and C<syswrite> methods.

=head2 handle => IO

Shortcut to specifying the same IO handle for both of the above.

=head2 on_read => CODE

=head2 on_read_error => CODE

=head2 on_outgoing_empty => CODE

=head2 on_write_error => CODE

=head2 on_writeable_start => CODE

=head2 on_writeable_stop => CODE

CODE references for event handlers.

=head2 autoflush => BOOL

Optional. If true, the C<write> method will attempt to write data to the
operating system immediately, without waiting for the loop to indicate the
filehandle is write-ready. This is useful, for example, on streams that should
contain up-to-date logging or console information.

It currently defaults to false for any file handle, but future versions of
L<IO::Async> may enable this by default on STDOUT and STDERR.

=head2 read_len => INT

Optional. Sets the buffer size for C<read> calls. Defaults to 8 KiBytes.

=head2 read_all => BOOL

Optional. If true, attempt to read as much data from the kernel as possible
when the handle becomes readable. By default this is turned off, meaning at
most one fixed-size buffer is read. If there is still more data in the
kernel's buffer, the handle will still be readable, and will be read from
again.

This behaviour allows multiple streams and sockets to be multiplexed
simultaneously, meaning that a large bulk transfer on one cannot starve other
filehandles of processing time. Turning this option on may improve bulk data
transfer rate, at the risk of delaying or stalling processing on other
filehandles.

=head2 write_len => INT

Optional. Sets the buffer size for C<write> calls. Defaults to 8 KiBytes.

=head2 write_all => BOOL

Optional. Analogous to the C<read_all> option, but for writing. When
C<autoflush> is enabled, this option only affects deferred writing if the
initial attempt failed due to buffer space.

=head2 read_high_watermark => INT

=head2 read_low_watermark => INT

Optional. If defined, gives a way to implement flow control or other
behaviours that depend on the size of Stream's read buffer.

If after more data is read from the underlying filehandle the read buffer is
now larger than the high watermark, the C<on_read_high_watermark> event is
triggered (which, by default, will disable read-ready notifications and pause
reading from the filehandle).

If after data is consumed by an C<on_read> handler the read buffer is now
smaller than the low watermark, the C<on_read_low_watermark> event is
triggered (which, by default, will re-enable read-ready notifications and
resume reading from the filehandle). For to be possible, the read handler
would have to be one added by the C<push_on_read> method or one of the
Future-returning C<read_*> methods.

By default these options are not defined, so this behaviour will not happen.
C<read_low_watermark> may not be set to a larger value than
C<read_high_watermark>, but it may be set to a smaller value, creating a
hysteresis region. If either option is defined then both must be.

If these options are used with the default event handlers, be careful not to
cause deadlocks by having a high watermark sufficiently low that a single
C<on_read> invocation might not consider it finished yet.

=head2 reader => STRING|CODE

=head2 writer => STRING|CODE

Optional. If defined, gives the name of a method or a CODE reference to use
to implement the actual reading from or writing to the filehandle. These will
be invoked as

 $stream->reader( $read_handle, $buffer, $len )
 $stream->writer( $write_handle, $buffer, $len )

Each is expected to modify the passed buffer; C<reader> by appending to it,
C<writer> by removing a prefix from it. Each is expected to return a true
value on success, zero on EOF, or C<undef> with C<$!> set for errors. If not
provided, they will be substituted by implenentations using C<sysread> and
C<syswrite> on the underlying handle, respectively.

=head2 close_on_read_eof => BOOL

Optional. Usually true, but if set to a false value then the stream will not
be C<close>d when an EOF condition occurs on read. This is normally not useful
as at that point the underlying stream filehandle is no longer useable, but it
may be useful for reading regular files, or interacting with TTY devices.

=head2 encoding => STRING

If supplied, sets the name of encoding of the underlying stream. If an
encoding is set, then the C<write> method will expect to receive Unicode
strings and encodes them into bytes, and incoming bytes will be decoded into

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN

sub _flush_one_write
{
   my $self = shift;

   my $writequeue = $self->{writequeue};

   my $head;
   while( $head = $writequeue->[0] and ref $head->data ) {
      if( ref $head->data eq "CODE" ) {
         my $data = $head->data->( $self );
         if( !defined $data ) {
            $head->on_flush->( $self ) if $head->on_flush;
            shift @$writequeue;
            return 1;
         }
         if( !ref $data and my $encoding = $self->{encoding} ) {
            $data = $encoding->encode( $data );
         }
         unshift @$writequeue, my $new = Writer(
            $data, $head->writelen, $head->on_write, undef, undef, 0
         );
         next;
      }
      elsif( blessed $head->data and $head->data->isa( "Future" ) ) {
         my $f = $head->data;
         if( !$f->is_ready ) {
            return 0 if $head->watching;
            $f->on_ready( sub { $self->_flush_one_write } );
            $head->watching++;
            return 0;
         }
         my $data = $f->get;
         if( !ref $data and my $encoding = $self->{encoding} ) {
            $data = $encoding->encode( $data );
         }
         $head->data = $data;
         next;
      }
      else {
         die "Unsure what to do with reference ".ref($head->data)." in write queue";
      }
   }

   my $second;
   while( $second = $writequeue->[1] and
          !ref $second->data and
          $head->writelen == $second->writelen and
          !$head->on_write and !$second->on_write and
          !$head->on_flush ) {
      $head->data .= $second->data;
      $head->on_write = $second->on_write;
      $head->on_flush = $second->on_flush;
      splice @$writequeue, 1, 1, ();
   }

   die "TODO: head data does not contain a plain string" if ref $head->data;

   if( $IO::Async::Debug::DEBUG > 1 ) {
      my $data = substr $head->data, 0, $head->writelen;
      $self->debug_printf( "WRITE len=%d", length $data );
      IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw};
   }

   my $writer = $self->{writer};
   my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen );

   if( !defined $len ) {
      my $errno = $!;

      if( $errno == EAGAIN or $errno == EWOULDBLOCK ) {
         $self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable};
         $self->{writeable} = 0;
      }

      return 0 if _nonfatal_error( $errno );

      if( $errno == EPIPE ) {
         $self->{write_eof} = 1;
         $self->maybe_invoke_event( on_write_eof => );
      }

      $head->on_error->( $self, $errno ) if $head->on_error;
      $self->maybe_invoke_event( on_write_error => $errno )
         or $self->close_now;

      return 0;
   }

   if( my $on_write = $head->on_write ) {
      $on_write->( $self, $len );
   }

   if( !length $head->data ) {
      $head->on_flush->( $self ) if $head->on_flush;
      shift @{ $self->{writequeue} };
   }

   return 1;
}

sub write
{
   my $self = shift;
   my ( $data, %params ) = @_;

   carp "Cannot write data to a Stream that is closing" and return if $self->{stream_closing};

   # Allow writes without a filehandle if we're not yet in a Loop, just don't
   # try to flush them
   my $handle = $self->write_handle;

   croak "Cannot write data to a Stream with no write_handle" if !$handle and $self->loop;

   if( !ref $data and my $encoding = $self->{encoding} ) {
      $data = $encoding->encode( $data );
   }

   my $on_write = delete $params{on_write};
   my $on_flush = delete $params{on_flush};
   my $on_error = delete $params{on_error};

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN

   }

   if( ref $ret eq "CODE" ) {
      # Replace the top CODE, or add it if there was none
      $readqueue->[0] = Reader( $ret, undef );
      return 1;
   }
   elsif( @$readqueue and !defined $ret ) {
      shift @$readqueue;
      return 1;
   }
   else {
      return $ret && ( length( $self->{readbuff} ) > 0 || $eof );
   }
}

sub _sysread
{
   my $self = shift;
   my ( $handle, undef, $len ) = @_;
   return $handle->sysread( $_[1], $len );
}

sub on_read_ready
{
   my $self = shift;

   $self->_do_read  if $self->{want} & WANT_READ_FOR_READ;
   $self->_do_write if $self->{want} & WANT_READ_FOR_WRITE;
}

sub _do_read
{
   my $self = shift;

   my $handle = $self->read_handle;
   my $reader = $self->{reader};

   while(1) {
      my $data;
      my $len = $self->$reader( $handle, $data, $self->{read_len} );

      if( !defined $len ) {
         my $errno = $!;

         return if _nonfatal_error( $errno );

         $self->maybe_invoke_event( on_read_error => $errno )
            or $self->close_now;

         foreach ( @{ $self->{readqueue} } ) {
            $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future;
         }
         undef @{ $self->{readqueue} };

         return;
      }

      if( $IO::Async::Debug::DEBUG > 1 ) {
         $self->debug_printf( "READ len=%d", $len );
         IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sr};
      }

      my $eof = $self->{read_eof} = ( $len == 0 );

      if( my $encoding = $self->{encoding} ) {
         my $bytes = defined $self->{bytes_remaining} ? $self->{bytes_remaining} . $data : $data;
         $data = $encoding->decode( $bytes, STOP_AT_PARTIAL );
         $self->{bytes_remaining} = $bytes;
      }

      $self->{readbuff} .= $data if !$eof;

      1 while $self->_flush_one_read( $eof );

      if( $eof ) {
         $self->maybe_invoke_event( on_read_eof => );
         $self->close_now if $self->{close_on_read_eof};
         foreach ( @{ $self->{readqueue} } ) {
            $_->future->done( undef ) if $_->future;
         }
         undef @{ $self->{readqueue} };
         return;
      }

      last unless $self->{read_all};
   }

   if( defined $self->{read_high_watermark} and length $self->{readbuff} >= $self->{read_high_watermark} ) {
      $self->{at_read_high_watermark} or
         $self->invoke_event( on_read_high_watermark => length $self->{readbuff} );

      $self->{at_read_high_watermark} = 1;
   }
}

sub on_read_high_watermark
{
   my $self = shift;
   $self->want_readready_for_read( 0 );
}

sub on_read_low_watermark
{
   my $self = shift;
   $self->want_readready_for_read( 1 );
}

=head2 push_on_read

   $stream->push_on_read( $on_read )

Pushes a new temporary C<on_read> handler to the end of the queue. This queue,
if non-empty, is used to provide C<on_read> event handling code in preference
to using the object's main event handler or method. New handlers can be
supplied at any time, and they will be used in first-in first-out (FIFO)
order.

As with the main C<on_read> event handler, each can return a (defined) boolean
to indicate if they wish to be invoked again or not, another C<CODE> reference
to replace themself with, or C<undef> to indicate it is now complete and

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN


sub read_until_eof
{
   my $self = shift;

   my $f = $self->_read_future;
   $self->push_on_read( sub {
      my ( undef, $buffref, $eof ) = @_;
      return undef if $f->is_cancelled;
      return 0 unless $eof;
      $f->done( $$buffref, $eof ); $$buffref = "";
      return undef;
   }, future => $f );
   return $f;
}

=head1 UTILITY CONSTRUCTORS

=cut

=head2 new_for_stdin

=head2 new_for_stdout

=head2 new_for_stdio

   $stream = IO::Async::Stream->new_for_stdin

   $stream = IO::Async::Stream->new_for_stdout

   $stream = IO::Async::Stream->new_for_stdio

Return a C<IO::Async::Stream> object preconfigured with the correct
C<read_handle>, C<write_handle> or both.

=cut

sub new_for_stdin  { shift->new( read_handle  => \*STDIN, @_ ) }
sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) }

sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) }

=head2 connect

   $future = $stream->connect( %args )

A convenient wrapper for calling the C<connect> method on the underlying
L<IO::Async::Loop> object, passing the C<socktype> hint as C<stream> if not
otherwise supplied.

=cut

sub connect
{
   my $self = shift;
   return $self->SUPER::connect( socktype => "stream", @_ );
}

=head1 DEBUGGING FLAGS

The following flags in C<IO_ASYNC_DEBUG_FLAGS> enable extra logging:

=over 4

=item C<Sr>

Log byte buffers as data is read from a Stream

=item C<Sw>

Log byte buffers as data is written to a Stream

=back

=cut

=head1 EXAMPLES

=head2 A line-based C<on_read> method

The following C<on_read> method accepts incoming C<\n>-terminated lines and
prints them to the program's C<STDOUT> stream.

 sub on_read
 {
    my $self = shift;
    my ( $buffref, $eof ) = @_;

    while( $$buffref =~ s/^(.*\n)// ) {
       print "Received a line: $1";
    }

    return 0;
 }

Because a reference to the buffer itself is passed, it is simple to use a
C<s///> regular expression on the scalar it points at, to both check if data
is ready (i.e. a whole line), and to remove it from the buffer. If no data is
available then C<0> is returned, to indicate it should not be tried again. If
a line was successfully extracted, then C<1> is returned, to indicate it
should try again in case more lines exist in the buffer.

=head2 Reading binary data

This C<on_read> method accepts incoming records in 16-byte chunks, printing
each one.

 sub on_read
 {
    my ( $self, $buffref, $eof ) = @_;

    if( length $$buffref >= 16 ) {
       my $record = substr( $$buffref, 0, 16, "" );
       print "Received a 16-byte record: $record\n";

       return 1;
    }

    if( $eof and length $$buffref ) {
       print "EOF: a partial record still exists\n";
    }



( run in 0.425 second using v1.01-cache-2.11-cpan-39bf76dae61 )