App-Context

 view release on metacpan or  search on metacpan

lib/App/Context/POE/ClusterController.pm  view on Meta::CPAN


    push(@{$self->{poe_states}},
        "poe_receive_node_status",
        "poe_run_event");
    push(@{$self->{poe_ikc_published_states}},
        "poe_receive_node_status");

    $self->_init_poe($options);

    &App::sub_exit() if ($App::trace);
}

sub _init2b {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;
    $self->startup_nodes($options) if ($options->{startup});
    &App::sub_exit() if ($App::trace);
}

sub dispatch_events_begin {
    my ($self) = @_;
    $self->log({level=>3},"Starting Cluster Controller on $self->{host}:$self->{port}\n") if $self->{options}{poe_trace};
}

sub dispatch_events_end {
    my ($self) = @_;
    $self->log({level=>3},"Stopping Cluster Controller\n") if $self->{options}{poe_trace};
    # nothing special yet
}

sub send_async_event_now {
    &App::sub_entry if ($App::trace);
    my ($self, $event, $callback_event) = @_;

    my $destination = $event->{destination};
    if (! defined $destination) {
        $self->log({level=>2},"ERROR $event->{name}.$event->{method} : destination not assigned\n");
    }
    elsif ($event->{destination} eq "in_process") {
        my $event_token = $self->send_async_event_in_process($event, $callback_event);
    }
    elsif ($destination =~ /^([^:]+):([0-9]+)$/) {
        my $controller = "$self->{host}:$self->{port}";
        my $node_host = $1;
        my $node_port = $2;
        my $args = $event->{args};

        my $remote_server_name = "poe_${node_host}_${node_port}";
        my $remote_session_alias = $self->{poe_session_name};  # remote is same as local
        my $remote_session_state = "poe_enqueue_async_event";
        my $local_callback_state = "poe_enqueue_async_event_finished";

        $self->{num_async_events}++;
        $self->{node}{$destination}{num_async_events}++;

        my $kernel = $self->{poe_kernel};
        $kernel->post("IKC", "call", "poe://$remote_server_name/$remote_session_alias/$remote_session_state",
            [ $controller, $event, $callback_event ], "poe:$local_callback_state" );
    }
    else {
        $self->SUPER::send_async_event_now($event, $callback_event);
    }
    &App::sub_exit() if ($App::trace);
}

sub ikc_register {
    &App::sub_entry if ($App::trace);
    my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
    $self->log({level=>3},"ikc_register: ($session_name)\n") if $self->{options}{poe_ikc_debug};
    if ($session_name =~ /^poe_([^_]+)_(\d+)$/) {
        my $node = "$1:$2";
        $self->set_node_up($node);
    }
    my ($retval);
    &App::sub_exit($retval) if ($App::trace);
    return($retval);
}

sub ikc_unregister {
    &App::sub_entry if ($App::trace);
    my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
    $self->log({level=>3},"ikc_unregister: ($session_name)\n") if $self->{options}{poe_ikc_debug};
    if ($session_name =~ /^poe_([^_]+)_(\d+)$/) {
        my $node = "$1:$2";
        $self->set_node_down($node);
    }
    &App::sub_exit() if ($App::trace);
}

sub ikc_shutdown {
    &App::sub_entry if ($App::trace);
    my ($self, $kernel, $arg0, $arg1, $arg2, $arg3) = @_[OBJECT, KERNEL, ARG0, ARG1, ARG2, ARG3];
    $self->log({level=>3},"ikc_shutdown: args=($arg0, $arg1, $arg2, $arg3)\n") if $self->{options}{poe_ikc_debug};
    &App::sub_exit() if ($App::trace);
    return;
}

# $runtime_event_tokens take the following forms:
#    $runtime_event_token = $pid; -- App::Context::Server::send_async_event_now() and ::finish_pid()
#    $runtime_event_token = "$host-$port-$serial"; -- i.e. a plain event token on the node
sub _abort_running_async_event {
    &App::sub_entry if ($App::trace);
    my ($self, $runtime_event_token, $event, $callback_event) = @_;
    if ($runtime_event_token && $event && $callback_event) {
        if ($runtime_event_token =~ /^[0-9]+$/) {
            kill(9, $runtime_event_token);
        }
        elsif ($runtime_event_token =~ /^([^-]+)-([0-9]+)-/) {
            my $node_host = $1;
            my $node_port = $2;

            my $remote_server_name = "poe_${node_host}_${node_port}";
            my $remote_session_alias = $self->{poe_session_name};  # remote is same as local
            my $remote_session_state = "poe_cancel_async_event";

            my $kernel = $self->{poe_kernel};
            $kernel->post("IKC", "post", "poe://$remote_server_name/$remote_session_alias/$remote_session_state",
                [ $runtime_event_token ]);
        }
        else {
            $self->log({level=>2},"ERROR $event->{name}.$event->{method} : unparseable runtime event token [$runtime_event_token]\n");

lib/App/Context/POE/ClusterController.pm  view on Meta::CPAN

    &App::sub_entry if ($App::trace);
    my ( $self, $kernel, $heap, $event ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];

    my ($event_str);
    my $args = $event->{args} || [];
    my $args_str = join(",", @$args);
    if ($event->{name}) {
        my $service_type = $event->{service_type} || "SessionObject";
        $event_str = "$service_type($event->{name}).$event->{method}";
    }
    else {
        $event_str = "$event->{method}";
    }
    $self->profile_start("poe_run_event: $event_str") if $self->{poe_profile};
    $self->log({level=>3},"poe_run_event: BEGIN $event_str\n") if $self->{poe_trace};
    $self->send_event($event);
    $self->log({level=>3},"poe_run_event: END   $event_str\n") if $self->{poe_trace};
    $self->profile_stop("poe_run_event: $event_str") if $self->{poe_profile};
    &App::sub_exit() if ($App::trace);
}

sub state {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my $datetime = time2str("%Y-%m-%d %H:%M:%S", time());
    my $state = "Cluster Controller: $self->{host}:$self->{port}  procs[$self->{num_procs}/$self->{max_procs}:max]  async_events[$self->{num_async_events}/$self->{max_async_events}:max/$self->{max_async_events_per_node}:per]\n[$datetime]\n";
    $state .= "\n";
    $state .= $self->_state();

    &App::sub_exit($state) if ($App::trace);
    return($state);
}

sub _state {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;

    my $state = "";

    my (@nodes);
    @nodes = @{$self->{nodes}} if ($self->{nodes});
    $state .= "Nodes: up [@nodes] last dispatched [$self->{node}{ALL}{last_node_idx}]\n";
    my ($memfree, $memtotal, $swapfree, $swaptotal);
    foreach my $node (sort keys %{$self->{node}}) {
        next if ($node eq "ALL");
        $state .= sprintf("   %-16s %4s : %3d/%3d max : [Load:%4.1f][System Load:%4.1f][Mem:%5.1f%%/%7d][Swap:%5.1f%%/%7d] : [Up:%19s][Last:%19s]\n", $node,
            $self->{node}{$node}{up} ? "UP" : "down",
            $self->{node}{$node}{num_async_events} || 0,
            $self->{node}{$node}{max_async_events} || 0,
            $self->{node}{$node}{load} || 0,
            $self->{node}{$node}{system_load} || 0,
            $self->{node}{$node}{memtotal} ? 100*($self->{node}{$node}{memtotal} - $self->{node}{$node}{memfree})/$self->{node}{$node}{memtotal} : 0,
            $self->{node}{$node}{memtotal} || 0,
            $self->{node}{$node}{swaptotal} ? 100*($self->{node}{$node}{swaptotal} - $self->{node}{$node}{swapfree})/$self->{node}{$node}{swaptotal} : 0,
            $self->{node}{$node}{swaptotal} || 0,
            $self->{node}{$node}{up_datetime},
            $self->{node}{$node}{datetime});
    }

    $state .= $self->SUPER::_state();

    &App::sub_exit($state) if ($App::trace);
    return($state);
}

sub set_node_up {
    &App::sub_entry if ($App::trace);
    my ($self, $node, $sys_info) = @_;
    my ($retval);
    if (!$self->{node}{$node}{up}) {
        $self->{node}{$node}{up_datetime} = time2str("%Y-%m-%d %H:%M:%S", time());
        if ($self->{node}{$node}{up}) {
            $retval = "ok";
        }
        else {
            $self->{node}{$node}{up} = 1;
            $self->set_nodes();
            $retval = "new";
        }
    }
    if ($sys_info) {
        $self->{node}{$node}{datetime} = time2str("%Y-%m-%d %H:%M:%S", time());
        foreach my $sys_var (keys %$sys_info) {
            $self->{node}{$node}{$sys_var} = $sys_info->{$sys_var};
        }
    }
    &App::sub_exit($retval) if ($App::trace);
    return($retval);
}

sub set_node_down {
    &App::sub_entry if ($App::trace);
    my ($self, $node) = @_;
    my $runtime_event_token_prefix = $node;
    $runtime_event_token_prefix =~ s/:/-/;
    $self->reset_running_async_events($runtime_event_token_prefix);
    $self->{node}{$node}{up} = 0;
    $self->set_nodes();
    &App::sub_exit() if ($App::trace);
}

sub set_nodes {
    &App::sub_entry if ($App::trace);
    my ($self) = @_;
    my (@nodes);
    foreach my $node (sort keys %{$self->{node}}) {
        if ($self->{node}{$node}{up}) {
            push(@nodes, $node);
        }
    }
    $self->{nodes} = \@nodes;
    $self->{max_async_events} = $self->{max_async_events_per_node} * ($#nodes + 1);
    my $main_service = $self->{main_service};
    if ($main_service && $main_service->can("capacity_change")) {
        $main_service->capacity_change($self->{max_async_events}, \@nodes, $self->{node});
    }
    &App::sub_exit() if ($App::trace);
}

sub shutdown {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    $self->shutdown_nodes();
    $self->write_node_file();
    $self->SUPER::shutdown();
    &App::sub_exit() if ($App::trace);
}

sub shutdown_nodes {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    foreach my $node (@{$self->{nodes}}) {
        if ($node =~ /^([^:]+):([0-9]+)$/) {
            my $remote_server_name = "poe_${1}_${2}";
            my $remote_session_alias = $self->{poe_session_name};  # remote is same as local
            my $remote_session_state = "poe_shutdown_node";
            my $kernel = $self->{poe_kernel};
            $kernel->post("IKC", "post", "poe://$remote_server_name/$remote_session_alias/$remote_session_state");
        }
        else {
            $self->log({level=>2},"ERROR unparseable node [$node]\n");
        }
    }
    &App::sub_exit() if ($App::trace);
}

sub startup_nodes {
    &App::sub_entry if ($App::trace);
    my ($self, $options) = @_;

    my $startup = $options->{startup};

    my ($node, $msg, $host, $port, $cmd);
    if ($startup eq "1") {
        $self->read_node_file();
    }
    else {
        foreach $node (split(/,/,$startup)) {
            $self->{node}{$node} = {};
        }
    }

    my $cmd_fmt = $self->{options}{"app.context.node_start_cmd"} || "ssh -f {host} mvnode --port={port}";
    foreach $node (keys %{$self->{node}}) {
        if ($node =~ /^([^:]+):([0-9]+)$/) {
            $host = $1;
            $port = $2;
            $cmd = $cmd_fmt;
            $cmd =~ s/{host}/$host/g;
            $cmd =~ s/{port}/$port/g;
            $self->log({level=>3},"Starting Node [$node]: [$cmd]\n") if $self->{options}{poe_trace};
            system("$cmd < /dev/null &");
        }
        else {
            $self->log({level=>2},"ERROR unparseable node [$node]\n");
        }
    }
    &App::sub_exit() if ($App::trace);
}

sub write_node_file {
    &App::sub_entry if ($App::trace);
    my $self = shift;
    my $prefix = $self->{options}{prefix};
    my $node_file = "$prefix/log/$self->{options}{app}-$self->{host}:$self->{port}.nodes";



( run in 0.903 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )