AnyEvent-MP
view release on metacpan or search on metacpan
TODO: maybe disbale warnings by default?
TODO: listener-scopes (10.0.0.1:4040@vpn) and connect-scopes ("vpn,public")
TODO: document env-variable usage
TODO: make node objects responsible for keepalive?
faq: can't se anything
faq: all is asynch
faq: how to interface to non-perl nodes?
TODO: check gproto, nproto, on connect
TODO: limiting reconnecting speed when unreachable? somehow use same interval timers as for seeding and keepalive?
TODO: multiple profiles? also some default profiles?
TODO: export keepalive?
TODO: $guard = con $cb->($up)
TODO: aemp readline support
TODO: gleeco re: AE::MP::DataConn -
TODO: version both in MP.pm and MP/Config.pm because of cpan indexer
2.02 Sun Jul 29 04:22:53 CEST 2018
- hardcode version in MP.pm to help the CPAN indexer.
# monitoring
mon $port, $cb->(@msg) # callback is invoked on death
mon $port, $localport # kill localport on abnormal death
mon $port, $localport, @msg # send message on death
# temporarily execute code in port context
peval $port, sub { die "kill the port!" };
# execute callbacks in $SELF port context
my $timer = AE::timer 1, 0, psub {
die "kill the port, delayed";
};
# distributed database - modification
db_set $family => $subkey [=> $value] # add a subkey
db_del $family => $subkey... # delete one or more subkeys
db_reg $family => $port [=> $value] # register a port
# distributed database - queries
db_family $family => $cb->(\%familyhash)
closure is executed, sets up the environment in the same way as in C<rcv>
callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
The effect is basically as if it returned C<< sub { peval $SELF, sub {
BLOCK }, @_ } >>.
This is useful when you register callbacks from C<rcv> callbacks:
rcv delayed_reply => sub {
my ($delay, @reply) = @_;
my $timer = AE::timer $delay, 0, psub {
snd @reply, $SELF;
};
};
=cut
sub psub(&) {
my $cb = shift;
my $port = $SELF
and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
}
=item $guard = mon_guard $port, $ref, $ref...
Monitors the given C<$port> and keeps the passed references. When the port
is killed, the references will be freed.
Optionally returns a guard that will stop the monitoring.
This function is useful when you create e.g. timers or other watchers and
want to free them when the port gets killed (note the use of C<psub>):
$port->rcv (start => sub {
my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
undef $timer if 0.9 < rand;
});
});
=cut
sub mon_guard {
my ($port, @refs) = @_;
#TODO: mon-less form?
This is simply a utility function that comes in handy at times - the
AnyEvent::MP author is not convinced of the wisdom of having it, though,
so it may go away in the future.
=cut
sub after($@) {
my ($timeout, @action) = @_;
my $t; $t = AE::timer $timeout, 0, sub {
undef $t;
ref $action[0]
? $action[0]()
: snd @action;
};
}
#=item $cb2 = timeout $seconds, $cb[, @args]
=item cal $port, @msg, $callback[, $timeout]
my $timeout = ref $_[-1] ? undef : pop;
my $cb = pop;
my $port = port {
undef $timeout;
kil $SELF;
&$cb;
};
if (defined $timeout) {
$timeout = AE::timer $timeout, 0, sub {
undef $timeout;
kil $port;
$cb->();
};
} else {
mon $_[0], sub {
kil $port;
$cb->();
};
}
MP/DataConn.pm view on Meta::CPAN
our $ID = "a";
our %STATE;
# another node tells us to await a connection
sub _expect {
my ($id, $port, $timeout, $initfunc, @initdata) = @_;
$STATE{$id} = {
id => $id,
to => (AE::timer $timeout, 0, sub {
$STATE{$id}{done}(undef);
}),
done => sub {
my ($hdl, $error) = @_;
%{delete $STATE{$id}} = ();
if (defined $hdl) {
(AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
} else {
MP/DataConn.pm view on Meta::CPAN
my $state = $STATE{$id}
or return;
my $addr = $AnyEvent::MP::Global::addr{$node};
@$addr
or return $state->{done}(undef, "$node: no listeners found");
# I love hardcoded constants !
$state->{next} = AE::timer 0, 2, sub {
my $endpoint = shift @$addr
or return delete $state->{next};
my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
or return;
my $transport; $transport = AnyEvent::MP::Transport::mp_connect
$host, $port,
protocol => "aemp-dataconn",
local_greeting => { dataconn_id => $id },
MP/DataConn.pm view on Meta::CPAN
my $port = $SELF
or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";
$node = node_of $node;
my $id = (++$ID) . "\@$NODE";
# damn, why do my simple state hashes resemble objects so quickly
my $state = $STATE{$id} = {
id => (++$ID) . "\@$NODE",
to => (AE::timer $timeout, 0, sub {
$STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
}),
done => sub {
my ($hdl, $error) = @_;
delete $AnyEvent::MP::Global::ON_SETUP{$id};
%{delete $STATE{$id}} = ();
if (defined $hdl) {
$cb->($hdl);
MP/Kernel.pm view on Meta::CPAN
our $DELAY_TIMER;
our @DELAY_QUEUE;
our $delay_run = sub {
(shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
};
sub delay($) {
push @DELAY_QUEUE, shift;
$DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
}
=item $AnyEvent::MP::Kernel::SRCNODE
During execution of a message callback, this variable contains the node ID
of the origin node.
The main use of this variable is for debugging output - there are probably
very few other cases where you need to know the source node ID.
MP/Kernel.pm view on Meta::CPAN
if (@seeds) {
# start connection attempt for every seed we are not connected to yet
seed_connect $_
for grep !exists $SEED_CONNECT{$_}, @seeds;
$SEED_RETRY = $SEED_RETRY * 2;
$SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
$SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
} else {
# all seeds connected or connecting, no need to restart timer
undef $SEED_WATCHER;
}
}
sub seed_again {
$SEED_RETRY = (1 + rand) * 0.6;
$SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
}
# sets new seed list, starts connecting
sub set_seeds(@) {
%SEED_NODE = ();
%NODE_SEED = ();
%SEED_CONNECT = ();
@SEED_NODE{@_} = ();
MP/Kernel.pm view on Meta::CPAN
. (join " ", keys %KEEPALIVE_DOWN)
. ".";
(add_node $_)->connect
for keys %KEEPALIVE_DOWN;
$KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
$KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
$KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
}
sub keepalive_again {
$KEEPALIVE_RETRY = (1 + rand) * 0.3;
keepalive_all;
}
sub keepalive_add {
return if $KEEPALIVE{$_[0]}++;
return if $self->{transport};
return if $self->{connect_w};
# we unweaken the node reference, in case it was weakened before
$AnyEvent::MP::Kernel::NODE{$self->{id}}
= $AnyEvent::MP::Kernel::NODE{$self->{id}};
Scalar::Util::weaken $self;
$self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
$self->transport_error (transport_error => $self->{id}, "connect timeout");
};
# maybe @$addresses?
my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
if ($addresses) {
$self->connect_to ($addresses);
} else {
# on global nodes, all bets are off now - we either know the node, or we don't
my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
$interval = ($monitor - $interval) / @$addresses
if ($monitor - $interval) / @$addresses < $interval;
$interval = 0.4 if $interval < 0.4;
my @endpoints = reverse @$addresses;
$self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
my $endpoint = pop @endpoints
or return;
AE::log 9 => "connecting to $self->{id} at $endpoint";
$self->{trial}{$endpoint} ||= do {
my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
or return AE::log critical => "$self->{id}: '$endpoint' is not a resolved node reference.";
AnyEvent::MP::Transport::mp_connect
sub transport_reset {
my ($self) = @_;
Scalar::Util::weaken $self;
$self->{send} = sub {
if (++$DELAY > 0) {
my $msg = $_[0];
push @DELAY, sub { AnyEvent::MP::Kernel::_inject (@$msg) };
$DELAY_W ||= AE::timer 0, 0, $send_delayed;
return;
}
local $AnyEvent::MP::Kernel::SRCNODE = $AnyEvent::MP::Kernel::NODE;
AnyEvent::MP::Kernel::_inject (@{ $_[0] });
};
}
sub transport_connect {
my ($self, $tp) = @_;
AE::log 9 => "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})";
}
sub kill {
my (undef, @args) = @_;
# we _always_ delay kil's, to avoid calling mon callbacks
# from anything but the event loop context.
$DELAY = 1;
push @DELAY, sub { AnyEvent::MP::Kernel::_kill (@args) };
$DELAY_W ||= AE::timer 0, 0, $send_delayed;
}
sub monitor {
# maybe always delay, too?
if ($DELAY_W) {
my @args = @_;
push @DELAY, sub { AnyEvent::MP::Kernel::_monitor (@args) };
return;
}
&AnyEvent::MP::Kernel::_monitor;
# monitoring
mon $port, $cb->(@msg) # callback is invoked on death
mon $port, $localport # kill localport on abnormal death
mon $port, $localport, @msg # send message on death
# temporarily execute code in port context
peval $port, sub { die "kill the port!" };
# execute callbacks in $SELF port context
my $timer = AE::timer 1, 0, psub {
die "kill the port, delayed";
};
# distributed database - modification
db_set $family => $subkey [=> $value] # add a subkey
db_del $family => $subkey... # delete one or more subkeys
db_reg $family => $port [=> $value] # register a port
# distributed database - queries
db_family $family => $cb->(\%familyhash)
"rcv" callbacks, i.e. runtime errors will cause the port to get
"kil"ed.
The effect is basically as if it returned "sub { peval $SELF, sub {
BLOCK }, @_ }".
This is useful when you register callbacks from "rcv" callbacks:
rcv delayed_reply => sub {
my ($delay, @reply) = @_;
my $timer = AE::timer $delay, 0, psub {
snd @reply, $SELF;
};
};
$guard = mon $port, $rcvport # kill $rcvport when $port dies
$guard = mon $port # kill $SELF when $port dies
$guard = mon $port, $cb->(@reason) # call $cb when $port dies
$guard = mon $port, $rcvport, @msg # send a message when $port dies
Monitor the given port and do something when the port is killed or
messages to it were lost, and optionally return a guard that can be
Example: send us a restart message when another $port is killed.
mon $port, $self => "restart";
$guard = mon_guard $port, $ref, $ref...
Monitors the given $port and keeps the passed references. When the
port is killed, the references will be freed.
Optionally returns a guard that will stop the monitoring.
This function is useful when you create e.g. timers or other
watchers and want to free them when the port gets killed (note the
use of "psub"):
$port->rcv (start => sub {
my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
undef $timer if 0.9 < rand;
});
});
kil $port[, @reason]
Kill the specified port with the given @reason.
If no @reason is specified, then the port is killed "normally" -
monitor callback will be invoked, but the kil will not cause linked
ports ("mon $mport, $lport" form) to get killed.
delete $to{$node};
@neigh = grep $_ ne $NODE, @neigh;
print $node, " -> ", (join " ", @neigh), "\n";
for my $neigh (@neigh) {
unless ($seen{$neigh}++) {
$cv->begin;
$to{$neigh} = AE::timer 15, 0, sub {
print "$neigh (timeout)\n";
$exit = 1;
$cv->end;
};
AnyEvent::MP::Kernel::eval_on $neigh, "AnyEvent::MP::Kernel::up_nodes" => $SELF => $neigh;
}
}
$cv->end;
};
};
$cv->recv;
}
sub node_eval {
my ($node, $expr) = @_;
init;
my $cv = AE::cv;
my $to = AE::timer 5, 0, sub { exit 1 };
AnyEvent::MP::Kernel::eval_on $node, $expr, port { &$cv };
mon $node, $cv;
my ($err, @res) = $cv->recv;
die "$err @res" if length $err;
print +(substr JSON::XS->new->encode (\@res), 1, -1), "\n";
}
sub docmd;
our %CMD = (
snd => sub {
my $port = shift @ARGV;
init;
snd $port, @ARGV; @ARGV = ();
my $cv = AE::cv;
my $to = AE::timer 5, 0, sub { exit 1 };
mon $port, $cv;
my $reply = port sub { &$cv };
snd node_of $port, snd => $reply, "message sent successfully";
print join " ", $cv->recv, "\n";
},
cal => sub {
my $port = shift @ARGV;
init;
( run in 1.163 second using v1.01-cache-2.11-cpan-49f99fa48dc )