AnyEvent-Fork-RPC

 view release on metacpan or  search on metacpan

RPC/Async.pm  view on Meta::CPAN

package AnyEvent::Fork::RPC::Async;

use common::sense; # actually required to avoid spurious warnings...

our $VERSION = 2; # protocol version

use Errno ();

use AnyEvent;

# declare only
sub AnyEvent::Fork::RPC::event;
sub AnyEvent::Fork::RPC::flush;

sub do_exit { exit } # workaround for perl 5.14 and below

sub run {
   my %kv = splice @_, pop;

   my $rfh = shift;
   my $wfh = fileno $rfh ? $rfh : *STDOUT;

   my $function   = delete $kv{function};
   my $serialiser = delete $kv{serialiser};
   my $rlen       = delete $kv{rlen};
   my $done       = delete $kv{done};

   $0 =~ s/^(\d+).*$/$1 $function/s;

   {
      package main;
      my $init = delete $kv{init};
      &$init if length $init;
      $function = \&$function; # resolve function early for extra speed
   }

   %kv = (); # save some very small amount of memory

   my $busy = 1; # exit when == 0

   my ($f, $t) = eval $serialiser; AE::log fatal => $@ if $@;
   my ($wbuf, $ww);

   my $wcb = sub {
      my $len = syswrite $wfh, $wbuf;

      unless (defined $len) {
         if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
            undef $ww;
            AE::log fatal => "AnyEvent::Fork::RPC: write error ($!), parent gone?";
         }
      }

      substr $wbuf, 0, $len, "";

      unless (length $wbuf) {
         undef $ww;
         unless ($busy) {
            shutdown $wfh, 1;
            @_ = (); goto &$done;
         }
      }
   };

   my $write = sub {
      $wbuf .= $_[0];
      $ww ||= AE::io $wfh, 1, $wcb;
   };

   *AnyEvent::Fork::RPC::flush = sub {
      while (length $wbuf) {
         my $len = syswrite $wfh, $wbuf;

         if (defined $len) {
            substr $wbuf, 0, $len, "";
         } elsif ($! == Errno::EAGAIN || $! == Errno::EWOULDBLOCK) {
            my $fdset;
            (vec $fdset, fileno $wfh, 1) = 1;
            # buggy windows often sets exceptfds instead of wfds
            select undef, my $wset = $fdset, my $eset = $fdset, undef;
         } else {
            return 0;
         }
      }

      1
   };

   *AnyEvent::Fork::RPC::event = sub {
      $write->(pack "NN/a*", 0, &$f);
   };

   my ($rbuf, $rw);

   my $len;

   $rw = AE::io $rfh, 0, sub {
      $rlen = $rlen * 2 + 16 if $rlen - 128 < length $rbuf;
      $len = sysread $rfh, $rbuf, $rlen - length $rbuf, length $rbuf;

      if ($len) {
         while (8 <= length $rbuf) {
            (my $id, $len) = unpack "NN", $rbuf;
            8 + $len <= length $rbuf
               or last;

            my @r = $t->(substr $rbuf, 8, $len);
            substr $rbuf, 0, 8 + $len, "";
            
            ++$busy;
            $function->(sub {
               --$busy;
               $write->(pack "NN/a*", $id, &$f);
            }, @r);
         }
      } elsif (defined $len or $! == Errno::EINVAL) { # EINVAL is for microshit windoze
         undef $rw;
         --$busy;
         $ww ||= AE::io $wfh, 1, $wcb;
      } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
         undef $rw;
         AE::log fatal => "AnyEvent::Fork::RPC: read error in child: $!";
      }
   };

   $AnyEvent::MODEL eq "AnyEvent::Impl::EV"
      ? EV::run ()
      : AE::cv->recv;
}

1



( run in 1.262 second using v1.01-cache-2.11-cpan-e1769b4cff6 )