Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# (C) Paul Evans, 2006-2015 -- leonerd@leonerd.org.uk
package IO::Async::Stream;
use strict;
use warnings;
our $VERSION = '0.70';
use base qw( IO::Async::Handle );
use Errno qw( EAGAIN EWOULDBLOCK EINTR EPIPE );
use Carp;
use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL );
use Scalar::Util qw( blessed );
use IO::Async::Debug;
# Tuneable from outside
# Not yet documented
our $READLEN = 8192;
our $WRITELEN = 8192;
use Struct::Dumb;
# Element of the writequeue
struct Writer => [qw( data writelen on_write on_flush on_error watching )];
# Element of the readqueue
struct Reader => [qw( on_read future )];
# Bitfields in the want flags
use constant WANT_READ_FOR_READ => 0x01;
use constant WANT_READ_FOR_WRITE => 0x02;
use constant WANT_WRITE_FOR_READ => 0x04;
use constant WANT_WRITE_FOR_WRITE => 0x08;
use constant WANT_ANY_READ => WANT_READ_FOR_READ |WANT_READ_FOR_WRITE;
use constant WANT_ANY_WRITE => WANT_WRITE_FOR_READ|WANT_WRITE_FOR_WRITE;
=head1 NAME
C<IO::Async::Stream> - event callbacks and write bufering for a stream
filehandle
=head1 SYNOPSIS
use IO::Async::Stream;
use IO::Async::Loop;
my $loop = IO::Async::Loop->new;
my $stream = IO::Async::Stream->new(
read_handle => \*STDIN,
write_handle => \*STDOUT,
on_read => sub {
my ( $self, $buffref, $eof ) = @_;
while( $$buffref =~ s/^(.*\n)// ) {
print "Received a line $1";
}
if( $eof ) {
print "EOF; last partial line is $$buffref\n";
}
return 0;
}
);
$loop->add( $stream );
$stream->write( "An initial line here\n" );
=head1 DESCRIPTION
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
performing this write. Invoked as for the C<Stream>'s C<on_write_error> event.
$on_error->( $stream, $errno )
=back
If the object is not yet a member of a loop and doesn't yet have a
C<write_handle>, then calls to the C<write> method will simply queue the data
and return. It will be flushed when the object is added to the loop.
If C<$data> is a defined but empty string, the write is still queued, and the
C<on_flush> continuation will be invoked, if supplied. This can be used to
obtain a marker, to invoke some code once the output queue has been flushed up
to this point.
=head2 write (scalar)
$stream->write( ... )->get
If called in non-void context, this method returns a L<Future> which will
complete (with no value) when the write operation has been flushed. This may
be used as an alternative to, or combined with, the C<on_flush> callback.
=cut
sub _syswrite
{
my $self = shift;
my ( $handle, undef, $len ) = @_;
my $written = $handle->syswrite( $_[1], $len );
return $written if !$written; # zero or undef
substr( $_[1], 0, $written ) = "";
return $written;
}
sub _flush_one_write
{
my $self = shift;
my $writequeue = $self->{writequeue};
my $head;
while( $head = $writequeue->[0] and ref $head->data ) {
if( ref $head->data eq "CODE" ) {
my $data = $head->data->( $self );
if( !defined $data ) {
$head->on_flush->( $self ) if $head->on_flush;
shift @$writequeue;
return 1;
}
if( !ref $data and my $encoding = $self->{encoding} ) {
$data = $encoding->encode( $data );
}
unshift @$writequeue, my $new = Writer(
$data, $head->writelen, $head->on_write, undef, undef, 0
);
next;
}
elsif( blessed $head->data and $head->data->isa( "Future" ) ) {
my $f = $head->data;
if( !$f->is_ready ) {
return 0 if $head->watching;
$f->on_ready( sub { $self->_flush_one_write } );
$head->watching++;
return 0;
}
my $data = $f->get;
if( !ref $data and my $encoding = $self->{encoding} ) {
$data = $encoding->encode( $data );
}
$head->data = $data;
next;
}
else {
die "Unsure what to do with reference ".ref($head->data)." in write queue";
}
}
my $second;
while( $second = $writequeue->[1] and
!ref $second->data and
$head->writelen == $second->writelen and
!$head->on_write and !$second->on_write and
!$head->on_flush ) {
$head->data .= $second->data;
$head->on_write = $second->on_write;
$head->on_flush = $second->on_flush;
splice @$writequeue, 1, 1, ();
}
die "TODO: head data does not contain a plain string" if ref $head->data;
if( $IO::Async::Debug::DEBUG > 1 ) {
my $data = substr $head->data, 0, $head->writelen;
$self->debug_printf( "WRITE len=%d", length $data );
IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw};
}
my $writer = $self->{writer};
my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen );
if( !defined $len ) {
my $errno = $!;
if( $errno == EAGAIN or $errno == EWOULDBLOCK ) {
$self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable};
$self->{writeable} = 0;
}
return 0 if _nonfatal_error( $errno );
if( $errno == EPIPE ) {
$self->{write_eof} = 1;
$self->maybe_invoke_event( on_write_eof => );
}
$head->on_error->( $self, $errno ) if $head->on_error;
$self->maybe_invoke_event( on_write_error => $errno )
or $self->close_now;
( run in 3.044 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )