Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
my $stream = IO::Async::Stream->new(
read_handle => \*STDIN,
write_handle => \*STDOUT,
on_read => sub {
my ( $self, $buffref, $eof ) = @_;
while( $$buffref =~ s/^(.*\n)// ) {
print "Received a line $1";
}
if( $eof ) {
print "EOF; last partial line is $$buffref\n";
}
return 0;
}
);
$loop->add( $stream );
$stream->write( "An initial line here\n" );
=head1 DESCRIPTION
This subclass of L<IO::Async::Handle> contains a filehandle that represents
a byte-stream. It provides buffering for both incoming and outgoing data. It
invokes the C<on_read> handler when new data is read from the filehandle. Data
may be written to the filehandle by calling the C<write> method.
This class is suitable for any kind of filehandle that provides a
possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
C<SOCK_STREAM> socket (such as TCP or a byte-oriented UNIX local socket). For
datagram or raw message-based sockets (such as UDP) see instead
L<IO::Async::Socket>.
=cut
=head1 EVENTS
The following events are invoked, either using subclass methods or CODE
references in parameters:
=head2 $ret = on_read \$buffer, $eof
Invoked when more data is available in the internal receiving buffer.
The first argument is a reference to a plain perl string. The code should
inspect and remove any data it likes, but is not required to remove all, or
indeed any of the data. Any data remaining in the buffer will be preserved for
the next call, the next time more data is received from the handle.
In this way, it is easy to implement code that reads records of some form when
completed, but ignores partially-received records, until all the data is
present. If the handler is confident no more useful data remains, it should
return C<0>. If not, it should return C<1>, and the handler will be called
again. This makes it easy to implement code that handles multiple incoming
records at the same time. See the examples at the end of this documentation
for more detail.
The second argument is a scalar indicating whether the stream has reported an
end-of-file (EOF) condition. A reference to the buffer is passed to the
handler in the usual way, so it may inspect data contained in it. Once the
handler returns a false value, it will not be called again, as the handle is
now at EOF and no more data can arrive.
The C<on_read> code may also dynamically replace itself with a new callback
by returning a CODE reference instead of C<0> or C<1>. The original callback
or method that the object first started with may be restored by returning
C<undef>. Whenever the callback is changed in this way, the new code is called
again; even if the read buffer is currently empty. See the examples at the end
of this documentation for more detail.
The C<push_on_read> method can be used to insert new, temporary handlers that
take precedence over the global C<on_read> handler. This event is only used if
there are no further pending handlers created by C<push_on_read>.
=head2 on_read_eof
Optional. Invoked when the read handle indicates an end-of-file (EOF)
condition. If there is any data in the buffer still to be processed, the
C<on_read> event will be invoked first, before this one.
=head2 on_write_eof
Optional. Invoked when the write handle indicates an end-of-file (EOF)
condition. Note that this condition can only be detected after a C<write>
syscall returns the C<EPIPE> error. If there is no data pending to be written
then it will not be detected yet.
=head2 on_read_error $errno
Optional. Invoked when the C<sysread> method on the read handle fails.
=head2 on_write_error $errno
Optional. Invoked when the C<syswrite> method on the write handle fails.
The C<on_read_error> and C<on_write_error> handlers are passed the value of
C<$!> at the time the error occured. (The C<$!> variable itself, by its
nature, may have changed from the original error by the time this handler
runs so it should always use the value passed in).
If an error occurs when the corresponding error callback is not supplied, and
there is not a handler for it, then the C<close> method is called instead.
=head2 on_read_high_watermark $length
=head2 on_read_low_watermark $length
Optional. Invoked when the read buffer grows larger than the high watermark
or smaller than the low watermark respectively. These are edge-triggered
events; they will only be triggered once per crossing, not continuously while
the buffer remains above or below the given limit.
If these event handlers are not defined, the default behaviour is to disable
read-ready notifications if the read buffer grows larger than the high
watermark (so as to avoid it growing arbitrarily if nothing is consuming it),
and re-enable notifications again once something has read enough to cause it to
drop. If these events are overridden, the overriding code will have to perform
this behaviour if required, by using
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
complete (with no value) when the write operation has been flushed. This may
be used as an alternative to, or combined with, the C<on_flush> callback.
=cut
sub _syswrite
{
my $self = shift;
my ( $handle, undef, $len ) = @_;
my $written = $handle->syswrite( $_[1], $len );
return $written if !$written; # zero or undef
substr( $_[1], 0, $written ) = "";
return $written;
}
sub _flush_one_write
{
my $self = shift;
my $writequeue = $self->{writequeue};
my $head;
while( $head = $writequeue->[0] and ref $head->data ) {
if( ref $head->data eq "CODE" ) {
my $data = $head->data->( $self );
if( !defined $data ) {
$head->on_flush->( $self ) if $head->on_flush;
shift @$writequeue;
return 1;
}
if( !ref $data and my $encoding = $self->{encoding} ) {
$data = $encoding->encode( $data );
}
unshift @$writequeue, my $new = Writer(
$data, $head->writelen, $head->on_write, undef, undef, 0
);
next;
}
elsif( blessed $head->data and $head->data->isa( "Future" ) ) {
my $f = $head->data;
if( !$f->is_ready ) {
return 0 if $head->watching;
$f->on_ready( sub { $self->_flush_one_write } );
$head->watching++;
return 0;
}
my $data = $f->get;
if( !ref $data and my $encoding = $self->{encoding} ) {
$data = $encoding->encode( $data );
}
$head->data = $data;
next;
}
else {
die "Unsure what to do with reference ".ref($head->data)." in write queue";
}
}
my $second;
while( $second = $writequeue->[1] and
!ref $second->data and
$head->writelen == $second->writelen and
!$head->on_write and !$second->on_write and
!$head->on_flush ) {
$head->data .= $second->data;
$head->on_write = $second->on_write;
$head->on_flush = $second->on_flush;
splice @$writequeue, 1, 1, ();
}
die "TODO: head data does not contain a plain string" if ref $head->data;
if( $IO::Async::Debug::DEBUG > 1 ) {
my $data = substr $head->data, 0, $head->writelen;
$self->debug_printf( "WRITE len=%d", length $data );
IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw};
}
my $writer = $self->{writer};
my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen );
if( !defined $len ) {
my $errno = $!;
if( $errno == EAGAIN or $errno == EWOULDBLOCK ) {
$self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable};
$self->{writeable} = 0;
}
return 0 if _nonfatal_error( $errno );
if( $errno == EPIPE ) {
$self->{write_eof} = 1;
$self->maybe_invoke_event( on_write_eof => );
}
$head->on_error->( $self, $errno ) if $head->on_error;
$self->maybe_invoke_event( on_write_error => $errno )
or $self->close_now;
return 0;
}
if( my $on_write = $head->on_write ) {
$on_write->( $self, $len );
}
if( !length $head->data ) {
$head->on_flush->( $self ) if $head->on_flush;
shift @{ $self->{writequeue} };
}
return 1;
}
sub write
{
my $self = shift;
my ( $data, %params ) = @_;
carp "Cannot write data to a Stream that is closing" and return if $self->{stream_closing};
# Allow writes without a filehandle if we're not yet in a Loop, just don't
# try to flush them
my $handle = $self->write_handle;
croak "Cannot write data to a Stream with no write_handle" if !$handle and $self->loop;
( run in 0.867 second using v1.01-cache-2.11-cpan-39bf76dae61 )