AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Intro.pod  view on Meta::CPAN

network, often the same string as used for the C<binds> parameter of the
other node. The need for seeds is easy to explain: I<somehow> the nodes
of an aemp network have to find each other, and often this means over the
internet. So broadcasts are out.

Instead, a node usually specifies the addresses of one or few (for
redundancy) other nodes, some of which should be up. Two nodes can set
each other as seeds without any issues. You could even specify all nodes
as seeds for all nodes, for total redundancy. But the common case is to
have some more or less central, stable servers running seed services for
other nodes.

All you need to do to ensure that an AnyEvent::MP network connects
together is to make sure that all seed nodes are connected together via
their seed connections, i.e., all connections from seed nodes to I<their>
seed nodes form a connected graph.

A node tries to keep connections open to all of it's seed nodes at all
times, while other connections are made on demand only.

The simplest way to do that would be for all nodes to use the same seed
nodes: seed nodes would seed each other, and all other nodes would connect
to the seed nodes.

All of this ensures that the network stays one network - even if all the
nodes in one half of the net are separated from the nodes in the other
half by some network problem, once that is over, they will eventually
become a single network again.

In addition to creating the network, a node also expects the seed nodes to
run the shared database service - if need be, by automatically starting
it, so you don't normally need to configure this explicitly.

The process of joining a network takes time, during which the node
is already running. This means it takes time until the node is
fully connected, and information about services in the network are
available. This is why most AnyEvent::MP programs either just register
themselves in the database and wait to be "found" by others, or they start
to monitor the database until some nodes of the required type show up.

We will see how this is done later, in the sender program.

=head3 Registering the Receiver

Coming back to our example, after the node has been configured for network
access, it is time to publish some service, namely the receive service.

For that, let's look at the next lines:

   my $port = port;
   db_set eg_receivers => $port;

The C<port> function has already been discussed. It simply creates a new
I<port> and returns the I<port ID>. The C<db_set> function, however, is
new: The first argument is the name of a I<database family> and the second
argument is the name of a I<subkey> within that family. The third argument
would be the I<value> to be associated with the family and subkey, but,
since it is missing, it will simply be C<undef>.

What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
database. This database runs on so-called "global" nodes, which usually
are the seed nodes of your network. The database structure is "simply" a
hash of hashes of values.

To illustrate this with Perl syntax, assume the database was stored in
C<%DB>, then the C<db_set> function more or less would do this:

   $DB{eg_receivers}{$port} = undef;

So the ominous "family" selects a hash in the database, and the "subkey"
is simply the key in this hash - C<db_set> very much works like this
assignment.

The family namespace is shared by all nodes in a network, so the names
should be reasonably unique, for example, they could start with the name
of your module, or the name of the program, using your port name or node
name as subkey.

The purpose behind adding this key to the database is that the sender can
look it up and find our port. We will shortly see how.

The last step in the example is to set up a receiver callback for those
messages, just as was discussed in the first example. We again match
for the tag C<test>. The difference is that this time we don't exit the
application after receiving the first message. Instead we continue to wait
for new messages indefinitely.

=head2 The Sender

OK, now let's take a look at the sender code:

   use AnyEvent;
   use AnyEvent::MP;

   configure nodeid => "eg_sender/%u", seeds => ["*:4040"];

   my $guard = db_mon eg_receivers => sub {
      my ($family, $a, $c, $d) = @_;
      return unless %$family;

      # now there are some receivers, send them a message
      snd $_ => test => time
         for keys %$family;
   };

   AnyEvent->condvar->recv;

It's even less code. The C<configure> serves the same purpose as in the
receiver, but instead of specifying binds we specify a list of seeds - the
only seed happens to be the same as the bind used by the receiver, which
therefore becomes our seed node.

Remember the part about having to wait till things become available? Well,
after configure returns, nothing has been done yet - the node is not
connected to the network, knows nothing about the database contents, and
it can take ages (for a computer :) for this situation to change.

