Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Channel.pm view on Meta::CPAN
*send_frozen = \&send_encoded;
=head2 recv
$data = $channel->recv
When called on a synchronous mode Channel this method will block until a Perl
reference value is available from the other end and then return it. If the
Channel is closed this method will return C<undef>. Since only references may
be passed and all Perl references are true the truth of the result of this
method can be used to detect that the channel is still open and has not yet
been closed.
$data = $channel->recv->get
When called on an asynchronous mode Channel this method returns a future which
will eventually yield the next Perl reference value that becomes available
from the other end. If the Channel is closed, the future will fail with an
C<eof> failure.
$channel->recv( %args )
When not returning a future, takes the following named arguments:
=over 8
=item on_recv => CODE
Called when a new Perl reference value is available. Will be passed the
Channel object and the reference data.
$on_recv->( $channel, $data )
=item on_eof => CODE
Called if the Channel was closed before a new value was ready. Will be passed
the Channel object.
$on_eof->( $channel )
=back
=cut
sub recv
{
my $self = shift;
defined $self->{mode} or die "Cannot ->recv without being set up";
return $self->_recv_sync( @_ ) if $self->{mode} eq "sync";
return $self->_recv_async( @_ ) if $self->{mode} eq "async";
}
=head2 close
$channel->close
Closes the channel. Causes a pending C<recv> on the other end to return undef
or the queued C<on_eof> callbacks to be invoked.
=cut
sub close
{
my $self = shift;
return $self->_close_sync if $self->{mode} eq "sync";
return $self->_close_async if $self->{mode} eq "async";
}
# Leave this undocumented for now
sub setup_sync_mode
{
my $self = shift;
( $self->{fh} ) = @_;
$self->{mode} = "sync";
# Since we're communicating binary structures and not Unicode text we need to
# enable binmode
binmode $self->{fh};
$self->{fh}->autoflush(1);
}
sub _read_exactly
{
$_[1] = "";
while( length $_[1] < $_[2] ) {
my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
defined $n or return undef;
$n or return "";
}
return $_[2];
}
sub _recv_sync
{
my $self = shift;
my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
defined $n or die "Cannot read - $!";
length $n or return undef;
my $len = unpack( "I", $lenbuffer );
$n = _read_exactly( $self->{fh}, my $record, $len );
defined $n or die "Cannot read - $!";
length $n or return undef;
return $self->{decode}->( $record );
}
sub _send_sync
{
my $self = shift;
my ( $bytes ) = @_;
( run in 1.744 second using v1.01-cache-2.11-cpan-39bf76dae61 )