AnyEvent-Fork-RPC
view release on metacpan or search on metacpan
Example 2: Asynchronous Backend
This example implements multiple count-downs in the child, using
AnyEvent timers. While this is a bit silly (one could use timers in the
parent just as well), it illustrates the ability to use AnyEvent in the
child and the fact that responses can arrive in a different order then
the requests.
It also shows how to embed the actual child code into a "__DATA__"
section, so it doesn't need any external files at all.
And when your parent process is often busy, and you have stricter timing
requirements, then running timers in a child process suddenly doesn't
look so silly anymore.
Without further ado, here is the code:
use AnyEvent;
use AnyEvent::Fork;
use AnyEvent::Fork::RPC;
my $done = AE::cv;
count 2 of 2
job 2 finished
count 2 of 3
count 3 of 3
job 3 finished
While the overall ordering isn't guaranteed, the async backend still
guarantees that events and responses are delivered to the parent process
in the exact same ordering as they were generated in the child process.
And unless your system is *very* busy, it should clearly show that the
job started last will finish first, as it has the lowest count.
This concludes the async example. Since AnyEvent::Fork does not actually
fork, you are free to use about any module in the child, not just
AnyEvent, but also IO::AIO, or Tk for example.
Example 3: Asynchronous backend with Coro
With Coro you can create a nice asynchronous backend implementation by
defining an rpc server function that creates a new Coro thread for every
request that calls a function "normally", i.e. the parameters from the
see it). The worker process in the first example in this document
uses 1792kB.
Since the calls are done synchronously, slow jobs will keep newer
jobs from executing.
The synchronous backend also has no overhead due to running an event
loop - reading requests is therefore very efficient, while writing
responses is less so, as every response results in a write syscall.
If the parent process is busy and a bit slow reading responses, the
child waits instead of processing further requests. This also limits
the amount of memory needed for buffering, as never more than one
response has to be buffered.
The API in the child is simple - you just have to define a function
that does something and returns something.
It's hard to use modules or code that relies on an event loop, as
the child cannot execute anything while it waits for more input.
The asynchronous backend relies on AnyEvent, which tries to be
small, but still comes at a price: On my system, the worker from
example 1a uses 3420kB RSS (for AnyEvent, which loads EV, which
needs XSLoader which in turn loads a lot of other modules such as
warnings, strict, vars, Exporter...).
It batches requests and responses reasonably efficiently, doing only
as few reads and writes as needed, but needs to poll for events via
the event loop.
Responses are queued when the parent process is busy. This means the
child can continue to execute any queued requests. It also means
that a child might queue a lot of responses in memory when it
generates them and the parent process is slow accepting them.
The API is not a straightforward RPC pattern - you have to call a
"done" callback to pass return values and signal completion. Also,
more importantly, the API starts jobs as fast as possible - when
1000 jobs are queued and the jobs are slow, they will all run
concurrently. The child must implement some queueing/limiting
mechanism if this causes problems. Alternatively, the parent could
This example implements multiple count-downs in the child, using
L<AnyEvent> timers. While this is a bit silly (one could use timers in the
parent just as well), it illustrates the ability to use AnyEvent in the
child and the fact that responses can arrive in a different order then the
requests.
It also shows how to embed the actual child code into a C<__DATA__>
section, so it doesn't need any external files at all.
And when your parent process is often busy, and you have stricter timing
requirements, then running timers in a child process suddenly doesn't look
so silly anymore.
Without further ado, here is the code:
use AnyEvent;
use AnyEvent::Fork;
use AnyEvent::Fork::RPC;
my $done = AE::cv;
count 2 of 2
job 2 finished
count 2 of 3
count 3 of 3
job 3 finished
While the overall ordering isn't guaranteed, the async backend still
guarantees that events and responses are delivered to the parent process
in the exact same ordering as they were generated in the child process.
And unless your system is I<very> busy, it should clearly show that the
job started last will finish first, as it has the lowest count.
This concludes the async example. Since L<AnyEvent::Fork> does not
actually fork, you are free to use about any module in the child, not just
L<AnyEvent>, but also L<IO::AIO>, or L<Tk> for example.
=head2 Example 3: Asynchronous backend with Coro
With L<Coro> you can create a nice asynchronous backend implementation by
defining an rpc server function that creates a new Coro thread for every
them this is probably shocking every single time they see it). The worker
process in the first example in this document uses 1792kB.
Since the calls are done synchronously, slow jobs will keep newer jobs
from executing.
The synchronous backend also has no overhead due to running an event loop
- reading requests is therefore very efficient, while writing responses is
less so, as every response results in a write syscall.
If the parent process is busy and a bit slow reading responses, the child
waits instead of processing further requests. This also limits the amount
of memory needed for buffering, as never more than one response has to be
buffered.
The API in the child is simple - you just have to define a function that
does something and returns something.
It's hard to use modules or code that relies on an event loop, as the
child cannot execute anything while it waits for more input.
The asynchronous backend relies on L<AnyEvent>, which tries to be small,
but still comes at a price: On my system, the worker from example 1a uses
3420kB RSS (for L<AnyEvent>, which loads L<EV>, which needs L<XSLoader>
which in turn loads a lot of other modules such as L<warnings>, L<strict>,
L<vars>, L<Exporter>...).
It batches requests and responses reasonably efficiently, doing only as
few reads and writes as needed, but needs to poll for events via the event
loop.
Responses are queued when the parent process is busy. This means the child
can continue to execute any queued requests. It also means that a child
might queue a lot of responses in memory when it generates them and the
parent process is slow accepting them.
The API is not a straightforward RPC pattern - you have to call a
"done" callback to pass return values and signal completion. Also, more
importantly, the API starts jobs as fast as possible - when 1000 jobs
are queued and the jobs are slow, they will all run concurrently. The
child must implement some queueing/limiting mechanism if this causes
problems. Alternatively, the parent could limit the amount of rpc calls
RPC/Async.pm view on Meta::CPAN
{
package main;
my $init = delete $kv{init};
&$init if length $init;
$function = \&$function; # resolve function early for extra speed
}
%kv = (); # save some very small amount of memory
my $busy = 1; # exit when == 0
my ($f, $t) = eval $serialiser; AE::log fatal => $@ if $@;
my ($wbuf, $ww);
my $wcb = sub {
my $len = syswrite $wfh, $wbuf;
unless (defined $len) {
if ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
undef $ww;
AE::log fatal => "AnyEvent::Fork::RPC: write error ($!), parent gone?";
}
}
substr $wbuf, 0, $len, "";
unless (length $wbuf) {
undef $ww;
unless ($busy) {
shutdown $wfh, 1;
@_ = (); goto &$done;
}
}
};
my $write = sub {
$wbuf .= $_[0];
$ww ||= AE::io $wfh, 1, $wcb;
};
RPC/Async.pm view on Meta::CPAN
if ($len) {
while (8 <= length $rbuf) {
(my $id, $len) = unpack "NN", $rbuf;
8 + $len <= length $rbuf
or last;
my @r = $t->(substr $rbuf, 8, $len);
substr $rbuf, 0, 8 + $len, "";
++$busy;
$function->(sub {
--$busy;
$write->(pack "NN/a*", $id, &$f);
}, @r);
}
} elsif (defined $len or $! == Errno::EINVAL) { # EINVAL is for microshit windoze
undef $rw;
--$busy;
$ww ||= AE::io $wfh, 1, $wcb;
} elsif ($! != Errno::EAGAIN && $! != Errno::EWOULDBLOCK) {
undef $rw;
AE::log fatal => "AnyEvent::Fork::RPC: read error in child: $!";
}
};
$AnyEvent::MODEL eq "AnyEvent::Impl::EV"
? EV::run ()
: AE::cv->recv;
( run in 0.333 second using v1.01-cache-2.11-cpan-87723dcf8b7 )