AnyEvent-MP
view release on metacpan or search on metacpan
MP/Intro.pod view on Meta::CPAN
not even be running. Despite these troubling facts, everything should
work just fine: if the node isn't running (or the init function throws an
exception), then the monitor will trigger because the port doesn't exist.
If the spawn message gets delivered, but the monitoring message is not
because of network problems (extremely unlikely, but monitoring, after
all, is implemented by passing a message, and messages can get lost), then
this connection loss will eventually trigger the monitoring action. On the
remote node (which in return monitors the client) the port will also be
cleaned up on connection loss. When the remote node comes up again and our
monitoring message can be delivered, it will instantly fail because the
port has been cleaned up in the meantime.
If your head is spinning by now, that's fine - just keep in mind, after
creating a port using C<spawn>, monitor it on the local node, and monitor
"the other side" from the remote node, and all will be cleaned up just
fine.
=head2 Services
Above it was mentioned that C<spawn> automatically loads modules. This can
be exploited in various useful ways.
Assume for a moment you put the server into a file called
F<mymod/chatserver.pm> reachable from the current directory. Then you
could run a node there with:
aemp run
The other nodes could C<spawn> the server by using
C<mymod::chatserver::client_connect> as init function - without any other
configuration.
Likewise, when you have some service that starts automatically when loaded
(similar to AnyEvent::MP::Global), then you can configure this service
statically:
aemp profile mysrvnode services mymod::service::
aemp run profile mysrvnode
And the module will automatically be loaded in the node, as specifying a
module name (with C<::>-suffix) will simply load the module, which is then
free to do whatever it wants.
Of course, you can also do it in the much more standard way by writing
a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
distribution and then configure nodes. For example, if I wanted to run the
Bummskraut IRC backend on a machine named "ruth", I could do this:
aemp profile ruth addservice BK::Backend::IRC::
And any F<aemp run> on that host will automatically have the Bummskraut
IRC backend running.
There are plenty of possibilities you can use - it's all up to you how you
structure your application.
=head1 PART 4: Coro::MP - selective receive
Not all problems lend themselves naturally to an event-based solution:
sometimes things are easier if you can decide in what order you want to
receive messages, regardless of the order in which they were sent.
In these cases, L<Coro::MP> can provide a nice solution: instead of
registering callbacks for each message type, C<Coro::MP> attaches a
(coro-) thread to a port. The thread can then opt to selectively receive
messages it is interested in. Other messages are not lost, but queued, and
can be received at a later time.
The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
module. It is, however, tightly integrated into C<AnyEvent::MP> - the
ports it creates are fully compatible to C<AnyEvent::MP> ports.
In fact, C<Coro::MP> is more of an extension than a separate module: all
functions exported by C<AnyEvent::MP> are exported by it as well.
To illustrate how programing with C<Coro::MP> looks like, consider the
following (slightly contrived) example: Let's implement a server that
accepts a C<< (write_file =>, $port, $path) >> message with a (source)
port and a filename, followed by as many C<< (data => $port, $data) >>
messages as required to fill the file, followed by an empty C<< (data =>
$port) >> message.
The server only writes a single file at a time, other requests will stay
in the queue until the current file has been finished.
Here is an example implementation that uses L<Coro::AIO> and largely
ignores error handling:
my $ioserver = port_async {
while () {
my ($tag, $port, $path) = get_cond;
$tag eq "write_file"
or die "only write_file messages expected";
my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
or die "$path: $!";
while () {
my (undef, undef, $data) = get_cond {
$_[0] eq "data" && $_[1] eq $port
} 5
or die "timeout waiting for data message from $port\n";
length $data or last;
aio_write $fh, undef, undef, $data, 0;
};
}
};
mon $ioserver, sub {
warn "ioserver was killed: @_\n";
};
Let's go through it, section by section.
my $ioserver = port_async {
Ports can be created by attaching a thread to an existing port via
( run in 0.709 second using v1.01-cache-2.11-cpan-39bf76dae61 )