AnyEvent-MP

 view release on metacpan or  search on metacpan

MP.pm  view on Meta::CPAN


AnyEvent::MP - erlang-style multi-processing/message-passing framework

=head1 SYNOPSIS

   use AnyEvent::MP;

   $NODE      # contains this node's node ID
   NODE       # returns this node's node ID

   $SELF      # receiving/own port id in rcv callbacks

   # initialise the node so it can send/receive messages
   configure;

   # ports are message destinations

   # sending messages
   snd $port, type => data...;
   snd $port, @msg;
   snd @msg_with_first_element_being_a_port;

MP.pm  view on Meta::CPAN

   kil $port, my_error => "everything is broken"; # error kill

   # monitoring
   mon $port, $cb->(@msg)      # callback is invoked on death
   mon $port, $localport       # kill localport on abnormal death
   mon $port, $localport, @msg # send message on death

   # temporarily execute code in port context
   peval $port, sub { die "kill the port!" };

   # execute callbacks in $SELF port context
   my $timer = AE::timer 1, 0, psub {
      die "kill the port, delayed";
   };

   # distributed database - modification
   db_set $family => $subkey [=> $value]  # add a subkey
   db_del $family => $subkey...           # delete one or more subkeys
   db_reg $family => $port [=> $value]    # register a port

   # distributed database - queries

MP.pm  view on Meta::CPAN

   configure profile => "seed";

   # or simply use aemp from the shell again:
   # aemp run profile seed

   # or provide a nicer-to-remember nodeid
   # aemp run profile seed nodeid "$(hostname)"

=item $SELF

Contains the current port id while executing C<rcv> callbacks or C<psub>
blocks.

=item *SELF, SELF, %SELF, @SELF...

Due to some quirks in how perl exports variables, it is impossible to
just export C<$SELF>, all the symbols named C<SELF> are exported by this
module, but only C<$SELF> is currently used.

=item snd $port, type => @data

MP.pm  view on Meta::CPAN

The type of data you can transfer depends on the transport protocol: when
JSON is used, then only strings, numbers and arrays and hashes consisting
of those are allowed (no objects). When Storable is used, then anything
that Storable can serialise and deserialise is allowed, and for the local
node, anything can be passed. Best rely only on the common denominator of
these.

=item $local_port = port

Create a new local port object and returns its port ID. Initially it has
no callbacks set and will throw an error when it receives messages.

=item $local_port = port { my @msg = @_ }

Creates a new local port, and returns its ID. Semantically the same as
creating a port and calling C<rcv $port, $callback> on it.

The block will be called for every message received on the port, with the
global variable C<$SELF> set to the port ID. Runtime errors will cause the
port to be C<kil>ed. The message will be passed as-is, no extra argument
(i.e. no port ID) will be passed to the callback.

MP.pm  view on Meta::CPAN


The global C<$SELF> (exported by this module) contains C<$port> while
executing the callback. Runtime errors during callback execution will
result in the port being C<kil>ed.

The default callback receives all messages not matched by a more specific
C<tag> match.

=item rcv $local_port, tag => $callback->(@msg_without_tag), ...

Register (or replace) callbacks to be called on messages starting with the
given tag on the given port (and return the port), or unregister it (when
C<$callback> is C<$undef> or missing). There can only be one callback
registered for each tag.

The original message will be passed to the callback, after the first
element (the tag) has been removed. The callback will use the same
environment as the default callback (see above).

Example: create a port and bind receivers on it in one go.

MP.pm  view on Meta::CPAN

      my $res = eval { &$cb };
      _self_die if $@;
      $res
   }
}

=item $closure = psub { BLOCK }

Remembers C<$SELF> and creates a closure out of the BLOCK. When the
closure is executed, sets up the environment in the same way as in C<rcv>
callbacks, i.e. runtime errors will cause the port to get C<kil>ed.

