App-Context

 view release on metacpan or  search on metacpan

lib/App/Context/ClusterNode.pm  view on Meta::CPAN

use Date::Format;

use strict;

=head1 NAME

App::Context::ClusterNode - a runtime environment for a Cluster Node that serves a Cluster Controller

=head1 SYNOPSIS

   # ... official way to get a Context object ...
   use App;
   $context = App->context();
   $config = $context->config();   # get the configuration
   $config->dispatch_events();     # dispatch events

   # ... alternative way (used internally) ...
   use App::Context::ClusterNode;
   $context = App::Context::ClusterNode->new();

=cut

sub _init2a {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    $self->{controller_host} = $options->{controller_host};
    $self->{controller_port} = $options->{controller_port};
    die "Node must have a controller host and port defined (\$context->{options}{controller_host} and {controller_port})"
        if (!$self->{controller_host} || !$self->{controller_port});
    &App::sub_exit() if ($App::trace);
}

sub _init2b {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    # nothing yet
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events_begin {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    $self->log({level=>2},"Starting Cluster Node on $self->{host}:$self->{port}\n");
    my $node_heartbeat  = $self->{options}{node_heartbeat} || 60;
    $self->schedule_event(
        method => "send_node_status",
        time => time(),  # immediately ...
        interval => $node_heartbeat,  # and every X seconds hereafter
    );
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events_end {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    $self->log({level=>2},"Stopping Cluster Node\n");
    my $controller_host = $self->{controller_host};
    my $controller_port = $self->{controller_port};
    my $node_host       = $self->{host};
    my $node_port       = $self->{port};
    # We need to close the listen socket before we do a synchronous connection to the controller
    # in order to avoid deadlock.
    $self->close_listen_socket();
    # This message needs to be synchronous, otherwise the parent will kill the subprocess during shutdown.
    $self->send_message($controller_host, $controller_port, "NODE-DOWN:$node_host:$node_port");
    &App::sub_exit() if ($App::trace);
}

sub send_node_status {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $controller_host = $self->{controller_host};
    my $controller_port = $self->{controller_port};
    my $node_host       = $self->{host};
    my $node_port       = $self->{port};
    my $values          = $self->system_values();
    $self->send_async_message($controller_host, $controller_port, "NODE-UP:$node_host:$node_port:$values");
    &App::sub_exit() if ($App::trace);
}

sub system_values {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my $info = $self->get_sys_info();
    my $memfree = $info->{memfree} + $info->{buffers} + $info->{cached};
    my $values = "load=$info->{load},memfree=$memfree,memtotal=$info->{memtotal},swapfree=$info->{swapfree},swaptotal=$info->{swaptotal}";
    &App::sub_exit($values) if ($App::trace);
    return($values);
}

sub process_msg {
    &App::sub_entry if ($App::trace);
    my ($self, $msg) = @_;
    my $verbose = $self->{verbose};
    $self->log({level=>3},"process_msg: [$msg]\n");
    my $return_value = $self->process_custom_msg($msg);
    if (!$return_value) {
        if ($msg =~ /^ASYNC-EVENT:([^:]+):([^:]+):([^:]+):(.*)$/) {
            my %event = (
                service_type => $1,
                name         => $2,
                method       => $3,
            );
            my $args = $4;
            $event{args} = $self->{rpc_serializer}->deserialize($args) if ($args ne "");
            my $event_token = $self->send_async_event({method => "process_async_event", args => [\%event],});
            $event{event_token} = $event_token;
            $return_value = "ASYNC-EVENT-TOKEN:$event_token\n";
        }
        elsif ($msg =~ /^CONTROLLER-UP:/) {
            my $controller_host = $self->{controller_host};
            my $controller_port = $self->{controller_port};
            my $node_host       = $self->{host};
            my $node_port       = $self->{port};
            $self->send_async_event({
                method => "send_async_message",
                args => [ $controller_host, $controller_port, "NODE-UP:$node_host:$node_port" ],
            });
            $return_value = "OK";
        }
        elsif ($msg =~ /^ABORT-ASYNC-EVENT:(.*)/) {
            my $event_token = $1;
            $self->abort_async_event($event_token);
            $return_value = "OK";



( run in 1.486 second using v1.01-cache-2.11-cpan-437f7b0c052 )