AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Kernel.pm  view on Meta::CPAN

sub _inject_nodeevent($$;@) {
   my ($node, $up, @reason) = @_;

   AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");

   for my $cb (values %MON_NODES) {
      eval { $cb->($node->{id}, $up, @reason); 1 }
         or AE::log die => $@;
   }
}

#############################################################################
# self node code

sub _kill {
   my $port = shift;

   delete $PORT{$port}
      or return; # killing nonexistent ports is O.K.
   delete $PORT_DATA{$port};

   my $mon = delete $LMON{$port}
      or !@_
      or AE::log die => "unmonitored local port $port died with reason: @_";

   $_->(@_) for values %$mon;
}

sub _monitor {
   return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
      unless exists $PORT{$_[1]};

   $LMON{$_[1]}{$_[2]+0} = $_[2];
}

sub _unmonitor {
   delete $LMON{$_[1]}{$_[2]+0}
      if exists $LMON{$_[1]};
}

sub _secure_check {
   $SECURE
      and die "remote execution not allowed\n";
}

our %NODE_REQ;

%NODE_REQ = (
   # "mproto" - monitoring protocol

   # monitoring
   mon0 => sub { # stop monitoring a port for another node
      my $portid = shift;
      # the if exists should not be needed, but there is apparently a bug
      # elsewhere, and this works around that, silently suppressing that bug. sigh.
      _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}
         if exists $NODE{$SRCNODE};
   },
   mon1 => sub { # start monitoring a port for another node
      my $portid = shift;
      Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
      _monitor undef, $portid, $node->{rmon}{$portid} = sub {
         delete $node->{rmon}{$portid};
         $node->send (["", kil0 => $portid, @_])
            if $node && $node->{transport};
      };
   },
   # another node has killed a monitored port
   kil0 => sub {
      my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
         or return;

      $_->(@_) for @$cbs;
   },
   # another node wants to kill a local port
   kil1 => \&_kill,

   # "public" services - not actually public

   # relay message to another node / generic echo
   snd => sub {
      &snd
   },
   # ask if a node supports the given request, only works for fixed tags
   can => sub {
      my $method = shift;
      snd @_, exists $NODE_REQ{$method};
   },

   # random utilities
   eval => sub {
      &_secure_check;
      my @res = do { package main; eval shift };
      snd @_, "$@", @res if @_;
   },
   time => sub {
      snd @_, AE::now;
   },
   devnull => sub {
      #
   },
   "" => sub {
      # empty messages are keepalives or similar devnull-applications
   },
);

# the node port
new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE

$PORT{""} = sub {
   my $tag = shift;
   eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
   AE::log die => "error processing node message from $SRCNODE: $@" if $@;
};

our $MPROTO = 1;

# tell everybody who connects our nproto
push @AnyEvent::MP::Transport::HOOK_GREET, sub {
   $_[0]{local_greeting}{mproto} = $MPROTO;
};



( run in 2.394 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )