AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Intro.pod  view on Meta::CPAN

fine.

=head2 Services

Above it was mentioned that C<spawn> automatically loads modules. This can
be exploited in various useful ways.

Assume for a moment you put the server into a file called
F<mymod/chatserver.pm> reachable from the current directory. Then you
could run a node there with:

   aemp run

The other nodes could C<spawn> the server by using
C<mymod::chatserver::client_connect> as init function - without any other
configuration.

Likewise, when you have some service that starts automatically when loaded
(similar to AnyEvent::MP::Global), then you can configure this service
statically:

   aemp profile mysrvnode services mymod::service::
   aemp run profile mysrvnode

And the module will automatically be loaded in the node, as specifying a
module name (with C<::>-suffix) will simply load the module, which is then
free to do whatever it wants.

Of course, you can also do it in the much more standard way by writing
a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
distribution and then configure nodes. For example, if I wanted to run the
Bummskraut IRC backend on a machine named "ruth", I could do this:

   aemp profile ruth addservice BK::Backend::IRC::

And any F<aemp run> on that host will automatically have the Bummskraut
IRC backend running.

There are plenty of possibilities you can use - it's all up to you how you
structure your application.

=head1 PART 4: Coro::MP - selective receive

Not all problems lend themselves naturally to an event-based solution:
sometimes things are easier if you can decide in what order you want to
receive messages, regardless of the order in which they were sent.

In these cases, L<Coro::MP> can provide a nice solution: instead of
registering callbacks for each message type, C<Coro::MP> attaches a
(coro-) thread to a port. The thread can then opt to selectively receive
messages it is interested in. Other messages are not lost, but queued, and
can be received at a later time.

The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
module. It is, however, tightly integrated into C<AnyEvent::MP> - the
ports it creates are fully compatible to C<AnyEvent::MP> ports.

In fact, C<Coro::MP> is more of an extension than a separate module: all
functions exported by C<AnyEvent::MP> are exported by it as well.

To illustrate how programing with C<Coro::MP> looks like, consider the
following (slightly contrived) example: Let's implement a server that
accepts a C<< (write_file =>, $port, $path) >> message with a (source)
port and a filename, followed by as many C<< (data => $port, $data) >>
messages as required to fill the file, followed by an empty C<< (data =>
$port) >> message.

The server only writes a single file at a time, other requests will stay
in the queue until the current file has been finished.

Here is an example implementation that uses L<Coro::AIO> and largely
ignores error handling:

   my $ioserver = port_async {
      while () {
         my ($tag, $port, $path) = get_cond;

         $tag eq "write_file"
            or die "only write_file messages expected";

         my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
            or die "$path: $!";

         while () {
            my (undef, undef, $data) = get_cond {
               $_[0] eq "data" && $_[1] eq $port
            } 5
               or die "timeout waiting for data message from $port\n";

            length $data or last;

            aio_write $fh, undef, undef, $data, 0;
         };
      }
   };

   mon $ioserver, sub {
      warn "ioserver was killed: @_\n";
   }; 

Let's go through it, section by section.

   my $ioserver = port_async {

Ports can be created by attaching a thread to an existing port via
C<rcv_async>, or as in this example, by calling C<port_async> with the
code to execute as a thread. The C<async> component comes from the fact
that threads are created using the C<Coro::async> function.

The thread runs in a normal port context (so C<$SELF> is set). In
addition, when the thread returns, it will be C<kil> I<normally>, i.e.
without a reason argument.

      while () {
         my ($tag, $port, $path) = get_cond;
            or die "only write_file messages expected";

The thread is supposed to serve many file writes, which is why it
executes in a loop. The first thing it does is fetch the next message,
using C<get_cond>, the "conditional message get". Without arguments, it
merely fetches the I<next> message from the queue, which I<must> be a



( run in 0.809 second using v1.01-cache-2.11-cpan-39bf76dae61 )