AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Kernel.pm  view on Meta::CPAN

      and die "remote execution not allowed\n";
}

our %NODE_REQ;

%NODE_REQ = (
   # "mproto" - monitoring protocol

   # monitoring
   mon0 => sub { # stop monitoring a port for another node
      my $portid = shift;
      # the if exists should not be needed, but there is apparently a bug
      # elsewhere, and this works around that, silently suppressing that bug. sigh.
      _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}
         if exists $NODE{$SRCNODE};
   },
   mon1 => sub { # start monitoring a port for another node
      my $portid = shift;
      Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
      _monitor undef, $portid, $node->{rmon}{$portid} = sub {
         delete $node->{rmon}{$portid};
         $node->send (["", kil0 => $portid, @_])
            if $node && $node->{transport};
      };
   },
   # another node has killed a monitored port
   kil0 => sub {
      my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
         or return;

      $_->(@_) for @$cbs;
   },
   # another node wants to kill a local port
   kil1 => \&_kill,

   # "public" services - not actually public

   # relay message to another node / generic echo
   snd => sub {
      &snd
   },
   # ask if a node supports the given request, only works for fixed tags
   can => sub {
      my $method = shift;
      snd @_, exists $NODE_REQ{$method};
   },

   # random utilities
   eval => sub {
      &_secure_check;
      my @res = do { package main; eval shift };
      snd @_, "$@", @res if @_;
   },
   time => sub {
      snd @_, AE::now;
   },
   devnull => sub {
      #
   },
   "" => sub {
      # empty messages are keepalives or similar devnull-applications
   },
);

# the node port
new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE

$PORT{""} = sub {
   my $tag = shift;
   eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
   AE::log die => "error processing node message from $SRCNODE: $@" if $@;
};

our $MPROTO = 1;

# tell everybody who connects our nproto
push @AnyEvent::MP::Transport::HOOK_GREET, sub {
   $_[0]{local_greeting}{mproto} = $MPROTO;
};

#############################################################################
# seed management, try to keep connections to all seeds at all times

our %SEED_NODE;    # seed ID => node ID|undef
our %NODE_SEED;    # map node ID to seed ID
our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
our $SEED_WATCHER;
our $SEED_RETRY;
our %GLOBAL_NODE;  # global => undef

sub seed_connect {
   my ($seed) = @_;

   my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
      or Carp::croak "$seed: unparsable seed address";

   AE::log 9 => "trying connect to seed node $seed.";

   $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
      $host, $port,
      on_greeted => sub {
         # called after receiving remote greeting, learn remote node name

         # we rely on untrusted data here (the remote node name) this is
         # hopefully ok, as this can at most be used for DOSing, which is easy
         # when you can do MITM anyway.

         # if we connect to ourselves, nuke this seed, but make sure we act like a seed
         if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
            require AnyEvent::MP::Global; # every seed becomes a global node currently
            delete $SEED_NODE{$seed};
         } else {
            $SEED_NODE{$seed} = $_[0]{remote_node};
            $NODE_SEED{$_[0]{remote_node}} = $seed;

            # also start global service, in case it isn't running
            # since we probably switch conenctions, maybe we don't need to do this here?
            snd $_[0]{remote_node}, "g_slave";
         }
      },
      sub {

MP/Kernel.pm  view on Meta::CPAN

sub seed_all {
   my @seeds = grep
      !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
      keys %SEED_NODE;

   if (@seeds) {
      # start connection attempt for every seed we are not connected to yet
      seed_connect $_
         for grep !exists $SEED_CONNECT{$_}, @seeds;

      $SEED_RETRY = $SEED_RETRY * 2;
      $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
         if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};

      $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;

   } else {
      # all seeds connected or connecting, no need to restart timer
      undef $SEED_WATCHER;
   }
}

sub seed_again {
   $SEED_RETRY = (1 + rand) * 0.6;
   $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
}

# sets new seed list, starts connecting
sub set_seeds(@) {
   %SEED_NODE     = ();
   %NODE_SEED     = ();
   %SEED_CONNECT  = ();

   @SEED_NODE{@_} = ();

   seed_again;
}

