Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
print "Received a line $1";
}
if( $eof ) {
print "EOF; last partial line is $$buffref\n";
}
return 0;
}
);
$loop->add( $stream );
$stream->write( "An initial line here\n" );
=head1 DESCRIPTION
This subclass of L<IO::Async::Handle> contains a filehandle that represents
a byte-stream. It provides buffering for both incoming and outgoing data. It
invokes the C<on_read> handler when new data is read from the filehandle. Data
may be written to the filehandle by calling the C<write> method.
This class is suitable for any kind of filehandle that provides a
possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
C<SOCK_STREAM> socket (such as TCP or a byte-oriented UNIX local socket). For
datagram or raw message-based sockets (such as UDP) see instead
L<IO::Async::Socket>.
=cut
=head1 EVENTS
The following events are invoked, either using subclass methods or CODE
references in parameters:
=head2 $ret = on_read \$buffer, $eof
Invoked when more data is available in the internal receiving buffer.
The first argument is a reference to a plain perl string. The code should
inspect and remove any data it likes, but is not required to remove all, or
indeed any of the data. Any data remaining in the buffer will be preserved for
the next call, the next time more data is received from the handle.
In this way, it is easy to implement code that reads records of some form when
completed, but ignores partially-received records, until all the data is
present. If the handler is confident no more useful data remains, it should
return C<0>. If not, it should return C<1>, and the handler will be called
again. This makes it easy to implement code that handles multiple incoming
records at the same time. See the examples at the end of this documentation
for more detail.
The second argument is a scalar indicating whether the stream has reported an
end-of-file (EOF) condition. A reference to the buffer is passed to the
handler in the usual way, so it may inspect data contained in it. Once the
handler returns a false value, it will not be called again, as the handle is
now at EOF and no more data can arrive.
The C<on_read> code may also dynamically replace itself with a new callback
by returning a CODE reference instead of C<0> or C<1>. The original callback
or method that the object first started with may be restored by returning
C<undef>. Whenever the callback is changed in this way, the new code is called
again; even if the read buffer is currently empty. See the examples at the end
of this documentation for more detail.
The C<push_on_read> method can be used to insert new, temporary handlers that
take precedence over the global C<on_read> handler. This event is only used if
there are no further pending handlers created by C<push_on_read>.
=head2 on_read_eof
Optional. Invoked when the read handle indicates an end-of-file (EOF)
condition. If there is any data in the buffer still to be processed, the
C<on_read> event will be invoked first, before this one.
=head2 on_write_eof
Optional. Invoked when the write handle indicates an end-of-file (EOF)
condition. Note that this condition can only be detected after a C<write>
syscall returns the C<EPIPE> error. If there is no data pending to be written
then it will not be detected yet.
=head2 on_read_error $errno
Optional. Invoked when the C<sysread> method on the read handle fails.
=head2 on_write_error $errno
Optional. Invoked when the C<syswrite> method on the write handle fails.
The C<on_read_error> and C<on_write_error> handlers are passed the value of
C<$!> at the time the error occured. (The C<$!> variable itself, by its
nature, may have changed from the original error by the time this handler
runs so it should always use the value passed in).
If an error occurs when the corresponding error callback is not supplied, and
there is not a handler for it, then the C<close> method is called instead.
=head2 on_read_high_watermark $length
=head2 on_read_low_watermark $length
Optional. Invoked when the read buffer grows larger than the high watermark
or smaller than the low watermark respectively. These are edge-triggered
events; they will only be triggered once per crossing, not continuously while
the buffer remains above or below the given limit.
If these event handlers are not defined, the default behaviour is to disable
read-ready notifications if the read buffer grows larger than the high
watermark (so as to avoid it growing arbitrarily if nothing is consuming it),
and re-enable notifications again once something has read enough to cause it to
drop. If these events are overridden, the overriding code will have to perform
this behaviour if required, by using
$self->want_readready_for_read(...)
=head2 on_outgoing_empty
Optional. Invoked when the writing data buffer becomes empty.
=head2 on_writeable_start
=head2 on_writeable_stop
Optional. These two events inform when the filehandle becomes writeable, and
when it stops being writeable. C<on_writeable_start> is invoked by the
C<on_write_ready> event if previously it was known to be not writeable.
C<on_writeable_stop> is invoked after a C<syswrite> operation fails with
C<EAGAIN> or C<EWOULDBLOCK>. These two events track the writeability state,
and ensure that only state change cause events to be invoked. A stream starts
off being presumed writeable, so the first of these events to be observed will
be C<on_writeable_stop>.
=cut
sub _init
{
my $self = shift;
$self->{writequeue} = []; # Queue of Writers
$self->{readqueue} = []; # Queue of Readers
$self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN)
$self->{readbuff} = "";
$self->{reader} = "_sysread";
$self->{writer} = "_syswrite";
$self->{read_len} = $READLEN;
$self->{write_len} = $WRITELEN;
$self->{want} = WANT_READ_FOR_READ;
$self->{close_on_read_eof} = 1;
}
=head1 PARAMETERS
The following named parameters may be passed to C<new> or C<configure>:
=head2 read_handle => IO
The IO handle to read from. Must implement C<fileno> and C<sysread> methods.
=head2 write_handle => IO
The IO handle to write to. Must implement C<fileno> and C<syswrite> methods.
=head2 handle => IO
Shortcut to specifying the same IO handle for both of the above.
=head2 on_read => CODE
=head2 on_read_error => CODE
=head2 on_outgoing_empty => CODE
=head2 on_write_error => CODE
=head2 on_writeable_start => CODE
=head2 on_writeable_stop => CODE
CODE references for event handlers.
=head2 autoflush => BOOL
Optional. If true, the C<write> method will attempt to write data to the
operating system immediately, without waiting for the loop to indicate the
filehandle is write-ready. This is useful, for example, on streams that should
contain up-to-date logging or console information.
It currently defaults to false for any file handle, but future versions of
L<IO::Async> may enable this by default on STDOUT and STDERR.
=head2 read_len => INT
Optional. Sets the buffer size for C<read> calls. Defaults to 8 KiBytes.
=head2 read_all => BOOL
Optional. If true, attempt to read as much data from the kernel as possible
when the handle becomes readable. By default this is turned off, meaning at
most one fixed-size buffer is read. If there is still more data in the
kernel's buffer, the handle will still be readable, and will be read from
again.
This behaviour allows multiple streams and sockets to be multiplexed
simultaneously, meaning that a large bulk transfer on one cannot starve other
filehandles of processing time. Turning this option on may improve bulk data
transfer rate, at the risk of delaying or stalling processing on other
filehandles.
=head2 write_len => INT
Optional. Sets the buffer size for C<write> calls. Defaults to 8 KiBytes.
=head2 write_all => BOOL
Optional. Analogous to the C<read_all> option, but for writing. When
C<autoflush> is enabled, this option only affects deferred writing if the
initial attempt failed due to buffer space.
=head2 read_high_watermark => INT
=head2 read_low_watermark => INT
Optional. If defined, gives a way to implement flow control or other
behaviours that depend on the size of Stream's read buffer.
If after more data is read from the underlying filehandle the read buffer is
now larger than the high watermark, the C<on_read_high_watermark> event is
triggered (which, by default, will disable read-ready notifications and pause
reading from the filehandle).
If after data is consumed by an C<on_read> handler the read buffer is now
smaller than the low watermark, the C<on_read_low_watermark> event is
triggered (which, by default, will re-enable read-ready notifications and
resume reading from the filehandle). For to be possible, the read handler
would have to be one added by the C<push_on_read> method or one of the
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
$stream->reader( $read_handle, $buffer, $len )
$stream->writer( $write_handle, $buffer, $len )
Each is expected to modify the passed buffer; C<reader> by appending to it,
C<writer> by removing a prefix from it. Each is expected to return a true
value on success, zero on EOF, or C<undef> with C<$!> set for errors. If not
provided, they will be substituted by implenentations using C<sysread> and
C<syswrite> on the underlying handle, respectively.
=head2 close_on_read_eof => BOOL
Optional. Usually true, but if set to a false value then the stream will not
be C<close>d when an EOF condition occurs on read. This is normally not useful
as at that point the underlying stream filehandle is no longer useable, but it
may be useful for reading regular files, or interacting with TTY devices.
=head2 encoding => STRING
If supplied, sets the name of encoding of the underlying stream. If an
encoding is set, then the C<write> method will expect to receive Unicode
strings and encodes them into bytes, and incoming bytes will be decoded into
Unicode strings for the C<on_read> event.
If an encoding is not supplied then C<write> and C<on_read> will work in byte
strings.
I<IMPORTANT NOTE:> in order to handle reads of UTF-8 content or other
multibyte encodings, the code implementing the C<on_read> event uses a feature
of L<Encode>; the C<STOP_AT_PARTIAL> flag. While this flag has existed for a
while and is used by the C<:encoding> PerlIO layer itself for similar
purposes, the flag is not officially documented by the C<Encode> module. In
principle this undocumented feature could be subject to change, in practice I
believe it to be reasonably stable.
This note applies only to the C<on_read> event; data written using the
C<write> method does not rely on any undocumented features of C<Encode>.
If a read handle is given, it is required that either an C<on_read> callback
reference is configured, or that the object provides an C<on_read> method. It
is optional whether either is true for C<on_outgoing_empty>; if neither is
supplied then no action will be taken when the writing buffer becomes empty.
An C<on_read> handler may be supplied even if no read handle is yet given, to
be used when a read handle is eventually provided by the C<set_handles>
method.
This condition is checked at the time the object is added to a Loop; it is
allowed to create a C<IO::Async::Stream> object with a read handle but without
a C<on_read> handler, provided that one is later given using C<configure>
before the stream is added to its containing Loop, either directly or by being
a child of another Notifier already in a Loop, or added to one.
=cut
sub configure
{
my $self = shift;
my %params = @_;
for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error
on_write_error on_writeable_start on_writeable_stop autoflush
read_len read_all write_len write_all on_read_high_watermark
on_read_low_watermark reader writer close_on_read_eof )) {
$self->{$_} = delete $params{$_} if exists $params{$_};
}
if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) {
my $high = delete $params{read_high_watermark};
defined $high or $high = $self->{read_high_watermark};
my $low = delete $params{read_low_watermark};
defined $low or $low = $self->{read_low_watermark};
croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high;
croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low;
croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high;
$self->{read_high_watermark} = $high;
$self->{read_low_watermark} = $low;
# TODO: reassert levels if we've moved them
}
if( exists $params{encoding} ) {
my $encoding = delete $params{encoding};
my $obj = find_encoding( $encoding );
defined $obj or croak "Cannot handle an encoding of '$encoding'";
$self->{encoding} = $obj;
}
$self->SUPER::configure( %params );
if( $self->loop and $self->read_handle ) {
$self->can_event( "on_read" ) or
croak 'Expected either an on_read callback or to be able to ->on_read';
}
}
sub _add_to_loop
{
my $self = shift;
if( defined $self->read_handle ) {
$self->can_event( "on_read" ) or
croak 'Expected either an on_read callback or to be able to ->on_read';
}
$self->SUPER::_add_to_loop( @_ );
if( !$self->_is_empty ) {
$self->want_writeready_for_write( 1 );
}
}
=head1 METHODS
The following methods documented with a trailing call to C<< ->get >> return
L<Future> instances.
=cut
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
if( !ref $data and my $encoding = $self->{encoding} ) {
$data = $encoding->encode( $data );
}
my $on_write = delete $params{on_write};
my $on_flush = delete $params{on_flush};
my $on_error = delete $params{on_error};
my $f;
if( defined wantarray ) {
my $orig_on_flush = $on_flush;
my $orig_on_error = $on_error;
my $loop = $self->loop or
croak "Cannot ->write data returning a Future to a Stream not in a Loop";
$f = $loop->new_future;
$on_flush = sub {
$f->done;
$orig_on_flush->( @_ ) if $orig_on_flush;
};
$on_error = sub {
my $self = shift;
my ( $errno ) = @_;
$f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready;
$orig_on_error->( $self, @_ ) if $orig_on_error;
};
}
my $write_len = $params{write_len};
defined $write_len or $write_len = $self->{write_len};
push @{ $self->{writequeue} }, Writer(
$data, $write_len, $on_write, $on_flush, $on_error, 0
);
keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params );
return $f unless $handle;
if( $self->{autoflush} ) {
1 while !$self->_is_empty and $self->_flush_one_write;
if( $self->_is_empty ) {
$self->want_writeready_for_write( 0 );
return $f;
}
}
$self->want_writeready_for_write( 1 );
return $f;
}
sub on_write_ready
{
my $self = shift;
if( !$self->{writeable} ) {
$self->maybe_invoke_event( on_writeable_start => );
$self->{writeable} = 1;
}
$self->_do_write if $self->{want} & WANT_WRITE_FOR_WRITE;
$self->_do_read if $self->{want} & WANT_WRITE_FOR_READ;
}
sub _do_write
{
my $self = shift;
1 while !$self->_is_empty and $self->_flush_one_write and $self->{write_all};
# All data successfully flushed
if( $self->_is_empty ) {
$self->want_writeready_for_write( 0 );
$self->maybe_invoke_event( on_outgoing_empty => );
$self->close_now if $self->{stream_closing};
}
}
sub _flush_one_read
{
my $self = shift;
my ( $eof ) = @_;
local $self->{flushing_read} = 1;
my $readqueue = $self->{readqueue};
my $ret;
if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) {
$ret = $on_read->( $self, \$self->{readbuff}, $eof );
}
else {
$ret = $self->invoke_event( on_read => \$self->{readbuff}, $eof );
}
if( defined $self->{read_low_watermark} and $self->{at_read_high_watermark} and
length $self->{readbuff} < $self->{read_low_watermark} ) {
undef $self->{at_read_high_watermark};
$self->invoke_event( on_read_low_watermark => length $self->{readbuff} );
}
if( ref $ret eq "CODE" ) {
# Replace the top CODE, or add it if there was none
$readqueue->[0] = Reader( $ret, undef );
return 1;
}
elsif( @$readqueue and !defined $ret ) {
shift @$readqueue;
return 1;
}
else {
return $ret && ( length( $self->{readbuff} ) > 0 || $eof );
}
}
( run in 0.797 second using v1.01-cache-2.11-cpan-39bf76dae61 )