Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
while( $$buffref =~ s/^(.*\n)// ) {
print "Received a line $1";
}
if( $eof ) {
print "EOF; last partial line is $$buffref\n";
}
return 0;
}
);
$loop->add( $stream );
$stream->write( "An initial line here\n" );
=head1 DESCRIPTION
This subclass of L<IO::Async::Handle> contains a filehandle that represents
a byte-stream. It provides buffering for both incoming and outgoing data. It
invokes the C<on_read> handler when new data is read from the filehandle. Data
may be written to the filehandle by calling the C<write> method.
This class is suitable for any kind of filehandle that provides a
possibly-bidirectional reliable byte stream, such as a pipe, TTY, or
C<SOCK_STREAM> socket (such as TCP or a byte-oriented UNIX local socket). For
datagram or raw message-based sockets (such as UDP) see instead
L<IO::Async::Socket>.
=cut
=head1 EVENTS
The following events are invoked, either using subclass methods or CODE
references in parameters:
=head2 $ret = on_read \$buffer, $eof
Invoked when more data is available in the internal receiving buffer.
The first argument is a reference to a plain perl string. The code should
inspect and remove any data it likes, but is not required to remove all, or
indeed any of the data. Any data remaining in the buffer will be preserved for
the next call, the next time more data is received from the handle.
In this way, it is easy to implement code that reads records of some form when
completed, but ignores partially-received records, until all the data is
present. If the handler is confident no more useful data remains, it should
return C<0>. If not, it should return C<1>, and the handler will be called
again. This makes it easy to implement code that handles multiple incoming
records at the same time. See the examples at the end of this documentation
for more detail.
The second argument is a scalar indicating whether the stream has reported an
end-of-file (EOF) condition. A reference to the buffer is passed to the
handler in the usual way, so it may inspect data contained in it. Once the
handler returns a false value, it will not be called again, as the handle is
now at EOF and no more data can arrive.
The C<on_read> code may also dynamically replace itself with a new callback
by returning a CODE reference instead of C<0> or C<1>. The original callback
or method that the object first started with may be restored by returning
C<undef>. Whenever the callback is changed in this way, the new code is called
again; even if the read buffer is currently empty. See the examples at the end
of this documentation for more detail.
The C<push_on_read> method can be used to insert new, temporary handlers that
take precedence over the global C<on_read> handler. This event is only used if
there are no further pending handlers created by C<push_on_read>.
=head2 on_read_eof
Optional. Invoked when the read handle indicates an end-of-file (EOF)
condition. If there is any data in the buffer still to be processed, the
C<on_read> event will be invoked first, before this one.
=head2 on_write_eof
Optional. Invoked when the write handle indicates an end-of-file (EOF)
condition. Note that this condition can only be detected after a C<write>
syscall returns the C<EPIPE> error. If there is no data pending to be written
then it will not be detected yet.
=head2 on_read_error $errno
Optional. Invoked when the C<sysread> method on the read handle fails.
=head2 on_write_error $errno
Optional. Invoked when the C<syswrite> method on the write handle fails.
The C<on_read_error> and C<on_write_error> handlers are passed the value of
C<$!> at the time the error occured. (The C<$!> variable itself, by its
nature, may have changed from the original error by the time this handler
runs so it should always use the value passed in).
If an error occurs when the corresponding error callback is not supplied, and
there is not a handler for it, then the C<close> method is called instead.
=head2 on_read_high_watermark $length
=head2 on_read_low_watermark $length
Optional. Invoked when the read buffer grows larger than the high watermark
or smaller than the low watermark respectively. These are edge-triggered
events; they will only be triggered once per crossing, not continuously while
the buffer remains above or below the given limit.
If these event handlers are not defined, the default behaviour is to disable
read-ready notifications if the read buffer grows larger than the high
watermark (so as to avoid it growing arbitrarily if nothing is consuming it),
and re-enable notifications again once something has read enough to cause it to
drop. If these events are overridden, the overriding code will have to perform
this behaviour if required, by using
$self->want_readready_for_read(...)
=head2 on_outgoing_empty
Optional. Invoked when the writing data buffer becomes empty.
=head2 on_writeable_start
=head2 on_writeable_stop
Optional. These two events inform when the filehandle becomes writeable, and
when it stops being writeable. C<on_writeable_start> is invoked by the
C<on_write_ready> event if previously it was known to be not writeable.
C<on_writeable_stop> is invoked after a C<syswrite> operation fails with
C<EAGAIN> or C<EWOULDBLOCK>. These two events track the writeability state,
and ensure that only state change cause events to be invoked. A stream starts
off being presumed writeable, so the first of these events to be observed will
be C<on_writeable_stop>.
=cut
sub _init
{
my $self = shift;
$self->{writequeue} = []; # Queue of Writers
$self->{readqueue} = []; # Queue of Readers
$self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN)
$self->{readbuff} = "";
$self->{reader} = "_sysread";
$self->{writer} = "_syswrite";
$self->{read_len} = $READLEN;
$self->{write_len} = $WRITELEN;
$self->{want} = WANT_READ_FOR_READ;
$self->{close_on_read_eof} = 1;
local/lib/perl5/IO/Async/Stream.pm view on Meta::CPAN
The 4-argument form of C<substr()> extracts the 16-byte record from the buffer
and assigns it to the C<$record> variable, if there was enough data in the
buffer to extract it.
A lot of protocols use a fixed-size header, followed by a variable-sized body
of data, whose size is given by one of the fields of the header. The following
C<on_read> method extracts messages in such a protocol.
sub on_read
{
my ( $self, $buffref, $eof ) = @_;
return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes
my ( $len, $x, $y ) = unpack "N n n", $$buffref;
return 0 unless length $$buffref >= 8 + $len;
substr( $$buffref, 0, 8, "" );
my $data = substr( $$buffref, 0, $len, "" );
print "A record with values x=$x y=$y\n";
return 1;
}
In this example, the header is C<unpack()>ed first, to extract the body
length, and then the body is extracted. If the buffer does not have enough
data yet for a complete message then C<0> is returned, and the buffer is left
unmodified for next time. Only when there are enough bytes in total does it
use C<substr()> to remove them.
=head2 Dynamic replacement of C<on_read>
Consider the following protocol (inspired by IMAP), which consists of
C<\n>-terminated lines that may have an optional data block attached. The
presence of such a data block, as well as its size, is indicated by the line
prefix.
sub on_read
{
my $self = shift;
my ( $buffref, $eof ) = @_;
if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) {
my $length = $1;
my $line = $2;
return sub {
my $self = shift;
my ( $buffref, $eof ) = @_;
return 0 unless length $$buffref >= $length;
# Take and remove the data from the buffer
my $data = substr( $$buffref, 0, $length, "" );
print "Received a line $line with some data ($data)\n";
return undef; # Restore the original method
}
}
elsif( $$buffref =~ s/^LINE:(.*)\n// ) {
my $line = $1;
print "Received a line $line with no data\n";
return 1;
}
else {
print STDERR "Unrecognised input\n";
# Handle it somehow
}
}
In the case where trailing data is supplied, a new temporary C<on_read>
callback is provided in a closure. This closure captures the C<$length>
variable so it knows how much data to expect. It also captures the C<$line>
variable so it can use it in the event report. When this method has finished
reading the data, it reports the event, then restores the original method by
returning C<undef>.
=head1 SEE ALSO
=over 4
=item *
L<IO::Handle> - Supply object methods for I/O handles
=back
=head1 AUTHOR
Paul Evans <leonerd@leonerd.org.uk>
=cut
0x55AA;
( run in 1.263 second using v1.01-cache-2.11-cpan-39bf76dae61 )