AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Intro.pod  view on Meta::CPAN

using profiles, which can be managed using the F<aemp> command-line
utility (yes, this section is about the advanced tinkering mentioned
before).

When you change both programs above to simply call

   configure;

then AnyEvent::MP tries to look up a profile using the current node name
in its configuration database, falling back to some global default.

You can run "generic" nodes using the F<aemp> utility as well, and we will
exploit this in the following way: we configure a profile "seed" and run
a node using it, whose sole purpose is to be a seed node for our example
programs.

We bind the seed node to port 4040 on all interfaces:

   aemp profile seed binds "*:4040"

And we configure all nodes to use this as seed node (this only works when
running on the same host, for multiple machines you would replace the C<*>
by the IP address or hostname of the node running the seed), by changing
the global settings shared between all profiles:

   aemp seeds "*:4040"

Then we run the seed node:

   aemp run profile seed

After that, we can start as many other nodes as we want, and they will
all use our generic seed node to discover each other. The reason we can
start our existing programs even though they specify "incompatible"
parameters to C<configure> is that the configuration file (by default)
takes precedence over any arguments passed to C<configure>.

That's all for now - next we will teach you about monitoring by writing a
simple chat client and server :)

=head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery

That's a mouthful, so what does it mean? Our previous example is what one
could call "very loosely coupled" - the sender doesn't care about whether
there are any receivers, and the receivers do not care if there is any
sender.

This can work fine for simple services, but most real-world applications
want to ensure that the side they are expecting to be there is actually
there. Going one step further: most bigger real-world applications even
want to ensure that if some component is missing, or has crashed, it will
still be there, by recovering and restarting the service.

AnyEvent::MP supports this by catching exceptions and network problems,
and notifying interested parties of these.

=head2 Exceptions, Port Context, Network Errors and Monitors

=head3 Exceptions

Exceptions are handled on a per-port basis: all receive callbacks are
executed in a special context, the so-called I<port-context>: code
that throws an otherwise uncaught exception will cause the port to be
C<kil>led. Killed ports are destroyed automatically (killing ports is
actually the only way to free ports).

Ports can be monitored, even from a different node and host, and when a
port is killed, any entity monitoring it will be notified.

Here is a simple example:

  use AnyEvent::MP;

  # create a port, it always dies
  my $port = port { die "oops" };

  # monitor it
  mon $port, sub {
     warn "$port was killed (with reason @_)";
  };

  # now send it some message, causing it to die:
  snd $port;

  AnyEvent->condvar->recv;

It first creates a port whose only action is to throw an exception,
and the monitors it with the C<mon> function. Afterwards it sends it a
message, causing it to die and call the monitoring callback:

   anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.

The callback was actually passed two arguments: C<die>, to indicate it
did throw an I<exception> as opposed to, say, a network error, and the
exception message itself.

What happens when a port is killed before we have a chance to monitor
it? Granted, this is highly unlikely in our example, but when you program
in a network this can easily happen due to races between nodes.

  use AnyEvent::MP;

  my $port = port { die "oops" };

  snd $port;

  mon $port, sub {
     warn "$port was killed (with reason @_)";
  };

  AnyEvent->condvar->recv;

This time we will get something else:

   2012-03-21 00:50:36 <2> unmonitored local port fADb died with reason: die oops at - line 3.
   anon/fADb was killed (with reason no_such_port cannot monitor nonexistent port)

The first line is an error message that is printed when a port dies that
isn't being monitored, because that is normally a bug. When later a C<mon>
is attempted, it is immediately killed, because the port is already
gone. The kill reason is now C<no_such_port> with some descriptive (we
hope) error message.

As you probably suspect from these examples, the kill reason is usually
some identifier as first argument and a human-readable error message as
second argument - all kill reasons by AnyEvent::MP itself follow this
pattern. But the kill reason can be anything: it is simply a list of
values you can choose yourself. It can even be nothing (an empty list) -
this is called a "normal" kill.

Apart from die'ing, you can kill ports manually using the C<kil>
function. Using the C<kil> function will be treated like an error when a
non-empty reason is specified:

   kil $port, custom_error => "don't like your steenking face";

And a I<normal> kill without any reason arguments:

   kil $port;

By now you probably wonder what this "normal" kill business is: A common
idiom is to not specify a callback to C<mon>, but another port, such as
C<$SELF>:

   mon $port, $SELF;

This basically means "monitor $port and kill me when it crashes" - and
the thing is, a "normal" kill does not count as a crash. This way you can
easily link ports together and make them crash together on errors, while
allowing you to remove a port silently when it has done it's job properly.

=head3 Port Context

Code runs in the so-called "port context". That means C<$SELF> contains
its own port ID and exceptions that the code throws will be caught.

Since AnyEvent::MP is event-based, it is not uncommon to register
callbacks from within C<rcv> handlers. As example, assume that the
following port receive handler wants to C<die> a second later, using
C<after>:

  my $port = port {
     after 1, sub { die "oops" };
  };

