AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Intro.pod  view on Meta::CPAN

=head1 PART 1: Passing Messages Between Processes

=head2 The Receiver

Lets split the previous example up into two programs: one that contains
the sender and one for the receiver. First the receiver application, in
full:

   use AnyEvent;
   use AnyEvent::MP;

   configure nodeid => "eg_receiver/%u", binds => ["*:4040"];

   my $port = port;
   db_set eg_receivers => $port;

   rcv $port, test => sub {
      my ($data, $reply_port) = @_;

      print "Received data: " . $data . "\n";
   };

   AnyEvent->condvar->recv;

Now, that wasn't too bad, was it? OK, let's go through the new functions
that have been used.

=head3 C<configure> and Joining and Maintaining the Network

First let's have a look at C<configure>:

   configure nodeid => "eg_receiver/%u", binds => ["*:4040"];

Before we are able to send messages to other nodes we have to configure
the node to become a "networked node". Configuring a node means naming
the node and binding some TCP listeners so that other nodes can contact
it. The choice on whether a process becomes a networked node or not must
be done before doing anything else with AnyEvent::MP.

Additionally, to actually link all nodes in a network together, you should
specify a number of seed addresses, which will be used by the node to
connect itself into an existing network, as we will see shortly.

All of this info (and more) can be passed to the C<configure> function -
later we will see how we can do all this without even passing anything to
C<configure>!

