AnyEvent-Fork-RPC

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

          my $w; $w = AE::timer 1, 1, sub {
             ++$n;

             AnyEvent::Fork::RPC::event "count $n of $count\n";

             if ($n == $count) {
                undef $w;
                $done->();
             }
          };
       }

    The parent part (the one before the "__DATA__" section) isn't very
    different from the earlier examples. It sets async mode, preloads the
    backend module (so the "AnyEvent::Fork::RPC::event" function is
    declared), uses a slightly different "on_event" handler (which we use
    simply for logging purposes) and then, instead of loading a module with
    the actual worker code, it "eval"'s the code from the data section in
    the child process.

    It then starts three countdowns, from 3 to 1 seconds downwards, destroys
    the rpc object so the example finishes eventually, and then just waits
    for the stuff to trickle in.

    The worker code uses the event function to log some progress messages,
    but mostly just creates a recurring one-second timer.

    The timer callback increments a counter, logs a message, and eventually,
    when the count has been reached, calls the finish callback.

    On my system, this results in the following output. Since all timers
    fire at roughly the same time, the actual order isn't guaranteed, but
    the order shown is very likely what you would get, too.

       starting to count up to 3
       starting to count up to 2
       starting to count up to 1
       count 1 of 3
       count 1 of 2
       count 1 of 1
       job 1 finished
       count 2 of 2
       job 2 finished
       count 2 of 3
       count 3 of 3
       job 3 finished

    While the overall ordering isn't guaranteed, the async backend still
    guarantees that events and responses are delivered to the parent process
    in the exact same ordering as they were generated in the child process.

    And unless your system is *very* busy, it should clearly show that the
    job started last will finish first, as it has the lowest count.

    This concludes the async example. Since AnyEvent::Fork does not actually
    fork, you are free to use about any module in the child, not just
    AnyEvent, but also IO::AIO, or Tk for example.

  Example 3: Asynchronous backend with Coro
    With Coro you can create a nice asynchronous backend implementation by
    defining an rpc server function that creates a new Coro thread for every
    request that calls a function "normally", i.e. the parameters from the
    parent process are passed to it, and any return values are returned to
    the parent process, e.g.:

       package My::Arith;

       sub add {
          return $_[0] + $_[1];
       }

       sub mul {
          return $_[0] * $_[1];
       }

       sub run {
          my ($done, $func, @arg) = @_;

          Coro::async_pool {
             $done->($func->(@arg));
          };
       }

    The "run" function creates a new thread for every invocation, using the
    first argument as function name, and calls the $done callback on it's
    return values. This makes it quite natural to define the "add" and "mul"
    functions to add or multiply two numbers and return the result.

    Since this is the asynchronous backend, it's quite possible to define
    RPC function that do I/O or wait for external events - their execution
    will overlap as needed.

    The above could be used like this:

       my $rpc = AnyEvent::Fork
          ->new
          ->require ("MyWorker")
          ->AnyEvent::Fork::RPC::run ("My::Arith::run",
             on_error => ..., on_event => ..., on_destroy => ...,
          );

       $rpc->(add => 1, 3, Coro::rouse_cb); say Coro::rouse_wait;
       $rpc->(mul => 3, 2, Coro::rouse_cb); say Coro::rouse_wait;

    The "say"'s will print 4 and 6.

  Example 4: Forward AnyEvent::Log messages using "on_event"
    This partial example shows how to use the "event" function to forward
    AnyEvent::Log messages to the parent.

    For this, the parent needs to provide a suitable "on_event":

       ->AnyEvent::Fork::RPC::run (
          on_event => sub {
             if ($_[0] eq "ae_log") {
                my (undef, $level, $message) = @_;
                AE::log $level, $message;
             } else {
                # other event types
             }
          },
       )

    In the child, as early as possible, the following code should
    reconfigure AnyEvent::Log to log via "AnyEvent::Fork::RPC::event":

       $AnyEvent::Log::LOG->log_cb (sub {
          my ($timestamp, $orig_ctx, $level, $message) = @{+shift};

          if (defined &AnyEvent::Fork::RPC::event) {
             AnyEvent::Fork::RPC::event (ae_log => $level, $message);
          } else {
             warn "[$$ before init] $message\n";
          }
       });

    There is an important twist - the "AnyEvent::Fork::RPC::event" function
    is only defined when the child is fully initialised. If you redirect the
    log messages in your "init" function for example, then the "event"
    function might not yet be available. This is why the log callback checks
    whether the function is there using "defined", and only then uses it to
    log the message.