# normal nodes only record global node connections
$NODE_REQ{g_global} = sub {
   undef $GLOBAL_NODE{$SRCNODE};
};

mon_nodes sub {
   delete $GLOBAL_NODE{$_[0]}
     unless $_[1];

   return unless exists $NODE_SEED{$_[0]};

   if ($_[1]) {
      # each time a connection to a seed node goes up, make
      # sure it runs the global service.
      snd $_[0], "g_slave";
   } else {
      # if we lost the connection to a seed node, make sure we are seeding
      seed_again;
   }
};

#############################################################################
# keepalive code - used to kepe conenctions to certain nodes alive
# only used by global code atm., but ought to be exposed somehow.
#TODO: should probbaly be done directly by node objects

our $KEEPALIVE_RETRY;
our $KEEPALIVE_WATCHER;
our %KEEPALIVE; # we want to keep these nodes alive
our %KEEPALIVE_DOWN; # nodes that are down currently

sub keepalive_all {
   AE::log 9 => "keepalive: trying to establish connections with: "
                . (join " ", keys %KEEPALIVE_DOWN)
                . ".";

   (add_node $_)->connect
      for keys %KEEPALIVE_DOWN;

   $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
   $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
      if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};

   $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
}

sub keepalive_again {
   $KEEPALIVE_RETRY = (1 + rand) * 0.3;
   keepalive_all;
}

sub keepalive_add {
   return if $KEEPALIVE{$_[0]}++;

   return if node_is_up $_[0];
   undef $KEEPALIVE_DOWN{$_[0]};
   keepalive_again;
}

sub keepalive_del {
   return if --$KEEPALIVE{$_[0]};

   delete $KEEPALIVE     {$_[0]};
   delete $KEEPALIVE_DOWN{$_[0]};

   undef $KEEPALIVE_WATCHER
      unless %KEEPALIVE_DOWN;
}

mon_nodes sub {
   return unless exists $KEEPALIVE{$_[0]};

   if ($_[1]) {
      delete $KEEPALIVE_DOWN{$_[0]};

      undef $KEEPALIVE_WATCHER
         unless %KEEPALIVE_DOWN;
   } else {
      # lost the conenction, try to connect again
      undef $KEEPALIVE_DOWN{$_[0]};
      keepalive_again;
   }
};

#############################################################################
# talk with/to global nodes

# protocol messages:
#
# sent by global nodes
# g_global                  - global nodes send this to all others
#
# database protocol
# g_slave database          - make other global node master of the sender
# g_set database            - global node's database to other global nodes
# g_upd family set del      - update single family (any to global)
#
# slave <-> global protocol
# g_find node               - query addresses for node (slave to global)
# g_found node binds        - node addresses (global to slave)
# g_db_family family id     - send g_reply with data (global to slave)
# g_db_keys   family id     - send g_reply with data (global to slave)
# g_db_values family id     - send g_reply with data (global to slave)
# g_reply id result         - result of any query (global to slave)
# g_mon1 family             - start to monitor family, replies with g_chg1
# g_mon0 family             - stop monitoring family
# g_chg1 family hash        - initial value of family when starting to monitor
# g_chg2 family set del     - like g_upd, but for monitoring only
#
# internal database families:
# "'l" -> node -> listeners
# "'g" -> node -> undef
# ...
#

# used on all nodes:
our $MASTER;       # the global node we bind ourselves to
our $MASTER_MON;
our %LOCAL_DB;     # this node database

our $GPROTO = 1;

# tell everybody who connects our gproto
push @AnyEvent::MP::Transport::HOOK_GREET, sub {
   $_[0]{local_greeting}{gproto} = $GPROTO;
};

#############################################################################
# master selection

# master requests
our %GLOBAL_REQ; # $id => \@req

sub global_req_add {
   my ($id, $req) = @_;

   return if exists $GLOBAL_REQ{$id};

   $GLOBAL_REQ{$id} = $req;

   snd $MASTER, @$req



( run in 1.647 second using v1.01-cache-2.11-cpan-df04353d9ac )