AnyEvent-MP
view release on metacpan or search on metacpan
MP/Intro.pod view on Meta::CPAN
fine.
=head2 Services
Above it was mentioned that C<spawn> automatically loads modules. This can
be exploited in various useful ways.
Assume for a moment you put the server into a file called
F<mymod/chatserver.pm> reachable from the current directory. Then you
could run a node there with:
aemp run
The other nodes could C<spawn> the server by using
C<mymod::chatserver::client_connect> as init function - without any other
configuration.
Likewise, when you have some service that starts automatically when loaded
(similar to AnyEvent::MP::Global), then you can configure this service
statically:
aemp profile mysrvnode services mymod::service::
aemp run profile mysrvnode
And the module will automatically be loaded in the node, as specifying a
module name (with C<::>-suffix) will simply load the module, which is then
free to do whatever it wants.
Of course, you can also do it in the much more standard way by writing
a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
distribution and then configure nodes. For example, if I wanted to run the
Bummskraut IRC backend on a machine named "ruth", I could do this:
aemp profile ruth addservice BK::Backend::IRC::
And any F<aemp run> on that host will automatically have the Bummskraut
IRC backend running.
There are plenty of possibilities you can use - it's all up to you how you
structure your application.
=head1 PART 4: Coro::MP - selective receive
Not all problems lend themselves naturally to an event-based solution:
sometimes things are easier if you can decide in what order you want to
receive messages, regardless of the order in which they were sent.
In these cases, L<Coro::MP> can provide a nice solution: instead of
registering callbacks for each message type, C<Coro::MP> attaches a
(coro-) thread to a port. The thread can then opt to selectively receive
messages it is interested in. Other messages are not lost, but queued, and
can be received at a later time.
The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
module. It is, however, tightly integrated into C<AnyEvent::MP> - the
ports it creates are fully compatible to C<AnyEvent::MP> ports.
In fact, C<Coro::MP> is more of an extension than a separate module: all
functions exported by C<AnyEvent::MP> are exported by it as well.
To illustrate how programing with C<Coro::MP> looks like, consider the
following (slightly contrived) example: Let's implement a server that
accepts a C<< (write_file =>, $port, $path) >> message with a (source)
port and a filename, followed by as many C<< (data => $port, $data) >>
messages as required to fill the file, followed by an empty C<< (data =>
$port) >> message.
The server only writes a single file at a time, other requests will stay
in the queue until the current file has been finished.
Here is an example implementation that uses L<Coro::AIO> and largely
ignores error handling:
my $ioserver = port_async {
while () {
my ($tag, $port, $path) = get_cond;
$tag eq "write_file"
or die "only write_file messages expected";
my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
or die "$path: $!";
while () {
my (undef, undef, $data) = get_cond {
$_[0] eq "data" && $_[1] eq $port
} 5
or die "timeout waiting for data message from $port\n";
length $data or last;
aio_write $fh, undef, undef, $data, 0;
};
}
};
mon $ioserver, sub {
warn "ioserver was killed: @_\n";
};
Let's go through it, section by section.
my $ioserver = port_async {
Ports can be created by attaching a thread to an existing port via
C<rcv_async>, or as in this example, by calling C<port_async> with the
code to execute as a thread. The C<async> component comes from the fact
that threads are created using the C<Coro::async> function.
The thread runs in a normal port context (so C<$SELF> is set). In
addition, when the thread returns, it will be C<kil> I<normally>, i.e.
without a reason argument.
while () {
my ($tag, $port, $path) = get_cond;
or die "only write_file messages expected";
The thread is supposed to serve many file writes, which is why it
executes in a loop. The first thing it does is fetch the next message,
using C<get_cond>, the "conditional message get". Without arguments, it
merely fetches the I<next> message from the queue, which I<must> be a
( run in 0.809 second using v1.01-cache-2.11-cpan-39bf76dae61 )