Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Channel.pm  view on Meta::CPAN


sub _make_codec_Storable
{
   require Storable;

   return
      \&Storable::freeze,
      \&Storable::thaw;
}

sub _make_codec_Sereal
{
   require Sereal::Encoder;
   require Sereal::Decoder;

   my $encoder;
   my $decoder;

   # "thread safety" to Sereal::{Encoder,Decoder} means that the variables get
   # reset to undef in new threads. We should defend against that.

   return
      sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) },
      sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) };
}

=head2 send

   $channel->send( $data )

Pushes the data stored in the given Perl reference into the FIFO of the
Channel, where it can be received by the other end. When called on a
synchronous mode Channel this method may block if a C<write()> call on the
underlying filehandle blocks. When called on an asynchronous mode channel this
method will not block.

=cut

sub send
{
   my $self = shift;
   my ( $data ) = @_;

   $self->send_encoded( $self->{encode}->( $data ) );
}

=head2 send_encoded

   $channel->send_encoded( $record )

A variant of the C<send> method; this method pushes the byte record given.
This should be the result of a call to C<encode>.

=cut

sub send_encoded
{
   my $self = shift;
   my ( $record ) = @_;

   my $bytes = pack( "I", length $record ) . $record;

   defined $self->{mode} or die "Cannot ->send without being set up";

   return $self->_send_sync( $bytes )  if $self->{mode} eq "sync";
   return $self->_send_async( $bytes ) if $self->{mode} eq "async";
}

=head2 encode

   $record = $channel->encode( $data )

Takes a Perl reference and returns a serialised string that can be passed to
C<send_encoded>. The following two forms are equivalent

 $channel->send( $data )
 $channel->send_encoded( $channel->encode( $data ) )

This is provided for the use-case where data needs to be serialised into a
fixed string to "snapshot it" but not sent yet; the returned string can be
saved and sent at a later time.

   $record = IO::Async::Channel->encode( $data )

This can also be used as a class method, in case it is inconvenient to operate
on a particular object instance, or when one does not exist yet. In this case
it will encode using whatever is the default codec for C<IO::Async::Channel>.

=cut

sub encode
{
   my $self = shift;
   my ( $data ) = @_;

   return ref $self ?
      $self->{encode}->( $data ) :
      do { require Storable; Storable::freeze( $data ) };
}

=head2 send_frozen

   $channel->send_frozen( $record )

Legacy name for C<send_encoded>. This is no longer preferred as it expects
the data to be encoded using C<Storable>, which prevents (or at least makes
more awkward) the use of other codecs on a channel by default. This method
should not be used in new code and may be removed in a later version.

=cut

*send_frozen = \&send_encoded;

=head2 recv

   $data = $channel->recv

When called on a synchronous mode Channel this method will block until a Perl
reference value is available from the other end and then return it. If the
Channel is closed this method will return C<undef>. Since only references may
be passed and all Perl references are true the truth of the result of this

local/lib/perl5/IO/Async/Channel.pm  view on Meta::CPAN


 $on_recv->( $channel, $data )

=item on_eof => CODE

Called if the Channel was closed before a new value was ready. Will be passed
the Channel object.

 $on_eof->( $channel )

=back

=cut

sub recv
{
   my $self = shift;

   defined $self->{mode} or die "Cannot ->recv without being set up";

   return $self->_recv_sync( @_ )  if $self->{mode} eq "sync";
   return $self->_recv_async( @_ ) if $self->{mode} eq "async";
}

=head2 close

   $channel->close

Closes the channel. Causes a pending C<recv> on the other end to return undef
or the queued C<on_eof> callbacks to be invoked.

=cut

sub close
{
   my $self = shift;

   return $self->_close_sync  if $self->{mode} eq "sync";
   return $self->_close_async if $self->{mode} eq "async";
}

# Leave this undocumented for now
sub setup_sync_mode
{
   my $self = shift;
   ( $self->{fh} ) = @_;

   $self->{mode} = "sync";

   # Since we're communicating binary structures and not Unicode text we need to
   # enable binmode
   binmode $self->{fh};

   $self->{fh}->autoflush(1);
}

