Coro

 view release on metacpan or  search on metacpan

Coro/Handle.pm  view on Meta::CPAN

=head1 NAME

Coro::Handle - non-blocking I/O with a blocking interface.

=head1 SYNOPSIS

 use Coro::Handle;

=head1 DESCRIPTION

This module is an L<AnyEvent> user, you need to make sure that you use and
run a supported event loop.

This module implements IO-handles in a coroutine-compatible way, that is,
other coroutines can run while reads or writes block on the handle.

It does so by using L<AnyEvent|AnyEvent> to wait for readable/writable
data, allowing other coroutines to run while one coroutine waits for I/O.

Coro::Handle does NOT inherit from IO::Handle but uses tied objects.

If at all possible, you should I<always> prefer method calls on the handle object over invoking
tied methods, i.e.:

   $fh->print ($str);         # NOT print $fh $str;
   my $line = $fh->readline;  # NOT my $line = <$fh>;

The reason is that perl recurses within the interpreter when invoking tie
magic, forcing the (temporary) allocation of a (big) stack. If you have
lots of socket connections and they happen to wait in e.g. <$fh>, then
they would all have a costly C coroutine associated with them.

=over 4

=cut

package Coro::Handle;

use common::sense;

use Carp ();
use Errno qw(EAGAIN EINTR EINPROGRESS);

use AnyEvent::Util qw(WSAEWOULDBLOCK WSAEINPROGRESS);
use AnyEvent::Socket ();

use base 'Exporter';

our $VERSION = 6.514;
our @EXPORT = qw(unblock);

=item $fh = new_from_fh Coro::Handle $fhandle [, arg => value...]

Create a new non-blocking io-handle using the given
perl-filehandle. Returns C<undef> if no filehandle is given. The only
other supported argument is "timeout", which sets a timeout for each
operation.

=cut

sub new_from_fh {
   my $class = shift;
   my $fh = shift or return;
   my $self = do { local *Coro::Handle };

   tie *$self, 'Coro::Handle::FH', fh => $fh, @_;

   bless \$self, ref $class ? ref $class : $class
}

=item $fh = unblock $fh

This is a convenience function that just calls C<new_from_fh> on the
given filehandle. Use it to replace a normal perl filehandle by a
non-(coroutine-)blocking equivalent.

=cut

sub unblock($) {
   new_from_fh Coro::Handle $_[0]
}

=item $fh->writable, $fh->readable

Wait until the filehandle is readable or writable (and return true) or
until an error condition happens (and return false).

=cut

sub readable	{ Coro::Handle::FH::readable (tied *${$_[0]}) }
sub writable	{ Coro::Handle::FH::writable (tied *${$_[0]}) }

=item $fh->readline ([$terminator])

Similar to the builtin of the same name, but allows you to specify the
input record separator in a coroutine-safe manner (i.e. not using a global
variable). Paragraph mode is not supported, use "\n\n" to achieve the same
effect.

=cut

sub readline	{ tied(*${+shift})->READLINE (@_) }

=item $fh->autoflush ([...])

Always returns true, arguments are being ignored (exists for compatibility
only). Might change in the future.

=cut

sub autoflush	{ !0 }

=item $fh->fileno, $fh->close, $fh->read, $fh->sysread, $fh->syswrite, $fh->print, $fh->printf

