Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN


 $stream->write( "An initial line here\n" );

=head1 DESCRIPTION

This subclass of L<IO::Async::Handle> contains a filehandle that represents
a byte-stream. It provides buffering for both incoming and outgoing data. It
invokes the C<on_read> handler when new data is read from the filehandle. Data
may be written to the filehandle by calling the C<write> method.

This class is suitable for any kind of filehandle that provides a
possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
C<SOCK_STREAM> socket (such as TCP or a byte-oriented UNIX local socket). For
datagram or raw message-based sockets (such as UDP) see instead
L<IO::Async::Socket>.

=cut

=head1 EVENTS

The following events are invoked, either using subclass methods or CODE
references in parameters:

=head2 $ret = on_read \$buffer, $eof

Invoked when more data is available in the internal receiving buffer.

The first argument is a reference to a plain perl string. The code should
inspect and remove any data it likes, but is not required to remove all, or
indeed any of the data. Any data remaining in the buffer will be preserved for
the next call, the next time more data is received from the handle.

In this way, it is easy to implement code that reads records of some form when
completed, but ignores partially-received records, until all the data is
present. If the handler is confident no more useful data remains, it should
return C<0>. If not, it should return C<1>, and the handler will be called
again. This makes it easy to implement code that handles multiple incoming
records at the same time. See the examples at the end of this documentation
for more detail.

The second argument is a scalar indicating whether the stream has reported an
end-of-file (EOF) condition. A reference to the buffer is passed to the
handler in the usual way, so it may inspect data contained in it. Once the
handler returns a false value, it will not be called again, as the handle is
now at EOF and no more data can arrive.

The C<on_read> code may also dynamically replace itself with a new callback
by returning a CODE reference instead of C<0> or C<1>. The original callback
or method that the object first started with may be restored by returning
C<undef>. Whenever the callback is changed in this way, the new code is called
again; even if the read buffer is currently empty. See the examples at the end
of this documentation for more detail.

The C<push_on_read> method can be used to insert new, temporary handlers that
take precedence over the global C<on_read> handler. This event is only used if
there are no further pending handlers created by C<push_on_read>.

=head2 on_read_eof

Optional. Invoked when the read handle indicates an end-of-file (EOF)
condition. If there is any data in the buffer still to be processed, the
C<on_read> event will be invoked first, before this one.

=head2 on_write_eof

Optional. Invoked when the write handle indicates an end-of-file (EOF)
condition. Note that this condition can only be detected after a C<write>
syscall returns the C<EPIPE> error. If there is no data pending to be written
then it will not be detected yet.

=head2 on_read_error $errno

Optional. Invoked when the C<sysread> method on the read handle fails.

=head2 on_write_error $errno

Optional. Invoked when the C<syswrite> method on the write handle fails.

The C<on_read_error> and C<on_write_error> handlers are passed the value of
C<$!> at the time the error occured. (The C<$!> variable itself, by its
nature, may have changed from the original error by the time this handler
runs so it should always use the value passed in).

If an error occurs when the corresponding error callback is not supplied, and
there is not a handler for it, then the C<close> method is called instead.

=head2 on_read_high_watermark $length

=head2 on_read_low_watermark $length

Optional. Invoked when the read buffer grows larger than the high watermark
or smaller than the low watermark respectively. These are edge-triggered
events; they will only be triggered once per crossing, not continuously while
the buffer remains above or below the given limit.

If these event handlers are not defined, the default behaviour is to disable
read-ready notifications if the read buffer grows larger than the high
watermark (so as to avoid it growing arbitrarily if nothing is consuming it),
and re-enable notifications again once something has read enough to cause it to
drop. If these events are overridden, the overriding code will have to perform
this behaviour if required, by using

 $self->want_readready_for_read(...)

=head2 on_outgoing_empty

Optional. Invoked when the writing data buffer becomes empty.

=head2 on_writeable_start

=head2 on_writeable_stop