sub _read_exactly
{
   $_[1] = "";

   while( length $_[1] < $_[2] ) {
      my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
      defined $n or return undef;
      $n or return "";
   }

   return $_[2];
}

sub _recv_sync
{
   my $self = shift;

   my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
   defined $n or die "Cannot read - $!";
   length $n or return undef;

   my $len = unpack( "I", $lenbuffer );

   $n = _read_exactly( $self->{fh}, my $record, $len );
   defined $n or die "Cannot read - $!";
   length $n or return undef;

   return $self->{decode}->( $record );
}

sub _send_sync
{
   my $self = shift;
   my ( $bytes ) = @_;
   $self->{fh}->print( $bytes );
}

sub _close_sync
{
   my $self = shift;
   $self->{fh}->close;
}

# Leave this undocumented for now
sub setup_async_mode
{
   my $self = shift;
   my %args = @_;

   exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle );

   keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );

   $self->{mode} = "async";
}

sub _build_stream
{
   my $self = shift;
   return $self->{stream} ||= do {
      $self->{on_result_queue} = [];

      my $stream = IO::Async::Stream->new(
         read_handle  => $self->{read_handle},
         write_handle => $self->{write_handle},
         autoflush    => 1,
         on_read      => $self->_capture_weakself( '_on_stream_read' )
      );

      $self->add_child( $stream );

      $stream;
   };
}

sub _send_async
{
   my $self = shift;
   my ( $bytes ) = @_;
   $self->_build_stream->write( $bytes );
}

sub _recv_async
{
   my $self = shift;
   my %args = @_;

   my $on_recv = $args{on_recv};
   my $on_eof = $args{on_eof};

   my $stream = $self->_build_stream;

   my $f;
   $f = $stream->loop->new_future unless !defined wantarray;

   push @{ $self->{on_result_queue} }, sub {
      my ( $self, $type, $result ) = @_;
      if( $type eq "recv" ) {
         $f->done( $result ) if $f and !$f->is_cancelled;
         $on_recv->( $self, $result ) if $on_recv;
      }
      else {
         $f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled;
         $on_eof->( $self ) if $on_eof;
      }
   };

   return $f;
}

sub _close_async
{
   my $self = shift;
   if( my $stream = $self->{stream} ) {
      $stream->close_when_empty;
   }
   else {
      $_ and $_->close for $self->{read_handle}, $self->{write_handle};
   }

   undef $_ for $self->{read_handle}, $self->{write_handle};
}

sub _on_stream_read
{
   my $self = shift or return;
   my ( $stream, $buffref, $eof ) = @_;

   if( $eof ) {
      while( my $on_result = shift @{ $self->{on_result_queue} } ) {
         $on_result->( $self, eof => );
      }
      $self->{on_eof}->( $self ) if $self->{on_eof};
      return;
   }

   return 0 unless length( $$buffref ) >= 4;
   my $len = unpack( "I", $$buffref );
   return 0 unless length( $$buffref ) >= 4 + $len;

   my $record = $self->{decode}->( substr( $$buffref, 4, $len ) );
   substr( $$buffref, 0, 4 + $len ) = "";

   if( my $on_result = shift @{ $self->{on_result_queue} } ) {
      $on_result->( $self, recv => $record );
   }
   else {
      $self->{on_recv}->( $self, $record );
   }

   return 1;
}

sub _extract_read_handle
{
   my $self = shift;

   return undef if !$self->{mode};

   croak "Cannot extract filehandle" if $self->{mode} ne "async";
   $self->{mode} = "dead";

   return $self->{read_handle};
}

sub _extract_write_handle
{
   my $self = shift;

   return undef if !$self->{mode};

   croak "Cannot extract filehandle" if $self->{mode} ne "async";
   $self->{mode} = "dead";

   return $self->{write_handle};
}

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;



( run in 1.788 second using v1.01-cache-2.11-cpan-140bd7fdf52 )