App-MultiModule
view release on metacpan or search on metacpan
lib/App/MultiModule.pm view on Meta::CPAN
=item module
=item debug
=item oob
=back
=cut
sub new {
my $class = shift;
my %args = @_;
die 'App::MultiModule::new: it is only safe to instantiate this one time per process space'
if $App::MultiModule::instantiated;
$App::MultiModule::instantiated = 1;
die "App::MultiModule::new failed: required argument 'state_dir' must be a scalar"
if not $args{state_dir} or
ref $args{state_dir};
my @module_prefixes = ('App::MultiModule::Tasks::');
if($args{module_prefixes}) {
if( ref $args{module_prefixes} and
ref $args{module_prefixes} eq 'ARRAY') {
push @module_prefixes, $_ for @{$args{module_prefixes}};
} else {
die "App::MultiModule::new failed: passed argument module_prefixes must either be a scalar or ARRAY ref";
}
}
my $debug = $args{debug};
$debug = 0 unless defined $debug;
my $self = {
module_prefixes => \@module_prefixes,
api => App::MultiModule::API->new(state_dir => $args{state_dir}),
my_qname => $args{qname},
module => $args{module},
tasks => {},
message_counts => {},
debug => $debug,
oob_opts => $args{oob},
hold_events_for => {}, #when we issue a 'shutdown' event in POE,
#it may or may not stop the next, scheduled event to fire.
#it's important for some of the task migration 'stuff' that
#save_task_state() not be called in the per-task state save recur
#after we want to deallocate.
#When we deallocate an internal task, we force a state save, but
#with a special flag, no_save_pid, to cause the written state
#file to not have a PID. This is important so _manage_tasks()
#in the MultiModule task will not think the task is running.
pristine_opts => $args{pristine_opts},
task_name => 'main',
};
$self->{config_file} = $args{config_file} if $args{config_file};
bless ($self, $class);
POE::Kernel->run(); #silence warning about run not being called
if($args{config_file}) {
$self->recur(repeat_interval => 1, work => sub {
eval {
die "App::MultiModule::new failed: optional passed argument config_file($args{config_file}) must either be a scalar and exist and be readable"
if ref $args{config_file} or not -r $args{config_file};
my $ctime = (stat($args{config_file}))[9];
$self->{last_config_stat} = 0
unless defined $self->{last_config_stat};
die "all good\n" if $ctime == $self->{last_config_stat};
$self->{last_config_stat} = $ctime;
$self->log("reading config from $args{config_file}");
local $SIG{ALRM} = sub { die "timed out\n"; };
alarm 2;
my $conf = do $args{config_file} or die "failed to deserialize $args{config_file}: $@";
#handle config 'either way'
if(not $conf->{'.multimodule'}) {
$conf = {
'.multimodule' => {
config => $conf
}
};
}
IPC::Transit::local_queue(qname => $args{qname});
IPC::Transit::send(qname => $args{qname}, message => $conf);
};
alarm 0;
if($@ and $@ ne "all good\n") {
$self->error("failed to read config file $args{config_file}: $@");
}
});
}
$self->{all_modules_info} = $self->get_multimodules_info();
$self->recur(repeat_interval => 60, work => sub {
$self->{message_counts} = {};
$App::MultiModule::Task::emit_counts = {};
});
$self->recur(repeat_interval => 10, work => sub {
=head1 cut
if($args{module} and $args{module} eq 'main') {
$self->{my_counter} = 0 unless $self->{my_counter};
$self->{my_counter}++;
open my $fh, '>>', '/tmp/my_logf';
print $fh $args{module} . ':' . $self->{my_counter}, "\n";
close $fh;
exit if $self->{my_counter} > 60;
}
=cut
$self->{all_modules_info} = $self->get_multimodules_info();
});
$self->recur(repeat_interval => 1, work => sub {
$self->_receive_messages;
});
$SIG{TERM} = sub {
print STDERR "caught SIGTERM. starting orderly exit\n";
$self->log('caught term');
_cleanly_exit($self);
};
$SIG{INT} = sub {
print STDERR "caught SIGINT. starting orderly exit\n";
$self->log('caught int');
IPC::Transit::send(qname => $args{qname}, message => {
'.multimodule' => {
control => [
{ type => 'cleanly_exit',
exit_externals => 1,
}
],
}
});
#_cleanly_exit($self, exit_external => 1);
};
$App::MultiModule::Task::emit_counts = {};
return $self;
}
sub _control {
my $self = shift;my $message = shift;
my %args = @_;
my $control = $message->{'.multimodule'};
if($control->{config}) {
foreach my $task_name (keys %{$control->{config}}) {
my $config = $control->{config}->{$task_name};
$self->{api}->save_task_config($task_name, $config);
$self->{all_modules_info}->{$task_name}->{config} = $config;
eval {
my $task = $self->get_task($task_name);
};
if($@) {
$self->debug("_control: failed to get_task($task_name): $@\n") if $self->{debug} > 1;
}
}
}
if($control->{control}) {
$self->debug('_control: passed control structure must be ARRAY reference') if $self->{debug} > 1 and ref $control->{control} ne 'ARRAY';
foreach my $control (@{$control->{control}}) {
if($control->{type} eq 'cleanly_exit') {
$self->debug('control cleanly exit') if $self->{debug} > 1;
$self->_cleanly_exit(%$control);
}
}
}
}
sub _cleanly_exit {
my $self = shift;
my %args = @_;
$self->debug('beginning cleanly_exit');
#how to exit cleanly:
#call save_task_state on all internal stateful tasks
#if exit_externals is set:
##send TERM to all external tasks if exit_externals is set
##wait a few seconds
##send KILL to all external tasks and all of their children and children
my @all_tasks;
foreach my $task_name (keys %{$self->{all_modules_info}}) {
push @all_tasks, $task_name;
}
#first: 'flush' all of the internal queues
for(1..5) { #lolwut
foreach my $task_name (@all_tasks) {
next unless $self->{tasks}->{$task_name};
IPC::Transit::local_queue(qname => $task_name);
my $stats = IPC::Transit::stat(
qname => $task_name,
override_local => _receive_mode_translate('local'));
next unless $stats->{qnum}; #nothing to receive
while( my $message = IPC::Transit::receive(
qname => $task_name,
override_local => _receive_mode_translate('local'))) {
eval {
$self->{tasks}->{$task_name}->message(
$message,
root_object => $self
);
};
if($@) {
$self->error("_cleanly_exit($task_name) threw: $@");
}
}
}
}
#second: save state and send signals, as appropriate
foreach my $task_name (@all_tasks) {
eval {
my $task_info = $self->{all_modules_info}->{$task_name};
my $task_is_stateful = $task_info->{is_stateful};
my $task_config = $task_info->{config} || {};
my $task_state = $self->{api}->get_task_state($task_name);
my $task_status = $self->{api}->get_task_status($task_name);
my $is_loaded = $self->{tasks}->{$task_name};
my $is_running = 0;
if( $task_status and
$task_status->{is_running}) {
$is_running = $task_status->{is_running};
}
my $is_my_pid = 0;
if( $task_status and
$task_status->{is_my_pid}) {
$is_my_pid = $task_status->{is_my_pid};
}
#first case: internal, stateful task
if( $is_loaded and
$task_is_stateful) {
$self->{api}->save_task_state($task_name, $self->{tasks}->{$task_name}->{'state'});
my $status = Storable::dclone($self->{tasks}->{$task_name}->{'status'});
$status->{is_internal} = 1;
$self->{api}->save_task_status($task_name, $status);
}
#second case: external task
if( not $is_loaded and
$is_running and
not $is_my_pid and
$args{exit_externals}) {
my $sig = $self->{api}->send_signal($task_name, 15);
sleep 2;
$self->log("cleanly_exit: exit_internals: sending signal 9 to $task_name");
$sig = $self->{api}->send_signal($task_name, 9) || 'undef';
}
};
}
$self->log('exit');
exit;
lib/App/MultiModule.pm view on Meta::CPAN
sub _receive_messages {
my $self = shift;
{ #handle messages directed at MultiModule proper
#first, we do local queue reads for the management queue
IPC::Transit::local_queue(qname => $self->{my_qname});
while( my $message = IPC::Transit::receive(
qname => $self->{my_qname},
nonblock => 1,
)
) {
$self->_control($message);
}
#only the parent MultiModule process reads non-local for itself
if($self->{module} eq 'main') {
while( my $message = IPC::Transit::receive(
qname => $self->{my_qname},
nonblock => 1,
override_local => 1,
)
) {
$self->_control($message);
}
}
}
#we always do local queue reads for all possible local queues
foreach my $module_name (keys %{$self->{all_modules_info}}) {
$self->_receive_messages_from($module_name, 'local');
}
if($self->{module} ne 'main') {
$self->_receive_messages_from($self->{module}, 'non-local');
} else { #main process
#non-local queue reads for every task that is not external
while(my($module_name, $module_info) = each %{$self->{all_modules_info}}) {
if( $module_info->{config} and
$module_info->{config}->{is_external}) {
#external; do not receive
next;
}
$self->_receive_messages_from($module_name, 'non-local');
}
}
}
sub _receive_mode_translate {
my $mode = shift;
return 0 if $mode eq 'local';
return 1 if $mode eq 'non-local';
die "unknown mode: $mode\n";
}
sub _receive_messages_from {
my $self = shift;
my $qname = shift; my $receive_mode = shift;
my %args = @_;
IPC::Transit::local_queue(qname => $qname);
my $stats = IPC::Transit::stat(
qname => $qname,
override_local => _receive_mode_translate($receive_mode));
return unless $stats->{qnum}; #nothing to receive
#at this point, there are one or more messages for us to receive
#we can only deliver messages to tasks that are loaded AND configured
if( $self->{tasks}->{$qname} and
$self->{tasks}->{$qname}->{config_is_set}) {
while( my $message = IPC::Transit::receive(
qname => $qname,
nonblock => 1,
override_local => _receive_mode_translate($receive_mode),
)
) {
#handle dynamic state transforms
if( $message->{'.multimodule'} and
$message->{'.multimodule'}->{transform}) {
$self->debug("_receive_messages_from($qname, _receive_mode_translate($receive_mode): in transform")
if $self->{debug} > 1;
eval {
mtransform( $self->{tasks}->{$qname}->{'state'},
$message->{'.multimodule'}->{transform}
);
};
$self->error("_receive_messages_from: transform failed: $@")
if $@;
$self->debug('post-transform state',
'state' => $self->{tasks}->{$qname}->{'state'})
if $self->{debug} > 5;
return;
}
#actually deliver the message
eval {
$self->{message_counts}->{$qname} = 0 unless
$self->{message_counts}->{$qname};
$self->{message_counts}->{$qname}++;
$self->{tasks}->{$qname}->message($message, root_object => $self);
};
if($@) {
my $err = $@;
$self->error("_receive_messages_from: handle_message failed: $@");
$self->bucket({
task_name => $qname,
check_type => 'admin',
cutoff_age => 300,
min_points => 1,
min_bucket_span => 0.01,
bucket_name => "$qname:local.admin.task_message_failure",
bucket_metric => 'local.admin.task_message_failure',
bucket_type => 'sum',
value => 1,
});
}
}
} elsif( $self->{tasks}->{$qname} and
not $self->{tasks}->{$qname}->{config_is_set}) {
#in this case, the task is loaded but not configured
#we just wait for the configure to happen
$self->debug("_receive_messages_from($qname): config_is_set is false")
( run in 1.812 second using v1.01-cache-2.11-cpan-39bf76dae61 )