Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Channel.pm  view on Meta::CPAN

=head1 CONSTRUCTOR

=cut

=head2 new

   $channel = IO::Async::Channel->new

Returns a new C<IO::Async::Channel> object. This object reference itself
should be shared by both sides of a C<fork()>ed process. After C<fork()> the
two C<setup_*> methods may be used to configure the object for operation on
either end.

While this object does in fact inherit from L<IO::Async::Notifier>, it should
not be added to a Loop object directly; event management will be handled by
its containing L<IO::Async::Routine> object.

=cut

=head1 METHODS

The following methods documented with a trailing call to C<< ->get >> return
L<Future> instances.

=cut

=head2 configure

   $channel->configure( %params )

Similar to the standard C<configure> method on L<IO::Async::Notifier>, this is
used to change details of the Channel's operation.

=over 4

=item on_recv => CODE

May only be set on an async mode channel. If present, will be invoked whenever
a new value is received, rather than using the C<recv> method.

 $on_recv->( $channel, $data )

=item on_eof => CODE

May only be set on an async mode channel. If present, will be invoked when the
channel gets closed by the peer.

 $on_eof->( $channel )

=back

=cut

sub _init
{
   my $self = shift;
   my ( $params ) = @_;

   defined $params->{codec} or $params->{codec} = "Storable";

   $self->SUPER::_init( $params );
}

sub configure
{
   my $self = shift;
   my %params = @_;

   foreach (qw( on_recv on_eof )) {
      next unless exists $params{$_};
      $self->{mode} and $self->{mode} eq "async" or
         croak "Can only configure $_ in async mode";

      $self->{$_} = delete $params{$_};
      $self->_build_stream;
   }

   if( my $codec = delete $params{codec} ) {
      @{ $self }{qw( encode decode )} = (
         $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'"
      )->();
   }

   $self->SUPER::configure( %params );
}

sub _make_codec_Storable
{
   require Storable;

   return
      \&Storable::freeze,
      \&Storable::thaw;
}

sub _make_codec_Sereal
{
   require Sereal::Encoder;
   require Sereal::Decoder;

   my $encoder;
   my $decoder;

   # "thread safety" to Sereal::{Encoder,Decoder} means that the variables get
   # reset to undef in new threads. We should defend against that.

   return
      sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) },
      sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) };
}

=head2 send

   $channel->send( $data )

Pushes the data stored in the given Perl reference into the FIFO of the
Channel, where it can be received by the other end. When called on a
synchronous mode Channel this method may block if a C<write()> call on the
underlying filehandle blocks. When called on an asynchronous mode channel this
method will not block.

=cut

sub send
{
   my $self = shift;
   my ( $data ) = @_;

   $self->send_encoded( $self->{encode}->( $data ) );
}

=head2 send_encoded

   $channel->send_encoded( $record )

A variant of the C<send> method; this method pushes the byte record given.
This should be the result of a call to C<encode>.

=cut

sub send_encoded
{
   my $self = shift;
   my ( $record ) = @_;



( run in 2.539 seconds using v1.01-cache-2.11-cpan-98e64b0badf )