AnyEvent-MP
view release on metacpan or search on metacpan
MP/Intro.pod view on Meta::CPAN
The callback is a typical AnyEvent idiom: the callback just passes
that number on to the I<condition variable> C<$end_cv> which will then
pass the value to the print. Condition variables are out of the scope
of this tutorial and not often used with ports, so please consult the
L<AnyEvent::Intro> about them.
Passing messages inside just one process is boring. Before we can move on
and do interprocess message passing we first have to make sure some things
have been set up correctly for our nodes to talk to each other.
=head2 System Requirements and System Setup
Before we can start with real IPC we have to make sure some things work on
your system.
First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
I<nodes> to be able to communicate with each other over the network it is
necessary to setup the same I<shared secret> for both of them, so they can
prove their trustworthyness to each other.
The easiest way is to set this up is to use the F<aemp> utility:
aemp gensecret
This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
random shared secret. You can copy this file to any other system and
then communicate over the network (via TCP) with it. You can also select
your own shared secret (F<aemp setsecret>) and for increased security
requirements you can even create (or configure) a TLS certificate (F<aemp
gencert>), causing connections to not just be securely authenticated, but
also to be encrypted and protected against tinkering.
Connections will only be successfully established when the I<nodes>
that want to connect to each other have the same I<shared secret> (or
successfully verify the TLS certificate of the other side, in which case
no shared secret is required).
B<If something does not work as expected, and for example tcpdump shows
that the connections are closed almost immediately, you should make sure
that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
you try to connect with each other!>
Thats is all for now, you will find some more advanced fiddling with the
C<aemp> utility later.
=head2 Shooting the Trouble
Sometimes things go wrong, and AnyEvent::MP, being a professional module,
does not gratuitously spill out messages to your screen.
To help troubleshooting any issues, there are two environment variables
that you can set. The first, C<AE_VERBOSE> sets the logging level of
L<AnyEvent::Log>, which AnyEvent::MP uses. The default is C<4>, which
means nothing much is printed. You can increase it to C<8> or C<9> to get
more verbose output. This is example output when starting a node (somewhat
abridged to get shorter lines):
2012-03-22 01:41:43.59 debug AE::Util: using Guard module to implement guards.
2012-03-22 01:41:43.62 debug AE::MP::Kernel: node cerebro/slwK2LEq7O starting up.
2012-03-22 01:41:43.62 debug AE::MP::Kernel: node listens on [10.0.0.1:52110].
2012-03-22 01:41:43.62 trace AE::MP::Kernel: trying connect to seed node 10.0.0.19:4040.
2012-03-22 01:41:43.66 trace AE::MP::Transport: 10.0.0.19:4040 connected as rain.
2012-03-22 01:41:43.66 info AE::MP::Kernel: rain is up.
A lot of info, but at least you can see that it does something. To only
get info about AnyEvent::MP, you can use C<AE_LOG=AnyEvent::MP=+log> in
your environment.
The other environment variable that can be useful is
C<AE_MP_TRACE>, which, when set to a true value, will cause
most messages that are sent or received to be printed. For example, F<aemp
restart rijk> might output these message exchanges:
SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
SND rain <- [null,"g_find","rijk"]
RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
RCV rijk -> ["b",""]
=head1 PART 1: Passing Messages Between Processes
=head2 The Receiver
Lets split the previous example up into two programs: one that contains
the sender and one for the receiver. First the receiver application, in
full:
use AnyEvent;
use AnyEvent::MP;
configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
my $port = port;
db_set eg_receivers => $port;
rcv $port, test => sub {
my ($data, $reply_port) = @_;
print "Received data: " . $data . "\n";
};
AnyEvent->condvar->recv;
Now, that wasn't too bad, was it? OK, let's go through the new functions
that have been used.
=head3 C<configure> and Joining and Maintaining the Network
First let's have a look at C<configure>:
configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
Before we are able to send messages to other nodes we have to configure
the node to become a "networked node". Configuring a node means naming
the node and binding some TCP listeners so that other nodes can contact
it. The choice on whether a process becomes a networked node or not must
be done before doing anything else with AnyEvent::MP.
Additionally, to actually link all nodes in a network together, you should
specify a number of seed addresses, which will be used by the node to
connect itself into an existing network, as we will see shortly.
All of this info (and more) can be passed to the C<configure> function -
later we will see how we can do all this without even passing anything to
C<configure>!
Back to the function call in the program: the first parameter, C<nodeid>,
specified the node ID (in this case C<eg_receiver/%u> - the default is to
use the node name of the current host plus C</%u>, which gives the node a
name with a random suffix to make it unique, but for this example we want
the node to have a bit more personality, and name it C<eg_receiver> with a
random suffix.
Why the random suffix? Node IDs need to be unique within the network and
appending a random suffix is the easiest way to do that.
The second parameter, C<binds>, specifies a list of C<address:port> pairs
to bind TCP listeners on. The special "address" of C<*> means to bind on
every local IP address (this might not work on every OS, so explicit IP
addresses are best).
The reason to bind on a TCP port is not just that other nodes can connect
to us: if no binds are specified, the node will still bind on a dynamic
port on all local addresses - but in this case we won't know the port, and
cannot tell other nodes to connect to it as seed node.
Now, a I<seed> is simply the TCP address of some other node in the
network, often the same string as used for the C<binds> parameter of the
other node. The need for seeds is easy to explain: I<somehow> the nodes
of an aemp network have to find each other, and often this means over the
internet. So broadcasts are out.
Instead, a node usually specifies the addresses of one or few (for
redundancy) other nodes, some of which should be up. Two nodes can set
each other as seeds without any issues. You could even specify all nodes
as seeds for all nodes, for total redundancy. But the common case is to
have some more or less central, stable servers running seed services for
other nodes.
All you need to do to ensure that an AnyEvent::MP network connects
together is to make sure that all seed nodes are connected together via
their seed connections, i.e., all connections from seed nodes to I<their>
seed nodes form a connected graph.
A node tries to keep connections open to all of it's seed nodes at all
times, while other connections are made on demand only.
The simplest way to do that would be for all nodes to use the same seed
nodes: seed nodes would seed each other, and all other nodes would connect
to the seed nodes.
MP/Intro.pod view on Meta::CPAN
is already running. This means it takes time until the node is
fully connected, and information about services in the network are
available. This is why most AnyEvent::MP programs either just register
themselves in the database and wait to be "found" by others, or they start
to monitor the database until some nodes of the required type show up.
We will see how this is done later, in the sender program.
=head3 Registering the Receiver
Coming back to our example, after the node has been configured for network
access, it is time to publish some service, namely the receive service.
For that, let's look at the next lines:
my $port = port;
db_set eg_receivers => $port;
The C<port> function has already been discussed. It simply creates a new
I<port> and returns the I<port ID>. The C<db_set> function, however, is
new: The first argument is the name of a I<database family> and the second
argument is the name of a I<subkey> within that family. The third argument
would be the I<value> to be associated with the family and subkey, but,
since it is missing, it will simply be C<undef>.
What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
database. This database runs on so-called "global" nodes, which usually
are the seed nodes of your network. The database structure is "simply" a
hash of hashes of values.
To illustrate this with Perl syntax, assume the database was stored in
C<%DB>, then the C<db_set> function more or less would do this:
$DB{eg_receivers}{$port} = undef;
So the ominous "family" selects a hash in the database, and the "subkey"
is simply the key in this hash - C<db_set> very much works like this
assignment.
The family namespace is shared by all nodes in a network, so the names
should be reasonably unique, for example, they could start with the name
of your module, or the name of the program, using your port name or node
name as subkey.
The purpose behind adding this key to the database is that the sender can
look it up and find our port. We will shortly see how.
The last step in the example is to set up a receiver callback for those
messages, just as was discussed in the first example. We again match
for the tag C<test>. The difference is that this time we don't exit the
application after receiving the first message. Instead we continue to wait
for new messages indefinitely.
=head2 The Sender
OK, now let's take a look at the sender code:
use AnyEvent;
use AnyEvent::MP;
configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
my $guard = db_mon eg_receivers => sub {
my ($family, $a, $c, $d) = @_;
return unless %$family;
# now there are some receivers, send them a message
snd $_ => test => time
for keys %$family;
};
AnyEvent->condvar->recv;
It's even less code. The C<configure> serves the same purpose as in the
receiver, but instead of specifying binds we specify a list of seeds - the
only seed happens to be the same as the bind used by the receiver, which
therefore becomes our seed node.
Remember the part about having to wait till things become available? Well,
after configure returns, nothing has been done yet - the node is not
connected to the network, knows nothing about the database contents, and
it can take ages (for a computer :) for this situation to change.
Therefore, the sender waits, in this case by using the C<db_mon>
function. This function registers an interest in a specific database
family (in this case C<eg_receivers>). Each time something inside the
family changes (a key is added, changed or deleted), it will call our
callback with the family hash as first argument, and the list of keys as
second argument.
The callback only checks whether the C<%$family> hash is empty - if it is,
then it doesn't do anything. But eventually the family will contain the
port subkey we set in the sender. Then it will send a message to it (and
any other receiver in the same family). Likewise, should the receiver go
away and come back, or should another receiver come up, it will again send
a message to all of them.
You can experiment by having multiple receivers - you have to change the
"binds" parameter in the receiver to the seeds used in the sender to start
up additional receivers, but then you can start as many as you like. If
you specify proper IP addresses for the seeds, you can even run them on
different computers.
Each time you start the sender, it will send a message to all receivers it
finds (you have to interrupt it manually afterwards).
Additional experiments you could try include using C<AE_MP_TRACE=1> to see
which messages are exchanged, or starting the sender before the receiver
and see how long it then takes to find the receiver.
=head3 Splitting Network Configuration and Application Code
OK, so far, this works reasonably well. In the real world, however, the
person configuring your application to run on a specific network (the end
user or network administrator) is often different to the person coding the
application.
Or to put it differently: the arguments passed to configure are usually
provided not by the programmer, but by whoever is deploying the program -
even in the example above, we would like to be able to just start senders
and receivers without having to patch the programs.
To make this easy, AnyEvent::MP supports a simple configuration database,
using profiles, which can be managed using the F<aemp> command-line
utility (yes, this section is about the advanced tinkering mentioned
before).
When you change both programs above to simply call
configure;
then AnyEvent::MP tries to look up a profile using the current node name
in its configuration database, falling back to some global default.
You can run "generic" nodes using the F<aemp> utility as well, and we will
exploit this in the following way: we configure a profile "seed" and run
a node using it, whose sole purpose is to be a seed node for our example
programs.
We bind the seed node to port 4040 on all interfaces:
aemp profile seed binds "*:4040"
And we configure all nodes to use this as seed node (this only works when
running on the same host, for multiple machines you would replace the C<*>
by the IP address or hostname of the node running the seed), by changing
the global settings shared between all profiles:
aemp seeds "*:4040"
Then we run the seed node:
aemp run profile seed
After that, we can start as many other nodes as we want, and they will
all use our generic seed node to discover each other. The reason we can
start our existing programs even though they specify "incompatible"
parameters to C<configure> is that the configuration file (by default)
takes precedence over any arguments passed to C<configure>.
That's all for now - next we will teach you about monitoring by writing a
simple chat client and server :)
=head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
That's a mouthful, so what does it mean? Our previous example is what one
could call "very loosely coupled" - the sender doesn't care about whether
there are any receivers, and the receivers do not care if there is any
sender.
This can work fine for simple services, but most real-world applications
want to ensure that the side they are expecting to be there is actually
there. Going one step further: most bigger real-world applications even
want to ensure that if some component is missing, or has crashed, it will
still be there, by recovering and restarting the service.
AnyEvent::MP supports this by catching exceptions and network problems,
and notifying interested parties of these.
=head2 Exceptions, Port Context, Network Errors and Monitors
=head3 Exceptions
Exceptions are handled on a per-port basis: all receive callbacks are
executed in a special context, the so-called I<port-context>: code
that throws an otherwise uncaught exception will cause the port to be
C<kil>led. Killed ports are destroyed automatically (killing ports is
actually the only way to free ports).
Ports can be monitored, even from a different node and host, and when a
port is killed, any entity monitoring it will be notified.
Here is a simple example:
use AnyEvent::MP;
# create a port, it always dies
my $port = port { die "oops" };
# monitor it
mon $port, sub {
warn "$port was killed (with reason @_)";
};
# now send it some message, causing it to die:
snd $port;
AnyEvent->condvar->recv;
( run in 0.801 second using v1.01-cache-2.11-cpan-39bf76dae61 )