AnyEvent-Fork-RPC
view release on metacpan or search on metacpan
/tmp/somepath/4: No such file or directory
/tmp/somepath/5: No such file or directory
6 requests handled
/tmp/somepath/6: No such file or directory
Obviously, none of the directories I am trying to delete even exist.
Also, the events and responses are processed in exactly the same order
as they were created in the child, which is true for both synchronous
and asynchronous backends.
Note that the parentheses in the call to "AnyEvent::Fork::RPC::event"
are not optional. That is because the function isn't defined when the
code is compiled. You can make sure it is visible by pre-loading the
correct backend module in the call to "require":
->require ("AnyEvent::Fork::RPC::Sync", "MyWorker")
Since the backend module declares the "event" function, loading it first
ensures that perl will correctly interpret calls to it.
And as a final remark, there is a fine module on CPAN that can
asynchronously "rmdir" and "unlink" and a lot more, and more efficiently
than this example, namely IO::AIO.
Example 1a: the same with the asynchronous backend
This example only shows what needs to be changed to use the async
backend instead. Doing this is not very useful, the purpose of this
example is to show the minimum amount of change that is required to go
from the synchronous to the asynchronous backend.
To use the async backend in the previous example, you need to add the
"async" parameter to the "AnyEvent::Fork::RPC::run" call:
->AnyEvent::Fork::RPC::run ("MyWorker::run",
async => 1,
...
And since the function call protocol is now changed, you need to adopt
"MyWorker::run" to the async API.
First, you need to accept the extra initial $done callback:
sub run {
my ($done, $cmd, $path) = @_;
And since a response is now generated when $done is called, as opposed
to when the function returns, we need to call the $done function with
the status:
$done->($status or (0, "$!"));
A few remarks are in order. First, it's quite pointless to use the async
backend for this example - but it *is* possible. Second, you can call
$done before or after returning from the function. Third, having both
returned from the function and having called the $done callback, the
child process may exit at any time, so you should call $done only when
you really *are* done.
Example 2: Asynchronous Backend
This example implements multiple count-downs in the child, using
AnyEvent timers. While this is a bit silly (one could use timers in the
parent just as well), it illustrates the ability to use AnyEvent in the
child and the fact that responses can arrive in a different order then
the requests.
It also shows how to embed the actual child code into a "__DATA__"
section, so it doesn't need any external files at all.
And when your parent process is often busy, and you have stricter timing
requirements, then running timers in a child process suddenly doesn't
look so silly anymore.
Without further ado, here is the code:
use AnyEvent;
use AnyEvent::Fork;
use AnyEvent::Fork::RPC;
my $done = AE::cv;
my $rpc = AnyEvent::Fork
->new
->require ("AnyEvent::Fork::RPC::Async")
->eval (do { local $/; <DATA> })
->AnyEvent::Fork::RPC::run ("run",
async => 1,
on_error => sub { warn "ERROR: $_[0]"; exit 1 },
on_event => sub { print $_[0] },
on_destroy => $done,
);
for my $count (3, 2, 1) {
$rpc->($count, sub {
warn "job $count finished\n";
});
}
undef $rpc;
$done->recv;
__DATA__
# this ends up in main, as we don't use a package declaration
use AnyEvent;
sub run {
my ($done, $count) = @_;
my $n;
AnyEvent::Fork::RPC::event "starting to count up to $count\n";
my $w; $w = AE::timer 1, 1, sub {
++$n;
AnyEvent::Fork::RPC::event "count $n of $count\n";
if ($n == $count) {
undef $w;
$done->();
}
};
}
The parent part (the one before the "__DATA__" section) isn't very
different from the earlier examples. It sets async mode, preloads the
backend module (so the "AnyEvent::Fork::RPC::event" function is
declared), uses a slightly different "on_event" handler (which we use
simply for logging purposes) and then, instead of loading a module with
the actual worker code, it "eval"'s the code from the data section in
the child process.
It then starts three countdowns, from 3 to 1 seconds downwards, destroys
the rpc object so the example finishes eventually, and then just waits
for the stuff to trickle in.
The worker code uses the event function to log some progress messages,
but mostly just creates a recurring one-second timer.
The timer callback increments a counter, logs a message, and eventually,
when the count has been reached, calls the finish callback.
On my system, this results in the following output. Since all timers
fire at roughly the same time, the actual order isn't guaranteed, but
the order shown is very likely what you would get, too.
starting to count up to 3
starting to count up to 2
starting to count up to 1
count 1 of 3
count 1 of 2
count 1 of 1
job 1 finished
count 2 of 2
job 2 finished
count 2 of 3
count 3 of 3
job 3 finished
While the overall ordering isn't guaranteed, the async backend still
guarantees that events and responses are delivered to the parent process
in the exact same ordering as they were generated in the child process.
And unless your system is *very* busy, it should clearly show that the
job started last will finish first, as it has the lowest count.
This concludes the async example. Since AnyEvent::Fork does not actually
fork, you are free to use about any module in the child, not just
AnyEvent, but also IO::AIO, or Tk for example.
Example 3: Asynchronous backend with Coro
With Coro you can create a nice asynchronous backend implementation by
defining an rpc server function that creates a new Coro thread for every
request that calls a function "normally", i.e. the parameters from the
parent process are passed to it, and any return values are returned to
the parent process, e.g.:
package My::Arith;
sub add {
return $_[0] + $_[1];
}
sub mul {
return $_[0] * $_[1];
}
sub run {
my ($done, $func, @arg) = @_;
Coro::async_pool {
$done->($func->(@arg));
};
}
The "run" function creates a new thread for every invocation, using the
first argument as function name, and calls the $done callback on it's
return values. This makes it quite natural to define the "add" and "mul"
functions to add or multiply two numbers and return the result.
Since this is the asynchronous backend, it's quite possible to define
RPC function that do I/O or wait for external events - their execution
will overlap as needed.
( run in 0.559 second using v1.01-cache-2.11-cpan-02777c243ea )