If you try this out, you would find it does not work - when the C<after>
callback is executed, it does not run in the port context anymore, so
exceptions will not be caught.

For these cases, AnyEvent::MP exports a special "closure constructor"
called C<psub>, which works mostly like perl's built-in C<sub>:

  my $port = port {
     after 1, psub { die "oops" };
  };

C<psub> remembers the port context and returns a code reference. When the
code reference is invoked, it will run the code block within the context
that it was created in, so exception handling once more works as expected.

There is even a way to temporarily execute code in the context of some
port, namely C<peval>:

  peval $port, sub {
     # die'ing here will kil $port
  };

The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
and then executes the given sub in a port context.

=head3 Network Errors and the AEMP Guarantee

Earlier we mentioned another important source of monitoring failures:
network problems. When a node loses connection to another node, it will
invoke all monitoring actions, just as if the port was killed, I<even if
it is possible that the port is still happily alive on another node> (not
being able to talk to a node means we have no clue what's going on with
it, it could be crashed, but also still running without knowing we lost
the connection).

So another way to view monitors is: "notify me when some of my messages
couldn't be delivered". AEMP has a guarantee about message delivery to a
port:  After starting a monitor, any message sent to a port will either
be delivered, or, when it is lost, any further messages will also be lost
until the monitoring action is invoked. After that, further messages
I<might> get delivered again.

This doesn't sound like a very big guarantee, but it is kind of the best
you can get while staying sane: Specifically, it means that there will be
no "holes" in the message sequence: all messages sent are delivered in
order, without any of them missing in between, and when some were lost,
you I<will> be notified of that, so you can take recovery action.

And, obviously, the guarantee only works in the presence of
correctly-working hardware, and no relevant bugs inside AEMP itself.

=head3 Supervising

MP/Intro.pod  view on Meta::CPAN

         &server_connect;
      };

If the client port dies (for whatever reason), the "supervisor" will start
looking for a server again - the semantics of C<db_mon> ensure that it
will immediately find it if there is a server port.

After this, everything is ready: the client will send a C<join> message
with its local port to the server, and start monitoring it:

      $server = (keys %{ $_[0] })[0];

      snd $server, join => $client, $nick;
      mon $server, $client;
   }

This second monitor will ensure that, when the server port crashes or goes
away (e.g. due to network problems), the client port will be killed as
well. This tells the user that the client was disconnected, and will then
start to connect the server again.

The rest of the program deals with the boring details of actually invoking
the supervisor function to start the whole client process and handle the
actual terminal input, sending it to the server.

Now... the "supervisor" in this example is a bit of a cheat - it doesn't
really clean up much (because the cleanup done by AnyEvent::MP suffices),
and there isn't much of a restarting action either - if the server isn't
there because it crashed, well, it isn't there.

In the real world, one would often add a timeout that would trigger when
the server couldn't be found within some time limit, and then complain,
or even try to start a new server. Or the supervisor would have to do
some real cleanups, such as rolling back database transactions when the
database thread crashes. For this simple chat server, however, this simple
supervisor works fine. Hopefully future versions of AnyEvent::MP will
offer some predefined supervisors, for now you will have to code it on
your own.

You should now try to start the server and one or more clients in different
terminal windows (and the seed node):

   perl eg/chat_client nick1
   perl eg/chat_client nick2
   perl eg/chat_server
   aemp run profile seed

And then you can experiment with chatting, killing one or more clients, or
stopping and restarting the server, to see the monitoring in action.

The crucial point you should understand from this example is that
monitoring is usually symmetric: when you monitor some other port,
potentially on another node, that other port usually should monitor you,
too, so when the connection dies, both ports get killed, or at least both
sides can take corrective action. Exceptions are "servers" that serve
multiple clients at once and might only wish to clean up, and supervisors,
who of course should not normally get killed (unless they, too, have a
supervisor).

If you often think in object-oriented terms, then you can think of a port
as an object: C<port> is the constructor, the receive callbacks set by
C<rcv> act as methods, the C<kil> function becomes the explicit destructor
and C<mon> installs a destructor hook. Unlike conventional object oriented
programming, it can make sense to exchange port IDs more freely (for
example, to monitor one port from another), because it is cheap to send
port IDs over the network, and AnyEvent::MP blurs the distinction between
local and remote ports.

Lastly, there is ample room for improvement in this example: the server
should probably remember the nickname in the C<join> handler instead of
expecting it in every chat message, it should probably monitor itself, and
the client should not try to send any messages unless a server is actually
connected.

=head1 PART 3: TIMTOWTDI: Virtual Connections

The chat system developed in the previous sections is very "traditional"
in a way: you start some server(s) and some clients statically and they
start talking to each other.

