AnyEvent-MP
view release on metacpan or search on metacpan
MP/Kernel.pm view on Meta::CPAN
$KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
$KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
$KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
}
sub keepalive_again {
$KEEPALIVE_RETRY = (1 + rand) * 0.3;
keepalive_all;
}
sub keepalive_add {
return if $KEEPALIVE{$_[0]}++;
return if node_is_up $_[0];
undef $KEEPALIVE_DOWN{$_[0]};
keepalive_again;
}
sub keepalive_del {
return if --$KEEPALIVE{$_[0]};
delete $KEEPALIVE {$_[0]};
delete $KEEPALIVE_DOWN{$_[0]};
undef $KEEPALIVE_WATCHER
unless %KEEPALIVE_DOWN;
}
mon_nodes sub {
return unless exists $KEEPALIVE{$_[0]};
if ($_[1]) {
delete $KEEPALIVE_DOWN{$_[0]};
undef $KEEPALIVE_WATCHER
unless %KEEPALIVE_DOWN;
} else {
# lost the conenction, try to connect again
undef $KEEPALIVE_DOWN{$_[0]};
keepalive_again;
}
};
#############################################################################
# talk with/to global nodes
# protocol messages:
#
# sent by global nodes
# g_global - global nodes send this to all others
#
# database protocol
# g_slave database - make other global node master of the sender
# g_set database - global node's database to other global nodes
# g_upd family set del - update single family (any to global)
#
# slave <-> global protocol
# g_find node - query addresses for node (slave to global)
# g_found node binds - node addresses (global to slave)
# g_db_family family id - send g_reply with data (global to slave)
# g_db_keys family id - send g_reply with data (global to slave)
# g_db_values family id - send g_reply with data (global to slave)
# g_reply id result - result of any query (global to slave)
# g_mon1 family - start to monitor family, replies with g_chg1
# g_mon0 family - stop monitoring family
# g_chg1 family hash - initial value of family when starting to monitor
# g_chg2 family set del - like g_upd, but for monitoring only
#
# internal database families:
# "'l" -> node -> listeners
# "'g" -> node -> undef
# ...
#
# used on all nodes:
our $MASTER; # the global node we bind ourselves to
our $MASTER_MON;
our %LOCAL_DB; # this node database
our $GPROTO = 1;
# tell everybody who connects our gproto
push @AnyEvent::MP::Transport::HOOK_GREET, sub {
$_[0]{local_greeting}{gproto} = $GPROTO;
};
#############################################################################
# master selection
# master requests
our %GLOBAL_REQ; # $id => \@req
sub global_req_add {
my ($id, $req) = @_;
return if exists $GLOBAL_REQ{$id};
$GLOBAL_REQ{$id} = $req;
snd $MASTER, @$req
if $MASTER;
}
sub global_req_del {
delete $GLOBAL_REQ{$_[0]};
}
#################################
# master rpc
our %GLOBAL_RES;
our $GLOBAL_RES_ID = "a";
sub global_call {
my $id = ++$GLOBAL_RES_ID;
$GLOBAL_RES{$id} = pop;
global_req_add $id, [@_, $id];
}
$NODE_REQ{g_reply} = sub {
my $id = shift;
global_req_del $id;
my $cb = delete $GLOBAL_RES{$id}
or return;
&$cb
};
#################################
sub g_find {
global_req_add "g_find $_[0]", [g_find => $_[0]];
}
# reply for g_find started in Node.pm
$NODE_REQ{g_found} = sub {
global_req_del "g_find $_[0]";
MP/Kernel.pm view on Meta::CPAN
AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
for (@_) {
my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
push @res, [
$pri += 1e-5,
AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
];
}
$cv->end;
};
}
}
$cv->end;
$cv
}
our @POST_CONFIGURE;
# not yet documented
sub post_configure(&) {
die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
push @POST_CONFIGURE, @_;
(shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
}
sub configure(@) {
unshift @_, "profile" if @_ & 1;
my (%kv) = @_;
my $profile = delete $kv{profile};
$profile = nodename
unless defined $profile;
$CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
$SECURE = $CONFIG->{secure};
my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
$node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
$NODE = $node;
$NODE =~ s/%n/nodename/ge;
if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
# nodes with randomised node names do not need randomised port names
$UNIQ = "";
}
$node_obj->{id} = $NODE;
$NODE{$NODE} = $node_obj;
my $seeds = $CONFIG->{seeds};
my $binds = $CONFIG->{binds};
$binds ||= ["*"];
AE::log 8 => "node $NODE starting up.";
$BINDS = [];
%BINDS = ();
for (map _resolve $_, @$binds) {
for my $bind ($_->recv) {
my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
or Carp::croak "$bind: unparsable local bind address";
my $listener = AnyEvent::MP::Transport::mp_server
$host,
$port,
prepare => sub {
my (undef, $host, $port) = @_;
$bind = AnyEvent::Socket::format_hostport $host, $port;
0
},
;
$BINDS{$bind} = $listener;
push @$BINDS, $bind;
}
}
AE::log 9 => "running post config hooks and init.";
# might initialise Global, so need to do it before db_set
post_configure { };
db_set "'l" => $NODE => $BINDS;
AE::log 8 => "node listens on [@$BINDS].";
# connect to all seednodes
set_seeds map $_->recv, map _resolve $_, @$seeds;
master_search;
# save gobs of memory
undef &_resolve;
*configure = sub (@){ };
AE::log 9 => "starting services.";
for (@{ $CONFIG->{services} }) {
if (ref) {
my ($func, @args) = @$_;
(load_func $func)->(@args);
} elsif (s/::$//) {
eval "require $_";
die $@ if $@;
} else {
(load_func $_)->();
}
}
eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
die "$@" if $@;
}
=back
=head1 LOGGING
AnyEvent::MP::Kernel logs high-level information about the current node,
when nodes go up and down, and most runtime errors. It also logs some
debugging and trace messages about network maintainance, such as seed
connections and global node management.
=head1 SEE ALSO
L<AnyEvent::MP>.
=head1 AUTHOR
Marc Lehmann <schmorp@schmorp.de>
http://home.schmorp.de/
=cut
1
( run in 0.742 second using v1.01-cache-2.11-cpan-2398b32b56e )