Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN

principle this undocumented feature could be subject to change, in practice I
believe it to be reasonably stable.

This note applies only to the C<on_read> event; data written using the
C<write> method does not rely on any undocumented features of C<Encode>.

If a read handle is given, it is required that either an C<on_read> callback
reference is configured, or that the object provides an C<on_read> method. It
is optional whether either is true for C<on_outgoing_empty>; if neither is
supplied then no action will be taken when the writing buffer becomes empty.

An C<on_read> handler may be supplied even if no read handle is yet given, to
be used when a read handle is eventually provided by the C<set_handles>
method.

This condition is checked at the time the object is added to a Loop; it is
allowed to create a C<IO::Async::Stream> object with a read handle but without
a C<on_read> handler, provided that one is later given using C<configure>
before the stream is added to its containing Loop, either directly or by being
a child of another Notifier already in a Loop, or added to one.

=cut

sub configure
{
   my $self = shift;
   my %params = @_;

   for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error
            on_write_error on_writeable_start on_writeable_stop autoflush
            read_len read_all write_len write_all on_read_high_watermark
            on_read_low_watermark reader writer close_on_read_eof )) {
      $self->{$_} = delete $params{$_} if exists $params{$_};
   }

   if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) {
      my $high = delete $params{read_high_watermark};
      defined $high or $high = $self->{read_high_watermark};

      my $low  = delete $params{read_low_watermark};
      defined $low  or $low  = $self->{read_low_watermark};

      croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high;
      croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low;

      croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high;

      $self->{read_high_watermark} = $high;
      $self->{read_low_watermark}  = $low;

      # TODO: reassert levels if we've moved them
   }

   if( exists $params{encoding} ) {
      my $encoding = delete $params{encoding};
      my $obj = find_encoding( $encoding );
      defined $obj or croak "Cannot handle an encoding of '$encoding'";
      $self->{encoding} = $obj;
   }

   $self->SUPER::configure( %params );

   if( $self->loop and $self->read_handle ) {
      $self->can_event( "on_read" ) or
         croak 'Expected either an on_read callback or to be able to ->on_read';
   }
}

sub _add_to_loop
{
   my $self = shift;

   if( defined $self->read_handle ) {
      $self->can_event( "on_read" ) or
         croak 'Expected either an on_read callback or to be able to ->on_read';
   }

   $self->SUPER::_add_to_loop( @_ );

   if( !$self->_is_empty ) {
      $self->want_writeready_for_write( 1 );
   }
}

=head1 METHODS

The following methods documented with a trailing call to C<< ->get >> return
L<Future> instances.

=cut

=head2 want_readready_for_read

=head2 want_readready_for_write

   $stream->want_readready_for_read( $set )

   $stream->want_readready_for_write( $set )

Mutators for the C<want_readready> property on L<IO::Async::Handle>, which
control whether the C<read> or C<write> behaviour should be continued once the
filehandle becomes ready for read.

Normally, C<want_readready_for_read> is always true (though the read watermark
behaviour can modify it), and C<want_readready_for_write> is not used.
However, if a custom C<writer> function is provided, it may find this useful
for being invoked again if it cannot proceed with a write operation until the
filehandle becomes readable (such as during transport negotiation or SSL key
management, for example).

=cut

sub want_readready_for_read
{
   my $self = shift;
   my ( $set ) = @_;
   $set ? ( $self->{want} |= WANT_READ_FOR_READ ) : ( $self->{want} &= ~WANT_READ_FOR_READ );

   $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
}

sub want_readready_for_write
{
   my $self = shift;
   my ( $set ) = @_;
   $set ? ( $self->{want} |= WANT_READ_FOR_WRITE ) : ( $self->{want} &= ~WANT_READ_FOR_WRITE );

   $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle;
}

=head2 want_writeready_for_read

=head2 want_writeready_for_write

   $stream->want_writeready_for_write( $set )

   $stream->want_writeready_for_read( $set )

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN

   my $self = shift;
   my ( $set ) = @_;
   $set ? ( $self->{want} |= WANT_WRITE_FOR_READ ) : ( $self->{want} &= ~WANT_WRITE_FOR_READ );

   $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle;
}

# FUNCTION not method
sub _nonfatal_error
{
   my ( $errno ) = @_;

   return $errno == EAGAIN ||
          $errno == EWOULDBLOCK ||
          $errno == EINTR;
}

sub _is_empty
{
   my $self = shift;
   return !@{ $self->{writequeue} };
}

=head2 close

   $stream->close

A synonym for C<close_when_empty>. This should not be used when the deferred
wait behaviour is required, as the behaviour of C<close> may change in a
future version of L<IO::Async>. Instead, call C<close_when_empty> directly.

=cut

sub close
{
   my $self = shift;
   $self->close_when_empty;
}

=head2 close_when_empty

   $stream->close_when_empty

If the write buffer is empty, this method calls C<close> on the underlying IO
handles, and removes the stream from its containing loop. If the write buffer
still contains data, then this is deferred until the buffer is empty. This is
intended for "write-then-close" one-shot streams.

 $stream->write( "Here is my final data\n" );
 $stream->close_when_empty;

Because of this deferred nature, it may not be suitable for error handling.
See instead the C<close_now> method.

=cut

sub close_when_empty
{
   my $self = shift;

   return $self->SUPER::close if $self->_is_empty;

   $self->{stream_closing} = 1;
}

=head2 close_now

   $stream->close_now

This method immediately closes the underlying IO handles and removes the
stream from the containing loop. It will not wait to flush the remaining data
in the write buffer.

=cut

sub close_now
{
   my $self = shift;

   foreach ( @{ $self->{writequeue} } ) {
       $_->on_error->( "stream closing" ) if $_->on_error;
   }

   undef @{ $self->{writequeue} };
   undef $self->{stream_closing};

   $self->SUPER::close;
}

=head2 is_read_eof

=head2 is_write_eof

   $eof = $stream->is_read_eof

   $eof = $stream->is_write_eof

Returns true after an EOF condition is reported on either the read or the
write handle, respectively.

=cut

sub is_read_eof
{
   my $self = shift;
   return $self->{read_eof};
}

sub is_write_eof
{
   my $self = shift;
   return $self->{write_eof};
}

=head2 write

   $stream->write( $data, %params )

This method adds data to the outgoing data queue, or writes it immediately,
according to the C<autoflush> parameter.

If the C<autoflush> option is set, this method will try immediately to write
the data to the underlying filehandle. If this completes successfully then it
will have been written by the time this method returns. If it fails to write
completely, then the data is queued as if C<autoflush> were not set, and will
be flushed as normal.

C<$data> can either be a plain string, a L<Future>, or a CODE reference. If it
is a plain string it is written immediately. If it is not, its value will be
used to generate more C<$data> values, eventually leading to strings to be
written.

If C<$data> is a C<Future>, the Stream will wait until it is ready, and take
the single value it yields.

If C<$data> is a CODE reference, it will be repeatedly invoked to generate new
values. Each time the filehandle is ready to write more data to it, the
function is invoked. Once the function has finished generating data it should
return undef. The function is passed the Stream object as its first argument.

It is allowed that C<Future>s yield CODE references, or CODE references return
C<Future>s, as well as plain strings.

For example, to stream the contents of an existing opened filehandle:

 open my $fileh, "<", $path or die "Cannot open $path - $!";

local/lib/perl5/IO/Async/Stream.pm  view on Meta::CPAN


Completes the C<Future> when the stream is eventually closed at EOF, and
yields all of the data that was available.

=cut

sub read_until_eof
{
   my $self = shift;

   my $f = $self->_read_future;
   $self->push_on_read( sub {
      my ( undef, $buffref, $eof ) = @_;
      return undef if $f->is_cancelled;
      return 0 unless $eof;
      $f->done( $$buffref, $eof ); $$buffref = "";
      return undef;
   }, future => $f );
   return $f;
}

=head1 UTILITY CONSTRUCTORS

=cut

=head2 new_for_stdin

=head2 new_for_stdout

=head2 new_for_stdio

   $stream = IO::Async::Stream->new_for_stdin

   $stream = IO::Async::Stream->new_for_stdout

   $stream = IO::Async::Stream->new_for_stdio

Return a C<IO::Async::Stream> object preconfigured with the correct
C<read_handle>, C<write_handle> or both.

=cut

sub new_for_stdin  { shift->new( read_handle  => \*STDIN, @_ ) }
sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) }

sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) }

=head2 connect

   $future = $stream->connect( %args )

A convenient wrapper for calling the C<connect> method on the underlying
L<IO::Async::Loop> object, passing the C<socktype> hint as C<stream> if not
otherwise supplied.

=cut

sub connect
{
   my $self = shift;
   return $self->SUPER::connect( socktype => "stream", @_ );
}

=head1 DEBUGGING FLAGS

The following flags in C<IO_ASYNC_DEBUG_FLAGS> enable extra logging:

=over 4

=item C<Sr>

Log byte buffers as data is read from a Stream

=item C<Sw>

Log byte buffers as data is written to a Stream

=back

=cut

=head1 EXAMPLES

=head2 A line-based C<on_read> method

The following C<on_read> method accepts incoming C<\n>-terminated lines and
prints them to the program's C<STDOUT> stream.

 sub on_read
 {
    my $self = shift;
    my ( $buffref, $eof ) = @_;

    while( $$buffref =~ s/^(.*\n)// ) {
       print "Received a line: $1";
    }

    return 0;
 }

Because a reference to the buffer itself is passed, it is simple to use a
C<s///> regular expression on the scalar it points at, to both check if data
is ready (i.e. a whole line), and to remove it from the buffer. If no data is
available then C<0> is returned, to indicate it should not be tried again. If
a line was successfully extracted, then C<1> is returned, to indicate it
should try again in case more lines exist in the buffer.

=head2 Reading binary data

This C<on_read> method accepts incoming records in 16-byte chunks, printing
each one.

 sub on_read
 {
    my ( $self, $buffref, $eof ) = @_;

    if( length $$buffref >= 16 ) {
       my $record = substr( $$buffref, 0, 16, "" );
       print "Received a 16-byte record: $record\n";

       return 1;



( run in 1.067 second using v1.01-cache-2.11-cpan-8f98c5d2c55 )