AnyEvent-MP
view release on metacpan or search on metacpan
MP/Kernel.pm view on Meta::CPAN
sub _inject_nodeevent($$;@) {
my ($node, $up, @reason) = @_;
AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
for my $cb (values %MON_NODES) {
eval { $cb->($node->{id}, $up, @reason); 1 }
or AE::log die => $@;
}
}
#############################################################################
# self node code
sub _kill {
my $port = shift;
delete $PORT{$port}
or return; # killing nonexistent ports is O.K.
delete $PORT_DATA{$port};
my $mon = delete $LMON{$port}
or !@_
or AE::log die => "unmonitored local port $port died with reason: @_";
$_->(@_) for values %$mon;
}
sub _monitor {
return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
unless exists $PORT{$_[1]};
$LMON{$_[1]}{$_[2]+0} = $_[2];
}
sub _unmonitor {
delete $LMON{$_[1]}{$_[2]+0}
if exists $LMON{$_[1]};
}
sub _secure_check {
$SECURE
and die "remote execution not allowed\n";
}
our %NODE_REQ;
%NODE_REQ = (
# "mproto" - monitoring protocol
# monitoring
mon0 => sub { # stop monitoring a port for another node
my $portid = shift;
# the if exists should not be needed, but there is apparently a bug
# elsewhere, and this works around that, silently suppressing that bug. sigh.
_unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}
if exists $NODE{$SRCNODE};
},
mon1 => sub { # start monitoring a port for another node
my $portid = shift;
Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
_monitor undef, $portid, $node->{rmon}{$portid} = sub {
delete $node->{rmon}{$portid};
$node->send (["", kil0 => $portid, @_])
if $node && $node->{transport};
};
},
# another node has killed a monitored port
kil0 => sub {
my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
or return;
$_->(@_) for @$cbs;
},
# another node wants to kill a local port
kil1 => \&_kill,
# "public" services - not actually public
# relay message to another node / generic echo
snd => sub {
&snd
},
# ask if a node supports the given request, only works for fixed tags
can => sub {
my $method = shift;
snd @_, exists $NODE_REQ{$method};
},
# random utilities
eval => sub {
&_secure_check;
my @res = do { package main; eval shift };
snd @_, "$@", @res if @_;
},
time => sub {
snd @_, AE::now;
},
devnull => sub {
#
},
"" => sub {
# empty messages are keepalives or similar devnull-applications
},
);
# the node port
new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
$PORT{""} = sub {
my $tag = shift;
eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
AE::log die => "error processing node message from $SRCNODE: $@" if $@;
};
our $MPROTO = 1;
# tell everybody who connects our nproto
push @AnyEvent::MP::Transport::HOOK_GREET, sub {
$_[0]{local_greeting}{mproto} = $MPROTO;
};
( run in 2.394 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )