App-MultiModule

 view release on metacpan or  search on metacpan

lib/App/MultiModule.pm  view on Meta::CPAN

=item module

=item debug

=item oob

=back

=cut

sub new {
    my $class = shift;
    my %args = @_;
    die 'App::MultiModule::new: it is only safe to instantiate this one time per process space'
        if $App::MultiModule::instantiated;
    $App::MultiModule::instantiated = 1;
    die "App::MultiModule::new failed: required argument 'state_dir' must be a scalar"
        if  not $args{state_dir} or
            ref $args{state_dir};
    my @module_prefixes = ('App::MultiModule::Tasks::');
    if($args{module_prefixes}) {
        if(     ref $args{module_prefixes} and
                ref $args{module_prefixes} eq 'ARRAY') {
            push @module_prefixes, $_ for @{$args{module_prefixes}};
        } else {
            die "App::MultiModule::new failed: passed argument module_prefixes must either be a scalar or ARRAY ref";
        }
    }

    my $debug = $args{debug};
    $debug = 0 unless defined $debug;
    my $self = {
        module_prefixes => \@module_prefixes,
        api => App::MultiModule::API->new(state_dir => $args{state_dir}),
        my_qname => $args{qname},
        module => $args{module},
        tasks => {},
        message_counts => {},
        debug => $debug,
        oob_opts => $args{oob},
        hold_events_for => {}, #when we issue a 'shutdown' event in POE,
            #it may or may not stop the next, scheduled event to fire.
            #it's important for some of the task migration 'stuff' that
            #save_task_state() not be called in the per-task state save recur
            #after we want to deallocate.
            #When we deallocate an internal task, we force a state save, but
            #with a special flag, no_save_pid, to cause the written state
            #file to not have a PID.  This is important so _manage_tasks()
            #in the MultiModule task will not think the task is running.
        pristine_opts => $args{pristine_opts},
        task_name => 'main',
    };
    $self->{config_file} = $args{config_file} if $args{config_file};
    bless ($self, $class);
    POE::Kernel->run(); #silence warning about run not being called
    if($args{config_file}) {
        $self->recur(repeat_interval => 1, work => sub {
            eval {
                die "App::MultiModule::new failed: optional passed argument config_file($args{config_file}) must either be a scalar and exist and be readable"
                    if ref $args{config_file} or not -r $args{config_file};
                my $ctime = (stat($args{config_file}))[9];
                $self->{last_config_stat} = 0
                    unless defined $self->{last_config_stat};
                die "all good\n" if $ctime == $self->{last_config_stat};
                $self->{last_config_stat} = $ctime;
                $self->log("reading config from $args{config_file}");
                local $SIG{ALRM} = sub { die "timed out\n"; };
                alarm 2;
                my $conf = do $args{config_file} or die "failed to deserialize $args{config_file}: $@";
                #handle config 'either way'
                if(not $conf->{'.multimodule'}) {
                    $conf = {
                        '.multimodule' => {
                            config => $conf
                        }
                    };
                }
                IPC::Transit::local_queue(qname => $args{qname});
                IPC::Transit::send(qname => $args{qname}, message => $conf);
            };
            alarm 0;
            if($@ and $@ ne "all good\n") {
                $self->error("failed to read config file $args{config_file}: $@");
            }
        });
    }

    $self->{all_modules_info} = $self->get_multimodules_info();

    $self->recur(repeat_interval => 60, work => sub {
        $self->{message_counts} = {};
        $App::MultiModule::Task::emit_counts = {};
    });
    $self->recur(repeat_interval => 10, work => sub {
=head1 cut
        if($args{module} and $args{module} eq 'main') {
            $self->{my_counter} = 0 unless $self->{my_counter};
            $self->{my_counter}++;
            open my $fh, '>>', '/tmp/my_logf';
            print $fh $args{module} . ':' . $self->{my_counter}, "\n";
            close $fh;
            exit if $self->{my_counter} > 60;
        }
=cut
        $self->{all_modules_info} = $self->get_multimodules_info();
    });
    $self->recur(repeat_interval => 1, work => sub {
        $self->_receive_messages;
    });
    $SIG{TERM} = sub {
        print STDERR "caught SIGTERM. starting orderly exit\n";
        $self->log('caught term');
        _cleanly_exit($self);
    };
    $SIG{INT} = sub {
        print STDERR "caught SIGINT. starting orderly exit\n";
        $self->log('caught int');
        IPC::Transit::send(qname => $args{qname}, message => {
            '.multimodule' => {
                control => [
                    {   type => 'cleanly_exit',
                        exit_externals => 1,
                    }
                ],
            }
        });
        #_cleanly_exit($self, exit_external => 1);
    };
    $App::MultiModule::Task::emit_counts = {};
    return $self;
}

sub _control {
    my $self = shift;my $message = shift;
    my %args = @_;
    my $control = $message->{'.multimodule'};
    if($control->{config}) {
        foreach my $task_name (keys %{$control->{config}}) {
            my $config = $control->{config}->{$task_name};
            $self->{api}->save_task_config($task_name, $config);
            $self->{all_modules_info}->{$task_name}->{config} = $config;
            eval {
                my $task = $self->get_task($task_name);
            };
            if($@) {
                $self->debug("_control: failed to get_task($task_name): $@\n") if $self->{debug} > 1;
            }
        }
    }
    if($control->{control}) {
        $self->debug('_control: passed control structure must be ARRAY reference') if $self->{debug} > 1 and ref $control->{control} ne 'ARRAY';
        foreach my $control (@{$control->{control}}) {
            if($control->{type} eq 'cleanly_exit') {
                $self->debug('control cleanly exit') if $self->{debug} > 1;
                $self->_cleanly_exit(%$control);
            }
        }
    }
}

sub _cleanly_exit {
    my $self = shift;
    my %args = @_;
    $self->debug('beginning cleanly_exit');
    #how to exit cleanly:
    #call save_task_state on all internal stateful tasks
    #if exit_externals is set:
    ##send TERM to all external tasks if exit_externals is set
    ##wait a few seconds
    ##send KILL to all external tasks and all of their children and children

    my @all_tasks;
    foreach my $task_name (keys %{$self->{all_modules_info}}) {
        push @all_tasks, $task_name;
    }
    #first: 'flush' all of the internal queues
    for(1..5) { #lolwut
    foreach my $task_name (@all_tasks) {
        next unless $self->{tasks}->{$task_name};
        IPC::Transit::local_queue(qname => $task_name);
        my $stats = IPC::Transit::stat(
            qname => $task_name,
            override_local => _receive_mode_translate('local'));
        next unless $stats->{qnum}; #nothing to receive
        while(  my $message = IPC::Transit::receive(
                    qname => $task_name,
                    override_local => _receive_mode_translate('local'))) {
            eval {
                $self->{tasks}->{$task_name}->message(
                    $message,
                    root_object => $self
                );
            };
            if($@) {
                $self->error("_cleanly_exit($task_name) threw: $@");
            }
        }
    }
    }
    #second: save state and send signals, as appropriate
    foreach my $task_name (@all_tasks) {
        eval {
            my $task_info = $self->{all_modules_info}->{$task_name};
            my $task_is_stateful = $task_info->{is_stateful};
            my $task_config = $task_info->{config} || {};
            my $task_state = $self->{api}->get_task_state($task_name);
            my $task_status = $self->{api}->get_task_status($task_name);
            my $is_loaded = $self->{tasks}->{$task_name};
            my $is_running = 0;
            if(     $task_status and
                    $task_status->{is_running}) {
                $is_running = $task_status->{is_running};
            }
            my $is_my_pid = 0;
            if(     $task_status and
                    $task_status->{is_my_pid}) {
                $is_my_pid = $task_status->{is_my_pid};
            }
            #first case: internal, stateful task
            if(     $is_loaded and
                    $task_is_stateful) {
                $self->{api}->save_task_state($task_name, $self->{tasks}->{$task_name}->{'state'});
                my $status = Storable::dclone($self->{tasks}->{$task_name}->{'status'});
                $status->{is_internal} = 1;
                $self->{api}->save_task_status($task_name, $status);
            }

            #second case: external task
            if(     not $is_loaded and
                    $is_running and
                    not $is_my_pid and
                    $args{exit_externals}) {
                my $sig = $self->{api}->send_signal($task_name, 15);
                sleep 2;
                $self->log("cleanly_exit: exit_internals: sending signal 9 to $task_name");
                $sig = $self->{api}->send_signal($task_name, 9) || 'undef';
            }
        };
    }
    $self->log('exit');
    exit;

lib/App/MultiModule.pm  view on Meta::CPAN


sub _receive_messages {
    my $self = shift;


    {   #handle messages directed at MultiModule proper
        #first, we do local queue reads for the management queue
        IPC::Transit::local_queue(qname => $self->{my_qname});
        while(  my $message = IPC::Transit::receive(
                    qname => $self->{my_qname},
                    nonblock => 1,
                )
        ) {
            $self->_control($message);
        }
        #only the parent MultiModule process reads non-local for itself
        if($self->{module} eq 'main') {
            while(  my $message = IPC::Transit::receive(
                        qname => $self->{my_qname},
                        nonblock => 1,
                        override_local => 1,
                    )
            ) {
                $self->_control($message);
            }
        }
    }

    #we always do local queue reads for all possible local queues
    foreach my $module_name (keys %{$self->{all_modules_info}}) {
        $self->_receive_messages_from($module_name, 'local');
    }

    if($self->{module} ne 'main') {
        $self->_receive_messages_from($self->{module}, 'non-local');
    } else { #main process
        #non-local queue reads for every task that is not external
        while(my($module_name, $module_info) = each %{$self->{all_modules_info}}) {
            if(     $module_info->{config} and
                    $module_info->{config}->{is_external}) {
                #external; do not receive
                next;
            }
            $self->_receive_messages_from($module_name, 'non-local');
        }
    }
}

sub _receive_mode_translate {
    my $mode = shift;
    return 0 if $mode eq 'local';
    return 1 if $mode eq 'non-local';
    die "unknown mode: $mode\n";
}

sub _receive_messages_from {
    my $self = shift;
    my $qname = shift; my $receive_mode = shift;
    my %args = @_;
    IPC::Transit::local_queue(qname => $qname);
    my $stats = IPC::Transit::stat(
        qname => $qname,
        override_local => _receive_mode_translate($receive_mode));
    return unless $stats->{qnum}; #nothing to receive
    #at this point, there are one or more messages for us to receive
    #we can only deliver messages to tasks that are loaded AND configured

    if(     $self->{tasks}->{$qname} and
            $self->{tasks}->{$qname}->{config_is_set}) {
        while(  my $message = IPC::Transit::receive(
                    qname => $qname,
                    nonblock => 1,
                    override_local => _receive_mode_translate($receive_mode),
                )
        ) {
            #handle dynamic state transforms
            if(     $message->{'.multimodule'} and
                    $message->{'.multimodule'}->{transform}) {
                $self->debug("_receive_messages_from($qname, _receive_mode_translate($receive_mode): in transform")
                    if $self->{debug} > 1;
                eval {
                    mtransform( $self->{tasks}->{$qname}->{'state'},
                                $message->{'.multimodule'}->{transform}
                    );
                };
                $self->error("_receive_messages_from: transform failed: $@")
                    if $@;
                $self->debug('post-transform state',
                        'state' => $self->{tasks}->{$qname}->{'state'})
                    if $self->{debug} > 5;

                return;
            }
            #actually deliver the message
            eval {
                $self->{message_counts}->{$qname} = 0 unless
                    $self->{message_counts}->{$qname};
                $self->{message_counts}->{$qname}++;
                $self->{tasks}->{$qname}->message($message, root_object => $self);
            };
            if($@) {
                my $err = $@;
                $self->error("_receive_messages_from: handle_message failed: $@");
                $self->bucket({
                    task_name => $qname,
                    check_type => 'admin',
                    cutoff_age => 300,
                    min_points => 1,
                    min_bucket_span => 0.01,
                    bucket_name => "$qname:local.admin.task_message_failure",
                    bucket_metric => 'local.admin.task_message_failure',
                    bucket_type => 'sum',
                    value => 1,
                });
            }
        }
    } elsif(    $self->{tasks}->{$qname} and
                not $self->{tasks}->{$qname}->{config_is_set}) {
        #in this case, the task is loaded but not configured
        #we just wait for the configure to happen
        $self->debug("_receive_messages_from($qname): config_is_set is false")



( run in 1.812 second using v1.01-cache-2.11-cpan-39bf76dae61 )