App-Context
view release on metacpan or search on metacpan
lib/App/Context/ClusterNode.pm view on Meta::CPAN
use Date::Format;
use strict;
=head1 NAME
App::Context::ClusterNode - a runtime environment for a Cluster Node that serves a Cluster Controller
=head1 SYNOPSIS
# ... official way to get a Context object ...
use App;
$context = App->context();
$config = $context->config(); # get the configuration
$config->dispatch_events(); # dispatch events
# ... alternative way (used internally) ...
use App::Context::ClusterNode;
$context = App::Context::ClusterNode->new();
=cut
sub _init2a {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
$self->{controller_host} = $options->{controller_host};
$self->{controller_port} = $options->{controller_port};
die "Node must have a controller host and port defined (\$context->{options}{controller_host} and {controller_port})"
if (!$self->{controller_host} || !$self->{controller_port});
&App::sub_exit() if ($App::trace);
}
sub _init2b {
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
# nothing yet
&App::sub_exit() if ($App::trace);
}
sub dispatch_events_begin {
&App::sub_entry if ($App::trace);
my ($self) = @_;
$self->log({level=>2},"Starting Cluster Node on $self->{host}:$self->{port}\n");
my $node_heartbeat = $self->{options}{node_heartbeat} || 60;
$self->schedule_event(
method => "send_node_status",
time => time(), # immediately ...
interval => $node_heartbeat, # and every X seconds hereafter
);
&App::sub_exit() if ($App::trace);
}
sub dispatch_events_end {
&App::sub_entry if ($App::trace);
my ($self) = @_;
$self->log({level=>2},"Stopping Cluster Node\n");
my $controller_host = $self->{controller_host};
my $controller_port = $self->{controller_port};
my $node_host = $self->{host};
my $node_port = $self->{port};
# We need to close the listen socket before we do a synchronous connection to the controller
# in order to avoid deadlock.
$self->close_listen_socket();
# This message needs to be synchronous, otherwise the parent will kill the subprocess during shutdown.
$self->send_message($controller_host, $controller_port, "NODE-DOWN:$node_host:$node_port");
&App::sub_exit() if ($App::trace);
}
sub send_node_status {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $controller_host = $self->{controller_host};
my $controller_port = $self->{controller_port};
my $node_host = $self->{host};
my $node_port = $self->{port};
my $values = $self->system_values();
$self->send_async_message($controller_host, $controller_port, "NODE-UP:$node_host:$node_port:$values");
&App::sub_exit() if ($App::trace);
}
sub system_values {
&App::sub_entry if ($App::trace);
my ($self) = @_;
my $info = $self->get_sys_info();
my $memfree = $info->{memfree} + $info->{buffers} + $info->{cached};
my $values = "load=$info->{load},memfree=$memfree,memtotal=$info->{memtotal},swapfree=$info->{swapfree},swaptotal=$info->{swaptotal}";
&App::sub_exit($values) if ($App::trace);
return($values);
}
sub process_msg {
&App::sub_entry if ($App::trace);
my ($self, $msg) = @_;
my $verbose = $self->{verbose};
$self->log({level=>3},"process_msg: [$msg]\n");
my $return_value = $self->process_custom_msg($msg);
if (!$return_value) {
if ($msg =~ /^ASYNC-EVENT:([^:]+):([^:]+):([^:]+):(.*)$/) {
my %event = (
service_type => $1,
name => $2,
method => $3,
);
my $args = $4;
$event{args} = $self->{rpc_serializer}->deserialize($args) if ($args ne "");
my $event_token = $self->send_async_event({method => "process_async_event", args => [\%event],});
$event{event_token} = $event_token;
$return_value = "ASYNC-EVENT-TOKEN:$event_token\n";
}
elsif ($msg =~ /^CONTROLLER-UP:/) {
my $controller_host = $self->{controller_host};
my $controller_port = $self->{controller_port};
my $node_host = $self->{host};
my $node_port = $self->{port};
$self->send_async_event({
method => "send_async_message",
args => [ $controller_host, $controller_port, "NODE-UP:$node_host:$node_port" ],
});
$return_value = "OK";
}
elsif ($msg =~ /^ABORT-ASYNC-EVENT:(.*)/) {
my $event_token = $1;
$self->abort_async_event($event_token);
$return_value = "OK";
( run in 1.486 second using v1.01-cache-2.11-cpan-437f7b0c052 )