AnyEvent-MP
view release on metacpan or search on metacpan
MP/Kernel.pm view on Meta::CPAN
and die "remote execution not allowed\n";
}
our %NODE_REQ;
%NODE_REQ = (
# "mproto" - monitoring protocol
# monitoring
mon0 => sub { # stop monitoring a port for another node
my $portid = shift;
# the if exists should not be needed, but there is apparently a bug
# elsewhere, and this works around that, silently suppressing that bug. sigh.
_unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}
if exists $NODE{$SRCNODE};
},
mon1 => sub { # start monitoring a port for another node
my $portid = shift;
Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
_monitor undef, $portid, $node->{rmon}{$portid} = sub {
delete $node->{rmon}{$portid};
$node->send (["", kil0 => $portid, @_])
if $node && $node->{transport};
};
},
# another node has killed a monitored port
kil0 => sub {
my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
or return;
$_->(@_) for @$cbs;
},
# another node wants to kill a local port
kil1 => \&_kill,
# "public" services - not actually public
# relay message to another node / generic echo
snd => sub {
&snd
},
# ask if a node supports the given request, only works for fixed tags
can => sub {
my $method = shift;
snd @_, exists $NODE_REQ{$method};
},
# random utilities
eval => sub {
&_secure_check;
my @res = do { package main; eval shift };
snd @_, "$@", @res if @_;
},
time => sub {
snd @_, AE::now;
},
devnull => sub {
#
},
"" => sub {
# empty messages are keepalives or similar devnull-applications
},
);
# the node port
new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
$PORT{""} = sub {
my $tag = shift;
eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
AE::log die => "error processing node message from $SRCNODE: $@" if $@;
};
our $MPROTO = 1;
# tell everybody who connects our nproto
push @AnyEvent::MP::Transport::HOOK_GREET, sub {
$_[0]{local_greeting}{mproto} = $MPROTO;
};
#############################################################################
# seed management, try to keep connections to all seeds at all times
our %SEED_NODE; # seed ID => node ID|undef
our %NODE_SEED; # map node ID to seed ID
our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
our $SEED_WATCHER;
our $SEED_RETRY;
our %GLOBAL_NODE; # global => undef
sub seed_connect {
my ($seed) = @_;
my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
or Carp::croak "$seed: unparsable seed address";
AE::log 9 => "trying connect to seed node $seed.";
$SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
$host, $port,
on_greeted => sub {
# called after receiving remote greeting, learn remote node name
# we rely on untrusted data here (the remote node name) this is
# hopefully ok, as this can at most be used for DOSing, which is easy
# when you can do MITM anyway.
# if we connect to ourselves, nuke this seed, but make sure we act like a seed
if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
require AnyEvent::MP::Global; # every seed becomes a global node currently
delete $SEED_NODE{$seed};
} else {
$SEED_NODE{$seed} = $_[0]{remote_node};
$NODE_SEED{$_[0]{remote_node}} = $seed;
# also start global service, in case it isn't running
# since we probably switch conenctions, maybe we don't need to do this here?
snd $_[0]{remote_node}, "g_slave";
}
},
sub {
MP/Kernel.pm view on Meta::CPAN
sub seed_all {
my @seeds = grep
!(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
keys %SEED_NODE;
if (@seeds) {
# start connection attempt for every seed we are not connected to yet
seed_connect $_
for grep !exists $SEED_CONNECT{$_}, @seeds;
$SEED_RETRY = $SEED_RETRY * 2;
$SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
$SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
} else {
# all seeds connected or connecting, no need to restart timer
undef $SEED_WATCHER;
}
}
sub seed_again {
$SEED_RETRY = (1 + rand) * 0.6;
$SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
}
# sets new seed list, starts connecting
sub set_seeds(@) {
%SEED_NODE = ();
%NODE_SEED = ();
%SEED_CONNECT = ();
@SEED_NODE{@_} = ();
seed_again;
}
# normal nodes only record global node connections
$NODE_REQ{g_global} = sub {
undef $GLOBAL_NODE{$SRCNODE};
};
mon_nodes sub {
delete $GLOBAL_NODE{$_[0]}
unless $_[1];
return unless exists $NODE_SEED{$_[0]};
if ($_[1]) {
# each time a connection to a seed node goes up, make
# sure it runs the global service.
snd $_[0], "g_slave";
} else {
# if we lost the connection to a seed node, make sure we are seeding
seed_again;
}
};
#############################################################################
# keepalive code - used to kepe conenctions to certain nodes alive
# only used by global code atm., but ought to be exposed somehow.
#TODO: should probbaly be done directly by node objects
our $KEEPALIVE_RETRY;
our $KEEPALIVE_WATCHER;
our %KEEPALIVE; # we want to keep these nodes alive
our %KEEPALIVE_DOWN; # nodes that are down currently
sub keepalive_all {
AE::log 9 => "keepalive: trying to establish connections with: "
. (join " ", keys %KEEPALIVE_DOWN)
. ".";
(add_node $_)->connect
for keys %KEEPALIVE_DOWN;
$KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
$KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
$KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
}
sub keepalive_again {
$KEEPALIVE_RETRY = (1 + rand) * 0.3;
keepalive_all;
}
sub keepalive_add {
return if $KEEPALIVE{$_[0]}++;
return if node_is_up $_[0];
undef $KEEPALIVE_DOWN{$_[0]};
keepalive_again;
}
sub keepalive_del {
return if --$KEEPALIVE{$_[0]};
delete $KEEPALIVE {$_[0]};
delete $KEEPALIVE_DOWN{$_[0]};
undef $KEEPALIVE_WATCHER
unless %KEEPALIVE_DOWN;
}
mon_nodes sub {
return unless exists $KEEPALIVE{$_[0]};
if ($_[1]) {
delete $KEEPALIVE_DOWN{$_[0]};
undef $KEEPALIVE_WATCHER
unless %KEEPALIVE_DOWN;
} else {
# lost the conenction, try to connect again
undef $KEEPALIVE_DOWN{$_[0]};
keepalive_again;
}
};
#############################################################################
# talk with/to global nodes
# protocol messages:
#
# sent by global nodes
# g_global - global nodes send this to all others
#
# database protocol
# g_slave database - make other global node master of the sender
# g_set database - global node's database to other global nodes
# g_upd family set del - update single family (any to global)
#
# slave <-> global protocol
# g_find node - query addresses for node (slave to global)
# g_found node binds - node addresses (global to slave)
# g_db_family family id - send g_reply with data (global to slave)
# g_db_keys family id - send g_reply with data (global to slave)
# g_db_values family id - send g_reply with data (global to slave)
# g_reply id result - result of any query (global to slave)
# g_mon1 family - start to monitor family, replies with g_chg1
# g_mon0 family - stop monitoring family
# g_chg1 family hash - initial value of family when starting to monitor
# g_chg2 family set del - like g_upd, but for monitoring only
#
# internal database families:
# "'l" -> node -> listeners
# "'g" -> node -> undef
# ...
#
# used on all nodes:
our $MASTER; # the global node we bind ourselves to
our $MASTER_MON;
our %LOCAL_DB; # this node database
our $GPROTO = 1;
# tell everybody who connects our gproto
push @AnyEvent::MP::Transport::HOOK_GREET, sub {
$_[0]{local_greeting}{gproto} = $GPROTO;
};
#############################################################################
# master selection
# master requests
our %GLOBAL_REQ; # $id => \@req
sub global_req_add {
my ($id, $req) = @_;
return if exists $GLOBAL_REQ{$id};
$GLOBAL_REQ{$id} = $req;
snd $MASTER, @$req
( run in 1.647 second using v1.01-cache-2.11-cpan-df04353d9ac )