Optional. These two events inform when the filehandle becomes writeable, and
when it stops being writeable. C<on_writeable_start> is invoked by the
C<on_write_ready> event if previously it was known to be not writeable.
C<on_writeable_stop> is invoked after a C<syswrite> operation fails with
C<EAGAIN> or C<EWOULDBLOCK>. These two events track the writeability state,
and ensure that only state change cause events to be invoked. A stream starts
off being presumed writeable, so the first of these events to be observed will
be C<on_writeable_stop>.

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN


   $self->{want} = WANT_READ_FOR_READ;

   $self->{close_on_read_eof} = 1;
}

=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=head2 read_handle => IO

The IO handle to read from. Must implement C<fileno> and C<sysread> methods.

=head2 write_handle => IO

The IO handle to write to. Must implement C<fileno> and C<syswrite> methods.

=head2 handle => IO

Shortcut to specifying the same IO handle for both of the above.

=head2 on_read => CODE

=head2 on_read_error => CODE

=head2 on_outgoing_empty => CODE

=head2 on_write_error => CODE

=head2 on_writeable_start => CODE

=head2 on_writeable_stop => CODE

CODE references for event handlers.

=head2 autoflush => BOOL

Optional. If true, the C<write> method will attempt to write data to the
operating system immediately, without waiting for the loop to indicate the
filehandle is write-ready. This is useful, for example, on streams that should
contain up-to-date logging or console information.

It currently defaults to false for any file handle, but future versions of
L<IO::Async> may enable this by default on STDOUT and STDERR.

=head2 read_len => INT

Optional. Sets the buffer size for C<read> calls. Defaults to 8 KiBytes.

=head2 read_all => BOOL

Optional. If true, attempt to read as much data from the kernel as possible
when the handle becomes readable. By default this is turned off, meaning at
most one fixed-size buffer is read. If there is still more data in the
kernel's buffer, the handle will still be readable, and will be read from
again.

This behaviour allows multiple streams and sockets to be multiplexed
simultaneously, meaning that a large bulk transfer on one cannot starve other
filehandles of processing time. Turning this option on may improve bulk data
transfer rate, at the risk of delaying or stalling processing on other
filehandles.

=head2 write_len => INT

Optional. Sets the buffer size for C<write> calls. Defaults to 8 KiBytes.

=head2 write_all => BOOL

Optional. Analogous to the C<read_all> option, but for writing. When
C<autoflush> is enabled, this option only affects deferred writing if the
initial attempt failed due to buffer space.

=head2 read_high_watermark => INT

=head2 read_low_watermark => INT

Optional. If defined, gives a way to implement flow control or other
behaviours that depend on the size of Stream's read buffer.

If after more data is read from the underlying filehandle the read buffer is
now larger than the high watermark, the C<on_read_high_watermark> event is
triggered (which, by default, will disable read-ready notifications and pause
reading from the filehandle).

If after data is consumed by an C<on_read> handler the read buffer is now
smaller than the low watermark, the C<on_read_low_watermark> event is
triggered (which, by default, will re-enable read-ready notifications and
resume reading from the filehandle). For to be possible, the read handler
would have to be one added by the C<push_on_read> method or one of the
Future-returning C<read_*> methods.

By default these options are not defined, so this behaviour will not happen.
C<read_low_watermark> may not be set to a larger value than
C<read_high_watermark>, but it may be set to a smaller value, creating a
hysteresis region. If either option is defined then both must be.

If these options are used with the default event handlers, be careful not to
cause deadlocks by having a high watermark sufficiently low that a single
C<on_read> invocation might not consider it finished yet.

=head2 reader => STRING|CODE

=head2 writer => STRING|CODE

Optional. If defined, gives the name of a method or a CODE reference to use
to implement the actual reading from or writing to the filehandle. These will
be invoked as

 $stream->reader( $read_handle, $buffer, $len )
 $stream->writer( $write_handle, $buffer, $len )

Each is expected to modify the passed buffer; C<reader> by appending to it,
C<writer> by removing a prefix from it. Each is expected to return a true
value on success, zero on EOF, or C<undef> with C<$!> set for errors. If not
provided, they will be substituted by implenentations using C<sysread> and
C<syswrite> on the underlying handle, respectively.

=head2 close_on_read_eof => BOOL

Optional. Usually true, but if set to a false value then the stream will not

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN

supplied at any time, and they will be used in first-in first-out (FIFO)
order.

As with the main C<on_read> event handler, each can return a (defined) boolean
to indicate if they wish to be invoked again or not, another C<CODE> reference
to replace themself with, or C<undef> to indicate it is now complete and
should be removed. When a temporary handler returns C<undef> it is shifted
from the queue and the next one, if present, is invoked instead. If there are
no more then the object's main handler is invoked instead.

=cut

sub push_on_read
{
   my $self = shift;
   my ( $on_read, %args ) = @_;
   # %args undocumented for internal use

   push @{ $self->{readqueue} }, Reader( $on_read, $args{future} );

   # TODO: Should this always defer?
   return if $self->{flushing_read};
   1 while length $self->{readbuff} and $self->_flush_one_read( 0 );
}

=head1 FUTURE-RETURNING READ METHODS

The following methods all return a L<Future> which will become ready when
enough data has been read by the Stream into its buffer. At this point, the
data is removed from the buffer and given to the C<Future> object to complete
it.

 my $f = $stream->read_...

 my ( $string ) = $f->get;

Unlike the C<on_read> event handlers, these methods don't allow for access to
"partial" results; they only provide the final result once it is ready.

If a C<Future> is cancelled before it completes it is removed from the read
queue without consuming any data; i.e. each C<Future> atomically either
completes or is cancelled.

Since it is possible to use a readable C<Stream> entirely using these
C<Future>-returning methods instead of the C<on_read> event, it may be useful
to configure a trivial return-false event handler to keep it from consuming
any input, and to allow it to be added to a C<Loop> in the first place.

 my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... );
 $loop->add( $stream );

 my $f = $stream->read_...

If a read EOF or error condition happens while there are read C<Future>s
pending, they are all completed. In the case of a read EOF, they are done with
C<undef>; in the case of a read error they are failed using the C<$!> error
value as the failure.

 $f->fail( $message, sysread => $! )

If a read EOF condition happens to the currently-processing read C<Future>, it
will return a partial result. The calling code can detect this by the fact
that the returned data is not complete according to the specification (too
short in C<read_exactly>'s case, or lacking the ending pattern in
C<read_until>'s case). Additionally, each C<Future> will yield the C<$eof>
value in its results.

 my ( $string, $eof ) = $f->get;

=cut

sub _read_future
{
   my $self = shift;
   my $f = $self->loop->new_future;
   $f->on_cancel( $self->_capture_weakself( sub {
      my $self = shift or return;
      1 while $self->_flush_one_read;
   }));
   return $f;
}

=head2 read_atmost

=head2 read_exactly

   ( $string, $eof ) = $stream->read_atmost( $len )->get

   ( $string, $eof ) = $stream->read_exactly( $len )->get

Completes the C<Future> when the read buffer contains C<$len> or more
characters of input. C<read_atmost> will also complete after the first
invocation of C<on_read>, even if fewer characters are available, whereas
C<read_exactly> will wait until at least C<$len> are available.

=cut

sub read_atmost
{
   my $self = shift;
   my ( $len ) = @_;

   my $f = $self->_read_future;
   $self->push_on_read( sub {
      my ( undef, $buffref, $eof ) = @_;
      return undef if $f->is_cancelled;
      $f->done( substr( $$buffref, 0, $len, "" ), $eof );
      return undef;
   }, future => $f );
   return $f;
}

sub read_exactly
{
   my $self = shift;
   my ( $len ) = @_;

   my $f = $self->_read_future;
   $self->push_on_read( sub {
      my ( undef, $buffref, $eof ) = @_;
      return undef if $f->is_cancelled;



( run in 0.803 second using v1.01-cache-2.11-cpan-d7f47b0818f )