App-MultiModule
view release on metacpan or search on metacpan
lib/App/MultiModule.pm view on Meta::CPAN
task_name => 'main',
};
$self->{config_file} = $args{config_file} if $args{config_file};
bless ($self, $class);
POE::Kernel->run(); #silence warning about run not being called
if($args{config_file}) {
$self->recur(repeat_interval => 1, work => sub {
eval {
die "App::MultiModule::new failed: optional passed argument config_file($args{config_file}) must either be a scalar and exist and be readable"
if ref $args{config_file} or not -r $args{config_file};
my $ctime = (stat($args{config_file}))[9];
$self->{last_config_stat} = 0
unless defined $self->{last_config_stat};
die "all good\n" if $ctime == $self->{last_config_stat};
$self->{last_config_stat} = $ctime;
$self->log("reading config from $args{config_file}");
local $SIG{ALRM} = sub { die "timed out\n"; };
alarm 2;
my $conf = do $args{config_file} or die "failed to deserialize $args{config_file}: $@";
#handle config 'either way'
if(not $conf->{'.multimodule'}) {
lib/App/MultiModule.pm view on Meta::CPAN
my @all_tasks;
foreach my $task_name (keys %{$self->{all_modules_info}}) {
push @all_tasks, $task_name;
}
#first: 'flush' all of the internal queues
for(1..5) { #lolwut
foreach my $task_name (@all_tasks) {
next unless $self->{tasks}->{$task_name};
IPC::Transit::local_queue(qname => $task_name);
my $stats = IPC::Transit::stat(
qname => $task_name,
override_local => _receive_mode_translate('local'));
next unless $stats->{qnum}; #nothing to receive
while( my $message = IPC::Transit::receive(
qname => $task_name,
override_local => _receive_mode_translate('local'))) {
eval {
$self->{tasks}->{$task_name}->message(
$message,
root_object => $self
lib/App/MultiModule.pm view on Meta::CPAN
return 0 if $mode eq 'local';
return 1 if $mode eq 'non-local';
die "unknown mode: $mode\n";
}
sub _receive_messages_from {
my $self = shift;
my $qname = shift; my $receive_mode = shift;
my %args = @_;
IPC::Transit::local_queue(qname => $qname);
my $stats = IPC::Transit::stat(
qname => $qname,
override_local => _receive_mode_translate($receive_mode));
return unless $stats->{qnum}; #nothing to receive
#at this point, there are one or more messages for us to receive
#we can only deliver messages to tasks that are loaded AND configured
if( $self->{tasks}->{$qname} and
$self->{tasks}->{$qname}->{config_is_set}) {
while( my $message = IPC::Transit::receive(
qname => $qname,
t/failsafe-mem.t view on Meta::CPAN
$OtherExternalModule->send();
#failsafed external module is no longer functioning
ok not $OtherExternalModule->receive(type => 'external', no_fail_exceptions => 1);
ok IPC::Transit::receive(qname => 'OtherExternalModule');
#unfailsafe OtherExternalModule
ok $api->unfailsafe_task('OtherExternalModule');
#sometimes the most recent message gets lost during the failsafe
{ my $stat = IPC::Transit::stat(qname => 'OtherExternalModule', override_local => 1);
print STDERR Data::Dumper::Dumper $stat;
system 'qtrans | grep -v " 0 "';
shift @{$OtherExternalModule->{sent_messages}};
if($stat and $stat->{qnum} == 0) {
shift @{$OtherExternalModule->{sent_messages}};
}
}
$OtherExternalModule->send();
ok $OtherExternalModule->receive(type => 'external');
( run in 0.972 second using v1.01-cache-2.11-cpan-49f99fa48dc )