AnyEvent-MP

 view release on metacpan or  search on metacpan

MP/Kernel.pm  view on Meta::CPAN

{
   # ~54 bits, for local port names, lowercase $ID appended
   my $now = AE::now;
   $UNIQ =
      (join "",
         map $alnum[$_],
            $$ / 62 % 62,
            $$ % 62,
            (int $now        ) % 62,
            (int $now *   100) % 62,
            (int $now * 10000) % 62,
      ) . nonce62 4
   ;

   # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
   $RUNIQ = nonce62 10;
   $RUNIQ =~ s/(.)$/\U$1/;

   $NODE = "";
}

sub NODE() {
   $NODE
}

sub node_of($) {
   my ($node, undef) = split /#/, $_[0], 2;

   $node
}

BEGIN {
   *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
      ? sub () { 1 }
      : sub () { 0 };
}

our $DELAY_TIMER;
our @DELAY_QUEUE;

our $delay_run = sub {
   (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};

sub delay($) {
   push @DELAY_QUEUE, shift;
   $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}

=item $AnyEvent::MP::Kernel::SRCNODE

During execution of a message callback, this variable contains the node ID
of the origin node.

The main use of this variable is for debugging output - there are probably
very few other cases where you need to know the source node ID.

=cut

sub _inject {
   warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;

   &{ $PORT{+shift} or return };
}

# this function adds a node-ref, so you can send stuff to it
# it is basically the central routing component.
sub add_node {
   $NODE{$_[0]} || do {
      my ($node) = @_;

      length $node
         or Carp::croak "'undef' or the empty string are not valid node/port IDs";

      # registers itself in %NODE
      new AnyEvent::MP::Node::Remote $node
   }
}

sub snd(@) {
   my ($nodeid, $portid) = split /#/, shift, 2;

   warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;

   ($NODE{$nodeid} || add_node $nodeid)
      ->{send} (["$portid", @_]);
}

sub port_is_local($) {
   my ($nodeid, undef) = split /#/, $_[0], 2;

   $nodeid eq $NODE
}

=item snd_to_func $node, $func, @args

Expects a node ID and a name of a function. Asynchronously tries to call
this function with the given arguments on that node.

This function can be used to implement C<spawn>-like interfaces.

=cut

sub snd_to_func($$;@) {
   my $nodeid = shift;

   # on $NODE, we artificially delay... (for spawn)
   # this is very ugly - maybe we should simply delay ALL messages,
   # to avoid deep recursion issues. but that's so... slow...
   $AnyEvent::MP::Node::Self::DELAY = 1
      if $nodeid ne $NODE;

   ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
}

=item snd_on $node, @msg

Executes C<snd> with the given C<@msg> (which must include the destination
port) on the given node.

=cut

sub snd_on($@) {
   my $node = shift;
   snd $node, snd => @_;
}

=item eval_on $node, $string[, @reply]

Evaluates the given string as Perl expression on the given node. When
@reply is specified, then it is used to construct a reply message with
C<"$@"> and any results from the eval appended.

=cut

sub eval_on($$;@) {
   my $node = shift;
   snd $node, eval => @_;
}

sub kil(@) {
   my ($nodeid, $portid) = split /#/, shift, 2;

MP/Kernel.pm  view on Meta::CPAN

   global_req_add "g_find $_[0]", [g_find => $_[0]];
}

# reply for g_find started in Node.pm
$NODE_REQ{g_found} = sub {
   global_req_del "g_find $_[0]";

   my $node = $NODE{$_[0]} or return;

   $node->connect_to ($_[1]);
};

sub master_set {
   $MASTER = $_[0];
   AE::log 8 => "new master node: $MASTER.";

   $MASTER_MON = mon_nodes sub {
      if ($_[0] eq $MASTER && !$_[1]) {
         undef $MASTER;
         master_search ();
      }
   };

   snd $MASTER, g_slave => \%LOCAL_DB;

   # (re-)send queued requests
   snd $MASTER, @$_
      for values %GLOBAL_REQ;
}

sub master_search {
   AE::log 9 => "starting search for master node.";

   #TODO: should also look for other global nodes, but we don't know them
   for (keys %NODE_SEED) {
      if (node_is_up $_) {
         master_set $_;
         return;
      }
   }

   $MASTER_MON = mon_nodes sub {
      return unless $_[1]; # we are only interested in node-ups
      return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes

      master_set $_[0];
   };
}

# other node wants to make us the master, so start the global service
$NODE_REQ{g_slave} = sub {
   # load global module and redo the request
   require AnyEvent::MP::Global;
   &{ $NODE_REQ{g_slave} }
};

#############################################################################
# local database operations

# canonical probably not needed
our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;

# are the two scalars equal? very very ugly and slow, need better way
sub sv_eq($$) {
   ref $_[0] || ref $_[1]
      ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
      : $_[0] eq $_[1]
        && defined $_[0] == defined $_[1]
}

# local database management

sub db_del($@) {
   my $family = shift;

   my @del = grep exists $LOCAL_DB{$family}{$_}, @_;

   return unless @del;

   delete @{ $LOCAL_DB{$family} }{@del};
   snd $MASTER, g_upd => $family => undef, \@del
      if defined $MASTER;
}

sub db_set($$;$) {
   my ($family, $subkey) = @_;

#   if (ref $_[1]) {
#      # bulk
#      my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
#      $LOCAL_DB{$_[0]} = $_[1];
#      snd $MASTER, g_upd => $_[0] => $_[1], \@del
#         if defined $MASTER;
#   } else {
      # single-key
      unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
         $LOCAL_DB{$family}{$subkey} = $_[2];
         snd $MASTER, g_upd => $family => { $subkey => $_[2] }
            if defined $MASTER;
      }
#   }

   defined wantarray
      and Guard::guard { db_del $family => $subkey }
}

# database query

sub db_family {
   my ($family, $cb) = @_;
   global_call g_db_family => $family, $cb;
}

sub db_keys {
   my ($family, $cb) = @_;
   global_call g_db_keys   => $family, $cb;
}

sub db_values {
   my ($family, $cb) = @_;
   global_call g_db_values => $family, $cb;
}

# database monitoring

our %LOCAL_MON; # f, reply



( run in 0.624 second using v1.01-cache-2.11-cpan-39bf76dae61 )