Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# (C) Paul Evans, 2006-2015 -- leonerd@leonerd.org.uk
package IO::Async::Stream;
use strict;
use warnings;
our $VERSION = '0.70';
use base qw( IO::Async::Handle );
use Errno qw( EAGAIN EWOULDBLOCK EINTR EPIPE );
use Carp;
use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL );
use Scalar::Util qw( blessed );
use IO::Async::Debug;
# Tuneable from outside
# Not yet documented
our $READLEN = 8192;
our $WRITELEN = 8192;
use Struct::Dumb;
# Element of the writequeue
struct Writer => [qw( data writelen on_write on_flush on_error watching )];
# Element of the readqueue
struct Reader => [qw( on_read future )];
# Bitfields in the want flags
use constant WANT_READ_FOR_READ => 0x01;
use constant WANT_READ_FOR_WRITE => 0x02;
use constant WANT_WRITE_FOR_READ => 0x04;
use constant WANT_WRITE_FOR_WRITE => 0x08;
use constant WANT_ANY_READ => WANT_READ_FOR_READ |WANT_READ_FOR_WRITE;
use constant WANT_ANY_WRITE => WANT_WRITE_FOR_READ|WANT_WRITE_FOR_WRITE;
=head1 NAME
C<IO::Async::Stream> - event callbacks and write bufering for a stream
filehandle
=head1 SYNOPSIS
use IO::Async::Stream;
use IO::Async::Loop;
my $loop = IO::Async::Loop->new;
my $stream = IO::Async::Stream->new(
read_handle => \*STDIN,
write_handle => \*STDOUT,
on_read => sub {
my ( $self, $buffref, $eof ) = @_;
while( $$buffref =~ s/^(.*\n)// ) {
print "Received a line $1";
}
if( $eof ) {
print "EOF; last partial line is $$buffref\n";
}
return 0;
}
);
$loop->add( $stream );
$stream->write( "An initial line here\n" );
=head1 DESCRIPTION
This subclass of L<IO::Async::Handle> contains a filehandle that represents
a byte-stream. It provides buffering for both incoming and outgoing data. It
invokes the C<on_read> handler when new data is read from the filehandle. Data
may be written to the filehandle by calling the C<write> method.
This class is suitable for any kind of filehandle that provides a
possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
C<SOCK_STREAM> socket (such as TCP or a byte-oriented UNIX local socket). For
datagram or raw message-based sockets (such as UDP) see instead
L<IO::Async::Socket>.
=cut
=head1 EVENTS
The following events are invoked, either using subclass methods or CODE
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
Optional. If defined, gives a way to implement flow control or other
behaviours that depend on the size of Stream's read buffer.
If after more data is read from the underlying filehandle the read buffer is
now larger than the high watermark, the C<on_read_high_watermark> event is
triggered (which, by default, will disable read-ready notifications and pause
reading from the filehandle).
If after data is consumed by an C<on_read> handler the read buffer is now
smaller than the low watermark, the C<on_read_low_watermark> event is
triggered (which, by default, will re-enable read-ready notifications and
resume reading from the filehandle). For to be possible, the read handler
would have to be one added by the C<push_on_read> method or one of the
Future-returning C<read_*> methods.
By default these options are not defined, so this behaviour will not happen.
C<read_low_watermark> may not be set to a larger value than
C<read_high_watermark>, but it may be set to a smaller value, creating a
hysteresis region. If either option is defined then both must be.
If these options are used with the default event handlers, be careful not to
cause deadlocks by having a high watermark sufficiently low that a single
C<on_read> invocation might not consider it finished yet.
=head2 reader => STRING|CODE
=head2 writer => STRING|CODE
Optional. If defined, gives the name of a method or a CODE reference to use
to implement the actual reading from or writing to the filehandle. These will
be invoked as
$stream->reader( $read_handle, $buffer, $len )
$stream->writer( $write_handle, $buffer, $len )
Each is expected to modify the passed buffer; C<reader> by appending to it,
C<writer> by removing a prefix from it. Each is expected to return a true
value on success, zero on EOF, or C<undef> with C<$!> set for errors. If not
provided, they will be substituted by implenentations using C<sysread> and
C<syswrite> on the underlying handle, respectively.
=head2 close_on_read_eof => BOOL
Optional. Usually true, but if set to a false value then the stream will not
be C<close>d when an EOF condition occurs on read. This is normally not useful
as at that point the underlying stream filehandle is no longer useable, but it
may be useful for reading regular files, or interacting with TTY devices.
=head2 encoding => STRING
If supplied, sets the name of encoding of the underlying stream. If an
encoding is set, then the C<write> method will expect to receive Unicode
strings and encodes them into bytes, and incoming bytes will be decoded into
Unicode strings for the C<on_read> event.
If an encoding is not supplied then C<write> and C<on_read> will work in byte
strings.
I<IMPORTANT NOTE:> in order to handle reads of UTF-8 content or other
multibyte encodings, the code implementing the C<on_read> event uses a feature
of L<Encode>; the C<STOP_AT_PARTIAL> flag. While this flag has existed for a
while and is used by the C<:encoding> PerlIO layer itself for similar
purposes, the flag is not officially documented by the C<Encode> module. In
principle this undocumented feature could be subject to change, in practice I
believe it to be reasonably stable.
This note applies only to the C<on_read> event; data written using the
C<write> method does not rely on any undocumented features of C<Encode>.
If a read handle is given, it is required that either an C<on_read> callback
reference is configured, or that the object provides an C<on_read> method. It
is optional whether either is true for C<on_outgoing_empty>; if neither is
supplied then no action will be taken when the writing buffer becomes empty.
An C<on_read> handler may be supplied even if no read handle is yet given, to
be used when a read handle is eventually provided by the C<set_handles>
method.
This condition is checked at the time the object is added to a Loop; it is
allowed to create a C<IO::Async::Stream> object with a read handle but without
a C<on_read> handler, provided that one is later given using C<configure>
before the stream is added to its containing Loop, either directly or by being
a child of another Notifier already in a Loop, or added to one.
=cut
sub configure
{
my $self = shift;
my %params = @_;
for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error
on_write_error on_writeable_start on_writeable_stop autoflush
read_len read_all write_len write_all on_read_high_watermark
on_read_low_watermark reader writer close_on_read_eof )) {
$self->{$_} = delete $params{$_} if exists $params{$_};
}
if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) {
my $high = delete $params{read_high_watermark};
defined $high or $high = $self->{read_high_watermark};
my $low = delete $params{read_low_watermark};
defined $low or $low = $self->{read_low_watermark};
croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high;
croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low;
croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high;
$self->{read_high_watermark} = $high;
$self->{read_low_watermark} = $low;
# TODO: reassert levels if we've moved them
}
if( exists $params{encoding} ) {
my $encoding = delete $params{encoding};
my $obj = find_encoding( $encoding );
defined $obj or croak "Cannot handle an encoding of '$encoding'";
$self->{encoding} = $obj;
}
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
sub read_until_eof
{
my $self = shift;
my $f = $self->_read_future;
$self->push_on_read( sub {
my ( undef, $buffref, $eof ) = @_;
return undef if $f->is_cancelled;
return 0 unless $eof;
$f->done( $$buffref, $eof ); $$buffref = "";
return undef;
}, future => $f );
return $f;
}
=head1 UTILITY CONSTRUCTORS
=cut
=head2 new_for_stdin
=head2 new_for_stdout
=head2 new_for_stdio
$stream = IO::Async::Stream->new_for_stdin
$stream = IO::Async::Stream->new_for_stdout
$stream = IO::Async::Stream->new_for_stdio
Return a C<IO::Async::Stream> object preconfigured with the correct
C<read_handle>, C<write_handle> or both.
=cut
sub new_for_stdin { shift->new( read_handle => \*STDIN, @_ ) }
sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) }
sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) }
=head2 connect
$future = $stream->connect( %args )
A convenient wrapper for calling the C<connect> method on the underlying
L<IO::Async::Loop> object, passing the C<socktype> hint as C<stream> if not
otherwise supplied.
=cut
sub connect
{
my $self = shift;
return $self->SUPER::connect( socktype => "stream", @_ );
}
=head1 DEBUGGING FLAGS
The following flags in C<IO_ASYNC_DEBUG_FLAGS> enable extra logging:
=over 4
=item C<Sr>
Log byte buffers as data is read from a Stream
=item C<Sw>
Log byte buffers as data is written to a Stream
=back
=cut
=head1 EXAMPLES
=head2 A line-based C<on_read> method
The following C<on_read> method accepts incoming C<\n>-terminated lines and
prints them to the program's C<STDOUT> stream.
sub on_read
{
my $self = shift;
my ( $buffref, $eof ) = @_;
while( $$buffref =~ s/^(.*\n)// ) {
print "Received a line: $1";
}
return 0;
}
Because a reference to the buffer itself is passed, it is simple to use a
C<s///> regular expression on the scalar it points at, to both check if data
is ready (i.e. a whole line), and to remove it from the buffer. If no data is
available then C<0> is returned, to indicate it should not be tried again. If
a line was successfully extracted, then C<1> is returned, to indicate it
should try again in case more lines exist in the buffer.
=head2 Reading binary data
This C<on_read> method accepts incoming records in 16-byte chunks, printing
each one.
sub on_read
{
my ( $self, $buffref, $eof ) = @_;
if( length $$buffref >= 16 ) {
my $record = substr( $$buffref, 0, 16, "" );
print "Received a 16-byte record: $record\n";
return 1;
}
if( $eof and length $$buffref ) {
print "EOF: a partial record still exists\n";
}
( run in 1.199 second using v1.01-cache-2.11-cpan-140bd7fdf52 )