Acme-Sort-Sleep

 view release on metacpan or  search on metacpan

local/lib/perl5/IO/Async/Channel.pm  view on Meta::CPAN

   $channel->recv( %args )

When not returning a future, takes the following named arguments:

=over 8

=item on_recv => CODE

Called when a new Perl reference value is available. Will be passed the
Channel object and the reference data.

 $on_recv->( $channel, $data )

=item on_eof => CODE

Called if the Channel was closed before a new value was ready. Will be passed
the Channel object.

 $on_eof->( $channel )

=back

=cut

sub recv
{
   my $self = shift;

   defined $self->{mode} or die "Cannot ->recv without being set up";

   return $self->_recv_sync( @_ )  if $self->{mode} eq "sync";
   return $self->_recv_async( @_ ) if $self->{mode} eq "async";
}

=head2 close

   $channel->close

Closes the channel. Causes a pending C<recv> on the other end to return undef
or the queued C<on_eof> callbacks to be invoked.

=cut

sub close
{
   my $self = shift;

   return $self->_close_sync  if $self->{mode} eq "sync";
   return $self->_close_async if $self->{mode} eq "async";
}

# Leave this undocumented for now
sub setup_sync_mode
{
   my $self = shift;
   ( $self->{fh} ) = @_;

   $self->{mode} = "sync";

   # Since we're communicating binary structures and not Unicode text we need to
   # enable binmode
   binmode $self->{fh};

   $self->{fh}->autoflush(1);
}

sub _read_exactly
{
   $_[1] = "";

   while( length $_[1] < $_[2] ) {
      my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
      defined $n or return undef;
      $n or return "";
   }

   return $_[2];
}

sub _recv_sync
{
   my $self = shift;

   my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
   defined $n or die "Cannot read - $!";
   length $n or return undef;

   my $len = unpack( "I", $lenbuffer );

   $n = _read_exactly( $self->{fh}, my $record, $len );
   defined $n or die "Cannot read - $!";
   length $n or return undef;

   return $self->{decode}->( $record );
}

sub _send_sync
{
   my $self = shift;
   my ( $bytes ) = @_;
   $self->{fh}->print( $bytes );
}

sub _close_sync
{
   my $self = shift;
   $self->{fh}->close;
}

# Leave this undocumented for now
sub setup_async_mode
{
   my $self = shift;
   my %args = @_;

   exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle );

   keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );

   $self->{mode} = "async";
}



( run in 1.288 second using v1.01-cache-2.11-cpan-39bf76dae61 )