AnyEvent-Fork-RPC

 view release on metacpan or  search on metacpan

RPC.pm  view on Meta::CPAN

         ++$n;

         AnyEvent::Fork::RPC::event "count $n of $count\n";

         if ($n == $count) {
            undef $w;
            $done->();
         }
      };
   }

The parent part (the one before the C<__DATA__> section) isn't very
different from the earlier examples. It sets async mode, preloads
the backend module (so the C<AnyEvent::Fork::RPC::event> function is
declared), uses a slightly different C<on_event> handler (which we use
simply for logging purposes) and then, instead of loading a module with
the actual worker code, it C<eval>'s the code from the data section in the
child process.

It then starts three countdowns, from 3 to 1 seconds downwards, destroys
the rpc object so the example finishes eventually, and then just waits for
the stuff to trickle in.

The worker code uses the event function to log some progress messages, but
mostly just creates a recurring one-second timer.

The timer callback increments a counter, logs a message, and eventually,
when the count has been reached, calls the finish callback.

On my system, this results in the following output. Since all timers fire
at roughly the same time, the actual order isn't guaranteed, but the order
shown is very likely what you would get, too.

   starting to count up to 3
   starting to count up to 2
   starting to count up to 1
   count 1 of 3
   count 1 of 2
   count 1 of 1
   job 1 finished
   count 2 of 2
   job 2 finished
   count 2 of 3
   count 3 of 3
   job 3 finished

While the overall ordering isn't guaranteed, the async backend still
guarantees that events and responses are delivered to the parent process
in the exact same ordering as they were generated in the child process.

And unless your system is I<very> busy, it should clearly show that the
job started last will finish first, as it has the lowest count.

This concludes the async example. Since L<AnyEvent::Fork> does not
actually fork, you are free to use about any module in the child, not just
L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.

=head2 Example 3: Asynchronous backend with Coro

With L<Coro> you can create a nice asynchronous backend implementation by
defining an rpc server function that creates a new Coro thread for every
request that calls a function "normally", i.e. the parameters from the
parent process are passed to it, and any return values are returned to the
parent process, e.g.:

   package My::Arith;

   sub add {
      return $_[0] + $_[1];
   }

   sub mul {
      return $_[0] * $_[1];
   }

   sub run {
      my ($done, $func, @arg) = @_;

      Coro::async_pool {
         $done->($func->(@arg));
      };
   }

The C<run> function creates a new thread for every invocation, using the
first argument as function name, and calls the C<$done> callback on it's
return values. This makes it quite natural to define the C<add> and C<mul>
functions to add or multiply two numbers and return the result.

Since this is the asynchronous backend, it's quite possible to define RPC
function that do I/O or wait for external events - their execution will
overlap as needed.

The above could be used like this:

   my $rpc = AnyEvent::Fork
      ->new
      ->require ("MyWorker")
      ->AnyEvent::Fork::RPC::run ("My::Arith::run",
         on_error => ..., on_event => ..., on_destroy => ...,
      );

   $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait;
   $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait;

The C<say>'s will print C<4> and C<6>.

=head2 Example 4: Forward AnyEvent::Log messages using C<on_event>

This partial example shows how to use the C<event> function to forward
L<AnyEvent::Log> messages to the parent.

For this, the parent needs to provide a suitable C<on_event>:

   ->AnyEvent::Fork::RPC::run (
      on_event => sub {
         if ($_[0] eq "ae_log") {
            my (undef, $level, $message) = @_;
            AE::log $level, $message;
         } else {
            # other event types
         }
      },
   )

In the child, as early as possible, the following code should reconfigure
L<AnyEvent::Log> to log via C<AnyEvent::Fork::RPC::event>:

   $AnyEvent::Log::LOG->log_cb (sub {
      my ($timestamp, $orig_ctx, $level, $message) = @{+shift};

      if (defined &AnyEvent::Fork::RPC::event) {
         AnyEvent::Fork::RPC::event (ae_log => $level, $message);
      } else {
         warn "[$$ before init] $message\n";
      }
   });