Sometimes applications work more like "services": They can run on almost
any node and even talk to copies of themselves on other nodes in case they
are distributed. The L<AnyEvent::MP::Global> service for example monitors
nodes joining the network and sometimes even starts itself on other nodes.

One good way to design such services is to put them into a module and
create "virtual connections" to other nodes. We call this the "bridge
head" method, because you start by I<creating a remote port> (the bridge
head) and from that you start to bootstrap your application.

Since that sounds rather theoretical, let us redesign the chat server and
client using this design method.

As usual, we start with the full program - here is the server:

   use common::sense;
   use AnyEvent::MP;

   configure;

   db_set eg_chat_server2 => $NODE;

   my %clients;

   sub msg {
      print "relaying: $_[0]\n";
      snd $_, $_[0]
         for values %clients;
   }

   sub client_connect {
      my ($client, $nick) = @_;

      mon $client;
      mon $client, psub {
         delete $clients{$client};
         msg "$nick (quits, @_)";
      };

      $clients{$client} = $client;

MP/Intro.pod  view on Meta::CPAN

If the spawn message gets delivered, but the monitoring message is not
because of network problems (extremely unlikely, but monitoring, after
all, is implemented by passing a message, and messages can get lost), then
this connection loss will eventually trigger the monitoring action. On the
remote node (which in return monitors the client) the port will also be
cleaned up on connection loss. When the remote node comes up again and our
monitoring message can be delivered, it will instantly fail because the
port has been cleaned up in the meantime.

If your head is spinning by now, that's fine - just keep in mind, after
creating a port using C<spawn>, monitor it on the local node, and monitor
"the other side" from the remote node, and all will be cleaned up just
fine.

=head2 Services

Above it was mentioned that C<spawn> automatically loads modules. This can
be exploited in various useful ways.

Assume for a moment you put the server into a file called
F<mymod/chatserver.pm> reachable from the current directory. Then you
could run a node there with:

   aemp run

The other nodes could C<spawn> the server by using
C<mymod::chatserver::client_connect> as init function - without any other
configuration.

Likewise, when you have some service that starts automatically when loaded
(similar to AnyEvent::MP::Global), then you can configure this service
statically:

   aemp profile mysrvnode services mymod::service::
   aemp run profile mysrvnode

And the module will automatically be loaded in the node, as specifying a
module name (with C<::>-suffix) will simply load the module, which is then
free to do whatever it wants.

Of course, you can also do it in the much more standard way by writing
a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
distribution and then configure nodes. For example, if I wanted to run the
Bummskraut IRC backend on a machine named "ruth", I could do this:

   aemp profile ruth addservice BK::Backend::IRC::

And any F<aemp run> on that host will automatically have the Bummskraut
IRC backend running.

There are plenty of possibilities you can use - it's all up to you how you
structure your application.

=head1 PART 4: Coro::MP - selective receive

Not all problems lend themselves naturally to an event-based solution:
sometimes things are easier if you can decide in what order you want to
receive messages, regardless of the order in which they were sent.

In these cases, L<Coro::MP> can provide a nice solution: instead of
registering callbacks for each message type, C<Coro::MP> attaches a
(coro-) thread to a port. The thread can then opt to selectively receive
messages it is interested in. Other messages are not lost, but queued, and
can be received at a later time.

The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
module. It is, however, tightly integrated into C<AnyEvent::MP> - the
ports it creates are fully compatible to C<AnyEvent::MP> ports.

In fact, C<Coro::MP> is more of an extension than a separate module: all
functions exported by C<AnyEvent::MP> are exported by it as well.

To illustrate how programing with C<Coro::MP> looks like, consider the
following (slightly contrived) example: Let's implement a server that
accepts a C<< (write_file =>, $port, $path) >> message with a (source)
port and a filename, followed by as many C<< (data => $port, $data) >>
messages as required to fill the file, followed by an empty C<< (data =>
$port) >> message.

The server only writes a single file at a time, other requests will stay
in the queue until the current file has been finished.

Here is an example implementation that uses L<Coro::AIO> and largely
ignores error handling:

   my $ioserver = port_async {
      while () {
         my ($tag, $port, $path) = get_cond;

         $tag eq "write_file"
            or die "only write_file messages expected";

         my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
            or die "$path: $!";

         while () {
            my (undef, undef, $data) = get_cond {
               $_[0] eq "data" && $_[1] eq $port
            } 5
               or die "timeout waiting for data message from $port\n";

            length $data or last;

            aio_write $fh, undef, undef, $data, 0;
         };
      }
   };

   mon $ioserver, sub {
      warn "ioserver was killed: @_\n";
   }; 

Let's go through it, section by section.

   my $ioserver = port_async {

Ports can be created by attaching a thread to an existing port via
C<rcv_async>, or as in this example, by calling C<port_async> with the
code to execute as a thread. The C<async> component comes from the fact
that threads are created using the C<Coro::async> function.



( run in 0.966 second using v1.01-cache-2.11-cpan-39bf76dae61 )