view release on metacpan or search on metacpan
- new "norc" and "force" options for MP::configure to ignore
or override the rc file.
- a shitload of minor and major fixes all reported and debugged
by Felix Ostmann.
- move seed code to Kernel.
- non-json receive framing was probably broken.
- fixed small typo in MP::Transport.
- fix (unused) provider-kv (version sometimes missing).
- reduce the default export list of MP::Kernel.
- reduced various random delays to zero, as they
are hopefully no longer needed with the new global
protocol.
- implement a secure mode that can suppress (some) code execution
requests.
- do not use GMP for generating alphanumeric strings even when
available, use a more tricky algorithm instead.
- anonymous node names and remote port names have been shortened
considerably.
- $AnyEvent::MP::Kernel::SRCNODE contains the node ID of the origin
node during message receives.
- aemp shell now supports "package" selection and offers an
- bin/aemp now loads AnyEvent::Watchdog::Util before trying
to call restart.
- bin/aemp setcert didn't properly handle the filename argument.
- removed empty and unused timeout= parameter from protocol greeting.
1.23 Fri Nov 6 18:46:26 CET 2009
- listener-less nodes were misinformed by their masters about
the location of new nodes, and therefore didn't connect
properly in all cases.
- messages send during node-up processing could sometimes get lost,
which would hamper global's ability to mesh the network.
- fixed AnyEvent::MP::Global::grp_reg to return a guard also
in scalar context.
- fixed AnyEvent::MP::Kernel::mon_nodes to return a guard also
in scalar context.
- try to improve error reporting when automatically loading
a function - being unable to load a module will now
stop the process and report the error.
1.22 Sat Oct 17 03:41:47 CEST 2009
- fix two rcv-bugs: after adding a tagged rcv, the default
complete graph, so that the network cannot split into separate subnets
forever.
Seed nodes are represented by seed IDs.
=item seed IDs - C<host:port>
Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
TCP port) of nodes that should be used as seed nodes.
=item global nodes
An AEMP network needs a discovery service - nodes need to know how to
connect to other nodes they only know by name. In addition, AEMP offers a
distributed "group database", which maps group names to a list of strings
- for example, to register worker ports.
A network needs at least one global node to work, and allows every node to
be a global node.
Any node that loads the L<AnyEvent::MP::Global> module becomes a global
node and tries to keep connections to all other nodes. So while it can
make sense to make every node "global" in small networks, it usually makes
sense to only make seed nodes into global nodes in large networks (nodes
keep connections to seed nodes and global nodes, so making them the same
reduces overhead).
=back
=head1 VARIABLES/FUNCTIONS
=over 4
=cut
The function first looks up a profile in the aemp configuration (see the
L<aemp> commandline utility). The profile name can be specified via the
named C<profile> parameter or can simply be the first parameter). If it is
missing, then the nodename (F<uname -n>) will be used as profile name.
The profile data is then gathered as follows:
First, all remaining key => value pairs (all of which are conveniently
undocumented at the moment) will be interpreted as configuration
data. Then they will be overwritten by any values specified in the global
default configuration (see the F<aemp> utility), then the chain of
profiles chosen by the profile name (and any C<parent> attributes).
That means that the values specified in the profile have highest priority
and the values specified directly via C<configure> have lowest priority,
and can only be used to specify defaults.
If the profile specifies a node ID, then this will become the node ID of
this process. If not, then the profile name will be used as node ID, with
a unique randoms tring (C</%u>) appended.
Create a new local port object and returns its port ID. Initially it has
no callbacks set and will throw an error when it receives messages.
=item $local_port = port { my @msg = @_ }
Creates a new local port, and returns its ID. Semantically the same as
creating a port and calling C<rcv $port, $callback> on it.
The block will be called for every message received on the port, with the
global variable C<$SELF> set to the port ID. Runtime errors will cause the
port to be C<kil>ed. The message will be passed as-is, no extra argument
(i.e. no port ID) will be passed to the callback.
If you want to stop/destroy the port, simply C<kil> it:
my $port = port {
my @msg = @_;
...
kil $SELF;
};
$port
}
=item rcv $local_port, $callback->(@msg)
Replaces the default callback on the specified port. There is no way to
remove the default callback: use C<sub { }> to disable it, or better
C<kil> the port when it is no longer needed.
The global C<$SELF> (exported by this module) contains C<$port> while
executing the callback. Runtime errors during callback execution will
result in the port being C<kil>ed.
The default callback receives all messages not matched by a more specific
C<tag> match.
=item rcv $local_port, tag => $callback->(@msg_without_tag), ...
Register (or replace) callbacks to be called on messages starting with the
given tag on the given port (and return the port), or unregister it (when
&snd;
$port
}
=back
=head1 DISTRIBUTED DATABASE
AnyEvent::MP comes with a simple distributed database. The database will
be mirrored asynchronously on all global nodes. Other nodes bind to one
of the global nodes for their needs. Every node has a "local database"
which contains all the values that are set locally. All local databases
are merged together to form the global database, which can be queried.
The database structure is that of a two-level hash - the database hash
contains hashes which contain values, similarly to a perl hash of hashes,
i.e.:
$DATABASE{$family}{$subkey} = $value
The top level hash key is called "family", and the second-level hash key
is called "subkey" or simply "key".
The family must be alphanumeric, i.e. start with a letter and consist
of letters, digits, underscores and colons (C<[A-Za-z][A-Za-z0-9_:]*>,
pretty much like Perl module names.
As the family namespace is global, it is recommended to prefix family names
with the name of the application or module using it.
The subkeys must be non-empty strings, with no further restrictions.
The values should preferably be strings, but other perl scalars should
work as well (such as C<undef>, arrays and hashes).
Every database entry is owned by one node - adding the same family/subkey
combination on multiple nodes will not cause discomfort for AnyEvent::MP,
but the result might be nondeterministic, i.e. the key might have
We considered "objects", but found that the actual number of methods
that can be called are quite low. Since port and node IDs travel over
the network frequently, the serialising/deserialising would add lots of
overhead, as well as having to keep a proxy object everywhere.
Strings can easily be printed, easily serialised etc. and need no special
procedures to be "valid".
And as a result, a port with just a default receiver consists of a single
code reference stored in a global hash - it can't become much cheaper.
=item Why favour JSON, why not a real serialising format such as Storable?
In fact, any AnyEvent::MP node will happily accept Storable as framing
format, but currently there is no way to make a node use Storable by
default (although all nodes will accept it).
The default framing protocol is JSON because a) JSON::XS is many times
faster for small messages and b) most importantly, after years of
experience we found that object serialisation is causing more problems
# were originally passed by grp_mon.
...
};
=item Nodes not longer connect to all other nodes.
In AEMP 1.x, every node automatically loads the L<AnyEvent::MP::Global>
module, which in turn would create connections to all other nodes in the
network (helped by the seed nodes).
In version 2.x, global nodes still connect to all other global nodes, but
other nodes don't - now every node either is a global node itself, or
attaches itself to another global node.
If a node isn't a global node itself, then it attaches itself to one
of its seed nodes. If that seed node isn't a global node yet, it will
automatically be upgraded to a global node.
So in many cases, nothing needs to be changed - one just has to make sure
that all seed nodes are meshed together with the other seed nodes (as with
AEMP 1.x), and other nodes specify them as seed nodes. This is most easily
achieved by specifying the same set of seed nodes for all nodes in the
network.
Not opening a connection to every other node is usually an advantage,
except when you need the lower latency of an already established
connection. To ensure a node establishes a connection to another node,
=item Listener-less nodes (nodes without binds) are gone.
And are not coming back, at least not in their old form. If no C<binds>
are specified for a node, AnyEvent::MP assumes a default of C<*:*>.
There are vague plans to implement some form of routing domains, which
might or might not bring back listener-less nodes, but don't count on it.
The fact that most connections are now optional somewhat mitigates this,
as a node can be effectively unreachable from the outside without any
problems, as long as it isn't a global node and only reaches out to other
nodes (as opposed to being contacted from other nodes).
=item $AnyEvent::MP::Kernel::WARN has gone.
AnyEvent has acquired a logging framework (L<AnyEvent::Log>), and AEMP now
uses this, and so should your programs.
Every module now documents what kinds of messages it generates, with
AnyEvent::MP acting as a catch all.
MP/DataConn.pm view on Meta::CPAN
# continue connect
if (@$addr) {
# node has listeners, so connect
_connect $id, $node;
} else {
# no listeners, ask it to connect to us
AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE;
}
} else {
# wait for the next global setup handshake
# due to the round-trip at the beginning, this should never be necessary
$AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
};
};
# we actually have to make sure that the connection arrives after the expect message, and
# the easiest way to do this is to use an rpc call.
AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() };
}
}
MP/Global.pm view on Meta::CPAN
=head1 SYNOPSIS
use AnyEvent::MP::Global;
=head1 DESCRIPTION
This module is usually run (or started on) seed nodes and provides a
variety of services to connected nodes, such as the distributed database.
The global nodes form a fully-meshed network, that is, all global nodes
currently maintain connections to all other global nodes.
Loading this module (e.g. as a service) transforms the local node into a
global node. There are no user-servicable parts inside.
For a limited time, this module also exports some AEMP 1.x compatibility
functions (C<grp_reg>, C<grp_get> and C<grp_mon>).
=cut
package AnyEvent::MP::Global;
use common::sense;
use Carp ();
use List::Util ();
use AnyEvent ();
use AnyEvent::MP;
use AnyEvent::MP::Kernel;
AE::log 7 => "starting global service.";
#############################################################################
# node protocol parts for global nodes
package AnyEvent::MP::Kernel;
use JSON::XS ();
# TODO: this is ugly (classical use vars vs. our),
# maybe this should go into MP::Kernel
# "import" from Kernel
our %NODE;
our $NODE;
#our $GLOBAL;
our $SRCNODE; # the origin node id
our %NODE_REQ;
our %GLOBAL_NODE;
our $GLOBAL;
# only in global code
our %GLOBAL_SLAVE;
our %GLOBAL_MON; # monitors {family}
our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
our %LOCAL_DBS; # local databases of other nodes (global and slave)
our %LOCAL_DB; # this node database
# broadcasts a message to all other global nodes
sub g_broadcast {
snd $_, @_
for keys %GLOBAL_NODE;
}
# add/replace/del inside a family in the database
# @$del must not contain any key in %$set
sub g_upd {
my ($node, $family, $set, $del) = @_;
MP/Global.pm view on Meta::CPAN
# add/replace keys
while (my ($k, $v) = each %$set) {
#TODO# optimize duplicate gdb-set's, to some extent, maybe
# but is probably difficult and slow, so don't for the time being.
$ldb->{$k} =
$gdb->{$k} = $v;
}
my (@del_local, @del_global); # actual deletes for other global nodes / our slaves
# take care of deletes
for my $k (@$del) {
delete $ldb->{$k};
if (my @other = grep exists $LOCAL_DBS{$_}{$family}{$k}, keys %LOCAL_DBS) {
# key exists in some other db shard(s)
# if there is a local one, we have to update
# otherwise, we update and delete on other globals
if (my $local = List::Util::first { exists $GLOBAL_SLAVE{$_} } @other) {
$set->{$k} =
$gdb->{$k} = $LOCAL_DBS{$local}{$family}{$k}
unless sv_eq $gdb->{$k}, $LOCAL_DBS{$local}{$family}{$k};
} else {
# must be in a global one then
my $global = List::Util::first { !exists $GLOBAL_SLAVE{$_} } @other;
push @del_global, $k;
$local_set{$k} =
$gdb->{$k} = $LOCAL_DBS{$global}{$family}{$k}
unless sv_eq $gdb->{$k}, $LOCAL_DBS{$global}{$family}{$k};
}
} else {
delete $gdb->{$k};
# this was the only one, so delete locally
push @del_local, $k;
# and globally, if it's a local key
push @del_global, $k if exists $GLOBAL_SLAVE{$node};
}
}
# family could be empty now
delete $GLOBAL_DB {$family} unless %$gdb;
delete $LOCAL_DBS{$node}{$family} unless %$ldb;
# tell other global nodes any changes in our database
g_broadcast g_upd => $family, $set, \@del_global
if exists $GLOBAL_SLAVE{$node} && (%$set || @del_global);
# tell subscribers we have changed the family
if (%$set || %local_set || @del_local) {
@$set{keys %local_set} = values %local_set;
snd $_ => g_chg2 => $family, $set, \@del_local
for keys %{ $GLOBAL_MON{$family} };
}
}
MP/Global.pm view on Meta::CPAN
}
delete $LOCAL_DBS{$node};
}
# gather node databases from slaves
# other node wants to make us the master and sends us their db
$NODE_REQ{g_slave} = sub {
my ($db) = @_
or return; # empty g_slave is used to start global service
my $node = $SRCNODE;
undef $GLOBAL_SLAVE{$node};
g_set $node, $db;
};
# other global node sends us their database
$NODE_REQ{g_set} = sub {
my ($db) = @_;
# need to get it here, because g_set destroys it
my $binds = $db->{"'l"}{$SRCNODE};
g_set $SRCNODE, $db;
# a remote node always has to provide their listeners. for global
# nodes, we mirror their 'l locally, just as we also set 'g.
# that's not very efficient, but ensures that global nodes
# find each other.
db_set "'l" => $SRCNODE => $binds;
};
# other node (global and slave) sends us a family update
$NODE_REQ{g_upd} = sub {
&g_upd ($SRCNODE, @_);
};
# slave node wants to know the listeners of a node
$NODE_REQ{g_find} = sub {
my ($node) = @_;
snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
};
MP/Global.pm view on Meta::CPAN
# g_mon1 family key - start monitoring
$NODE_REQ{g_mon1} = sub {
undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
undef $GLOBAL_MON{$_[0]}{$SRCNODE};
snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
};
#############################################################################
# switch to global mode
# connect from a global node
sub g_global_connect {
my ($node) = @_;
# each node puts the set of connected global nodes into
# 'g - this causes a big duplication and mergefest, but
# is the easiest way to ensure global nodes have a list
# of all other global nodes.
# we also mirror 'l as soon as we receive it, causing
# even more overhead.
db_set "'g" => $node;
# global nodes send all local databases of their slaves, merged,
# as their database to other global nodes
my %db;
while (my ($k, $v) = each %LOCAL_DBS) {
next unless exists $GLOBAL_SLAVE{$k};
while (my ($f, $fv) = each %$v) {
while (my ($k, $kv) = each %$fv) {
$db{$f}{$k} = $kv;
}
}
}
snd $node => g_set => \%db;
}
# overrides request in Kernel
$NODE_REQ{g_global} = sub {
g_disconnect $SRCNODE; # usually a nop, but not when a normal node becomes global
undef $GLOBAL_NODE{$SRCNODE}; # same as in Kernel.pm
g_global_connect $SRCNODE;
};
# delete data from other nodes on node-down
mon_nodes sub {
if ($_[1]) {
snd $_[0] => "g_global"; # tell everybody that we are a global node
} else {
g_disconnect $_[0];
}
};
# now, this is messy
AnyEvent::MP::Kernel::post_configure {
# enable global mode
$GLOBAL = 1;
# global nodes are their own masters - this
# resends global requests and sets the local database.
master_set $NODE;
# now add us to the set of global nodes
db_set "'g" => $NODE;
# tell other nodes that we are global now
for (up_nodes) {
snd $_, "g_global";
# if the node is global, connect
g_global_connect $_
if exists $GLOBAL_NODE{$_};
}
# from here on we should be able to act "normally"
# maintain connections to all global nodes that we know of
db_mon "'g" => sub {
keepalive_add $_ for @{ $_[1] };
keepalive_del $_ for @{ $_[3] };
};
};
#############################################################################
# compatibility functions for aemp 1.0
package AnyEvent::MP::Global;
MP/Intro.pod view on Meta::CPAN
db_set eg_receivers => $port;
The C<port> function has already been discussed. It simply creates a new
I<port> and returns the I<port ID>. The C<db_set> function, however, is
new: The first argument is the name of a I<database family> and the second
argument is the name of a I<subkey> within that family. The third argument
would be the I<value> to be associated with the family and subkey, but,
since it is missing, it will simply be C<undef>.
What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
database. This database runs on so-called "global" nodes, which usually
are the seed nodes of your network. The database structure is "simply" a
hash of hashes of values.
To illustrate this with Perl syntax, assume the database was stored in
C<%DB>, then the C<db_set> function more or less would do this:
$DB{eg_receivers}{$port} = undef;
So the ominous "family" selects a hash in the database, and the "subkey"
is simply the key in this hash - C<db_set> very much works like this
MP/Intro.pod view on Meta::CPAN
To make this easy, AnyEvent::MP supports a simple configuration database,
using profiles, which can be managed using the F<aemp> command-line
utility (yes, this section is about the advanced tinkering mentioned
before).
When you change both programs above to simply call
configure;
then AnyEvent::MP tries to look up a profile using the current node name
in its configuration database, falling back to some global default.
You can run "generic" nodes using the F<aemp> utility as well, and we will
exploit this in the following way: we configure a profile "seed" and run
a node using it, whose sole purpose is to be a seed node for our example
programs.
We bind the seed node to port 4040 on all interfaces:
aemp profile seed binds "*:4040"
And we configure all nodes to use this as seed node (this only works when
running on the same host, for multiple machines you would replace the C<*>
by the IP address or hostname of the node running the seed), by changing
the global settings shared between all profiles:
aemp seeds "*:4040"
Then we run the seed node:
aemp run profile seed
After that, we can start as many other nodes as we want, and they will
all use our generic seed node to discover each other. The reason we can
start our existing programs even though they specify "incompatible"
MP/Kernel.pm view on Meta::CPAN
our $UNIQ; # per-process/node unique cookie
our $NODE;
our $ID = "a";
our %NODE; # node id to transport mapping, or "undef", for local node
our (%PORT, %PORT_DATA); # local ports
our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
our %LMON; # monitored _local_ ports
#our $GLOBAL; # true if node is a global ("directory") node
our %BINDS;
our $BINDS; # our listeners, as arrayref
our $SRCNODE; # holds the sending node _object_ during _inject
our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
# initialise names for non-networked operation
{
# ~54 bits, for local port names, lowercase $ID appended
my $now = AE::now;
$UNIQ =
(join "",
map $alnum[$_],
$$ / 62 % 62,
$$ % 62,
MP/Kernel.pm view on Meta::CPAN
};
#############################################################################
# seed management, try to keep connections to all seeds at all times
our %SEED_NODE; # seed ID => node ID|undef
our %NODE_SEED; # map node ID to seed ID
our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
our $SEED_WATCHER;
our $SEED_RETRY;
our %GLOBAL_NODE; # global => undef
sub seed_connect {
my ($seed) = @_;
my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
or Carp::croak "$seed: unparsable seed address";
AE::log 9 => "trying connect to seed node $seed.";
$SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
$host, $port,
on_greeted => sub {
# called after receiving remote greeting, learn remote node name
# we rely on untrusted data here (the remote node name) this is
# hopefully ok, as this can at most be used for DOSing, which is easy
# when you can do MITM anyway.
# if we connect to ourselves, nuke this seed, but make sure we act like a seed
if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
require AnyEvent::MP::Global; # every seed becomes a global node currently
delete $SEED_NODE{$seed};
} else {
$SEED_NODE{$seed} = $_[0]{remote_node};
$NODE_SEED{$_[0]{remote_node}} = $seed;
# also start global service, in case it isn't running
# since we probably switch conenctions, maybe we don't need to do this here?
snd $_[0]{remote_node}, "g_slave";
}
},
sub {
delete $SEED_CONNECT{$seed};
}
;
}
MP/Kernel.pm view on Meta::CPAN
sub set_seeds(@) {
%SEED_NODE = ();
%NODE_SEED = ();
%SEED_CONNECT = ();
@SEED_NODE{@_} = ();
seed_again;
}
# normal nodes only record global node connections
$NODE_REQ{g_global} = sub {
undef $GLOBAL_NODE{$SRCNODE};
};
mon_nodes sub {
delete $GLOBAL_NODE{$_[0]}
unless $_[1];
return unless exists $NODE_SEED{$_[0]};
if ($_[1]) {
# each time a connection to a seed node goes up, make
# sure it runs the global service.
snd $_[0], "g_slave";
} else {
# if we lost the connection to a seed node, make sure we are seeding
seed_again;
}
};
#############################################################################
# keepalive code - used to kepe conenctions to certain nodes alive
# only used by global code atm., but ought to be exposed somehow.
#TODO: should probbaly be done directly by node objects
our $KEEPALIVE_RETRY;
our $KEEPALIVE_WATCHER;
our %KEEPALIVE; # we want to keep these nodes alive
our %KEEPALIVE_DOWN; # nodes that are down currently
sub keepalive_all {
AE::log 9 => "keepalive: trying to establish connections with: "
. (join " ", keys %KEEPALIVE_DOWN)
MP/Kernel.pm view on Meta::CPAN
undef $KEEPALIVE_WATCHER
unless %KEEPALIVE_DOWN;
} else {
# lost the conenction, try to connect again
undef $KEEPALIVE_DOWN{$_[0]};
keepalive_again;
}
};
#############################################################################
# talk with/to global nodes
# protocol messages:
#
# sent by global nodes
# g_global - global nodes send this to all others
#
# database protocol
# g_slave database - make other global node master of the sender
# g_set database - global node's database to other global nodes
# g_upd family set del - update single family (any to global)
#
# slave <-> global protocol
# g_find node - query addresses for node (slave to global)
# g_found node binds - node addresses (global to slave)
# g_db_family family id - send g_reply with data (global to slave)
# g_db_keys family id - send g_reply with data (global to slave)
# g_db_values family id - send g_reply with data (global to slave)
# g_reply id result - result of any query (global to slave)
# g_mon1 family - start to monitor family, replies with g_chg1
# g_mon0 family - stop monitoring family
# g_chg1 family hash - initial value of family when starting to monitor
# g_chg2 family set del - like g_upd, but for monitoring only
#
# internal database families:
# "'l" -> node -> listeners
# "'g" -> node -> undef
# ...
#
# used on all nodes:
our $MASTER; # the global node we bind ourselves to
our $MASTER_MON;
our %LOCAL_DB; # this node database
our $GPROTO = 1;
# tell everybody who connects our gproto
push @AnyEvent::MP::Transport::HOOK_GREET, sub {
$_[0]{local_greeting}{gproto} = $GPROTO;
};
#############################################################################
# master selection
# master requests
our %GLOBAL_REQ; # $id => \@req
sub global_req_add {
my ($id, $req) = @_;
return if exists $GLOBAL_REQ{$id};
$GLOBAL_REQ{$id} = $req;
snd $MASTER, @$req
if $MASTER;
}
sub global_req_del {
delete $GLOBAL_REQ{$_[0]};
}
#################################
# master rpc
our %GLOBAL_RES;
our $GLOBAL_RES_ID = "a";
sub global_call {
my $id = ++$GLOBAL_RES_ID;
$GLOBAL_RES{$id} = pop;
global_req_add $id, [@_, $id];
}
$NODE_REQ{g_reply} = sub {
my $id = shift;
global_req_del $id;
my $cb = delete $GLOBAL_RES{$id}
or return;
&$cb
};
#################################
sub g_find {
global_req_add "g_find $_[0]", [g_find => $_[0]];
}
# reply for g_find started in Node.pm
$NODE_REQ{g_found} = sub {
global_req_del "g_find $_[0]";
my $node = $NODE{$_[0]} or return;
$node->connect_to ($_[1]);
};
sub master_set {
$MASTER = $_[0];
AE::log 8 => "new master node: $MASTER.";
MP/Kernel.pm view on Meta::CPAN
snd $MASTER, g_slave => \%LOCAL_DB;
# (re-)send queued requests
snd $MASTER, @$_
for values %GLOBAL_REQ;
}
sub master_search {
AE::log 9 => "starting search for master node.";
#TODO: should also look for other global nodes, but we don't know them
for (keys %NODE_SEED) {
if (node_is_up $_) {
master_set $_;
return;
}
}
$MASTER_MON = mon_nodes sub {
return unless $_[1]; # we are only interested in node-ups
return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
master_set $_[0];
};
}
# other node wants to make us the master, so start the global service
$NODE_REQ{g_slave} = sub {
# load global module and redo the request
require AnyEvent::MP::Global;
&{ $NODE_REQ{g_slave} }
};
#############################################################################
# local database operations
# canonical probably not needed
our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
MP/Kernel.pm view on Meta::CPAN
# }
defined wantarray
and Guard::guard { db_del $family => $subkey }
}
# database query
sub db_family {
my ($family, $cb) = @_;
global_call g_db_family => $family, $cb;
}
sub db_keys {
my ($family, $cb) = @_;
global_call g_db_keys => $family, $cb;
}
sub db_values {
my ($family, $cb) = @_;
global_call g_db_values => $family, $cb;
}
# database monitoring
our %LOCAL_MON; # f, reply
our %MON_DB; # f, k, value
sub db_mon($@) {
my ($family, $cb) = @_;
MP/Kernel.pm view on Meta::CPAN
AE::postpone {
return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
# set actual callback
$LOCAL_MON{$family}{$cb+0} = $cb;
$cb->($db, [keys %$db]);
};
} else {
# new monitor, request chg1 from upstream
$LOCAL_MON{$family}{$cb+0} = $cb;
global_req_add "mon1 $family" => [g_mon1 => $family];
$MON_DB{$family} = {};
}
defined wantarray
and Guard::guard {
my $mon = $LOCAL_MON{$family};
delete $mon->{$cb+0};
unless (%$mon) {
global_req_del "mon1 $family";
# no global_req, because we don't care if we are not connected
snd $MASTER, g_mon0 => $family
if $MASTER;
delete $LOCAL_MON{$family};
delete $MON_DB{$family};
}
}
}
# full update
MP/Kernel.pm view on Meta::CPAN
alarm 2;
for my $if (Net::Interface->interfaces) {
# we statically lower-prioritise ipv6 here, TODO :()
for $_ ($if->address (Net::Interface::AF_INET ())) {
next if /^\x7f/; # skip localhost etc.
push @addr, $_;
}
for ($if->address (Net::Interface::AF_INET6 ())) {
#next if $if->scope ($_) <= 2;
next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
push @addr, $_;
}
}
alarm 0;
@addr
};
my @addr;
MP/Kernel.pm view on Meta::CPAN
die "$@" if $@;
}
=back
=head1 LOGGING
AnyEvent::MP::Kernel logs high-level information about the current node,
when nodes go up and down, and most runtime errors. It also logs some
debugging and trace messages about network maintainance, such as seed
connections and global node management.
=head1 SEE ALSO
L<AnyEvent::MP>.
=head1 AUTHOR
Marc Lehmann <schmorp@schmorp.de>
http://home.schmorp.de/
$self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
$self->transport_error (transport_error => $self->{id}, "connect timeout");
};
# maybe @$addresses?
my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
if ($addresses) {
$self->connect_to ($addresses);
} else {
# on global nodes, all bets are off now - we either know the node, or we don't
if ($AnyEvent::MP::Kernel::GLOBAL) {
$self->transport_error (transport_error => $self->{id}, "no known address");
} else {
AnyEvent::MP::Kernel::g_find ($self->{id});
}
}
}
sub connect_to {
my ($self, $addresses) = @_;
MP/Transport.pm view on Meta::CPAN
Informs the other side of the node protocol implemented by this
node. Major version mismatches are fatal. If this key is missing, then it
is assumed that the node doesn't support the node protocol.
The node protocol is currently undocumented, but includes port
monitoring, spawning and informational requests.
=item gproto=<major>.<fractional>
Informs the other side of the global protocol implemented by this
node. Major version mismatches are fatal. If this key is missing, then it
is assumed that the node doesn't support the global protocol.
The global protocol is currently undocumented, but includes node address
lookup and shared database operations.
=back
=head3 Second Greeting Line
After this greeting line there will be a second line containing a
cryptographic nonce, i.e. random data of high quality. To keep the
protocol text-only, these are usually 32 base64-encoded octets, but
it could be anything that doesn't contain any ASCII CR or ASCII LF
MP/Transport.pm view on Meta::CPAN
hexencoded secret would be the shared secret, in lowercase hex (e.g. if
the secret is "geheim", the hex-encoded version would be "67656865696d").
Note that apart from the low-level handshake and framing protocol, there
is a high-level protocol, e.g. for monitoring, building the mesh or
spawning. All these messages are sent to the node port (the empty string)
and can safely be ignored if you do not need the relevant functionality.
=head3 USEFUL HINTS
Since taking part in the global protocol to find port groups is
nontrivial, hardcoding port names should be considered as well, i.e. the
non-Perl node could simply listen to messages for a few well-known ports.
Alternatively, the non-Perl node could call a (already loaded) function
in the Perl node by sending it a special message:
["", "Some::Function::name", "myownport", 1, 2, 3]
This would call the function C<Some::Function::name> with the string
C<myownport> and some additional arguments.
nodes as seed nodes for each other. What's important is that all
seed nodes connections form a complete graph, so that the network
cannot split into separate subnets forever.
Seed nodes are represented by seed IDs.
seed IDs - "host:port"
Seed IDs are transport endpoint(s) (usually a hostname/IP address
and a TCP port) of nodes that should be used as seed nodes.
global nodes
An AEMP network needs a discovery service - nodes need to know how
to connect to other nodes they only know by name. In addition, AEMP
offers a distributed "group database", which maps group names to a
list of strings - for example, to register worker ports.
A network needs at least one global node to work, and allows every
node to be a global node.
Any node that loads the AnyEvent::MP::Global module becomes a global
node and tries to keep connections to all other nodes. So while it
can make sense to make every node "global" in small networks, it
usually makes sense to only make seed nodes into global nodes in
large networks (nodes keep connections to seed nodes and global
nodes, so making them the same reduces overhead).
VARIABLES/FUNCTIONS
$thisnode = NODE / $NODE
The "NODE" function returns, and the $NODE variable contains, the
node ID of the node running in the current process. This value is
initialised by a call to "configure".
$nodeid = node_of $port
Extracts and returns the node ID from a port ID or a node ID.
(see the aemp commandline utility). The profile name can be
specified via the named "profile" parameter or can simply be the
first parameter). If it is missing, then the nodename (uname -n)
will be used as profile name.
The profile data is then gathered as follows:
First, all remaining key => value pairs (all of which are
conveniently undocumented at the moment) will be interpreted as
configuration data. Then they will be overwritten by any values
specified in the global default configuration (see the aemp
utility), then the chain of profiles chosen by the profile name
(and any "parent" attributes).
That means that the values specified in the profile have highest
priority and the values specified directly via "configure" have
lowest priority, and can only be used to specify defaults.
If the profile specifies a node ID, then this will become the
node ID of this process. If not, then the profile name will be
used as node ID, with a unique randoms tring ("/%u") appended.
$local_port = port
Create a new local port object and returns its port ID. Initially it
has no callbacks set and will throw an error when it receives
messages.
$local_port = port { my @msg = @_ }
Creates a new local port, and returns its ID. Semantically the same
as creating a port and calling "rcv $port, $callback" on it.
The block will be called for every message received on the port,
with the global variable $SELF set to the port ID. Runtime errors
will cause the port to be "kil"ed. The message will be passed as-is,
no extra argument (i.e. no port ID) will be passed to the callback.
If you want to stop/destroy the port, simply "kil" it:
my $port = port {
my @msg = @_;
...
kil $SELF;
};
rcv $local_port, $callback->(@msg)
Replaces the default callback on the specified port. There is no way
to remove the default callback: use "sub { }" to disable it, or
better "kil" the port when it is no longer needed.
The global $SELF (exported by this module) contains $port while
executing the callback. Runtime errors during callback execution
will result in the port being "kil"ed.
The default callback receives all messages not matched by a more
specific "tag" match.
rcv $local_port, tag => $callback->(@msg_without_tag), ...
Register (or replace) callbacks to be called on messages starting
with the given tag on the given port (and return the port), or
unregister it (when $callback is $undef or missing). There can only
If no time-out is given (or it is "undef"), then the local port will
monitor the remote port instead, so it eventually gets cleaned-up.
Currently this function returns the temporary port, but this
"feature" might go in future versions unless you can make a
convincing case that this is indeed useful for something.
DISTRIBUTED DATABASE
AnyEvent::MP comes with a simple distributed database. The database will
be mirrored asynchronously on all global nodes. Other nodes bind to one
of the global nodes for their needs. Every node has a "local database"
which contains all the values that are set locally. All local databases
are merged together to form the global database, which can be queried.
The database structure is that of a two-level hash - the database hash
contains hashes which contain values, similarly to a perl hash of
hashes, i.e.:
$DATABASE{$family}{$subkey} = $value
The top level hash key is called "family", and the second-level hash key
is called "subkey" or simply "key".
The family must be alphanumeric, i.e. start with a letter and consist of
letters, digits, underscores and colons ("[A-Za-z][A-Za-z0-9_:]*",
pretty much like Perl module names.
As the family namespace is global, it is recommended to prefix family
names with the name of the application or module using it.
The subkeys must be non-empty strings, with no further restrictions.
The values should preferably be strings, but other perl scalars should
work as well (such as "undef", arrays and hashes).
Every database entry is owned by one node - adding the same
family/subkey combination on multiple nodes will not cause discomfort
for AnyEvent::MP, but the result might be nondeterministic, i.e. the key
We considered "objects", but found that the actual number of methods
that can be called are quite low. Since port and node IDs travel
over the network frequently, the serialising/deserialising would add
lots of overhead, as well as having to keep a proxy object
everywhere.
Strings can easily be printed, easily serialised etc. and need no
special procedures to be "valid".
And as a result, a port with just a default receiver consists of a
single code reference stored in a global hash - it can't become much
cheaper.
Why favour JSON, why not a real serialising format such as Storable?
In fact, any AnyEvent::MP node will happily accept Storable as
framing format, but currently there is no way to make a node use
Storable by default (although all nodes will accept it).
The default framing protocol is JSON because a) JSON::XS is many
times faster for small messages and b) most importantly, after years
of experience we found that object serialisation is causing more
# now $ports, $add and $del are the same as
# were originally passed by grp_mon.
...
};
Nodes not longer connect to all other nodes.
In AEMP 1.x, every node automatically loads the AnyEvent::MP::Global
module, which in turn would create connections to all other nodes in
the network (helped by the seed nodes).
In version 2.x, global nodes still connect to all other global
nodes, but other nodes don't - now every node either is a global
node itself, or attaches itself to another global node.
If a node isn't a global node itself, then it attaches itself to one
of its seed nodes. If that seed node isn't a global node yet, it
will automatically be upgraded to a global node.
So in many cases, nothing needs to be changed - one just has to make
sure that all seed nodes are meshed together with the other seed
nodes (as with AEMP 1.x), and other nodes specify them as seed
nodes. This is most easily achieved by specifying the same set of
seed nodes for all nodes in the network.
Not opening a connection to every other node is usually an
advantage, except when you need the lower latency of an already
established connection. To ensure a node establishes a connection to
And are not coming back, at least not in their old form. If no
"binds" are specified for a node, AnyEvent::MP assumes a default of
"*:*".
There are vague plans to implement some form of routing domains,
which might or might not bring back listener-less nodes, but don't
count on it.
The fact that most connections are now optional somewhat mitigates
this, as a node can be effectively unreachable from the outside
without any problems, as long as it isn't a global node and only
reaches out to other nodes (as opposed to being contacted from other
nodes).
$AnyEvent::MP::Kernel::WARN has gone.
AnyEvent has acquired a logging framework (AnyEvent::Log), and AEMP
now uses this, and so should your programs.
Every module now documents what kinds of messages it generates, with
AnyEvent::MP acting as a catch all.
Each string entry in the list is interpreted as either a module name to
load (when it ends with C<::>) or a function to call (all other cases).
Each entry which is an array itself (you need to use JSON format to
specify those) is interpreted as a function name and the arguments to
pass.
The algorithm to find the function is the same as used for C<<
L<AnyEvent::MP>::spawn >>.
Example: run the global service.
aemp setservices AnyEvent::MP::Global::
Example: call the mymod::myfun function with arguments 1, 2 and 3.
aemp setservices '[["mymod::myfun", 1,2,3]]'
=item delservices
Removes the service list again, which means it is inherited again from
configured.
=item deleval
Delete any eval string set with seteval.
=back
=head2 CONFIGURATION/PROFILE MANAGEMENT
All the above configuration functions by default affect the I<global
default configuration>, which is basically used to augment every profile
and node configuration.
=over 4
=item profile <name> ...
This subcommand makes the following subcommands act only on a specific
named profile, instead of on the global default. The profile is created if
necessary.
Example: create a C<server> profile, give it a random node name, some seed
nodes and bind it on an unspecified port on all local interfaces. You
should add some services then and run the node...
aemp profile server nodeid anon/ seeds doomed,10.0.0.2:5000 binds "*:*"
=item delprofile <name>
Deletes the profile of the given name.
=item setparent <name>
Sets the parent profile to use - values not specified in a profile will be
taken from the parent profile (even recursively, with the global default
config being the default parent). This is useful to configure profile
I<classes> and then to inherit from them for individual nodes.
Note that you can specify circular parent chains and even a parent for the
global configuration. Neither will do you any good, however.
Example: inherit all values not specified in the C<doomed> profile from
the C<server> profile.
aemp profile doomed setparent server
=item delparent
Removes the parent again from the profile, if any was set, so the profile
inherits directly from the global default config again.
=item showprofile <name>
Shows the values of the given profile, and only those, no inherited
values.
=item showconfig <name> <key value...>
Shows the I<effective> config, i.e. the values as used by a node started
with the given profile name. Any additional key-value pairs specified
augment the configuration, just as with C<configure>.
If all arguments are omitted, show the global default config.
=back
=head2 LOW-LEVEL TRANSPORT PROTOCOL
The low-level transport protocol betwene two nodes also has a number of
configurable options, most of which should not be touched unless you know
what you are doing.
=over 4