AnyEvent-MP
view release on metacpan or search on metacpan
your nodes have a lot of transports, you might have to set this to a low
value so that they will actually all be tried within the monitor timeout
interval.
C<2> is usually a good value, unless you live in new zealand.
=item [set|del]framing_format [array]
Configures the list of framing formats offered to the other side. This is
simply a list of formatted read/write types used with L<AnyEvent::Handle>,
in order of decreasing preference.
Nodes support C<cbor>, C<json> and C<storable> framing formats for data
packets out of the box, and usually choose C<cbor> because it is first in
the list.
Example: prefer the C<My::Personal::Format> framing format over JSON over
Storable.
aemp setframing_format '["My::Personal::Format", "json", "storable"]'
=item [set|del]auth_offer [array]
Configures the list of authentication types that the node offers to the
other side as acceptable, in order of decreasing preference. Only auth
methods that the node can actually support will be offered.
The default is '["tls_sha3_512", "hmac_sha3_512"]' and is usually good
enough.
=item [set|del]auth_accept [array]
Configures the list of authentication types that remote nodes can use to
authenticate, in order of decreasing preference.
The default is '["tls_sha3_512", "hmac_sha3_512", "tls_anon",
"cleartext"]' and is usually good enough.
=item [set|del]autocork <boolean>
Sets the default C<autocork> option value for the L<AnyEvent::Handle>
object used by transports. By default, autocorking is off, potentially
reducing latency.
=item [set|del]nodelay <boolean>
Sets the default C<nodelay> option value for the L<AnyEvent::Handle>
object used by transports. By default, nodelay is on, potentially reducing
latency.
=back
=cut
use common::sense;
# should come before anything else, so all modules
# will be loaded on each restart
BEGIN {
if (@ARGV == 1 && $ARGV[0] =~ /^\[/) {
require JSON::XS;
@ARGV = @{ JSON::XS->new->utf8->decode (shift) };
} else {
for (@ARGV) {
if (/^[\[\{\"]/) {
require JSON::XS;
$_ = JSON::XS->new->utf8->allow_nonref->decode ($_);
}
}
}
if ($ARGV[0] eq "run") {
shift;
# d'oh
require AnyEvent::Watchdog;
# only now can we load additional modules
require AnyEvent;
require AnyEvent::Watchdog::Util;
AnyEvent::Watchdog::Util::autorestart (1);
AnyEvent::Watchdog::Util::heartbeat (300);
require AnyEvent::MP::Kernel;
AnyEvent::MP::Kernel::configure (@ARGV);
AnyEvent::detect () eq "AnyEvent::Impl::EV"
? EV::loop ()
: AE::cv ()->recv;
}
}
use Carp ();
use JSON::XS;
use AnyEvent;
use AnyEvent::Util;
use AnyEvent::MP;
use AnyEvent::MP::Config;
sub my_run_cmd {
my ($cmd) = @_;
my $cv = &run_cmd;
my $status = $cv->recv;
$status
and die "@$cmd: command failed with exit status $status.";
}
sub gen_cert {
my_run_cmd [qw(openssl req
-new -nodes -x509 -days 3650
-newkey rsa:2048 -keyout /dev/fd/3
-batch -subj /CN=AnyEvent::MP
)],
"<", "/dev/null",
">" , \my $cert,
"3>", \my $key,
"2>", "/dev/null";
"$cert$key"
}
sub init {
configure profile => "aemp", nodeid => "aemp/%n/%u";
}
our $cfg = AnyEvent::MP::Config::config;
our $profile = $cfg;
sub trace {
my ($seed) = @_;
my $cv = AE::cv;
my %seen;
my $exit;
my %to;
init;
my $reply = port {
my ($node, undef, @neigh) = @_;
delete $to{$node};
@neigh = grep $_ ne $NODE, @neigh;
print $node, " -> ", (join " ", @neigh), "\n";
for my $neigh (@neigh) {
unless ($seen{$neigh}++) {
$cv->begin;
$to{$neigh} = AE::timer 15, 0, sub {
print "$neigh (timeout)\n";
$exit = 1;
$cv->end;
};
AnyEvent::MP::Kernel::eval_on $neigh, "AnyEvent::MP::Kernel::up_nodes" => $SELF => $neigh;
}
}
$cv->end;
};
$cv->begin;
snd $reply, seed => undef, $seed;
$cv->recv;
exit $exit;
}
sub shell {
init;
my $node = shift @ARGV || $NODE;
$| = 1;
print <<EOF;
Entering interactive shell - no commandline editing of course (use rlfe etc.).
\= display a list of nodes
\=name switch to another node
package P switch to package P when evaluating
\$ECHO contains the name of a port that echos everything sent to it
EOF
my $json = JSON::XS->new->pretty->ascii;
my $pkg = "AnyEvent::MP::Kernel";
my $cv = AE::cv;
my $echo = port {
print "\nECHO<$AnyEvent::MP::Kernel::SRCNODE> ", $json->encode (\@_), "\n$node $pkg> ";
};
print "$node $pkg> ";
my $t = AE::io *STDIN, 0, sub {
chomp (my $line = <STDIN>);
if ($line =~ s/^=//) {
if (length $line) {
$node = $line;
} else {
db_keys "'l" => sub {
print "\nnodes: ", (join " ", sort @{ $_[0] }), "\n$node $pkg> ";
};
}
} elsif ($line =~ /^\s*package\s+(\S+)\s*;?\s*$/) {
$pkg = $1;
} elsif ($line =~ /\S/) {
my $time = AE::time;
AnyEvent::MP::Kernel::eval_on
$node,
"package $pkg; my \$ECHO = '$echo'; $line",
port {
kil $SELF;
my ($err, @res) = @_;
$time = AE::time - $time;
print "\n$node: $line\n";
printf "%0.3fs\n", $time;
if (length $err) {
print "$err @res";
} else {
print $json->encode(\@res);
}
print "\n$node $pkg> ";
}
;
}
print "$node $pkg> ";
};
$cv->recv;
}
sub node_eval {
my ($node, $expr) = @_;
init;
my $cv = AE::cv;
my $to = AE::timer 5, 0, sub { exit 1 };
AnyEvent::MP::Kernel::eval_on $node, $expr, port { &$cv };
mon $node, $cv;
my ($err, @res) = $cv->recv;
die "$err @res" if length $err;
print +(substr JSON::XS->new->encode (\@res), 1, -1), "\n";
}
sub docmd;
our %CMD = (
snd => sub {
my $port = shift @ARGV;
init;
snd $port, @ARGV; @ARGV = ();
my $cv = AE::cv;
my $to = AE::timer 5, 0, sub { exit 1 };
mon $port, $cv;
my $reply = port sub { &$cv };
snd node_of $port, snd => $reply, "message sent successfully";
print join " ", $cv->recv, "\n";
},
cal => sub {
my $port = shift @ARGV;
init;
my $cv = AE::cv;
cal $port, @ARGV, sub { &$cv }; @ARGV = ();
print +(substr JSON::XS->new->encode ([$cv->recv]), 1, -1), "\n";
},
mon => sub {
my $port = shift @ARGV;
init;
mon $port, my $cv = AE::cv;
print join " ", $cv->recv, "\n";
},
eval => sub {
my $node = node_of shift @ARGV;
my $expr = join " ", @ARGV; @ARGV = ();
node_eval $node, $expr;
},
shell => \&shell,
trace => sub {
@ARGV >= 1
or die "node id missing\n";
trace shift @ARGV;
},
restart => sub {
my $node = node_of shift @ARGV;
node_eval $node, 'my $w; $w = AE::idle sub { '
. 'undef $w; '
. 'use AnyEvent::Watchdog::Util ();'
. 'AnyEvent::Watchdog::Util::restart'
. '}; ()';
},
setnodeid => sub {
@ARGV >= 1
or die "shared secret missing\n";
$profile->{nodeid} = shift @ARGV;
++$cfg->{dirty};
},
delnodeid => sub {
delete $profile->{nodeid};
++$cfg->{dirty};
},
setsecret => sub {
@ARGV >= 1
or die "shared secret missing\n";
$profile->{secret} = shift @ARGV;
++$cfg->{dirty};
},
gensecret => sub {
$profile->{secret} = AnyEvent::MP::Kernel::nonce62 180; # ~1071 bits
++$cfg->{dirty};
},
delsecret => sub {
delete $profile->{secret};
++$cfg->{dirty};
},
or die "service specification missing\n";
my $service = shift @ARGV;
push @{ $profile->{services} }, $service;
++$cfg->{dirty};
},
delservice => sub {
@ARGV >= 1
or die "service specification missing\n";
my $service = shift @ARGV;
for (0 .. $#{ $profile->{services} }) {
next unless $profile->{services}[$_] eq $service;
splice @{ $profile->{services} }, $_, 1;
last;
}
++$cfg->{dirty};
},
seteval => sub {
@ARGV >= 1
or die "eval string missing\n";
$profile->{eval} = shift @ARGV;
++$cfg->{dirty};
},
deleval => sub {
delete $profile->{eval};
++$cfg->{dirty};
},
profile => sub {
@ARGV >= 1
or die "profile name is missing\n";
my $name = shift @ARGV;
$profile = $cfg->{profile}{$name} ||= {};
++$cfg->{dirty};
},
delprofile => sub {
@ARGV >= 1
or die "profile name is missing\n";
my $name = shift @ARGV;
delete $cfg->{profile}{$name};
++$cfg->{dirty};
},
setparent => sub {
@ARGV >= 1
or die "profile name is missing\n";
$profile->{parent} = shift @ARGV;
++$cfg->{dirty};
},
delparent => sub {
delete $profile->{parent};
++$cfg->{dirty};
},
showprofile => sub {
@ARGV >= 1
or die "profile name is missing\n";
my $name = shift @ARGV;
print JSON::XS->new->pretty->encode ($cfg->{profile}{$name} || {});
},
showconfig => sub {
my $name = @ARGV ? shift @ARGV : AnyEvent::MP::Kernel::nodename;
my $profile = AnyEvent::MP::Config::find_profile $name, @ARGV;
@ARGV = ();
# make it look nicer:
delete $profile->{profile};
delete $profile->{parent};
print JSON::XS->new->pretty->encode ($profile);
},
);
for my $attr (qw(
monitor_timeout connect_interval framing_format auth_offer
auth_accept autocork nodelay secure
)) {
$CMD{"set$attr"} = sub {
@ARGV >= 1
or die "$attr value is missing\n";
$profile->{$attr} = shift @ARGV;
++$cfg->{dirty};
};
$CMD{"del$attr"} = sub {
delete $profile->{$attr};
++$cfg->{dirty};
};
}
for (keys %CMD) {
$CMD{$1} ||= $CMD{$_} if /^set(.*)$/;
}
sub docmd {
my $cmd = shift @ARGV;
$CMD{$cmd}
or die "$cmd: no such aemp command (try perldoc aemp, or man aemp)";
$CMD{$cmd}();
}
@ARGV
or die "Usage: aemp subcommand ... (try perldoc aemp, or man aemp)\n";
docmd while @ARGV;
( run in 0.554 second using v1.01-cache-2.11-cpan-39bf76dae61 )