Acme-Sort-Sleep
view release on metacpan or search on metacpan
local/lib/perl5/IO/Async/Channel.pm view on Meta::CPAN
sub _make_codec_Storable
{
require Storable;
return
\&Storable::freeze,
\&Storable::thaw;
}
sub _make_codec_Sereal
{
require Sereal::Encoder;
require Sereal::Decoder;
my $encoder;
my $decoder;
# "thread safety" to Sereal::{Encoder,Decoder} means that the variables get
# reset to undef in new threads. We should defend against that.
return
sub { ( $encoder ||= Sereal::Encoder->new )->encode( $_[0] ) },
sub { ( $decoder ||= Sereal::Decoder->new )->decode( $_[0] ) };
}
=head2 send
$channel->send( $data )
Pushes the data stored in the given Perl reference into the FIFO of the
Channel, where it can be received by the other end. When called on a
synchronous mode Channel this method may block if a C<write()> call on the
underlying filehandle blocks. When called on an asynchronous mode channel this
method will not block.
=cut
sub send
{
my $self = shift;
my ( $data ) = @_;
$self->send_encoded( $self->{encode}->( $data ) );
}
=head2 send_encoded
$channel->send_encoded( $record )
A variant of the C<send> method; this method pushes the byte record given.
This should be the result of a call to C<encode>.
=cut
sub send_encoded
{
my $self = shift;
my ( $record ) = @_;
my $bytes = pack( "I", length $record ) . $record;
defined $self->{mode} or die "Cannot ->send without being set up";
return $self->_send_sync( $bytes ) if $self->{mode} eq "sync";
return $self->_send_async( $bytes ) if $self->{mode} eq "async";
}
=head2 encode
$record = $channel->encode( $data )
Takes a Perl reference and returns a serialised string that can be passed to
C<send_encoded>. The following two forms are equivalent
$channel->send( $data )
$channel->send_encoded( $channel->encode( $data ) )
This is provided for the use-case where data needs to be serialised into a
fixed string to "snapshot it" but not sent yet; the returned string can be
saved and sent at a later time.
$record = IO::Async::Channel->encode( $data )
This can also be used as a class method, in case it is inconvenient to operate
on a particular object instance, or when one does not exist yet. In this case
it will encode using whatever is the default codec for C<IO::Async::Channel>.
=cut
sub encode
{
my $self = shift;
my ( $data ) = @_;
return ref $self ?
$self->{encode}->( $data ) :
do { require Storable; Storable::freeze( $data ) };
}
=head2 send_frozen
$channel->send_frozen( $record )
Legacy name for C<send_encoded>. This is no longer preferred as it expects
the data to be encoded using C<Storable>, which prevents (or at least makes
more awkward) the use of other codecs on a channel by default. This method
should not be used in new code and may be removed in a later version.
=cut
*send_frozen = \&send_encoded;
=head2 recv
$data = $channel->recv
When called on a synchronous mode Channel this method will block until a Perl
reference value is available from the other end and then return it. If the
Channel is closed this method will return C<undef>. Since only references may
be passed and all Perl references are true the truth of the result of this
local/lib/perl5/IO/Async/Channel.pm view on Meta::CPAN
$on_recv->( $channel, $data )
=item on_eof => CODE
Called if the Channel was closed before a new value was ready. Will be passed
the Channel object.
$on_eof->( $channel )
=back
=cut
sub recv
{
my $self = shift;
defined $self->{mode} or die "Cannot ->recv without being set up";
return $self->_recv_sync( @_ ) if $self->{mode} eq "sync";
return $self->_recv_async( @_ ) if $self->{mode} eq "async";
}
=head2 close
$channel->close
Closes the channel. Causes a pending C<recv> on the other end to return undef
or the queued C<on_eof> callbacks to be invoked.
=cut
sub close
{
my $self = shift;
return $self->_close_sync if $self->{mode} eq "sync";
return $self->_close_async if $self->{mode} eq "async";
}
# Leave this undocumented for now
sub setup_sync_mode
{
my $self = shift;
( $self->{fh} ) = @_;
$self->{mode} = "sync";
# Since we're communicating binary structures and not Unicode text we need to
# enable binmode
binmode $self->{fh};
$self->{fh}->autoflush(1);
}
sub _read_exactly
{
$_[1] = "";
while( length $_[1] < $_[2] ) {
my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] );
defined $n or return undef;
$n or return "";
}
return $_[2];
}
sub _recv_sync
{
my $self = shift;
my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 );
defined $n or die "Cannot read - $!";
length $n or return undef;
my $len = unpack( "I", $lenbuffer );
$n = _read_exactly( $self->{fh}, my $record, $len );
defined $n or die "Cannot read - $!";
length $n or return undef;
return $self->{decode}->( $record );
}
sub _send_sync
{
my $self = shift;
my ( $bytes ) = @_;
$self->{fh}->print( $bytes );
}
sub _close_sync
{
my $self = shift;
$self->{fh}->close;
}
# Leave this undocumented for now
sub setup_async_mode
{
my $self = shift;
my %args = @_;
exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle );
keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args );
$self->{mode} = "async";
}
sub _build_stream
{
my $self = shift;
return $self->{stream} ||= do {
$self->{on_result_queue} = [];
my $stream = IO::Async::Stream->new(
read_handle => $self->{read_handle},
write_handle => $self->{write_handle},
autoflush => 1,
on_read => $self->_capture_weakself( '_on_stream_read' )
);
$self->add_child( $stream );
$stream;
};
}
sub _send_async
{
my $self = shift;
my ( $bytes ) = @_;
$self->_build_stream->write( $bytes );
}
sub _recv_async
{
my $self = shift;
my %args = @_;
my $on_recv = $args{on_recv};
my $on_eof = $args{on_eof};
my $stream = $self->_build_stream;
my $f;
$f = $stream->loop->new_future unless !defined wantarray;
push @{ $self->{on_result_queue} }, sub {
my ( $self, $type, $result ) = @_;
if( $type eq "recv" ) {
$f->done( $result ) if $f and !$f->is_cancelled;
$on_recv->( $self, $result ) if $on_recv;
}
else {
$f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled;
$on_eof->( $self ) if $on_eof;
}
};
return $f;
}
sub _close_async
{
my $self = shift;
if( my $stream = $self->{stream} ) {
$stream->close_when_empty;
}
else {
$_ and $_->close for $self->{read_handle}, $self->{write_handle};
}
undef $_ for $self->{read_handle}, $self->{write_handle};
}
sub _on_stream_read
{
my $self = shift or return;
my ( $stream, $buffref, $eof ) = @_;
if( $eof ) {
while( my $on_result = shift @{ $self->{on_result_queue} } ) {
$on_result->( $self, eof => );
}
$self->{on_eof}->( $self ) if $self->{on_eof};
return;
}
return 0 unless length( $$buffref ) >= 4;
my $len = unpack( "I", $$buffref );
return 0 unless length( $$buffref ) >= 4 + $len;
my $record = $self->{decode}->( substr( $$buffref, 4, $len ) );
substr( $$buffref, 0, 4 + $len ) = "";
if( my $on_result = shift @{ $self->{on_result_queue} } ) {
$on_result->( $self, recv => $record );
}
else {
$self->{on_recv}->( $self, $record );
}
return 1;
}
sub _extract_read_handle
{
my $self = shift;
return undef if !$self->{mode};
croak "Cannot extract filehandle" if $self->{mode} ne "async";
$self->{mode} = "dead";
return $self->{read_handle};
}
sub _extract_write_handle
{
my $self = shift;
return undef if !$self->{mode};
croak "Cannot extract filehandle" if $self->{mode} ne "async";
$self->{mode} = "dead";
return $self->{write_handle};
}
=head1 AUTHOR
Paul Evans <leonerd@leonerd.org.uk>
=cut
0x55AA;
( run in 1.788 second using v1.01-cache-2.11-cpan-140bd7fdf52 )