AnyEvent-MP
view release on metacpan or search on metacpan
MP/Kernel.pm view on Meta::CPAN
=head1 NAME
AnyEvent::MP::Kernel - the actual message passing kernel
=head1 SYNOPSIS
use AnyEvent::MP::Kernel;
$AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
snd_to_func $node, $func, @args # send msg to function
snd_on $node, @msg # snd message again (relay)
eval_on $node, $string[, @reply] # execute perl code on another node
node_is_up $nodeid # return true if a node is connected
@nodes = up_nodes # return a list of all connected nodes
$guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
=head1 DESCRIPTION
This module implements most of the inner workings of AnyEvent::MP. It
offers mostly lower-level functions that deal with network connectivity
and special requests.
You normally interface with AnyEvent::MP through a higher level interface
such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
with using the functions from this module.
=head1 GLOBALS AND FUNCTIONS
=over 4
=cut
package AnyEvent::MP::Kernel;
use common::sense;
use Carp ();
use AnyEvent ();
use Guard ();
use AnyEvent::MP::Node;
use AnyEvent::MP::Transport;
use base "Exporter";
# for re-export in AnyEvent::MP and Coro::MP
our @EXPORT_API = qw(
NODE $NODE
configure
node_of port_is_local
snd kil
db_set db_del
db_mon db_family db_keys db_values
);
our @EXPORT_OK = (
# these are internal
qw(
%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
add_node load_func
),
@EXPORT_API,
);
our @EXPORT = qw(
snd_to_func snd_on eval_on
port_is_local
MP/Kernel.pm view on Meta::CPAN
our $BINDS; # our listeners, as arrayref
our $SRCNODE; # holds the sending node _object_ during _inject
our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
# initialise names for non-networked operation
{
# ~54 bits, for local port names, lowercase $ID appended
my $now = AE::now;
$UNIQ =
(join "",
map $alnum[$_],
$$ / 62 % 62,
$$ % 62,
(int $now ) % 62,
(int $now * 100) % 62,
(int $now * 10000) % 62,
) . nonce62 4
;
# ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
$RUNIQ = nonce62 10;
$RUNIQ =~ s/(.)$/\U$1/;
$NODE = "";
}
sub NODE() {
$NODE
}
sub node_of($) {
my ($node, undef) = split /#/, $_[0], 2;
$node
}
BEGIN {
*TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
? sub () { 1 }
: sub () { 0 };
}
our $DELAY_TIMER;
our @DELAY_QUEUE;
our $delay_run = sub {
(shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};
sub delay($) {
push @DELAY_QUEUE, shift;
$DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}
=item $AnyEvent::MP::Kernel::SRCNODE
During execution of a message callback, this variable contains the node ID
of the origin node.
The main use of this variable is for debugging output - there are probably
very few other cases where you need to know the source node ID.
=cut
sub _inject {
warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;
&{ $PORT{+shift} or return };
}
# this function adds a node-ref, so you can send stuff to it
# it is basically the central routing component.
sub add_node {
$NODE{$_[0]} || do {
my ($node) = @_;
length $node
or Carp::croak "'undef' or the empty string are not valid node/port IDs";
# registers itself in %NODE
new AnyEvent::MP::Node::Remote $node
}
}
sub snd(@) {
my ($nodeid, $portid) = split /#/, shift, 2;
warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;
($NODE{$nodeid} || add_node $nodeid)
->{send} (["$portid", @_]);
}
sub port_is_local($) {
my ($nodeid, undef) = split /#/, $_[0], 2;
$nodeid eq $NODE
}
=item snd_to_func $node, $func, @args
Expects a node ID and a name of a function. Asynchronously tries to call
this function with the given arguments on that node.
This function can be used to implement C<spawn>-like interfaces.
=cut
sub snd_to_func($$;@) {
my $nodeid = shift;
# on $NODE, we artificially delay... (for spawn)
# this is very ugly - maybe we should simply delay ALL messages,
# to avoid deep recursion issues. but that's so... slow...
$AnyEvent::MP::Node::Self::DELAY = 1
if $nodeid ne $NODE;
($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
}
MP/Kernel.pm view on Meta::CPAN
for (map _resolve $_, @$binds) {
for my $bind ($_->recv) {
my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
or Carp::croak "$bind: unparsable local bind address";
my $listener = AnyEvent::MP::Transport::mp_server
$host,
$port,
prepare => sub {
my (undef, $host, $port) = @_;
$bind = AnyEvent::Socket::format_hostport $host, $port;
0
},
;
$BINDS{$bind} = $listener;
push @$BINDS, $bind;
}
}
AE::log 9 => "running post config hooks and init.";
# might initialise Global, so need to do it before db_set
post_configure { };
db_set "'l" => $NODE => $BINDS;
AE::log 8 => "node listens on [@$BINDS].";
# connect to all seednodes
set_seeds map $_->recv, map _resolve $_, @$seeds;
master_search;
# save gobs of memory
undef &_resolve;
*configure = sub (@){ };
AE::log 9 => "starting services.";
for (@{ $CONFIG->{services} }) {
if (ref) {
my ($func, @args) = @$_;
(load_func $func)->(@args);
} elsif (s/::$//) {
eval "require $_";
die $@ if $@;
} else {
(load_func $_)->();
}
}
eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
die "$@" if $@;
}
=back
=head1 LOGGING
AnyEvent::MP::Kernel logs high-level information about the current node,
when nodes go up and down, and most runtime errors. It also logs some
debugging and trace messages about network maintainance, such as seed
connections and global node management.
=head1 SEE ALSO
L<AnyEvent::MP>.
=head1 AUTHOR
Marc Lehmann <schmorp@schmorp.de>
http://home.schmorp.de/
=cut
1
( run in 1.291 second using v1.01-cache-2.11-cpan-39bf76dae61 )