The effect is basically as if it returned C<< sub { peval $SELF, sub {
BLOCK }, @_ } >>.

This is useful when you register callbacks from C<rcv> callbacks:

   rcv delayed_reply => sub {
      my ($delay, @reply) = @_;
      my $timer = AE::timer $delay, 0, psub {
         snd @reply, $SELF;
      };
   };

=cut

sub psub(&) {
   my $cb = shift;

   my $port = $SELF
      or Carp::croak "psub can only be called from within rcv or psub callbacks, not";

   sub {
      local $SELF = $port;

      if (wantarray) {
         my @res = eval { &$cb };
         _self_die if $@;
         @res
      } else {
         my $res = eval { &$cb };

MP.pm  view on Meta::CPAN


Kill the specified port with the given C<@reason>.

If no C<@reason> is specified, then the port is killed "normally" -
monitor callback will be invoked, but the kil will not cause linked ports
(C<mon $mport, $lport> form) to get killed.

If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
form) get killed with the same reason.

Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
will be reported as reason C<< die => $@ >>.

Transport/communication errors are reported as C<< transport_error =>
$message >>.

Common idioms:

   # silently remove yourself, do not kill linked ports
   kil $SELF;

MP.pm  view on Meta::CPAN

fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
specify a function in the main program, use C<::name>.

If the function doesn't exist, then the node tries to C<require>
the package, then the package above the package and so on (e.g.
C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
exists or it runs out of package names.

The init function is then called with the newly-created port as context
object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
the port might not get created.

A common idiom is to pass a local port, immediately monitor the spawned
port, and in the remote init function, immediately monitor the passed
local port. This two-way monitoring ensures that both ports get cleaned up
when there is a problem.

C<spawn> guarantees that the C<$initfunc> has no visible effects on the
caller before C<spawn> returns (by delaying invocation when spawn is
called for the local node).

MP/Intro.pod  view on Meta::CPAN

want to ensure that if some component is missing, or has crashed, it will
still be there, by recovering and restarting the service.

AnyEvent::MP supports this by catching exceptions and network problems,
and notifying interested parties of these.

=head2 Exceptions, Port Context, Network Errors and Monitors

=head3 Exceptions

Exceptions are handled on a per-port basis: all receive callbacks are
executed in a special context, the so-called I<port-context>: code
that throws an otherwise uncaught exception will cause the port to be
C<kil>led. Killed ports are destroyed automatically (killing ports is
actually the only way to free ports).

Ports can be monitored, even from a different node and host, and when a
port is killed, any entity monitoring it will be notified.

Here is a simple example:

MP/Intro.pod  view on Meta::CPAN

the thing is, a "normal" kill does not count as a crash. This way you can
easily link ports together and make them crash together on errors, while
allowing you to remove a port silently when it has done it's job properly.

=head3 Port Context

Code runs in the so-called "port context". That means C<$SELF> contains
its own port ID and exceptions that the code throws will be caught.

Since AnyEvent::MP is event-based, it is not uncommon to register
callbacks from within C<rcv> handlers. As example, assume that the
following port receive handler wants to C<die> a second later, using
C<after>:

  my $port = port {
     after 1, sub { die "oops" };
  };

If you try this out, you would find it does not work - when the C<after>
callback is executed, it does not run in the port context anymore, so
exceptions will not be caught.

MP/Intro.pod  view on Meta::CPAN

The crucial point you should understand from this example is that
monitoring is usually symmetric: when you monitor some other port,
potentially on another node, that other port usually should monitor you,
too, so when the connection dies, both ports get killed, or at least both
sides can take corrective action. Exceptions are "servers" that serve
multiple clients at once and might only wish to clean up, and supervisors,
who of course should not normally get killed (unless they, too, have a
supervisor).

If you often think in object-oriented terms, then you can think of a port
as an object: C<port> is the constructor, the receive callbacks set by
C<rcv> act as methods, the C<kil> function becomes the explicit destructor
and C<mon> installs a destructor hook. Unlike conventional object oriented
programming, it can make sense to exchange port IDs more freely (for
example, to monitor one port from another), because it is cheap to send
port IDs over the network, and AnyEvent::MP blurs the distinction between
local and remote ports.

Lastly, there is ample room for improvement in this example: the server
should probably remember the nickname in the C<join> handler instead of
expecting it in every chat message, it should probably monitor itself, and

MP/Intro.pod  view on Meta::CPAN

There are plenty of possibilities you can use - it's all up to you how you
structure your application.

=head1 PART 4: Coro::MP - selective receive

Not all problems lend themselves naturally to an event-based solution:
sometimes things are easier if you can decide in what order you want to
receive messages, regardless of the order in which they were sent.

In these cases, L<Coro::MP> can provide a nice solution: instead of
registering callbacks for each message type, C<Coro::MP> attaches a
(coro-) thread to a port. The thread can then opt to selectively receive
messages it is interested in. Other messages are not lost, but queued, and
can be received at a later time.

The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
module. It is, however, tightly integrated into C<AnyEvent::MP> - the
ports it creates are fully compatible to C<AnyEvent::MP> ports.

In fact, C<Coro::MP> is more of an extension than a separate module: all
functions exported by C<AnyEvent::MP> are exported by it as well.

MP/Node.pm  view on Meta::CPAN


sub transport_connect {
   my ($self, $tp) = @_;

   AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
}

sub kill {
   my (undef, @args) = @_;

   # we _always_ delay kil's, to avoid calling mon callbacks
   # from anything but the event loop context.
   $DELAY = 1;
   push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
   $DELAY_W ||= AE::timer 0, 0, $send_delayed;
}

sub monitor {
   # maybe always delay, too?
   if ($DELAY_W) {
      my @args = @_;

README  view on Meta::CPAN

NAME
    AnyEvent::MP - erlang-style multi-processing/message-passing framework

SYNOPSIS
       use AnyEvent::MP;

       $NODE      # contains this node's node ID
       NODE       # returns this node's node ID

       $SELF      # receiving/own port id in rcv callbacks

       # initialise the node so it can send/receive messages
       configure;

       # ports are message destinations

       # sending messages
       snd $port, type => data...;
       snd $port, @msg;
       snd @msg_with_first_element_being_a_port;

README  view on Meta::CPAN

       kil $port, my_error => "everything is broken"; # error kill

       # monitoring
       mon $port, $cb->(@msg)      # callback is invoked on death
       mon $port, $localport       # kill localport on abnormal death
       mon $port, $localport, @msg # send message on death

       # temporarily execute code in port context
       peval $port, sub { die "kill the port!" };

       # execute callbacks in $SELF port context
       my $timer = AE::timer 1, 0, psub {
          die "kill the port, delayed";
       };

       # distributed database - modification
       db_set $family => $subkey [=> $value]  # add a subkey
       db_del $family => $subkey...           # delete one or more subkeys
       db_reg $family => $port [=> $value]    # register a port

       # distributed database - queries

README  view on Meta::CPAN

           # then use it
           configure profile => "seed";

           # or simply use aemp from the shell again:
           # aemp run profile seed

           # or provide a nicer-to-remember nodeid
           # aemp run profile seed nodeid "$(hostname)"

    $SELF
        Contains the current port id while executing "rcv" callbacks or
        "psub" blocks.

    *SELF, SELF, %SELF, @SELF...
        Due to some quirks in how perl exports variables, it is impossible
        to just export $SELF, all the symbols named "SELF" are exported by
        this module, but only $SELF is currently used.

    snd $port, type => @data
    snd $port, @msg
        Send the given message to the given port, which can identify either

README  view on Meta::CPAN


        The type of data you can transfer depends on the transport protocol:
        when JSON is used, then only strings, numbers and arrays and hashes
        consisting of those are allowed (no objects). When Storable is used,
        then anything that Storable can serialise and deserialise is
        allowed, and for the local node, anything can be passed. Best rely
        only on the common denominator of these.

    $local_port = port
        Create a new local port object and returns its port ID. Initially it
        has no callbacks set and will throw an error when it receives
        messages.

    $local_port = port { my @msg = @_ }
        Creates a new local port, and returns its ID. Semantically the same
        as creating a port and calling "rcv $port, $callback" on it.

        The block will be called for every message received on the port,
        with the global variable $SELF set to the port ID. Runtime errors
        will cause the port to be "kil"ed. The message will be passed as-is,
        no extra argument (i.e. no port ID) will be passed to the callback.

README  view on Meta::CPAN

        better "kil" the port when it is no longer needed.

        The global $SELF (exported by this module) contains $port while
        executing the callback. Runtime errors during callback execution
        will result in the port being "kil"ed.

        The default callback receives all messages not matched by a more
        specific "tag" match.

    rcv $local_port, tag => $callback->(@msg_without_tag), ...
        Register (or replace) callbacks to be called on messages starting
        with the given tag on the given port (and return the port), or
        unregister it (when $callback is $undef or missing). There can only
        be one callback registered for each tag.

        The original message will be passed to the callback, after the first
        element (the tag) has been removed. The callback will use the same
        environment as the default callback (see above).

        Example: create a port and bind receivers on it in one go.

README  view on Meta::CPAN

           my $port = port { ... };

           peval $port, sub {
              init
                 or die "unable to init";
           };

    $closure = psub { BLOCK }
        Remembers $SELF and creates a closure out of the BLOCK. When the
        closure is executed, sets up the environment in the same way as in
        "rcv" callbacks, i.e. runtime errors will cause the port to get
        "kil"ed.

        The effect is basically as if it returned "sub { peval $SELF, sub {
        BLOCK }, @_ }".

        This is useful when you register callbacks from "rcv" callbacks:

           rcv delayed_reply => sub {
              my ($delay, @reply) = @_;
              my $timer = AE::timer $delay, 0, psub {
                 snd @reply, $SELF;
              };
           };

    $guard = mon $port, $rcvport # kill $rcvport when $port dies
    $guard = mon $port # kill $SELF when $port dies

README  view on Meta::CPAN

    kil $port[, @reason]
        Kill the specified port with the given @reason.

        If no @reason is specified, then the port is killed "normally" -
        monitor callback will be invoked, but the kil will not cause linked
        ports ("mon $mport, $lport" form) to get killed.

        If a @reason is specified, then linked ports ("mon $mport, $lport"
        form) get killed with the same reason.

        Runtime errors while evaluating "rcv" callbacks or inside "psub"
        blocks will be reported as reason "die => $@".

        Transport/communication errors are reported as "transport_error =>
        $message".

        Common idioms:

           # silently remove yourself, do not kill linked ports
           kil $SELF;

README  view on Meta::CPAN

        "MyApp::Chat::Server::init"). To specify a function in the main
        program, use "::name".

        If the function doesn't exist, then the node tries to "require" the
        package, then the package above the package and so on (e.g.
        "MyApp::Chat::Server", "MyApp::Chat", "MyApp") until the function
        exists or it runs out of package names.

        The init function is then called with the newly-created port as
        context object ($SELF) and the @initdata values as arguments. It
        *must* call one of the "rcv" functions to set callbacks on $SELF,
        otherwise the port might not get created.

        A common idiom is to pass a local port, immediately monitor the
        spawned port, and in the remote init function, immediately monitor
        the passed local port. This two-way monitoring ensures that both
        ports get cleaned up when there is a problem.

        "spawn" guarantees that the $initfunc has no visible effects on the
        caller before "spawn" returns (by delaying invocation when spawn is
        called for the local node).



( run in 1.705 second using v1.01-cache-2.11-cpan-d6f9594c0a5 )