AnyEvent-MP

 view release on metacpan or  search on metacpan

bin/aemp  view on Meta::CPAN


use AnyEvent;
use AnyEvent::Util;

use AnyEvent::MP;
use AnyEvent::MP::Config;

sub my_run_cmd {
   my ($cmd) = @_;

   my $cv = &run_cmd;
   my $status = $cv->recv;

   $status
      and die "@$cmd: command failed with exit status $status.";
}

sub gen_cert {
   my_run_cmd [qw(openssl req 
                     -new -nodes -x509 -days 3650
                     -newkey rsa:2048 -keyout /dev/fd/3
                     -batch -subj /CN=AnyEvent::MP
              )],
      "<", "/dev/null",
      ">" , \my $cert,
      "3>", \my $key,
      "2>", "/dev/null";

   "$cert$key"
}

sub init {
   configure profile => "aemp", nodeid => "aemp/%n/%u";
}

our $cfg     = AnyEvent::MP::Config::config;
our $profile = $cfg;

sub trace {
   my ($seed) = @_;
   my $cv = AE::cv;
   my %seen;
   my $exit;

   my %to;

   init;

   my $reply = port {
      my ($node, undef, @neigh) = @_;

      delete $to{$node};

      @neigh = grep $_ ne $NODE, @neigh;

      print $node, " -> ", (join " ", @neigh), "\n";

      for my $neigh (@neigh) {
         unless ($seen{$neigh}++) {
            $cv->begin;
            $to{$neigh} = AE::timer 15, 0, sub {
               print "$neigh (timeout)\n";
               $exit = 1;
               $cv->end;
            };
            AnyEvent::MP::Kernel::eval_on $neigh, "AnyEvent::MP::Kernel::up_nodes" => $SELF => $neigh;
         }
      }

      $cv->end;
   };

   $cv->begin;
   snd $reply, seed => undef, $seed;

   $cv->recv;

   exit $exit;
}

sub shell {
   init;

   my $node = shift @ARGV || $NODE;
   $| = 1;

   print <<EOF;
Entering interactive shell - no commandline editing of course (use rlfe etc.).

\=           display a list of nodes
\=name       switch to another node
package P   switch to package P when evaluating
\$ECHO       contains the name of a port that echos everything sent to it

EOF
   my $json = JSON::XS->new->pretty->ascii;
   my $pkg = "AnyEvent::MP::Kernel";
   my $cv = AE::cv;
   my $echo = port {
      print "\nECHO<$AnyEvent::MP::Kernel::SRCNODE> ", $json->encode (\@_), "\n$node $pkg> ";
   };
   print "$node $pkg> ";
   my $t = AE::io *STDIN, 0, sub {
      chomp (my $line = <STDIN>);

      if ($line =~ s/^=//) {
         if (length $line) {
            $node = $line;
         } else {
            db_keys "'l" => sub {
               print "\nnodes: ", (join " ", sort @{ $_[0] }), "\n$node $pkg> ";
            };
         }
      } elsif ($line =~ /^\s*package\s+(\S+)\s*;?\s*$/) {
         $pkg = $1;
      } elsif ($line =~ /\S/) {
         my $time = AE::time;
         AnyEvent::MP::Kernel::eval_on
            $node,
            "package $pkg; my \$ECHO = '$echo'; $line",
            port {
               kil $SELF;

               my ($err, @res) = @_;

               $time = AE::time - $time;

               print "\n$node: $line\n";
               printf "%0.3fs\n", $time;
               if (length $err) {
                  print "$err @res";
               } else {
                  print $json->encode(\@res);
               }
               print "\n$node $pkg> ";
            }
         ;
      }

      print "$node $pkg> ";
   };
   $cv->recv;
}

sub node_eval {
   my ($node, $expr) = @_;

   init;

   my $cv = AE::cv;
   my $to = AE::timer 5, 0, sub { exit 1 };
   AnyEvent::MP::Kernel::eval_on $node, $expr, port { &$cv };
   mon $node, $cv;

   my ($err, @res) = $cv->recv;

   die "$err @res" if length $err;

   print +(substr JSON::XS->new->encode (\@res), 1, -1), "\n";
}

sub docmd;

our %CMD = (
   snd => sub {
      my $port = shift @ARGV;
      init;

      snd $port, @ARGV; @ARGV = ();

      my $cv = AE::cv;
      my $to = AE::timer 5, 0, sub { exit 1 };
      mon $port, $cv;
      my $reply = port sub { &$cv };
      snd node_of $port, snd => $reply, "message sent successfully";

      print join " ", $cv->recv, "\n";
   },

   cal => sub {
      my $port = shift @ARGV;
      init;

      my $cv = AE::cv;
      cal $port, @ARGV, sub { &$cv }; @ARGV = ();

      print +(substr JSON::XS->new->encode ([$cv->recv]), 1, -1), "\n";
   },

   mon => sub {
      my $port = shift @ARGV;
      init;

      mon $port, my $cv = AE::cv;
      print join " ", $cv->recv, "\n";
   },

   eval => sub {
      my $node = node_of shift @ARGV;
      my $expr = join " ", @ARGV; @ARGV = ();
      node_eval $node, $expr;
   },

   shell => \&shell,

   trace => sub {
      @ARGV >= 1
         or die "node id missing\n";

      trace shift @ARGV;
   },
   restart => sub {
      my $node = node_of shift @ARGV;
      node_eval $node, 'my $w; $w = AE::idle sub { '
                       . 'undef $w; '
                       . 'use AnyEvent::Watchdog::Util ();'
                       . 'AnyEvent::Watchdog::Util::restart'
                       . '}; ()';
   },

   setnodeid => sub {
      @ARGV >= 1
         or die "shared secret missing\n";

      $profile->{nodeid} = shift @ARGV;
      ++$cfg->{dirty};
   },
   delnodeid => sub {
      delete $profile->{nodeid};
      ++$cfg->{dirty};
   },



( run in 1.038 second using v1.01-cache-2.11-cpan-39bf76dae61 )