Therefore, the sender waits, in this case by using the C<db_mon>
function. This function registers an interest in a specific database
family (in this case C<eg_receivers>). Each time something inside the
family changes (a key is added, changed or deleted), it will call our
callback with the family hash as first argument, and the list of keys as
second argument.

The callback only checks whether the C<%$family> hash is empty - if it is,
then it doesn't do anything. But eventually the family will contain the
port subkey we set in the sender. Then it will send a message to it (and
any other receiver in the same family). Likewise, should the receiver go
away and come back, or should another receiver come up, it will again send
a message to all of them.

You can experiment by having multiple receivers - you have to change the
"binds" parameter in the receiver to the seeds used in the sender to start
up additional receivers, but then you can start as many as you like. If
you specify proper IP addresses for the seeds, you can even run them on
different computers.

Each time you start the sender, it will send a message to all receivers it
finds (you have to interrupt it manually afterwards).

Additional experiments you could try include using C<AE_MP_TRACE=1> to see
which messages are exchanged, or starting the sender before the receiver
and see how long it then takes to find the receiver.

=head3 Splitting Network Configuration and Application Code

OK, so far, this works reasonably well. In the real world, however, the
person configuring your application to run on a specific network (the end
user or network administrator) is often different to the person coding the
application.

Or to put it differently: the arguments passed to configure are usually
provided not by the programmer, but by whoever is deploying the program -
even in the example above, we would like to be able to just start senders
and receivers without having to patch the programs.

To make this easy, AnyEvent::MP supports a simple configuration database,
using profiles, which can be managed using the F<aemp> command-line
utility (yes, this section is about the advanced tinkering mentioned
before).

When you change both programs above to simply call

   configure;

then AnyEvent::MP tries to look up a profile using the current node name
in its configuration database, falling back to some global default.

You can run "generic" nodes using the F<aemp> utility as well, and we will
exploit this in the following way: we configure a profile "seed" and run
a node using it, whose sole purpose is to be a seed node for our example
programs.

We bind the seed node to port 4040 on all interfaces:

   aemp profile seed binds "*:4040"

And we configure all nodes to use this as seed node (this only works when
running on the same host, for multiple machines you would replace the C<*>
by the IP address or hostname of the node running the seed), by changing
the global settings shared between all profiles:

   aemp seeds "*:4040"

Then we run the seed node:

   aemp run profile seed

After that, we can start as many other nodes as we want, and they will
all use our generic seed node to discover each other. The reason we can
start our existing programs even though they specify "incompatible"
parameters to C<configure> is that the configuration file (by default)
takes precedence over any arguments passed to C<configure>.

That's all for now - next we will teach you about monitoring by writing a
simple chat client and server :)

=head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery

That's a mouthful, so what does it mean? Our previous example is what one
could call "very loosely coupled" - the sender doesn't care about whether
there are any receivers, and the receivers do not care if there is any
sender.

This can work fine for simple services, but most real-world applications
want to ensure that the side they are expecting to be there is actually
there. Going one step further: most bigger real-world applications even
want to ensure that if some component is missing, or has crashed, it will
still be there, by recovering and restarting the service.

AnyEvent::MP supports this by catching exceptions and network problems,
and notifying interested parties of these.

=head2 Exceptions, Port Context, Network Errors and Monitors

=head3 Exceptions

Exceptions are handled on a per-port basis: all receive callbacks are
executed in a special context, the so-called I<port-context>: code
that throws an otherwise uncaught exception will cause the port to be
C<kil>led. Killed ports are destroyed automatically (killing ports is
actually the only way to free ports).

Ports can be monitored, even from a different node and host, and when a
port is killed, any entity monitoring it will be notified.

Here is a simple example:

  use AnyEvent::MP;

  # create a port, it always dies
  my $port = port { die "oops" };

  # monitor it
  mon $port, sub {
     warn "$port was killed (with reason @_)";
  };

  # now send it some message, causing it to die:
  snd $port;



( run in 0.984 second using v1.01-cache-2.11-cpan-39bf76dae61 )