AnyEvent-MP
view release on metacpan or search on metacpan
MP/Kernel.pm view on Meta::CPAN
our $DELAY_TIMER;
our @DELAY_QUEUE;
our $delay_run = sub {
(shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};
sub delay($) {
push @DELAY_QUEUE, shift;
$DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}
=item $AnyEvent::MP::Kernel::SRCNODE
During execution of a message callback, this variable contains the node ID
of the origin node.
The main use of this variable is for debugging output - there are probably
very few other cases where you need to know the source node ID.
=cut
sub _inject {
warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;
&{ $PORT{+shift} or return };
}
# this function adds a node-ref, so you can send stuff to it
# it is basically the central routing component.
sub add_node {
$NODE{$_[0]} || do {
my ($node) = @_;
length $node
or Carp::croak "'undef' or the empty string are not valid node/port IDs";
# registers itself in %NODE
new AnyEvent::MP::Node::Remote $node
}
}
sub snd(@) {
my ($nodeid, $portid) = split /#/, shift, 2;
warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;
($NODE{$nodeid} || add_node $nodeid)
->{send} (["$portid", @_]);
}
sub port_is_local($) {
my ($nodeid, undef) = split /#/, $_[0], 2;
$nodeid eq $NODE
}
=item snd_to_func $node, $func, @args
Expects a node ID and a name of a function. Asynchronously tries to call
this function with the given arguments on that node.
This function can be used to implement C<spawn>-like interfaces.
=cut
sub snd_to_func($$;@) {
my $nodeid = shift;
# on $NODE, we artificially delay... (for spawn)
# this is very ugly - maybe we should simply delay ALL messages,
# to avoid deep recursion issues. but that's so... slow...
$AnyEvent::MP::Node::Self::DELAY = 1
if $nodeid ne $NODE;
($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
}
=item snd_on $node, @msg
Executes C<snd> with the given C<@msg> (which must include the destination
port) on the given node.
=cut
sub snd_on($@) {
my $node = shift;
snd $node, snd => @_;
}
=item eval_on $node, $string[, @reply]
Evaluates the given string as Perl expression on the given node. When
@reply is specified, then it is used to construct a reply message with
C<"$@"> and any results from the eval appended.
=cut
sub eval_on($$;@) {
my $node = shift;
snd $node, eval => @_;
}
sub kil(@) {
my ($nodeid, $portid) = split /#/, shift, 2;
length $portid
or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
($NODE{$nodeid} || add_node $nodeid)
->kill ("$portid", @_);
}
#############################################################################
# node monitoring and info
=item $bool = node_is_up $nodeid
Returns true if the given node is "up", that is, the kernel thinks it has
a working connection to it.
( run in 1.083 second using v1.01-cache-2.11-cpan-39bf76dae61 )