AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Kernel.pm  view on Meta::CPAN

=head1 NAME

AnyEvent::MP::Kernel - the actual message passing kernel

=head1 SYNOPSIS

   use AnyEvent::MP::Kernel;

   $AnyEvent::MP::Kernel::SRCNODE   # contains msg origin node id, for debugging

   snd_to_func $node, $func, @args  # send msg to function
   snd_on $node, @msg               # snd message again (relay)
   eval_on $node, $string[, @reply] # execute perl code on another node

   node_is_up $nodeid               # return true if a node is connected
   @nodes = up_nodes                # return a list of all connected nodes
   $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs

=head1 DESCRIPTION

This module implements most of the inner workings of AnyEvent::MP. It
offers mostly lower-level functions that deal with network connectivity
and special requests.

You normally interface with AnyEvent::MP through a higher level interface
such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
with using the functions from this module.

=head1 GLOBALS AND FUNCTIONS

=over 4

=cut

package AnyEvent::MP::Kernel;

use common::sense;
use Carp ();

use AnyEvent ();
use Guard ();

use AnyEvent::MP::Node;
use AnyEvent::MP::Transport;

use base "Exporter";

# for re-export in AnyEvent::MP and Coro::MP
our @EXPORT_API = qw(
   NODE $NODE
   configure
   node_of port_is_local
   snd kil
   db_set db_del
   db_mon db_family db_keys db_values
);

our @EXPORT_OK = (
   # these are internal
   qw(
      %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
      add_node load_func
   ),
   @EXPORT_API,
);

our @EXPORT = qw(
   snd_to_func snd_on eval_on
   port_is_local

MP/Kernel.pm  view on Meta::CPAN

our $BINDS; # our listeners, as arrayref

our $SRCNODE; # holds the sending node _object_ during _inject
our $GLOBAL;  # true when this is a global node (only set by AnyEvent::MP::Global)

# initialise names for non-networked operation
{
   # ~54 bits, for local port names, lowercase $ID appended
   my $now = AE::now;
   $UNIQ =
      (join "",
         map $alnum[$_],
            $$ / 62 % 62,
            $$ % 62,
            (int $now        ) % 62,
            (int $now *   100) % 62,
            (int $now * 10000) % 62,
      ) . nonce62 4
   ;

   # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
   $RUNIQ = nonce62 10;
   $RUNIQ =~ s/(.)$/\U$1/;

   $NODE = "";
}

sub NODE() {
   $NODE
}

sub node_of($) {
   my ($node, undef) = split /#/, $_[0], 2;

   $node
}

BEGIN {
   *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
      ? sub () { 1 }
      : sub () { 0 };
}

our $DELAY_TIMER;
our @DELAY_QUEUE;

our $delay_run = sub {
   (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};

sub delay($) {
   push @DELAY_QUEUE, shift;
   $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}

=item $AnyEvent::MP::Kernel::SRCNODE

During execution of a message callback, this variable contains the node ID
of the origin node.

The main use of this variable is for debugging output - there are probably
very few other cases where you need to know the source node ID.

=cut

sub _inject {
   warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;

   &{ $PORT{+shift} or return };
}

# this function adds a node-ref, so you can send stuff to it
# it is basically the central routing component.
sub add_node {
   $NODE{$_[0]} || do {
      my ($node) = @_;

      length $node
         or Carp::croak "'undef' or the empty string are not valid node/port IDs";

      # registers itself in %NODE
      new AnyEvent::MP::Node::Remote $node
   }
}

sub snd(@) {
   my ($nodeid, $portid) = split /#/, shift, 2;

   warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;

   ($NODE{$nodeid} || add_node $nodeid)
      ->{send} (["$portid", @_]);
}

sub port_is_local($) {
   my ($nodeid, undef) = split /#/, $_[0], 2;

   $nodeid eq $NODE
}

=item snd_to_func $node, $func, @args

Expects a node ID and a name of a function. Asynchronously tries to call
this function with the given arguments on that node.

This function can be used to implement C<spawn>-like interfaces.

=cut

sub snd_to_func($$;@) {
   my $nodeid = shift;

   # on $NODE, we artificially delay... (for spawn)
   # this is very ugly - maybe we should simply delay ALL messages,
   # to avoid deep recursion issues. but that's so... slow...
   $AnyEvent::MP::Node::Self::DELAY = 1
      if $nodeid ne $NODE;

   ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
}

MP/Kernel.pm  view on Meta::CPAN

   for (map _resolve $_, @$binds) {
      for my $bind ($_->recv) {
         my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
            or Carp::croak "$bind: unparsable local bind address";

         my $listener = AnyEvent::MP::Transport::mp_server
            $host,
            $port,
            prepare => sub {
               my (undef, $host, $port) = @_;
               $bind = AnyEvent::Socket::format_hostport $host, $port;
               0
            },
         ;
         $BINDS{$bind} = $listener;
         push @$BINDS, $bind;
      }
   }

   AE::log 9 => "running post config hooks and init.";

   # might initialise Global, so need to do it before db_set
   post_configure { };

   db_set "'l" => $NODE => $BINDS;

   AE::log 8 => "node listens on [@$BINDS].";

   # connect to all seednodes
   set_seeds map $_->recv, map _resolve $_, @$seeds;
   master_search;

   # save gobs of memory
   undef &_resolve;
   *configure = sub (@){ };

   AE::log 9 => "starting services.";

   for (@{ $CONFIG->{services} }) {
      if (ref) {
         my ($func, @args) = @$_;
         (load_func $func)->(@args);
      } elsif (s/::$//) {
         eval "require $_";
         die $@ if $@;
      } else {
         (load_func $_)->();
      }
   }

   eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
   die "$@" if $@;
}

=back

=head1 LOGGING

AnyEvent::MP::Kernel logs high-level information about the current node,
when nodes go up and down, and most runtime errors. It also logs some
debugging and trace messages about network maintainance, such as seed
connections and global node management.

=head1 SEE ALSO

L<AnyEvent::MP>.

=head1 AUTHOR

 Marc Lehmann <schmorp@schmorp.de>
 http://home.schmorp.de/

=cut

1



( run in 1.291 second using v1.01-cache-2.11-cpan-39bf76dae61 )