Back to the function call in the program: the first parameter, C<nodeid>,
specified the node ID (in this case C<eg_receiver/%u> - the default is to
use the node name of the current host plus C</%u>, which gives the node a
name with a random suffix to make it unique, but for this example we want
the node to have a bit more personality, and name it C<eg_receiver> with a
random suffix.

Why the random suffix? Node IDs need to be unique within the network and
appending a random suffix is the easiest way to do that.

The second parameter, C<binds>, specifies a list of C<address:port> pairs
to bind TCP listeners on. The special "address" of C<*> means to bind on
every local IP address (this might not work on every OS, so explicit IP
addresses are best).

The reason to bind on a TCP port is not just that other nodes can connect
to us: if no binds are specified, the node will still bind on a dynamic
port on all local addresses - but in this case we won't know the port, and
cannot tell other nodes to connect to it as seed node.

Now, a I<seed> is simply the TCP address of some other node in the
network, often the same string as used for the C<binds> parameter of the
other node. The need for seeds is easy to explain: I<somehow> the nodes
of an aemp network have to find each other, and often this means over the
internet. So broadcasts are out.

Instead, a node usually specifies the addresses of one or few (for
redundancy) other nodes, some of which should be up. Two nodes can set
each other as seeds without any issues. You could even specify all nodes
as seeds for all nodes, for total redundancy. But the common case is to
have some more or less central, stable servers running seed services for
other nodes.

All you need to do to ensure that an AnyEvent::MP network connects
together is to make sure that all seed nodes are connected together via
their seed connections, i.e., all connections from seed nodes to I<their>
seed nodes form a connected graph.

A node tries to keep connections open to all of it's seed nodes at all
times, while other connections are made on demand only.

The simplest way to do that would be for all nodes to use the same seed
nodes: seed nodes would seed each other, and all other nodes would connect
to the seed nodes.

All of this ensures that the network stays one network - even if all the
nodes in one half of the net are separated from the nodes in the other
half by some network problem, once that is over, they will eventually
become a single network again.

In addition to creating the network, a node also expects the seed nodes to
run the shared database service - if need be, by automatically starting
it, so you don't normally need to configure this explicitly.

The process of joining a network takes time, during which the node
is already running. This means it takes time until the node is
fully connected, and information about services in the network are
available. This is why most AnyEvent::MP programs either just register
themselves in the database and wait to be "found" by others, or they start
to monitor the database until some nodes of the required type show up.

We will see how this is done later, in the sender program.

=head3 Registering the Receiver

Coming back to our example, after the node has been configured for network
access, it is time to publish some service, namely the receive service.

For that, let's look at the next lines:

   my $port = port;
   db_set eg_receivers => $port;

The C<port> function has already been discussed. It simply creates a new

MP/Intro.pod  view on Meta::CPAN

the thing is, a "normal" kill does not count as a crash. This way you can
easily link ports together and make them crash together on errors, while
allowing you to remove a port silently when it has done it's job properly.

=head3 Port Context

Code runs in the so-called "port context". That means C<$SELF> contains
its own port ID and exceptions that the code throws will be caught.

Since AnyEvent::MP is event-based, it is not uncommon to register
callbacks from within C<rcv> handlers. As example, assume that the
following port receive handler wants to C<die> a second later, using
C<after>:

  my $port = port {
     after 1, sub { die "oops" };
  };

If you try this out, you would find it does not work - when the C<after>
callback is executed, it does not run in the port context anymore, so
exceptions will not be caught.

For these cases, AnyEvent::MP exports a special "closure constructor"
called C<psub>, which works mostly like perl's built-in C<sub>:

  my $port = port {
     after 1, psub { die "oops" };
  };

C<psub> remembers the port context and returns a code reference. When the
code reference is invoked, it will run the code block within the context
that it was created in, so exception handling once more works as expected.

There is even a way to temporarily execute code in the context of some
port, namely C<peval>:

  peval $port, sub {
     # die'ing here will kil $port
  };

The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
and then executes the given sub in a port context.

=head3 Network Errors and the AEMP Guarantee

Earlier we mentioned another important source of monitoring failures:
network problems. When a node loses connection to another node, it will
invoke all monitoring actions, just as if the port was killed, I<even if
it is possible that the port is still happily alive on another node> (not
being able to talk to a node means we have no clue what's going on with
it, it could be crashed, but also still running without knowing we lost
the connection).

So another way to view monitors is: "notify me when some of my messages
couldn't be delivered". AEMP has a guarantee about message delivery to a
port:  After starting a monitor, any message sent to a port will either
be delivered, or, when it is lost, any further messages will also be lost
until the monitoring action is invoked. After that, further messages
I<might> get delivered again.

This doesn't sound like a very big guarantee, but it is kind of the best
you can get while staying sane: Specifically, it means that there will be
no "holes" in the message sequence: all messages sent are delivered in
order, without any of them missing in between, and when some were lost,
you I<will> be notified of that, so you can take recovery action.

And, obviously, the guarantee only works in the presence of
correctly-working hardware, and no relevant bugs inside AEMP itself.

=head3 Supervising

OK, so how is this crashing-everything-stuff going to make applications
I<more> stable? Well, in fact, the goal is not really to make them
more stable, but to make them more resilient against actual errors
and crashes. And this is not done by crashing I<everything>, but by
crashing everything except a I<supervisor> that then cleans up and sgtarts
everything again.

A supervisor is simply some code that ensures that an application (or a
part of it) is running, and if it crashes, is restarted properly. That is,
it supervises a service by starting and restarting it, as necessary.

To show how to do all this we will create a simple chat server that can
handle many chat clients. Both server and clients can be killed and
restarted, and even crash, to some extent, without disturbing the chat
functionality.

=head2 Chatting, the Resilient Way

Without further ado, here is the chat server (to run it, we assume the
set-up explained earlier, with a separate F<aemp run seed> node):

   use common::sense;
   use AnyEvent::MP;

   configure;

   my %clients;

   sub msg {
      print "relaying: $_[0]\n";
      snd $_, $_[0]
         for values %clients;
   }

   our $server = port;

   rcv $server, join => sub {
      my ($client, $nick) = @_;

      $clients{$client} = $client;

      mon $client, sub {
         delete $clients{$client};
         msg "$nick (quits, @_)";
      };
      msg "$nick (joins)";
   };

   rcv $server, privmsg => sub {
      my ($nick, $msg) = @_;



( run in 0.778 second using v1.01-cache-2.11-cpan-39bf76dae61 )