Forks-Super
view release on metacpan or search on metacpan
lib/Forks/Super/Wait.pm view on Meta::CPAN
our $OVERLOAD_RETURN;
sub _reap_return {
my ($job) = @_;
if (!defined $OVERLOAD_RETURN) {
$OVERLOAD_RETURN = $Forks::Super::Job::OVERLOAD_ENABLED;
}
my $pid = $job->{real_pid};
$pid = $OVERLOAD_RETURN ? Forks::Super::Job::get($pid) : $pid;
return $pid;
}
#
# The handle_CHLD() subroutine takes care of reaping
# processes from the operating system. This method's
# part of the relay is taking the reaped process
# and updating the job's state.
#
# Optionally takes a process group ID to reap processes
# from that specific group.
#
# return the process id of the job that was reaped, or
# -1 if no eligible jobs were reaped. In wantarray mode,
# return the number of eligible processes (state == ACTIVE
# or state == COMPLETE or STATE == SUSPENDED) that were
# not reaped.
#
sub _reap {
my ($reap_bg_ok, $optional_pgid) = @_; # to reap procs from specific group
__run_productive_waitpid_code();
Forks::Super::Sigchld::handle_bastards();
my @j = @ALL_JOBS;
if (defined $optional_pgid) {
# same code for MSWin32, Unix
@j = grep { $_->{pgid} == $optional_pgid } @ALL_JOBS;
}
# see if any jobs are complete (signaled the SIGCHLD handler)
# but have not been reaped.
my @waiting = grep { $_->{state} eq 'COMPLETE' } @j;
if (!$reap_bg_ok) {
@waiting = grep { $_->{_is_bg} == 0 } @waiting;
}
debug('_reap(): found ', scalar @waiting,
' complete & unreaped processes') if $DEBUG;
if (@waiting > 0) {
@waiting = sort { $a->{end} <=> $b->{end} } @waiting;
my $job = shift @waiting;
my $real_pid = $job->{real_pid};
my $pid = $job->{pid};
if ($job->{debug}) {
debug("_reap: reaping $pid/$real_pid.");
}
if (not wantarray) {
return _reap_return($job);
}
my ($nactive1, $nalive, $nactive2, $nalive2)
= Forks::Super::Job::count_processes($reap_bg_ok, $optional_pgid);
debug("_reap: $nalive remain.") if $DEBUG;
$job->_mark_reaped;
return (_reap_return($job), $nactive1, $nalive, $nactive2, $nalive2);
}
# the failure to reap active jobs may occur because the jobs are still
# running, or it may occur because the relevant signals arrived at a
# time when the signal handler was overwhelmed
my ($nactive1, $nalive, $nactive2, $nalive2)
= Forks::Super::Job::count_processes($reap_bg_ok, $optional_pgid);
my $val = $nalive2 ? _active_waitpid_result() : _reaped_waitpid_result();
return $val if not wantarray;
if ($DEBUG) {
debug("_reap(): nothing to reap now. $nactive1 remain.");
}
return ($val, $nactive1, $nalive, $nactive2, $nalive2);
}
# wait on any process
sub _waitpid_any {
my ($no_hang,$reap_bg_ok,$timeout) = @_;
my $expire = Time::HiRes::time() + ($timeout || &FOREVER);
my ($pid, $nactive2, $nalive, $nactive, $nalive2) = _reap($reap_bg_ok);
if ($no_hang == 0) {
while (!isValidPid($pid,1) && $nalive > 0) {
if (Time::HiRes::time() >= $expire) {
# XXX - reset $? ?
return TIMEOUT;
}
if ($nactive == 0) {
if ($WAIT_ACTION_ON_SUSPENDED_JOBS eq 'fail') {
# XXX - reset $? ?
return ONLY_SUSPENDED_JOBS_LEFT;
} elsif ($WAIT_ACTION_ON_SUSPENDED_JOBS eq 'resume') {
_activate_one_suspended_job($reap_bg_ok);
}
}
__run_productive_waitpid_code();
# XXX - $DEFAULT_PAUSE here?
# Pause time should not be greater than the timeout.
Forks::Super::Util::pause();
($pid, $nactive2, $nalive, $nactive, $nalive2) = _reap($reap_bg_ok);
}
}
if (defined $ALL_JOBS{$pid}) {
my $job = Forks::Super::Job::get($ALL_JOBS{$pid});
while (not defined $job->{status}) {
Forks::Super::Util::pause();
}
$? = $job->{status};
}
return __waitpid_result($pid);
}
sub __waitpid_result {
my $pid = shift;
if ($respect_SIGCHLD_ignore &&
$Signals::XSIG{CHLD} &&
ref($Signals::XSIG{CHLD}) eq 'ARRAY' &&
'IGNORE' eq ($Signals::XSIG::XSIG{CHLD}[0] || '') &&
defined $Forks::Super::SysInfo::IGNORE_WAITPID_RESULT) {
$? = $Forks::Super::SysInfo::IGNORE_WAITPID_STATUS;
$pid = $Forks::Super::SysInfo::IGNORE_WAITPID_RESULT;
}
return $pid;
}
sub _activate_one_suspended_job {
my @suspended =
grep { $_->{state} eq 'SUSPENDED' } @Forks::Super::ALL_JOBS;
if (@suspended == 0) {
@suspended = grep {
$_->{state} =~ /SUSPENDED/
} @Forks::Super::ALL_JOBS;
}
@suspended = sort {
$b->{queue_priority} <=> $a->{queue_priority} } @suspended;
if (@suspended == 0) {
warn 'Forks::Super::_activate_one_suspended_job(): ',
" can't find an appropriate suspended job to resume\n";
return;
}
my $j1 = $suspended[0];
$j1->{queue_priority} -= 1E-4;
$j1->resume;
return;
}
sub _bogus_waitpid_result {
if (defined $Forks::Super::SysInfo::BOGUS_WAITPID_STATUS) {
$? = $Forks::Super::SysInfo::BOGUS_WAITPID_STATUS;
}
return $Forks::Super::SysInfo::BOGUS_WAITPID_RESULT;
}
sub _active_waitpid_result {
if (defined $Forks::Super::SysInfo::ACTIVE_WAITPID_STATUS) {
$? = $Forks::Super::SysInfo::ACTIVE_WAITPID_STATUS;
}
( run in 1.399 second using v1.01-cache-2.11-cpan-39bf76dae61 )