Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Protocol/Stream.pm  view on Meta::CPAN

been configured with an C<on_read> callback handler.

=cut

=head1 EVENTS

The following events are invoked, either using subclass methods or CODE
references in parameters:

=head2 $ret = on_read \$buffer, $eof

=head2 on_read_eof

=head2 on_write_eof

The event handlers are invoked identically to L<IO::Async::Stream>.

=head2 on_closed

The C<on_closed> handler is optional, but if provided, will be invoked after
the stream is closed by either side (either because the C<close()> method has
been invoked on it, or on an incoming EOF).

=cut

=head1 PARAMETERS

The following named parameters may be passed to C<new> or C<configure>:

=head2 on_read => CODE

=head2 on_read_eof => CODE

=head2 on_write_eof => CODE

CODE references for the events.

=head2 handle => IO

A shortcut for the common case where the transport only needs to be a plain
L<IO::Async::Stream> object. If this argument is provided without a
C<transport> object, a new L<IO::Async::Stream> object will be built around
the given IO handle, and used as the transport.

=cut

sub configure
{
   my $self = shift;
   my %params = @_;

   for (qw( on_read on_read_eof on_write_eof )) {
      $self->{$_} = delete $params{$_} if exists $params{$_};
   }

   if( !exists $params{transport} and my $handle = delete $params{handle} ) {
      require IO::Async::Stream;
      $params{transport} = IO::Async::Stream->new( handle => $handle );
   }

   $self->SUPER::configure( %params );

   if( $self->loop ) {
      $self->can_event( "on_read" ) or
         croak 'Expected either an on_read callback or to be able to ->on_read';
   }
}

sub _add_to_loop
{
   my $self = shift;

   $self->can_event( "on_read" ) or
      croak 'Expected either an on_read callback or to be able to ->on_read';
}

sub setup_transport
{
   my $self = shift;
   my ( $transport ) = @_;

   $self->SUPER::setup_transport( $transport );

   $transport->configure( 
      on_read => $self->_replace_weakself( sub {
         my $self = shift or return;
         $self->invoke_event( on_read => @_ );
      } ),
      on_read_eof => $self->_replace_weakself( sub {
         my $self = shift or return;
         $self->maybe_invoke_event( on_read_eof => @_ );
      } ),
      on_write_eof => $self->_replace_weakself( sub {
         my $self = shift or return;
         $self->maybe_invoke_event( on_write_eof => @_ );
      } ),
   );
}

sub teardown_transport
{
   my $self = shift;
   my ( $transport ) = @_;

   $transport->configure(
      on_read => undef,
   );

   $self->SUPER::teardown_transport( $transport );
}

=head1 METHODS

=cut

=head2 write

   $protocol->write( $data )

Writes the given data by calling the C<write> method on the contained
transport stream.

=cut

sub write
{
   my $self = shift;
   my ( $data, %args ) = @_;

   if( ref $data eq "CODE" ) {
      $data = $self->_replace_weakself( $data );
   }

   if( $args{on_flush} ) {
      $args{on_flush} = $self->_replace_weakself( $args{on_flush} );
   }

   my $transport = $self->transport or croak "Attempted to ->write to a ".ref($self)." with no transport";
   $transport->write( $data, %args );
}

=head2 connect

   $protocol->connect( %args )

Sets up a connection to a peer, and configures the underlying C<transport> for
the Protocol. Calls L<IO::Async::Protocol> C<connect> with C<socktype> set to
C<"stream">.

=cut

sub connect
{
   my $self = shift;
   $self->SUPER::connect(
      @_,
      socktype => "stream",
   );
}

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;



( run in 0.450 second using v1.01-cache-2.11-cpan-fe3c2283af0 )