There is an important twist - the C<AnyEvent::Fork::RPC::event> function
is only defined when the child is fully initialised. If you redirect the
log messages in your C<init> function for example, then the C<event>
function might not yet be available. This is why the log callback checks
whether the function is there using C<defined>, and only then uses it to
log the message.

RPC.pm  view on Meta::CPAN

               $on_error->("unexpected eof");
            } else {
               $on_destroy->()
                  if $on_destroy;
            }
         } elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
            undef $rw; undef $ww; # it ends here
            $on_error->("read: $!");
         }
      };

      $ww ||= AE::io $fh, 1, $wcb;
   });

   my $guard = Guard::guard {
      $shutdown = 1;

      shutdown $fh, 1 if $fh && !$ww;
   };

   my $id;

   $arg{async}
      ? sub {
           $id = ($id == 0xffffffff ? 0 : $id) + 1;
           $id = ($id == 0xffffffff ? 0 : $id) + 1 while exists $rcb{$id}; # rarely loops

           $rcb{$id} = pop;

           $guard if 0; # keep it alive

           $wbuf .= pack "NN/a*", $id, &$f;
           $ww ||= $fh && AE::io $fh, 1, $wcb;
        }
      : sub {
           push @rcb, pop;

           $guard; # keep it alive

           $wbuf .= pack "N/a*", &$f;
           $ww ||= $fh && AE::io $fh, 1, $wcb;
        }
}

=item $rpc->(..., $cb->(...))

The RPC object returned by C<AnyEvent::Fork::RPC::run> is actually a code
reference. There are two things you can do with it: call it, and let it go
out of scope (let it get destroyed).

If C<async> was false when C<$rpc> was created (the default), then, if you
call C<$rpc>, the C<$function> is invoked with all arguments passed to
C<$rpc> except the last one (the callback). When the function returns, the
callback will be invoked with all the return values.

If C<async> was true, then the C<$function> receives an additional
initial argument, the result callback. In this case, returning from
C<$function> does nothing - the function only counts as "done" when the
result callback is called, and any arguments passed to it are considered
the return values. This makes it possible to "return" from event handlers
or e.g. Coro threads.

The other thing that can be done with the RPC object is to destroy it. In
this case, the child process will execute all remaining RPC calls, report
their results, and then exit.

See the examples section earlier in this document for some actual
examples.

=back

=head1 CHILD PROCESS USAGE

The following function is not available in this module. They are only
available in the namespace of this module when the child is running,
without having to load any extra modules. They are part of the child-side
API of L<AnyEvent::Fork::RPC>.

Note that these functions are typically not yet declared when code is
compiled into the child, because the backend module is only loaded when
you call C<run>, which is typically the last method you call on the fork
object.

Therefore, you either have to explicitly pre-load the right backend module
or mark calls to these functions as function calls, e.g.:

   AnyEvent::Fork::RPC::event (0 => "five");
   AnyEvent::Fork::RPC::event->(0 => "five");
   &AnyEvent::Fork::RPC::flush;

=over 4

=item AnyEvent::Fork::RPC::event (...)

Send an event to the parent. Events are a bit like RPC calls made by the
child process to the parent, except that there is no notion of return
values.

See the examples section earlier in this document for some actual
examples.

Note: the event data, like any data send to the parent, might not be sent
immediatelly but queued for later sending, so there is no guarantee that
the event has been sent to the parent when the call returns - when you
e.g. exit directly after calling this function, the parent might never
receive the event. See the next function for a remedy.

=item $success = AnyEvent::Fork::RPC::flush ()

Synchronously wait and flush the reply data to the parent. Returns true on
success and false otherwise (i.e. when the reply data cannot be written at
all). Ignoring the success status is a common and healthy behaviour.

Only the "async" backend does something on C<flush> - the "sync" backend
is not buffering reply data and always returns true from this function.

Normally, reply data might or might not be written to the parent
immediatelly but is buffered. This can greatly improve performance and
efficiency, but sometimes can get in your way: for example. when you want
to send an error message just before exiting, or when you want to ensure
replies timely reach the parent before starting a long blocking operation.

RPC.pm  view on Meta::CPAN

=head2 Choosing a backend

So how do you decide which backend to use? Well, that's your problem to
solve, but here are some thoughts on the matter:

=over 4

=item Synchronous

The synchronous backend does not rely on any external modules (well,
except L<common::sense>, which works around a bug in how perl's warning
system works). This keeps the process very small, for example, on my
system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
after C<use warnings; use strict> (for people who grew up with C64s around
them this is probably shocking every single time they see it). The worker
process in the first example in this document uses 1792kB.

Since the calls are done synchronously, slow jobs will keep newer jobs
from executing.

The synchronous backend also has no overhead due to running an event loop
- reading requests is therefore very efficient, while writing responses is
less so, as every response results in a write syscall.

If the parent process is busy and a bit slow reading responses, the child
waits instead of processing further requests. This also limits the amount
of memory needed for buffering, as never more than one response has to be
buffered.

The API in the child is simple - you just have to define a function that
does something and returns something.

It's hard to use modules or code that relies on an event loop, as the
child cannot execute anything while it waits for more input.

=item Asynchronous

The asynchronous backend relies on L<AnyEvent>, which tries to be small,
but still comes at a price: On my system, the worker from example 1a uses
3420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
which in turn loads a lot of other modules such as L<warnings>, L<strict>,
L<vars>, L<Exporter>...).

It batches requests and responses reasonably efficiently, doing only as
few reads and writes as needed, but needs to poll for events via the event
loop.

Responses are queued when the parent process is busy. This means the child
can continue to execute any queued requests. It also means that a child
might queue a lot of responses in memory when it generates them and the
parent process is slow accepting them.

The API is not a straightforward RPC pattern - you have to call a
"done" callback to pass return values and signal completion. Also, more
importantly, the API starts jobs as fast as possible - when 1000 jobs
are queued and the jobs are slow, they will all run concurrently. The
child must implement some queueing/limiting mechanism if this causes
problems. Alternatively, the parent could limit the amount of rpc calls
that are outstanding.

Blocking use of condvars is not supported (in the main thread, outside of
e.g. L<Coro> threads).

Using event-based modules such as L<IO::AIO>, L<Gtk2>, L<Tk> and so on is
easy.

=back

=head2 Passing file descriptors

Unlike L<AnyEvent::Fork>, this module has no in-built file handle or file
descriptor passing abilities.

The reason is that passing file descriptors is extraordinary tricky
business, and conflicts with efficient batching of messages.

There still is a method you can use: Create a
C<AnyEvent::Util::portable_socketpair> and C<send_fh> one half of it to
the process before you pass control to C<AnyEvent::Fork::RPC::run>.

Whenever you want to pass a file descriptor, send an rpc request to the
child process (so it expects the descriptor), then send it over the other
half of the socketpair. The child should fetch the descriptor from the
half it has passed earlier.

Here is some (untested) pseudocode to that effect:

   use AnyEvent::Util;
   use AnyEvent::Fork;
   use AnyEvent::Fork::RPC;
   use IO::FDPass;

   my ($s1, $s2) = AnyEvent::Util::portable_socketpair;

   my $rpc = AnyEvent::Fork
      ->new
      ->send_fh ($s2)
      ->require ("MyWorker")
      ->AnyEvent::Fork::RPC::run ("MyWorker::run"
           init => "MyWorker::init",
        );

   undef $s2; # no need to keep it around

   # pass an fd
   $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);

   IO::FDPass fileno $s1, fileno $handle_to_pass;

   $cv->recv;

The MyWorker module could look like this:

   package MyWorker;

   use IO::FDPass;

   my $s2;

   sub init {
      $s2 = $_[0];
   }



( run in 0.797 second using v1.01-cache-2.11-cpan-df04353d9ac )