AnyEvent-Fork-RPC
view release on metacpan or search on metacpan
- add $CBOR_XS_SERIALISER.
1.21 Tue Oct 15 10:52:09 CEST 2013
- work around perl 5.14 and earlier bug causing
"Goto undefined subroutine &CORE::exit" messages.
1.2 Wed Sep 25 13:05:46 CEST 2013
- INCOMPATIBLE CHANGE: remove dependency on, and autoloading
of, AnyEvent::Fork.
- provide an eof option to specify an alternative to exiting
in the asynchronous backend.
- add $NSTORABLE_SERIALISER.
- use "AE::log die" instead of die to report uncaught errors.
- ->run now provides a better $0 (as seen in some ps outputs).
- add a nice async Coro RPC server example.
- add an example of how to use AnyEvent::Log and on_event to
forward log messages to the parent.
- work around bugs in the TCP/IP stack on windows, aborting
connections instead of properly closing them on exit.
- log any errors via AE::log fatal instead of die'ing in the
async backend.
});
$cv->recv;
DESCRIPTION
This module implements a simple RPC protocol and backend for processes
created via AnyEvent::Fork or AnyEvent::Fork::Remote, allowing you to
call a function in the child process and receive its return values (up
to 4GB serialised).
It implements two different backends: a synchronous one that works like
a normal function call, and an asynchronous one that can run multiple
jobs concurrently in the child, using AnyEvent.
It also implements an asynchronous event mechanism from the child to the
parent, that could be used for progress indications or other
information.
EXAMPLES
Example 1: Synchronous Backend
Here is a simple example that implements a backend that executes
"unlink" and "rmdir" calls, and reports their status back. It also
reports the number of requests it has processed every three requests,
which is clearly silly, but illustrates the use of events.
/tmp/somepath/2: No such file or directory
3 requests handled
/tmp/somepath/3: No such file or directory
/tmp/somepath/4: No such file or directory
/tmp/somepath/5: No such file or directory
6 requests handled
/tmp/somepath/6: No such file or directory
Obviously, none of the directories I am trying to delete even exist.
Also, the events and responses are processed in exactly the same order
as they were created in the child, which is true for both synchronous
and asynchronous backends.
Note that the parentheses in the call to "AnyEvent::Fork::RPC::event"
are not optional. That is because the function isn't defined when the
code is compiled. You can make sure it is visible by pre-loading the
correct backend module in the call to "require":
->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
Since the backend module declares the "event" function, loading it first
ensures that perl will correctly interpret calls to it.
And as a final remark, there is a fine module on CPAN that can
asynchronously "rmdir" and "unlink" and a lot more, and more efficiently
than this example, namely IO::AIO.
Example 1a: the same with the asynchronous backend
This example only shows what needs to be changed to use the async
backend instead. Doing this is not very useful, the purpose of this
example is to show the minimum amount of change that is required to go
from the synchronous to the asynchronous backend.
To use the async backend in the previous example, you need to add the
"async" parameter to the "AnyEvent::Fork::RPC::run" call:
->AnyEvent::Fork::RPC::run ("MyWorker::run",
async => 1,
...
And since the function call protocol is now changed, you need to adopt
"MyWorker::run" to the async API.
$done->($status or (0, "$!"));
A few remarks are in order. First, it's quite pointless to use the async
backend for this example - but it *is* possible. Second, you can call
$done before or after returning from the function. Third, having both
returned from the function and having called the $done callback, the
child process may exit at any time, so you should call $done only when
you really *are* done.
Example 2: Asynchronous Backend
This example implements multiple count-downs in the child, using
AnyEvent timers. While this is a bit silly (one could use timers in the
parent just as well), it illustrates the ability to use AnyEvent in the
child and the fact that responses can arrive in a different order then
the requests.
It also shows how to embed the actual child code into a "__DATA__"
section, so it doesn't need any external files at all.
And when your parent process is often busy, and you have stricter timing
guarantees that events and responses are delivered to the parent process
in the exact same ordering as they were generated in the child process.
And unless your system is *very* busy, it should clearly show that the
job started last will finish first, as it has the lowest count.
This concludes the async example. Since AnyEvent::Fork does not actually
fork, you are free to use about any module in the child, not just
AnyEvent, but also IO::AIO, or Tk for example.
Example 3: Asynchronous backend with Coro
With Coro you can create a nice asynchronous backend implementation by
defining an rpc server function that creates a new Coro thread for every
request that calls a function "normally", i.e. the parameters from the
parent process are passed to it, and any return values are returned to
the parent process, e.g.:
package My::Arith;
sub add {
return $_[0] + $_[1];
}
Coro::async_pool {
$done->($func->(@arg));
};
}
The "run" function creates a new thread for every invocation, using the
first argument as function name, and calls the $done callback on it's
return values. This makes it quite natural to define the "add" and "mul"
functions to add or multiply two numbers and return the result.
Since this is the asynchronous backend, it's quite possible to define
RPC function that do I/O or wait for external events - their execution
will overlap as needed.
The above could be used like this:
my $rpc = AnyEvent::Fork
->new
->require ("MyWorker")
->AnyEvent::Fork::RPC::run ("My::Arith::run",
on_error => ..., on_event => ..., on_destroy => ...,
It can be used to do one-time things in the child such as
storing passed parameters or opening database connections.
It is called very early - before the serialisers are created or
the $function name is resolved into a function reference, so it
could be used to load any modules that provide the serialiser or
function. It can not, however, create events.
done => $function (default: "CORE::exit")
The function to call when the asynchronous backend detects an
end of file condition when reading from the communications
socket *and* there are no outstanding requests. It is ignored by
the synchronous backend.
By overriding this you can prolong the life of a RPC process
after e.g. the parent has exited by running the event loop in
the provided function (or simply calling it, for example, when
your child process uses EV you could provide EV::run as "done"
function).
Of course, in that case you are responsible for exiting at the
appropriate time and not returning from
Setting "async" to a true value switches to another
implementation that uses AnyEvent in the child and allows
multiple concurrent RPC calls (it does not support recursion in
the event loop however, blocking condvar calls will fail).
The actual API in the child is documented in the section that
describes the calling semantics of the returned $rpc function.
If you want to pre-load the actual back-end modules to enable
memory sharing, then you should load "AnyEvent::Fork::RPC::Sync"
for synchronous, and "AnyEvent::Fork::RPC::Async" for
asynchronous mode.
If you use a template process and want to fork both sync and
async children, then it is permissible to load both modules.
serialiser => $string (default:
$AnyEvent::Fork::RPC::STRING_SERIALISER)
All arguments, result data and event data have to be serialised
to be transferred between the processes. For this, they have to
be frozen and thawed in both parent and child processes.
...
}
PROCESS EXIT
If and when the child process exits depends on the backend and
configuration. Apart from explicit exits (e.g. by calling "exit") or
runtime conditions (uncaught exceptions, signals etc.), the backends
exit under these conditions:
Synchronous Backend
The synchronous backend is very simple: when the process waits for
another request to arrive and the writing side (usually in the
parent) is closed, it will exit normally, i.e. as if your main
program reached the end of the file.
That means that if your parent process exits, the RPC process will
usually exit as well, either because it is idle anyway, or because
it executes a request. In the latter case, you will likely get an
error when the RPc process tries to send the results to the parent
(because agruably, you shouldn't exit your parent while there are
still outstanding requests).
The process is usually quiescent when it happens, so it should
rarely be a problem, and "END" handlers can be used to clean up.
Asynchronous Backend
For the asynchronous backend, things are more complicated: Whenever
it listens for another request by the parent, it might detect that
the socket was closed (e.g. because the parent exited). It will sotp
listening for new requests and instead try to write out any
remaining data (if any) or simply check whether the socket can be
written to. After this, the RPC process is effectively done - no new
requests are incoming, no outstanding request data can be written
back.
Since chances are high that there are event watchers that the RPC
server knows nothing about (why else would one use the async backend
if not for the ability to register watchers?), the event loop would
often happily continue.
This is why the asynchronous backend explicitly calls "CORE::exit"
when it is done (under other circumstances, such as when there is an
I/O error and there is outstanding data to write, it will log a
fatal message via AnyEvent::Log, also causing the program to exit).
You can override this by specifying a function name to call via the
"done" parameter instead.
ADVANCED TOPICS
Choosing a backend
So how do you decide which backend to use? Well, that's your problem to
solve, but here are some thoughts on the matter:
Synchronous
The synchronous backend does not rely on any external modules (well,
except common::sense, which works around a bug in how perl's warning
system works). This keeps the process very small, for example, on my
system, an empty perl interpreter uses 1492kB RSS, which becomes
2020kB after "use warnings; use strict" (for people who grew up with
C64s around them this is probably shocking every single time they
see it). The worker process in the first example in this document
uses 1792kB.
Since the calls are done synchronously, slow jobs will keep newer
jobs from executing.
The synchronous backend also has no overhead due to running an event
loop - reading requests is therefore very efficient, while writing
responses is less so, as every response results in a write syscall.
If the parent process is busy and a bit slow reading responses, the
child waits instead of processing further requests. This also limits
the amount of memory needed for buffering, as never more than one
response has to be buffered.
The API in the child is simple - you just have to define a function
that does something and returns something.
It's hard to use modules or code that relies on an event loop, as
the child cannot execute anything while it waits for more input.
Asynchronous
The asynchronous backend relies on AnyEvent, which tries to be
small, but still comes at a price: On my system, the worker from
example 1a uses 3420kB RSS (for AnyEvent, which loads EV, which
needs XSLoader which in turn loads a lot of other modules such as
warnings, strict, vars, Exporter...).
It batches requests and responses reasonably efficiently, doing only
as few reads and writes as needed, but needs to poll for events via
the event loop.
Responses are queued when the parent process is busy. This means the
$cv->recv;
=head1 DESCRIPTION
This module implements a simple RPC protocol and backend for processes
created via L<AnyEvent::Fork> or L<AnyEvent::Fork::Remote>, allowing you
to call a function in the child process and receive its return values (up
to 4GB serialised).
It implements two different backends: a synchronous one that works like a
normal function call, and an asynchronous one that can run multiple jobs
concurrently in the child, using AnyEvent.
It also implements an asynchronous event mechanism from the child to the
parent, that could be used for progress indications or other information.
=head1 EXAMPLES
=head2 Example 1: Synchronous Backend
Here is a simple example that implements a backend that executes C<unlink>
and C<rmdir> calls, and reports their status back. It also reports the
number of requests it has processed every three requests, which is clearly
silly, but illustrates the use of events.
/tmp/somepath/2: No such file or directory
3 requests handled
/tmp/somepath/3: No such file or directory
/tmp/somepath/4: No such file or directory
/tmp/somepath/5: No such file or directory
6 requests handled
/tmp/somepath/6: No such file or directory
Obviously, none of the directories I am trying to delete even exist. Also,
the events and responses are processed in exactly the same order as
they were created in the child, which is true for both synchronous and
asynchronous backends.
Note that the parentheses in the call to C<AnyEvent::Fork::RPC::event> are
not optional. That is because the function isn't defined when the code is
compiled. You can make sure it is visible by pre-loading the correct
backend module in the call to C<require>:
->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
Since the backend module declares the C<event> function, loading it first
ensures that perl will correctly interpret calls to it.
And as a final remark, there is a fine module on CPAN that can
asynchronously C<rmdir> and C<unlink> and a lot more, and more efficiently
than this example, namely L<IO::AIO>.
=head3 Example 1a: the same with the asynchronous backend
This example only shows what needs to be changed to use the async backend
instead. Doing this is not very useful, the purpose of this example is
to show the minimum amount of change that is required to go from the
synchronous to the asynchronous backend.
To use the async backend in the previous example, you need to add the
C<async> parameter to the C<AnyEvent::Fork::RPC::run> call:
->AnyEvent::Fork::RPC::run ("MyWorker::run",
async => 1,
...
And since the function call protocol is now changed, you need to adopt
C<MyWorker::run> to the async API.
$done->($status or (0, "$!"));
A few remarks are in order. First, it's quite pointless to use the async
backend for this example - but it I<is> possible. Second, you can call
C<$done> before or after returning from the function. Third, having both
returned from the function and having called the C<$done> callback, the
child process may exit at any time, so you should call C<$done> only when
you really I<are> done.
=head2 Example 2: Asynchronous Backend
This example implements multiple count-downs in the child, using
L<AnyEvent> timers. While this is a bit silly (one could use timers in the
parent just as well), it illustrates the ability to use AnyEvent in the
child and the fact that responses can arrive in a different order then the
requests.
It also shows how to embed the actual child code into a C<__DATA__>
section, so it doesn't need any external files at all.
guarantees that events and responses are delivered to the parent process
in the exact same ordering as they were generated in the child process.
And unless your system is I<very> busy, it should clearly show that the
job started last will finish first, as it has the lowest count.
This concludes the async example. Since L<AnyEvent::Fork> does not
actually fork, you are free to use about any module in the child, not just
L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.
=head2 Example 3: Asynchronous backend with Coro
With L<Coro> you can create a nice asynchronous backend implementation by
defining an rpc server function that creates a new Coro thread for every
request that calls a function "normally", i.e. the parameters from the
parent process are passed to it, and any return values are returned to the
parent process, e.g.:
package My::Arith;
sub add {
return $_[0] + $_[1];
}
Coro::async_pool {
$done->($func->(@arg));
};
}
The C<run> function creates a new thread for every invocation, using the
first argument as function name, and calls the C<$done> callback on it's
return values. This makes it quite natural to define the C<add> and C<mul>
functions to add or multiply two numbers and return the result.
Since this is the asynchronous backend, it's quite possible to define RPC
function that do I/O or wait for external events - their execution will
overlap as needed.
The above could be used like this:
my $rpc = AnyEvent::Fork
->new
->require ("MyWorker")
->AnyEvent::Fork::RPC::run ("My::Arith::run",
on_error => ..., on_event => ..., on_destroy => ...,
It can be used to do one-time things in the child such as storing passed
parameters or opening database connections.
It is called very early - before the serialisers are created or the
C<$function> name is resolved into a function reference, so it could be
used to load any modules that provide the serialiser or function. It can
not, however, create events.
=item done => $function (default: C<CORE::exit>)
The function to call when the asynchronous backend detects an end of file
condition when reading from the communications socket I<and> there are no
outstanding requests. It is ignored by the synchronous backend.
By overriding this you can prolong the life of a RPC process after e.g.
the parent has exited by running the event loop in the provided function
(or simply calling it, for example, when your child process uses L<EV> you
could provide L<EV::run> as C<done> function).
Of course, in that case you are responsible for exiting at the appropriate
time and not returning from
=item async => $boolean (default: C<0>)
Setting C<async> to a true value switches to another implementation that
uses L<AnyEvent> in the child and allows multiple concurrent RPC calls (it
does not support recursion in the event loop however, blocking condvar
calls will fail).
The actual API in the child is documented in the section that describes
the calling semantics of the returned C<$rpc> function.
If you want to pre-load the actual back-end modules to enable memory
sharing, then you should load C<AnyEvent::Fork::RPC::Sync> for
synchronous, and C<AnyEvent::Fork::RPC::Async> for asynchronous mode.
If you use a template process and want to fork both sync and async
children, then it is permissible to load both modules.
=item serialiser => $string (default: C<$AnyEvent::Fork::RPC::STRING_SERIALISER>)
All arguments, result data and event data have to be serialised to be
transferred between the processes. For this, they have to be frozen and
thawed in both parent and child processes.
If and when the child process exits depends on the backend and
configuration. Apart from explicit exits (e.g. by calling C<exit>) or
runtime conditions (uncaught exceptions, signals etc.), the backends exit
under these conditions:
=over 4
=item Synchronous Backend
The synchronous backend is very simple: when the process waits for another
request to arrive and the writing side (usually in the parent) is closed,
it will exit normally, i.e. as if your main program reached the end of the
file.
That means that if your parent process exits, the RPC process will usually
exit as well, either because it is idle anyway, or because it executes a
request. In the latter case, you will likely get an error when the RPc
process tries to send the results to the parent (because agruably, you
shouldn't exit your parent while there are still outstanding requests).
The process is usually quiescent when it happens, so it should rarely be a
problem, and C<END> handlers can be used to clean up.
=item Asynchronous Backend
For the asynchronous backend, things are more complicated: Whenever it
listens for another request by the parent, it might detect that the socket
was closed (e.g. because the parent exited). It will sotp listening for
new requests and instead try to write out any remaining data (if any) or
simply check whether the socket can be written to. After this, the RPC
process is effectively done - no new requests are incoming, no outstanding
request data can be written back.
Since chances are high that there are event watchers that the RPC server
knows nothing about (why else would one use the async backend if not for
the ability to register watchers?), the event loop would often happily
continue.
This is why the asynchronous backend explicitly calls C<CORE::exit> when
it is done (under other circumstances, such as when there is an I/O error
and there is outstanding data to write, it will log a fatal message via
L<AnyEvent::Log>, also causing the program to exit).
You can override this by specifying a function name to call via the C<done>
parameter instead.
=back
=head1 ADVANCED TOPICS
=head2 Choosing a backend
So how do you decide which backend to use? Well, that's your problem to
solve, but here are some thoughts on the matter:
=over 4
=item Synchronous
The synchronous backend does not rely on any external modules (well,
except L<common::sense>, which works around a bug in how perl's warning
system works). This keeps the process very small, for example, on my
system, an empty perl interpreter uses 1492kB RSS, which becomes 2020kB
after C<use warnings; use strict> (for people who grew up with C64s around
them this is probably shocking every single time they see it). The worker
process in the first example in this document uses 1792kB.
Since the calls are done synchronously, slow jobs will keep newer jobs
from executing.
The synchronous backend also has no overhead due to running an event loop
- reading requests is therefore very efficient, while writing responses is
less so, as every response results in a write syscall.
If the parent process is busy and a bit slow reading responses, the child
waits instead of processing further requests. This also limits the amount
of memory needed for buffering, as never more than one response has to be
buffered.
The API in the child is simple - you just have to define a function that
does something and returns something.
It's hard to use modules or code that relies on an event loop, as the
child cannot execute anything while it waits for more input.
=item Asynchronous
The asynchronous backend relies on L<AnyEvent>, which tries to be small,
but still comes at a price: On my system, the worker from example 1a uses
3420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
which in turn loads a lot of other modules such as L<warnings>, L<strict>,
L<vars>, L<Exporter>...).
It batches requests and responses reasonably efficiently, doing only as
few reads and writes as needed, but needs to poll for events via the event
loop.
Responses are queued when the parent process is busy. This means the child
( run in 0.279 second using v1.01-cache-2.11-cpan-0d8aa00de5b )