PARENT PROCESS USAGE

README  view on Meta::CPAN

                   (
                      sub {    Storable::freeze \@_ },
                      sub { @{ Storable::thaw shift } }
                   )

            $AnyEvent::Fork::RPC::NSTORABLE_SERIALISER - portable Storable
                This serialiser also uses Storable, but uses it's "network"
                format to serialise data, which makes it possible to talk to
                different perl binaries (for example, when talking to a
                process created with AnyEvent::Fork::Remote).

                Implementation:

                   use Storable ();
                   (
                      sub {    Storable::nfreeze \@_ },
                      sub { @{ Storable::thaw shift } }
                   )

        buflen => $bytes (default: "512 - 16")
            The starting size of the read buffer for request and response
            data.

            "AnyEvent::Fork::RPC" ensures that the buffer for reeading
            request and response data is large enough for at leats aingle
            request or response, and will dynamically enlarge the buffer if
            needed.

            While this ensures that memory is not overly wasted, it
            typically leads to having to do one syscall per request, which
            can be inefficient in some cases. In such cases, it can be
            beneficient to increase the buffer size to hold more than one
            request.

        buflen_req => $bytes (default: same as "buflen")
            Overrides "buflen" for request data (as read by the forked
            process).

        buflen_res => $bytes (default: same as "buflen")
            Overrides "buflen" for response data (replies read by the parent
            process).

        See the examples section earlier in this document for some actual
        examples.

    $rpc->(..., $cb->(...))
        The RPC object returned by "AnyEvent::Fork::RPC::run" is actually a
        code reference. There are two things you can do with it: call it,
        and let it go out of scope (let it get destroyed).

        If "async" was false when $rpc was created (the default), then, if
        you call $rpc, the $function is invoked with all arguments passed to
        $rpc except the last one (the callback). When the function returns,
        the callback will be invoked with all the return values.

        If "async" was true, then the $function receives an additional
        initial argument, the result callback. In this case, returning from
        $function does nothing - the function only counts as "done" when the
        result callback is called, and any arguments passed to it are
        considered the return values. This makes it possible to "return"
        from event handlers or e.g. Coro threads.

        The other thing that can be done with the RPC object is to destroy
        it. In this case, the child process will execute all remaining RPC
        calls, report their results, and then exit.

        See the examples section earlier in this document for some actual
        examples.

CHILD PROCESS USAGE
    The following function is not available in this module. They are only
    available in the namespace of this module when the child is running,
    without having to load any extra modules. They are part of the
    child-side API of AnyEvent::Fork::RPC.

    Note that these functions are typically not yet declared when code is
    compiled into the child, because the backend module is only loaded when
    you call "run", which is typically the last method you call on the fork
    object.

    Therefore, you either have to explicitly pre-load the right backend
    module or mark calls to these functions as function calls, e.g.:

       AnyEvent::Fork::RPC::event (0 => "five");
       AnyEvent::Fork::RPC::event->(0 => "five");
       &AnyEvent::Fork::RPC::flush;

    AnyEvent::Fork::RPC::event (...)
        Send an event to the parent. Events are a bit like RPC calls made by
        the child process to the parent, except that there is no notion of
        return values.

        See the examples section earlier in this document for some actual
        examples.

        Note: the event data, like any data send to the parent, might not be
        sent immediatelly but queued for later sending, so there is no
        guarantee that the event has been sent to the parent when the call
        returns - when you e.g. exit directly after calling this function,
        the parent might never receive the event. See the next function for
        a remedy.

    $success = AnyEvent::Fork::RPC::flush ()
        Synchronously wait and flush the reply data to the parent. Returns
        true on success and false otherwise (i.e. when the reply data cannot
        be written at all). Ignoring the success status is a common and
        healthy behaviour.

        Only the "async" backend does something on "flush" - the "sync"
        backend is not buffering reply data and always returns true from
        this function.

        Normally, reply data might or might not be written to the parent
        immediatelly but is buffered. This can greatly improve performance
        and efficiency, but sometimes can get in your way: for example. when
        you want to send an error message just before exiting, or when you
        want to ensure replies timely reach the parent before starting a
        long blocking operation.

        In these cases, you can call this function to flush any outstanding
        reply data to the parent. This is done blockingly, so no requests

