AnyEvent-MP
view release on metacpan or search on metacpan
use AnyEvent;
use AnyEvent::Util;
use AnyEvent::MP;
use AnyEvent::MP::Config;
sub my_run_cmd {
my ($cmd) = @_;
my $cv = &run_cmd;
my $status = $cv->recv;
$status
and die "@$cmd: command failed with exit status $status.";
}
sub gen_cert {
my_run_cmd [qw(openssl req
-new -nodes -x509 -days 3650
-newkey rsa:2048 -keyout /dev/fd/3
-batch -subj /CN=AnyEvent::MP
)],
"<", "/dev/null",
">" , \my $cert,
"3>", \my $key,
"2>", "/dev/null";
"$cert$key"
}
sub init {
configure profile => "aemp", nodeid => "aemp/%n/%u";
}
our $cfg = AnyEvent::MP::Config::config;
our $profile = $cfg;
sub trace {
my ($seed) = @_;
my $cv = AE::cv;
my %seen;
my $exit;
my %to;
init;
my $reply = port {
my ($node, undef, @neigh) = @_;
delete $to{$node};
@neigh = grep $_ ne $NODE, @neigh;
print $node, " -> ", (join " ", @neigh), "\n";
for my $neigh (@neigh) {
unless ($seen{$neigh}++) {
$cv->begin;
$to{$neigh} = AE::timer 15, 0, sub {
print "$neigh (timeout)\n";
$exit = 1;
$cv->end;
};
AnyEvent::MP::Kernel::eval_on $neigh, "AnyEvent::MP::Kernel::up_nodes" => $SELF => $neigh;
}
}
$cv->end;
};
$cv->begin;
snd $reply, seed => undef, $seed;
$cv->recv;
exit $exit;
}
sub shell {
init;
my $node = shift @ARGV || $NODE;
$| = 1;
print <<EOF;
Entering interactive shell - no commandline editing of course (use rlfe etc.).
\= display a list of nodes
\=name switch to another node
package P switch to package P when evaluating
\$ECHO contains the name of a port that echos everything sent to it
EOF
my $json = JSON::XS->new->pretty->ascii;
my $pkg = "AnyEvent::MP::Kernel";
my $cv = AE::cv;
my $echo = port {
print "\nECHO<$AnyEvent::MP::Kernel::SRCNODE> ", $json->encode (\@_), "\n$node $pkg> ";
};
print "$node $pkg> ";
my $t = AE::io *STDIN, 0, sub {
chomp (my $line = <STDIN>);
if ($line =~ s/^=//) {
if (length $line) {
$node = $line;
} else {
db_keys "'l" => sub {
print "\nnodes: ", (join " ", sort @{ $_[0] }), "\n$node $pkg> ";
};
}
} elsif ($line =~ /^\s*package\s+(\S+)\s*;?\s*$/) {
$pkg = $1;
} elsif ($line =~ /\S/) {
my $time = AE::time;
AnyEvent::MP::Kernel::eval_on
$node,
"package $pkg; my \$ECHO = '$echo'; $line",
port {
kil $SELF;
my ($err, @res) = @_;
$time = AE::time - $time;
print "\n$node: $line\n";
printf "%0.3fs\n", $time;
if (length $err) {
print "$err @res";
} else {
print $json->encode(\@res);
}
print "\n$node $pkg> ";
}
;
}
print "$node $pkg> ";
};
$cv->recv;
}
sub node_eval {
my ($node, $expr) = @_;
init;
my $cv = AE::cv;
my $to = AE::timer 5, 0, sub { exit 1 };
AnyEvent::MP::Kernel::eval_on $node, $expr, port { &$cv };
mon $node, $cv;
my ($err, @res) = $cv->recv;
die "$err @res" if length $err;
print +(substr JSON::XS->new->encode (\@res), 1, -1), "\n";
}
sub docmd;
our %CMD = (
snd => sub {
my $port = shift @ARGV;
init;
snd $port, @ARGV; @ARGV = ();
my $cv = AE::cv;
my $to = AE::timer 5, 0, sub { exit 1 };
mon $port, $cv;
my $reply = port sub { &$cv };
snd node_of $port, snd => $reply, "message sent successfully";
print join " ", $cv->recv, "\n";
},
cal => sub {
my $port = shift @ARGV;
init;
my $cv = AE::cv;
cal $port, @ARGV, sub { &$cv }; @ARGV = ();
print +(substr JSON::XS->new->encode ([$cv->recv]), 1, -1), "\n";
},
mon => sub {
my $port = shift @ARGV;
init;
mon $port, my $cv = AE::cv;
print join " ", $cv->recv, "\n";
},
eval => sub {
my $node = node_of shift @ARGV;
my $expr = join " ", @ARGV; @ARGV = ();
node_eval $node, $expr;
},
shell => \&shell,
trace => sub {
@ARGV >= 1
or die "node id missing\n";
trace shift @ARGV;
},
restart => sub {
my $node = node_of shift @ARGV;
node_eval $node, 'my $w; $w = AE::idle sub { '
. 'undef $w; '
. 'use AnyEvent::Watchdog::Util ();'
. 'AnyEvent::Watchdog::Util::restart'
. '}; ()';
},
setnodeid => sub {
@ARGV >= 1
or die "shared secret missing\n";
$profile->{nodeid} = shift @ARGV;
++$cfg->{dirty};
},
delnodeid => sub {
delete $profile->{nodeid};
++$cfg->{dirty};
},
( run in 1.038 second using v1.01-cache-2.11-cpan-39bf76dae61 )