Work like their function equivalents (except read, which works like
sysread. You should not use the read function with Coro::Handle's, it will
work but it's not efficient).

=cut

sub read	{ Coro::Handle::FH::READ   (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
sub sysread	{ Coro::Handle::FH::READ   (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
sub syswrite	{ Coro::Handle::FH::WRITE  (tied *${$_[0]}, $_[1], $_[2], $_[3]) }
sub print	{ Coro::Handle::FH::WRITE  (tied *${+shift}, join "", @_) }
sub printf	{ Coro::Handle::FH::PRINTF (tied *${+shift}, @_) }
sub fileno	{ Coro::Handle::FH::FILENO (tied *${$_[0]}) }
sub close	{ Coro::Handle::FH::CLOSE  (tied *${$_[0]}) }
sub blocking    { !0 } # this handler always blocks the caller

sub partial     {
   my $obj = tied *${$_[0]};

   my $retval = $obj->[8];
   $obj->[8] = $_[1] if @_ > 1;
   $retval
}

=item connect, listen, bind, getsockopt, setsockopt,
send, recv, peername, sockname, shutdown, peerport, peerhost

Do the same thing as the perl builtins or IO::Socket methods (but return
true on EINPROGRESS). Remember that these must be method calls.

=cut

sub connect	{ connect     tied(*${$_[0]})->[0], $_[1] or $! == EINPROGRESS or $! == EAGAIN or $! == WSAEWOULDBLOCK }
sub bind	{ bind        tied(*${$_[0]})->[0], $_[1] }
sub listen	{ listen      tied(*${$_[0]})->[0], $_[1] }
sub getsockopt	{ getsockopt  tied(*${$_[0]})->[0], $_[1], $_[2] }
sub setsockopt	{ setsockopt  tied(*${$_[0]})->[0], $_[1], $_[2], $_[3] }
sub send	{ send        tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
sub recv	{ recv        tied(*${$_[0]})->[0], $_[1], $_[2], @_ > 2 ? $_[3] : () }
sub sockname	{ getsockname tied(*${$_[0]})->[0] }
sub peername	{ getpeername tied(*${$_[0]})->[0] }
sub shutdown	{ shutdown    tied(*${$_[0]})->[0], $_[1] }

Coro/Handle.pm  view on Meta::CPAN

sub GETC {
   my $buf;
   READ ($_[0], $buf, 1);
   $buf
}

sub BINMODE {
   binmode $_[0][0];
}

sub TELL {
   Carp::croak "Coro::Handle's don't support tell()";
}

sub SEEK {
   Carp::croak "Coro::Handle's don't support seek()";
}

sub EOF {
   Carp::croak "Coro::Handle's don't support eof()";
}

sub CLOSE {
   my $fh = $_[0][0];
   &cleanup;
   close $fh
}

sub DESTROY {
   &cleanup;
}

sub FILENO {
   fileno $_[0][0]
}

# seems to be called for stringification (how weird), at least
# when DumpValue::dumpValue is used to print this.
sub FETCH {
   "$_[0]<$_[0][1]>"
}

sub _readable_anyevent {
   my $cb = Coro::rouse_cb;

   my $w = AE::io $_[0][0], 0, sub { $cb->(1) };
   my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };

   Coro::rouse_wait
}

sub _writable_anyevent {
   my $cb = Coro::rouse_cb;

   my $w = AE::io $_[0][0], 1, sub { $cb->(1) };
   my $t = (defined $_[0][2]) && AE::timer $_[0][2], 0, sub { $cb->(0) };

   Coro::rouse_wait
}

sub _readable_coro {
   ($_[0][5] ||= "Coro::Event"->io (
      fd      => $_[0][0],
      desc    => "fh $_[0][1] read watcher",
      timeout => $_[0][2],
      poll    => &Event::Watcher::R + &Event::Watcher::E + &Event::Watcher::T,
   ))->next->[4] & &Event::Watcher::R
}

sub _writable_coro {
   ($_[0][6] ||= "Coro::Event"->io (
      fd      => $_[0][0],
      desc    => "fh $_[0][1] write watcher",
      timeout => $_[0][2],
      poll    => &Event::Watcher::W + &Event::Watcher::E + &Event::Watcher::T,
   ))->next->[4] & &Event::Watcher::W
}

#sub _readable_ev {
#   &EV::READ  == Coro::EV::timed_io_once (fileno $_[0][0], &EV::READ , $_[0][2])
#}
#
#sub _writable_ev {
#   &EV::WRITE == Coro::EV::timed_io_once (fileno $_[0][0], &EV::WRITE, $_[0][2])
#}

# decide on event model at runtime
for my $rw (qw(readable writable)) {
   *$rw = sub {
      AnyEvent::detect;
      if ($AnyEvent::MODEL eq "AnyEvent::Impl::Event" and eval { require Coro::Event }) {
         *$rw = \&{"_$rw\_coro"};
         *cleanup = sub {
            eval {
               $_[0][5]->cancel if $_[0][5];
               $_[0][6]->cancel if $_[0][6];
            };
            @{$_[0]} = ();
         };

      } elsif ($AnyEvent::MODEL eq "AnyEvent::Impl::EV" and eval { require Coro::EV }) {
         *$rw = \&{"Coro::EV::_$rw\_ev"};
         return &$rw; # Coro 5.0+ doesn't support goto &SLF, and this line is executed once only

      } else {
         *$rw = \&{"_$rw\_anyevent"};
      }
      goto &$rw
   };
};

sub WRITE {
   my $len = defined $_[2] ? $_[2] : length $_[1];
   my $ofs = $_[3];
   my $res;

   while () {
      my $r = syswrite ($_[0][0], $_[1], $len, $ofs);
      if (defined $r) {
         $len -= $r;
         $ofs += $r;
         $res += $r;
         last unless $len;
      } elsif ($! != EAGAIN && $! != EINTR && $! != WSAEWOULDBLOCK) {
         last;
      }
      last unless &writable;
   }

   $res
}

sub READ {
   my $len = $_[2];
   my $ofs = $_[3];
   my $res;

   # first deplete the read buffer
   if (length $_[0][3]) {
      my $l = length $_[0][3];
      if ($l <= $len) {
         substr ($_[1], $ofs) = $_[0][3]; $_[0][3] = "";
         $len -= $l;
         $ofs += $l;
         $res += $l;
         return $res unless $len;
      } else {
         substr ($_[1], $ofs) = substr ($_[0][3], 0, $len);
         substr ($_[0][3], 0, $len) = "";
         return $len;
      }
   }



( run in 0.517 second using v1.01-cache-2.11-cpan-98e64b0badf )