AnyEvent-MP

 view release on metacpan or  search on metacpan

MP.pm  view on Meta::CPAN

Erlang uses processes that selectively receive messages out of order, and
therefore needs a queue. AEMP is event based, queuing messages would serve
no useful purpose. For the same reason the pattern-matching abilities
of AnyEvent::MP are more limited, as there is little need to be able to
filter messages without dequeuing them.

This is not a philosophical difference, but simply stems from AnyEvent::MP
being event-based, while Erlang is process-based.

You can have a look at L<Coro::MP> for a more Erlang-like process model on
top of AEMP and Coro threads.

=item * Erlang sends are synchronous, AEMP sends are asynchronous.

Sending messages in Erlang is synchronous and blocks the process until
a connection has been established and the message sent (and so does not
need a queue that can overflow). AEMP sends return immediately, connection
establishment is handled in the background.

=item * Erlang suffers from silent message loss, AEMP does not.

MP/Intro.pod  view on Meta::CPAN


Now... the "supervisor" in this example is a bit of a cheat - it doesn't
really clean up much (because the cleanup done by AnyEvent::MP suffices),
and there isn't much of a restarting action either - if the server isn't
there because it crashed, well, it isn't there.

In the real world, one would often add a timeout that would trigger when
the server couldn't be found within some time limit, and then complain,
or even try to start a new server. Or the supervisor would have to do
some real cleanups, such as rolling back database transactions when the
database thread crashes. For this simple chat server, however, this simple
supervisor works fine. Hopefully future versions of AnyEvent::MP will
offer some predefined supervisors, for now you will have to code it on
your own.

You should now try to start the server and one or more clients in different
terminal windows (and the seed node):

   perl eg/chat_client nick1
   perl eg/chat_client nick2
   perl eg/chat_server

MP/Intro.pod  view on Meta::CPAN

structure your application.

=head1 PART 4: Coro::MP - selective receive

Not all problems lend themselves naturally to an event-based solution:
sometimes things are easier if you can decide in what order you want to
receive messages, regardless of the order in which they were sent.

In these cases, L<Coro::MP> can provide a nice solution: instead of
registering callbacks for each message type, C<Coro::MP> attaches a
(coro-) thread to a port. The thread can then opt to selectively receive
messages it is interested in. Other messages are not lost, but queued, and
can be received at a later time.

The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
module. It is, however, tightly integrated into C<AnyEvent::MP> - the
ports it creates are fully compatible to C<AnyEvent::MP> ports.

In fact, C<Coro::MP> is more of an extension than a separate module: all
functions exported by C<AnyEvent::MP> are exported by it as well.

MP/Intro.pod  view on Meta::CPAN

   };

   mon $ioserver, sub {
      warn "ioserver was killed: @_\n";
   }; 

Let's go through it, section by section.

   my $ioserver = port_async {

Ports can be created by attaching a thread to an existing port via
C<rcv_async>, or as in this example, by calling C<port_async> with the
code to execute as a thread. The C<async> component comes from the fact
that threads are created using the C<Coro::async> function.

The thread runs in a normal port context (so C<$SELF> is set). In
addition, when the thread returns, it will be C<kil> I<normally>, i.e.
without a reason argument.

      while () {
         my ($tag, $port, $path) = get_cond;
            or die "only write_file messages expected";

The thread is supposed to serve many file writes, which is why it
executes in a loop. The first thing it does is fetch the next message,
using C<get_cond>, the "conditional message get". Without arguments, it
merely fetches the I<next> message from the queue, which I<must> be a
C<write_file> message.

The message contains the C<$path> to the file, which is then created:

         my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
            or die "$path: $!";

MP/Intro.pod  view on Meta::CPAN


When an empty C<data> message is received we are done and can close the
file (which is done automatically as C<$fh> goes out of scope):

            length $data or last;

Otherwise we need to write the data:

            aio_write $fh, undef, undef, $data, 0;

And that's basically it. Note that every port thread should have some
kind of supervisor. In our case, the supervisor simply prints any error
message:

   mon $ioserver, sub {
      warn "ioserver was killed: @_\n";
   }; 

Here is a usage example:

   port_async {

MP/Intro.pod  view on Meta::CPAN

source of network-wide unique IDs.

Apart from C<get_cond> as seen above, there are other ways to receive
messages. The C<write_file> message above could also selectively be
received using a C<get> call:

   my ($port, $path) = get "write_file";

This is simpler, but when some other code part sends an unexpected message
to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
every threaded port should have a "fetch next message unconditionally"
somewhere, to avoid filling up the queue.

Finally, it is also possible to use more switch-like C<get_conds>:

  get_cond {
     $_[0] eq "msg1" and return sub {
        my (undef, @msg1_data) = @_;
        ...;
     };

README  view on Meta::CPAN

        order, and therefore needs a queue. AEMP is event based, queuing
        messages would serve no useful purpose. For the same reason the
        pattern-matching abilities of AnyEvent::MP are more limited, as
        there is little need to be able to filter messages without dequeuing
        them.

        This is not a philosophical difference, but simply stems from
        AnyEvent::MP being event-based, while Erlang is process-based.

        You can have a look at Coro::MP for a more Erlang-like process model
        on top of AEMP and Coro threads.

    *   Erlang sends are synchronous, AEMP sends are asynchronous.

        Sending messages in Erlang is synchronous and blocks the process
        until a connection has been established and the message sent (and so
        does not need a queue that can overflow). AEMP sends return
        immediately, connection establishment is handled in the background.

    *   Erlang suffers from silent message loss, AEMP does not.



( run in 0.488 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )