App-Context

 view release on metacpan or  search on metacpan

lib/App/Context/ClusterController.pm  view on Meta::CPAN


=cut

sub _init2a {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    die "Controller must have a port defined (\$context->{options}{port})" if (!$self->{port});
    $self->{num_async_events} = 0;
    $self->{max_async_events_per_node} = $self->{options}{"app.context.max_async_events_per_node"} || 10;
    $self->{max_async_events} = 0;  # start with 0 because there are no nodes up
    &App::sub_exit() if ($App::trace);
}

sub _init2b {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    $self->startup_nodes($options) if ($options->{startup});
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events_begin {
    my ($self) = @_;
    $self->log({level=>2},"Starting Cluster Controller on $self->{host}:$self->{port}\n");
}

sub dispatch_events_end {
    my ($self) = @_;
    $self->log({level=>2},"Stopping Cluster Controller\n");
    # nothing special yet
}

sub send_async_event_now {
    &App::sub_entry if ($App::trace);
    my ($self, $event, $callback_event) = @_;

    my $destination = $event->{destination};
    if (! defined $destination) {
        $self->log("ERROR: send_async_event_now(): node not assigned\n");
    }
    elsif ($destination =~ /^([^:]+):([0-9]+)$/) {
        my $node_host = $1;
        my $node_port = $2;
        my $args = "";
        if ($event->{args}) {
            $args = $self->{rpc_serializer}->serialize($event->{args});
        }
        my $response = $self->send_message($node_host, $node_port, "ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args", 1, undef, 1);
        #print "$response = send_message($node_host, $node_port, ASYNC-EVENT:$event->{service_type}:$event->{name}:$event->{method}:$args\n";
        if ($response =~ /^ASYNC-EVENT-TOKEN:(.+)/) {
            my $runtime_event_token = $1;
            my $destination = $event->{destination} || "local";
            $self->{num_async_events}++;
            $self->{node}{$destination}{num_async_events}++;
            $self->{running_async_event}{$runtime_event_token} = [ $event, $callback_event ];
        }
        elsif ($response =~ /ERROR/) {
            $self->set_node_down("$node_host:$node_port");
        }
    }
    else {
        $self->SUPER::send_async_event_now($event, $callback_event);
    }
    &App::sub_exit() if ($App::trace);
}

# $runtime_event_tokens take the following forms:
#    $runtime_event_token = $pid; -- App::Context::Server::send_async_event_now() and ::finish_pid()
#    $runtime_event_token = "$host-$port-$serial"; -- i.e. a plain event token on the node
sub _abort_running_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $runtime_event_token, $event, $callback_event) = @_;
    if ($runtime_event_token =~ /^[0-9]+$/) {
        kill(9, $runtime_event_token);
    }
    elsif ($runtime_event_token =~ /^([^-]+)-([0-9]+)-/) {
        my $node_host = $1;
        my $node_port = $2;
        $self->send_async_message($node_host, $node_port, "ABORT-ASYNC-EVENT:$runtime_event_token");
    }
    else {
        $self->log("Unable to abort running async event [$runtime_event_token] (controller)\n");
    }
    &App::sub_exit() if ($App::trace);
}

sub assign_event_destination {
    &App::sub_entry if ($App::trace);
    my ($self, $event) = @_;
    my $assigned = undef;
    if ($self->{num_async_events} < $self->{max_async_events}) {
        # SPA 2006-07-01: I just commented this out. I shouldn't need it.
        # $event->{destination} = $self->{host};
        my $main_service = $self->{main_service};
        if ($main_service && $main_service->can("assign_event_destination")) {
            $assigned = $main_service->assign_event_destination($event, $self->{nodes}, $self->{node});
        }
        else {
            $assigned = $self->assign_event_destination_by_round_robin($event);
        }
    }
    &App::sub_exit($assigned) if ($App::trace);
    return($assigned);
}

sub assign_event_destination_by_round_robin {
    &App::sub_entry if ($App::trace);
    my ($self, $event) = @_;
    
    my $assigned = undef;
    my $nodes = $self->{nodes};
    if ($#$nodes > -1) {
        my $node_idx = $self->{node}{ALL}{last_node_idx};
        $node_idx = (defined $node_idx) ? $node_idx + 1 : 0;
        $node_idx = 0 if ($node_idx > $#$nodes);
        $event->{destination} = $nodes->[$node_idx];
        $self->{node}{ALL}{last_node_idx} = $node_idx;
        $assigned = 1;
    }

    &App::sub_exit($assigned) if ($App::trace);
    return($assigned);

lib/App/Context/ClusterController.pm  view on Meta::CPAN

            else {
                $self->log("WARNING: Unexpected Async Event Results: [$msg]\n");
            }
            $return_value = "OK";
        }
        else {
            $self->log("ERROR: unknown [$msg]\n");
            $return_value = "unknown [$msg]";
        }
    }
    &App::sub_exit($return_value) if ($App::trace);
    return($return_value);
}

# Can be overridden to provide customized processing.
sub process_custom_msg {
    &App::sub_entry if ($App::trace);
    my ($self, $msg) = @_;
    my $return_value = "";
    &App::sub_exit($return_value) if ($App::trace);
    return($return_value);
}

sub state {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
    my $state = "Cluster Controller: $self->{host}:$self->{port}  procs[$self->{num_procs}/$self->{max_procs}:max]  async_events[$self->{num_async_events}/$self->{max_async_events}:max/$self->{max_async_events_per_node}:per]\n[$datetime]\n";
    $state .= "\n";
    $state .= $self->_state();

    &App::sub_exit($state) if ($App::trace);
    return($state);
}

