AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Kernel.pm  view on Meta::CPAN

   $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
   $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
      if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};

   $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
}

sub keepalive_again {
   $KEEPALIVE_RETRY = (1 + rand) * 0.3;
   keepalive_all;
}

sub keepalive_add {
   return if $KEEPALIVE{$_[0]}++;

   return if node_is_up $_[0];
   undef $KEEPALIVE_DOWN{$_[0]};
   keepalive_again;
}

sub keepalive_del {
   return if --$KEEPALIVE{$_[0]};

   delete $KEEPALIVE     {$_[0]};
   delete $KEEPALIVE_DOWN{$_[0]};

   undef $KEEPALIVE_WATCHER
      unless %KEEPALIVE_DOWN;
}

mon_nodes sub {
   return unless exists $KEEPALIVE{$_[0]};

   if ($_[1]) {
      delete $KEEPALIVE_DOWN{$_[0]};

      undef $KEEPALIVE_WATCHER
         unless %KEEPALIVE_DOWN;
   } else {
      # lost the conenction, try to connect again
      undef $KEEPALIVE_DOWN{$_[0]};
      keepalive_again;
   }
};

#############################################################################
# talk with/to global nodes

# protocol messages:
#
# sent by global nodes
# g_global                  - global nodes send this to all others
#
# database protocol
# g_slave database          - make other global node master of the sender
# g_set database            - global node's database to other global nodes
# g_upd family set del      - update single family (any to global)
#
# slave <-> global protocol
# g_find node               - query addresses for node (slave to global)
# g_found node binds        - node addresses (global to slave)
# g_db_family family id     - send g_reply with data (global to slave)
# g_db_keys   family id     - send g_reply with data (global to slave)
# g_db_values family id     - send g_reply with data (global to slave)
# g_reply id result         - result of any query (global to slave)
# g_mon1 family             - start to monitor family, replies with g_chg1
# g_mon0 family             - stop monitoring family
# g_chg1 family hash        - initial value of family when starting to monitor
# g_chg2 family set del     - like g_upd, but for monitoring only
#
# internal database families:
# "'l" -> node -> listeners
# "'g" -> node -> undef
# ...
#

# used on all nodes:
our $MASTER;       # the global node we bind ourselves to
our $MASTER_MON;
our %LOCAL_DB;     # this node database

our $GPROTO = 1;

# tell everybody who connects our gproto
push @AnyEvent::MP::Transport::HOOK_GREET, sub {
   $_[0]{local_greeting}{gproto} = $GPROTO;
};

#############################################################################
# master selection

# master requests
our %GLOBAL_REQ; # $id => \@req

sub global_req_add {
   my ($id, $req) = @_;

   return if exists $GLOBAL_REQ{$id};

   $GLOBAL_REQ{$id} = $req;

   snd $MASTER, @$req
      if $MASTER;
}

sub global_req_del {
   delete $GLOBAL_REQ{$_[0]};
}

#################################
# master rpc

our %GLOBAL_RES;
our $GLOBAL_RES_ID = "a";

sub global_call {
   my $id = ++$GLOBAL_RES_ID;
   $GLOBAL_RES{$id} = pop;
   global_req_add $id, [@_, $id];
}

$NODE_REQ{g_reply} = sub {
   my $id = shift;
   global_req_del $id;
   my $cb = delete $GLOBAL_RES{$id}
      or return;
   &$cb
};

#################################

sub g_find {
   global_req_add "g_find $_[0]", [g_find => $_[0]];
}

# reply for g_find started in Node.pm
$NODE_REQ{g_found} = sub {
   global_req_del "g_find $_[0]";

MP/Kernel.pm  view on Meta::CPAN

         AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
            for (@_) {
               my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
               push @res, [
                  $pri += 1e-5,
                  AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
               ];
            }
            $cv->end;
         };
      }
   }

   $cv->end;

   $cv
}

our @POST_CONFIGURE;

# not yet documented
sub post_configure(&) {
   die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;

   push @POST_CONFIGURE, @_;
   (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
}

sub configure(@) {
   unshift @_, "profile" if @_ & 1;
   my (%kv) = @_;

   my $profile = delete $kv{profile};

   $profile = nodename
      unless defined $profile;

   $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;

   $SECURE = $CONFIG->{secure};

   my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";

   $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";

   my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure

   $NODE = $node;

   $NODE =~ s/%n/nodename/ge;

   if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
      # nodes with randomised node names do not need randomised port names
      $UNIQ = "";
   }

   $node_obj->{id} = $NODE;
   $NODE{$NODE} = $node_obj;

   my $seeds = $CONFIG->{seeds};
   my $binds = $CONFIG->{binds};

   $binds ||= ["*"];

   AE::log 8 => "node $NODE starting up.";

   $BINDS = [];
   %BINDS = ();

   for (map _resolve $_, @$binds) {
      for my $bind ($_->recv) {
         my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
            or Carp::croak "$bind: unparsable local bind address";

         my $listener = AnyEvent::MP::Transport::mp_server
            $host,
            $port,
            prepare => sub {
               my (undef, $host, $port) = @_;
               $bind = AnyEvent::Socket::format_hostport $host, $port;
               0
            },
         ;
         $BINDS{$bind} = $listener;
         push @$BINDS, $bind;
      }
   }

   AE::log 9 => "running post config hooks and init.";

   # might initialise Global, so need to do it before db_set
   post_configure { };

   db_set "'l" => $NODE => $BINDS;

   AE::log 8 => "node listens on [@$BINDS].";

   # connect to all seednodes
   set_seeds map $_->recv, map _resolve $_, @$seeds;
   master_search;

   # save gobs of memory
   undef &_resolve;
   *configure = sub (@){ };

   AE::log 9 => "starting services.";

   for (@{ $CONFIG->{services} }) {
      if (ref) {
         my ($func, @args) = @$_;
         (load_func $func)->(@args);
      } elsif (s/::$//) {
         eval "require $_";
         die $@ if $@;
      } else {
         (load_func $_)->();
      }
   }

   eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
   die "$@" if $@;
}

=back

=head1 LOGGING

AnyEvent::MP::Kernel logs high-level information about the current node,
when nodes go up and down, and most runtime errors. It also logs some
debugging and trace messages about network maintainance, such as seed
connections and global node management.

=head1 SEE ALSO

L<AnyEvent::MP>.

=head1 AUTHOR

 Marc Lehmann <schmorp@schmorp.de>
 http://home.schmorp.de/

=cut

1



( run in 0.742 second using v1.01-cache-2.11-cpan-2398b32b56e )