README  view on Meta::CPAN

        You can override this by specifying a function name to call via the
        "done" parameter instead.

ADVANCED TOPICS
  Choosing a backend
    So how do you decide which backend to use? Well, that's your problem to
    solve, but here are some thoughts on the matter:

    Synchronous
        The synchronous backend does not rely on any external modules (well,
        except common::sense, which works around a bug in how perl's warning
        system works). This keeps the process very small, for example, on my
        system, an empty perl interpreter uses 1492kB RSS, which becomes
        2020kB after "use warnings; use strict" (for people who grew up with
        C64s around them this is probably shocking every single time they
        see it). The worker process in the first example in this document
        uses 1792kB.

        Since the calls are done synchronously, slow jobs will keep newer
        jobs from executing.

        The synchronous backend also has no overhead due to running an event
        loop - reading requests is therefore very efficient, while writing
        responses is less so, as every response results in a write syscall.

        If the parent process is busy and a bit slow reading responses, the
        child waits instead of processing further requests. This also limits
        the amount of memory needed for buffering, as never more than one
        response has to be buffered.

        The API in the child is simple - you just have to define a function
        that does something and returns something.

        It's hard to use modules or code that relies on an event loop, as
        the child cannot execute anything while it waits for more input.

    Asynchronous
        The asynchronous backend relies on AnyEvent, which tries to be
        small, but still comes at a price: On my system, the worker from
        example 1a uses 3420kB RSS (for AnyEvent, which loads EV, which
        needs XSLoader which in turn loads a lot of other modules such as
        warnings, strict, vars, Exporter...).

        It batches requests and responses reasonably efficiently, doing only
        as few reads and writes as needed, but needs to poll for events via
        the event loop.

        Responses are queued when the parent process is busy. This means the
        child can continue to execute any queued requests. It also means
        that a child might queue a lot of responses in memory when it
        generates them and the parent process is slow accepting them.

        The API is not a straightforward RPC pattern - you have to call a
        "done" callback to pass return values and signal completion. Also,
        more importantly, the API starts jobs as fast as possible - when
        1000 jobs are queued and the jobs are slow, they will all run
        concurrently. The child must implement some queueing/limiting
        mechanism if this causes problems. Alternatively, the parent could
        limit the amount of rpc calls that are outstanding.

        Blocking use of condvars is not supported (in the main thread,
        outside of e.g. Coro threads).

        Using event-based modules such as IO::AIO, Gtk2, Tk and so on is
        easy.

  Passing file descriptors
    Unlike AnyEvent::Fork, this module has no in-built file handle or file
    descriptor passing abilities.

    The reason is that passing file descriptors is extraordinary tricky
    business, and conflicts with efficient batching of messages.

    There still is a method you can use: Create a
    "AnyEvent::Util::portable_socketpair" and "send_fh" one half of it to
    the process before you pass control to "AnyEvent::Fork::RPC::run".

    Whenever you want to pass a file descriptor, send an rpc request to the
    child process (so it expects the descriptor), then send it over the
    other half of the socketpair. The child should fetch the descriptor from
    the half it has passed earlier.

    Here is some (untested) pseudocode to that effect:

       use AnyEvent::Util;
       use AnyEvent::Fork;
       use AnyEvent::Fork::RPC;
       use IO::FDPass;

       my ($s1, $s2) = AnyEvent::Util::portable_socketpair;

       my $rpc = AnyEvent::Fork
          ->new
          ->send_fh ($s2)
          ->require ("MyWorker")
          ->AnyEvent::Fork::RPC::run ("MyWorker::run"
               init => "MyWorker::init",
            );

       undef $s2; # no need to keep it around

       # pass an fd
       $rpc->("i'll send some fd now, please expect it!", my $cv = AE::cv);

       IO::FDPass fileno $s1, fileno $handle_to_pass;

       $cv->recv;

    The MyWorker module could look like this:

       package MyWorker;

       use IO::FDPass;

       my $s2;

       sub init {
          $s2 = $_[0];
       }

       sub run {
          if ($_[0] eq "i'll send some fd now, please expect it!") {



( run in 0.899 second using v1.01-cache-2.11-cpan-df04353d9ac )