sub _state {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my $state = "";

    my (@nodes);
    @nodes = @{$self->{nodes}} if ($self->{nodes});
    $state .= "Nodes: up [@nodes] last dispatched [$self->{node}{ALL}{last_node_idx}]\n";
    my ($memfree, $memtotal, $swapfree, $swaptotal);
    foreach my $node (sort keys %{$self->{node}}) {
        next if ($node eq "ALL");
        $state .= sprintf("   %-16s %4s : %3d/%3d max : [Load:%4.1f][Mem:%5.1f%%/%7d][Swap:%5.1f%%/%7d] : [%19s]\n", $node,
            $self->{node}{$node}{up} ? "UP" : "down",
            $self->{node}{$node}{num_async_events} || 0,
            $self->{node}{$node}{max_async_events} || 0,
            $self->{node}{$node}{load} || 0,
            $self->{node}{$node}{memtotal} ? 100*($self->{node}{$node}{memtotal} - $self->{node}{$node}{memfree})/$self->{node}{$node}{memtotal} : 0,
            $self->{node}{$node}{memtotal} || 0,
            $self->{node}{$node}{swaptotal} ? 100*($self->{node}{$node}{swaptotal} - $self->{node}{$node}{swapfree})/$self->{node}{$node}{swaptotal} : 0,
            $self->{node}{$node}{swaptotal} || 0,
            $self->{node}{$node}{datetime});
    }

    $state .= $self->SUPER::_state();

    &App::sub_exit($state) if ($App::trace);
    return($state);
}

sub set_node_down {
    &App::sub_entry if ($App::trace);
    my ($self, $node) = @_;
    my $runtime_event_token_prefix = $node;
    $runtime_event_token_prefix =~ s/:/-/;
    $self->reset_running_async_events($runtime_event_token_prefix);
    $self->{node}{$node}{up} = 0;
    $self->set_nodes();
    &App::sub_exit() if ($App::trace);
}

sub set_node_up {
    &App::sub_entry if ($App::trace);
    my ($self, $node) = @_;
    my ($retval, $values);
    if ($node =~ /^([^:]+:\d+):(.*)/) {
        $node   = $1;
        $values = $2;
        if ($values) {
            foreach my $value (split(/,/, $values)) {
                if ($value  =~ /^([^=]+)=(.*)/) {
                    $self->{node}{$node}{$1} = $2;
                }
            }
        }
    }
    $self->{node}{$node}{datetime} = time2str("%Y-%m-%d %H:%M:%S", time());
    if ($self->{node}{$node}{up}) {
        $retval = "ok";
    }
    else {
        $self->{node}{$node}{up} = 1;
        $self->set_nodes();
        $retval = "new";
    }
    &App::sub_exit($retval) if ($App::trace);
    return($retval);
}

sub set_nodes {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my (@nodes);
    foreach my $node (sort keys %{$self->{node}}) {
        if ($self->{node}{$node}{up}) {
            push(@nodes, $node);
        }
    }
    $self->{nodes} = \@nodes;
    $self->{max_async_events} = $self->{max_async_events_per_node} * ($#nodes + 1);
    my $main_service = $self->{main_service};
    if ($main_service && $main_service->can("capacity_change")) {
        $main_service->capacity_change($self->{max_async_events}, \@nodes, $self->{node});
    }
    &App::sub_exit() if ($App::trace);
}

sub shutdown {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->shutdown_nodes();
    $self->write_node_file();
    $self->SUPER::shutdown();
    &App::sub_exit() if ($App::trace);
}

sub shutdown_nodes {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    foreach my $node (@{$self->{nodes}}) {
        $self->send_message($node, undef, "QUIT", 0, undef, 1);
    }
    &App::sub_exit() if ($App::trace);
}

sub startup_nodes {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;

    my $startup = $options->{startup};

    my ($node, $msg, $host, $port, $cmd);
    if ($startup eq "1") {
        $self->read_node_file();
    }
    else {
        foreach $node (split(/,/,$startup)) {
            $self->{node}{$node} = {};
        }
    }

    my $cmd_fmt = $self->{options}{"app.context.node_start_cmd"} || "ssh -f {host} mvnode --port={port}";
    foreach $node (keys %{$self->{node}}) {
        $msg = $self->send_message($node, undef, "CONTROLLER-UP:", 0, undef, 1);
        if ($msg =~ /ERROR:/) {
            if ($node =~ /^([^:]+):([0-9]+)$/) {
                $host = $1;
                $port = $2;
                $cmd = $cmd_fmt;
                $cmd =~ s/{host}/$host/g;
                $cmd =~ s/{port}/$port/g;
                $self->log("Starting Node [$node]: [$cmd]\n");
                system("$cmd < /dev/null &");
            }
        }
    }
    &App::sub_exit() if ($App::trace);
}

sub write_node_file {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    my $prefix = $self->{options}{prefix};
    my $node_file = "$prefix/log/$self->{options}{app}-$self->{host}:$self->{port}.nodes";
    if (open(FILE, "> $node_file")) {
        foreach my $node (@{$self->{nodes}}) {
            print App::Context::ClusterController::FILE "$node\n";
        }
        close(App::Context::ClusterController::FILE);
    }
    else {
        $self->log("WARNING: Can't write node file [$node_file]: $!\n");
    }



( run in 1.702 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )