AnyEvent-MP
view release on metacpan or search on metacpan
AnyEvent::MP - erlang-style multi-processing/message-passing framework
=head1 SYNOPSIS
use AnyEvent::MP;
$NODE # contains this node's node ID
NODE # returns this node's node ID
$SELF # receiving/own port id in rcv callbacks
# initialise the node so it can send/receive messages
configure;
# ports are message destinations
# sending messages
snd $port, type => data...;
snd $port, @msg;
snd @msg_with_first_element_being_a_port;
kil $port, my_error => "everything is broken"; # error kill
# monitoring
mon $port, $cb->(@msg) # callback is invoked on death
mon $port, $localport # kill localport on abnormal death
mon $port, $localport, @msg # send message on death
# temporarily execute code in port context
peval $port, sub { die "kill the port!" };
# execute callbacks in $SELF port context
my $timer = AE::timer 1, 0, psub {
die "kill the port, delayed";
};
# distributed database - modification
db_set $family => $subkey [=> $value] # add a subkey
db_del $family => $subkey... # delete one or more subkeys
db_reg $family => $port [=> $value] # register a port
# distributed database - queries
configure profile => "seed";
# or simply use aemp from the shell again:
# aemp run profile seed
# or provide a nicer-to-remember nodeid
# aemp run profile seed nodeid "$(hostname)"
=item $SELF
Contains the current port id while executing C<rcv> callbacks or C<psub>
blocks.
=item *SELF, SELF, %SELF, @SELF...
Due to some quirks in how perl exports variables, it is impossible to
just export C<$SELF>, all the symbols named C<SELF> are exported by this
module, but only C<$SELF> is currently used.
=item snd $port, type => @data
The type of data you can transfer depends on the transport protocol: when
JSON is used, then only strings, numbers and arrays and hashes consisting
of those are allowed (no objects). When Storable is used, then anything
that Storable can serialise and deserialise is allowed, and for the local
node, anything can be passed. Best rely only on the common denominator of
these.
=item $local_port = port
Create a new local port object and returns its port ID. Initially it has
no callbacks set and will throw an error when it receives messages.
=item $local_port = port { my @msg = @_ }
Creates a new local port, and returns its ID. Semantically the same as
creating a port and calling C<rcv $port, $callback> on it.
The block will be called for every message received on the port, with the
global variable C<$SELF> set to the port ID. Runtime errors will cause the
port to be C<kil>ed. The message will be passed as-is, no extra argument
(i.e. no port ID) will be passed to the callback.
The global C<$SELF> (exported by this module) contains C<$port> while
executing the callback. Runtime errors during callback execution will
result in the port being C<kil>ed.
The default callback receives all messages not matched by a more specific
C<tag> match.
=item rcv $local_port, tag => $callback->(@msg_without_tag), ...
Register (or replace) callbacks to be called on messages starting with the
given tag on the given port (and return the port), or unregister it (when
C<$callback> is C<$undef> or missing). There can only be one callback
registered for each tag.
The original message will be passed to the callback, after the first
element (the tag) has been removed. The callback will use the same
environment as the default callback (see above).
Example: create a port and bind receivers on it in one go.
my $res = eval { &$cb };
_self_die if $@;
$res
}
}
=item $closure = psub { BLOCK }
Remembers C<$SELF> and creates a closure out of the BLOCK. When the
closure is executed, sets up the environment in the same way as in C<rcv>
callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
The effect is basically as if it returned C<< sub { peval $SELF, sub {
BLOCK }, @_ } >>.
This is useful when you register callbacks from C<rcv> callbacks:
rcv delayed_reply => sub {
my ($delay, @reply) = @_;
my $timer = AE::timer $delay, 0, psub {
snd @reply, $SELF;
};
};
=cut
sub psub(&) {
my $cb = shift;
my $port = $SELF
or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
sub {
local $SELF = $port;
if (wantarray) {
my @res = eval { &$cb };
_self_die if $@;
@res
} else {
my $res = eval { &$cb };
Kill the specified port with the given C<@reason>.
If no C<@reason> is specified, then the port is killed "normally" -
monitor callback will be invoked, but the kil will not cause linked ports
(C<mon $mport, $lport> form) to get killed.
If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
form) get killed with the same reason.
Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
will be reported as reason C<< die => $@ >>.
Transport/communication errors are reported as C<< transport_error =>
$message >>.
Common idioms:
# silently remove yourself, do not kill linked ports
kil $SELF;
fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
specify a function in the main program, use C<::name>.
If the function doesn't exist, then the node tries to C<require>
the package, then the package above the package and so on (e.g.
C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
exists or it runs out of package names.
The init function is then called with the newly-created port as context
object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
the port might not get created.
A common idiom is to pass a local port, immediately monitor the spawned
port, and in the remote init function, immediately monitor the passed
local port. This two-way monitoring ensures that both ports get cleaned up
when there is a problem.
C<spawn> guarantees that the C<$initfunc> has no visible effects on the
caller before C<spawn> returns (by delaying invocation when spawn is
called for the local node).
MP/Intro.pod view on Meta::CPAN
want to ensure that if some component is missing, or has crashed, it will
still be there, by recovering and restarting the service.
AnyEvent::MP supports this by catching exceptions and network problems,
and notifying interested parties of these.
=head2 Exceptions, Port Context, Network Errors and Monitors
=head3 Exceptions
Exceptions are handled on a per-port basis: all receive callbacks are
executed in a special context, the so-called I<port-context>: code
that throws an otherwise uncaught exception will cause the port to be
C<kil>led. Killed ports are destroyed automatically (killing ports is
actually the only way to free ports).
Ports can be monitored, even from a different node and host, and when a
port is killed, any entity monitoring it will be notified.
Here is a simple example:
MP/Intro.pod view on Meta::CPAN
the thing is, a "normal" kill does not count as a crash. This way you can
easily link ports together and make them crash together on errors, while
allowing you to remove a port silently when it has done it's job properly.
=head3 Port Context
Code runs in the so-called "port context". That means C<$SELF> contains
its own port ID and exceptions that the code throws will be caught.
Since AnyEvent::MP is event-based, it is not uncommon to register
callbacks from within C<rcv> handlers. As example, assume that the
following port receive handler wants to C<die> a second later, using
C<after>:
my $port = port {
after 1, sub { die "oops" };
};
If you try this out, you would find it does not work - when the C<after>
callback is executed, it does not run in the port context anymore, so
exceptions will not be caught.
MP/Intro.pod view on Meta::CPAN
The crucial point you should understand from this example is that
monitoring is usually symmetric: when you monitor some other port,
potentially on another node, that other port usually should monitor you,
too, so when the connection dies, both ports get killed, or at least both
sides can take corrective action. Exceptions are "servers" that serve
multiple clients at once and might only wish to clean up, and supervisors,
who of course should not normally get killed (unless they, too, have a
supervisor).
If you often think in object-oriented terms, then you can think of a port
as an object: C<port> is the constructor, the receive callbacks set by
C<rcv> act as methods, the C<kil> function becomes the explicit destructor
and C<mon> installs a destructor hook. Unlike conventional object oriented
programming, it can make sense to exchange port IDs more freely (for
example, to monitor one port from another), because it is cheap to send
port IDs over the network, and AnyEvent::MP blurs the distinction between
local and remote ports.
Lastly, there is ample room for improvement in this example: the server
should probably remember the nickname in the C<join> handler instead of
expecting it in every chat message, it should probably monitor itself, and
MP/Intro.pod view on Meta::CPAN
There are plenty of possibilities you can use - it's all up to you how you
structure your application.
=head1 PART 4: Coro::MP - selective receive
Not all problems lend themselves naturally to an event-based solution:
sometimes things are easier if you can decide in what order you want to
receive messages, regardless of the order in which they were sent.
In these cases, L<Coro::MP> can provide a nice solution: instead of
registering callbacks for each message type, C<Coro::MP> attaches a
(coro-) thread to a port. The thread can then opt to selectively receive
messages it is interested in. Other messages are not lost, but queued, and
can be received at a later time.
The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
module. It is, however, tightly integrated into C<AnyEvent::MP> - the
ports it creates are fully compatible to C<AnyEvent::MP> ports.
In fact, C<Coro::MP> is more of an extension than a separate module: all
functions exported by C<AnyEvent::MP> are exported by it as well.
sub transport_connect {
my ($self, $tp) = @_;
AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
}
sub kill {
my (undef, @args) = @_;
# we _always_ delay kil's, to avoid calling mon callbacks
# from anything but the event loop context.
$DELAY = 1;
push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
$DELAY_W ||= AE::timer 0, 0, $send_delayed;
}
sub monitor {
# maybe always delay, too?
if ($DELAY_W) {
my @args = @_;
NAME
AnyEvent::MP - erlang-style multi-processing/message-passing framework
SYNOPSIS
use AnyEvent::MP;
$NODE # contains this node's node ID
NODE # returns this node's node ID
$SELF # receiving/own port id in rcv callbacks
# initialise the node so it can send/receive messages
configure;
# ports are message destinations
# sending messages
snd $port, type => data...;
snd $port, @msg;
snd @msg_with_first_element_being_a_port;
kil $port, my_error => "everything is broken"; # error kill
# monitoring
mon $port, $cb->(@msg) # callback is invoked on death
mon $port, $localport # kill localport on abnormal death
mon $port, $localport, @msg # send message on death
# temporarily execute code in port context
peval $port, sub { die "kill the port!" };
# execute callbacks in $SELF port context
my $timer = AE::timer 1, 0, psub {
die "kill the port, delayed";
};
# distributed database - modification
db_set $family => $subkey [=> $value] # add a subkey
db_del $family => $subkey... # delete one or more subkeys
db_reg $family => $port [=> $value] # register a port
# distributed database - queries
# then use it
configure profile => "seed";
# or simply use aemp from the shell again:
# aemp run profile seed
# or provide a nicer-to-remember nodeid
# aemp run profile seed nodeid "$(hostname)"
$SELF
Contains the current port id while executing "rcv" callbacks or
"psub" blocks.
*SELF, SELF, %SELF, @SELF...
Due to some quirks in how perl exports variables, it is impossible
to just export $SELF, all the symbols named "SELF" are exported by
this module, but only $SELF is currently used.
snd $port, type => @data
snd $port, @msg
Send the given message to the given port, which can identify either
The type of data you can transfer depends on the transport protocol:
when JSON is used, then only strings, numbers and arrays and hashes
consisting of those are allowed (no objects). When Storable is used,
then anything that Storable can serialise and deserialise is
allowed, and for the local node, anything can be passed. Best rely
only on the common denominator of these.
$local_port = port
Create a new local port object and returns its port ID. Initially it
has no callbacks set and will throw an error when it receives
messages.
$local_port = port { my @msg = @_ }
Creates a new local port, and returns its ID. Semantically the same
as creating a port and calling "rcv $port, $callback" on it.
The block will be called for every message received on the port,
with the global variable $SELF set to the port ID. Runtime errors
will cause the port to be "kil"ed. The message will be passed as-is,
no extra argument (i.e. no port ID) will be passed to the callback.
better "kil" the port when it is no longer needed.
The global $SELF (exported by this module) contains $port while
executing the callback. Runtime errors during callback execution
will result in the port being "kil"ed.
The default callback receives all messages not matched by a more
specific "tag" match.
rcv $local_port, tag => $callback->(@msg_without_tag), ...
Register (or replace) callbacks to be called on messages starting
with the given tag on the given port (and return the port), or
unregister it (when $callback is $undef or missing). There can only
be one callback registered for each tag.
The original message will be passed to the callback, after the first
element (the tag) has been removed. The callback will use the same
environment as the default callback (see above).
Example: create a port and bind receivers on it in one go.
my $port = port { ... };
peval $port, sub {
init
or die "unable to init";
};
$closure = psub { BLOCK }
Remembers $SELF and creates a closure out of the BLOCK. When the
closure is executed, sets up the environment in the same way as in
"rcv" callbacks, i.e. runtime errors will cause the port to get
"kil"ed.
The effect is basically as if it returned "sub { peval $SELF, sub {
BLOCK }, @_ }".
This is useful when you register callbacks from "rcv" callbacks:
rcv delayed_reply => sub {
my ($delay, @reply) = @_;
my $timer = AE::timer $delay, 0, psub {
snd @reply, $SELF;
};
};
$guard = mon $port, $rcvport # kill $rcvport when $port dies
$guard = mon $port # kill $SELF when $port dies
kil $port[, @reason]
Kill the specified port with the given @reason.
If no @reason is specified, then the port is killed "normally" -
monitor callback will be invoked, but the kil will not cause linked
ports ("mon $mport, $lport" form) to get killed.
If a @reason is specified, then linked ports ("mon $mport, $lport"
form) get killed with the same reason.
Runtime errors while evaluating "rcv" callbacks or inside "psub"
blocks will be reported as reason "die => $@".
Transport/communication errors are reported as "transport_error =>
$message".
Common idioms:
# silently remove yourself, do not kill linked ports
kil $SELF;
"MyApp::Chat::Server::init"). To specify a function in the main
program, use "::name".
If the function doesn't exist, then the node tries to "require" the
package, then the package above the package and so on (e.g.
"MyApp::Chat::Server", "MyApp::Chat", "MyApp") until the function
exists or it runs out of package names.
The init function is then called with the newly-created port as
context object ($SELF) and the @initdata values as arguments. It
*must* call one of the "rcv" functions to set callbacks on $SELF,
otherwise the port might not get created.
A common idiom is to pass a local port, immediately monitor the
spawned port, and in the remote init function, immediately monitor
the passed local port. This two-way monitoring ensures that both
ports get cleaned up when there is a problem.
"spawn" guarantees that the $initfunc has no visible effects on the
caller before "spawn" returns (by delaying invocation when spawn is
called for the local node).
( run in 1.705 second using v1.01-cache-2.11-cpan-d6f9594c0a5 )