Forks-Super

 view release on metacpan or  search on metacpan

lib/Forks/Super/Wait.pm  view on Meta::CPAN

our $OVERLOAD_RETURN;
sub _reap_return {
    my ($job) = @_;
    if (!defined $OVERLOAD_RETURN) {
	$OVERLOAD_RETURN = $Forks::Super::Job::OVERLOAD_ENABLED;
    }

    my $pid = $job->{real_pid};
    $pid = $OVERLOAD_RETURN ? Forks::Super::Job::get($pid) : $pid;
    return $pid;
}

#
# The handle_CHLD() subroutine takes care of reaping
# processes from the operating system. This method's
# part of the relay is taking the reaped process
# and updating the job's state.
#
# Optionally takes a process group ID to reap processes
# from that specific group.
#
# return the process id of the job that was reaped, or
# -1 if no eligible jobs were reaped. In wantarray mode,
# return the number of eligible processes (state == ACTIVE
# or  state == COMPLETE  or  STATE == SUSPENDED) that were
# not reaped.
#
sub _reap {
    my ($reap_bg_ok, $optional_pgid) = @_; # to reap procs from specific group
    __run_productive_waitpid_code();
    Forks::Super::Sigchld::handle_bastards();

    my @j = @ALL_JOBS;
    if (defined $optional_pgid) {
	# same code for MSWin32, Unix
	@j = grep { $_->{pgid} == $optional_pgid } @ALL_JOBS;
    }

    # see if any jobs are complete (signaled the SIGCHLD handler)
    # but have not been reaped.
    my @waiting = grep { $_->{state} eq 'COMPLETE' } @j;
    if (!$reap_bg_ok) {
	@waiting = grep { $_->{_is_bg} == 0 } @waiting;
    }
    debug('_reap(): found ', scalar @waiting,
	  ' complete & unreaped processes') if $DEBUG;

    if (@waiting > 0) {
	@waiting = sort { $a->{end} <=> $b->{end} } @waiting;
	my $job = shift @waiting;
	my $real_pid = $job->{real_pid};
	my $pid = $job->{pid};

	if ($job->{debug}) {
	    debug("_reap: reaping $pid/$real_pid.");
	}
	if (not wantarray) {
	    return _reap_return($job);
	}

	my ($nactive1, $nalive, $nactive2, $nalive2)
	    = Forks::Super::Job::count_processes($reap_bg_ok, $optional_pgid);
	debug("_reap:  $nalive remain.") if $DEBUG;
	$job->_mark_reaped;
	return (_reap_return($job), $nactive1, $nalive, $nactive2, $nalive2);
    }


    # the failure to reap active jobs may occur because the jobs are still
    # running, or it may occur because the relevant signals arrived at a
    # time when the signal handler was overwhelmed

    my ($nactive1, $nalive, $nactive2, $nalive2)
	= Forks::Super::Job::count_processes($reap_bg_ok, $optional_pgid);

    my $val = $nalive2 ? _active_waitpid_result() : _reaped_waitpid_result();
    return $val if not wantarray;

    if ($DEBUG) {
	debug("_reap(): nothing to reap now. $nactive1 remain.");
    }
    return ($val, $nactive1, $nalive, $nactive2, $nalive2);
}


# wait on any process
sub _waitpid_any {
    my ($no_hang,$reap_bg_ok,$timeout) = @_;
    my $expire = Time::HiRes::time() + ($timeout || &FOREVER);
    my ($pid, $nactive2, $nalive, $nactive, $nalive2) = _reap($reap_bg_ok);
    if ($no_hang == 0) {
	while (!isValidPid($pid,1) && $nalive > 0) {
	    if (Time::HiRes::time() >= $expire) {
		# XXX - reset $? ?
		return TIMEOUT;
	    }
	    if ($nactive == 0) {

		if ($WAIT_ACTION_ON_SUSPENDED_JOBS eq 'fail') {
		    # XXX - reset $? ?
		    return ONLY_SUSPENDED_JOBS_LEFT;
		} elsif ($WAIT_ACTION_ON_SUSPENDED_JOBS eq 'resume') {
		    _activate_one_suspended_job($reap_bg_ok);
		}
	    }
	    __run_productive_waitpid_code();

	    # XXX - $DEFAULT_PAUSE here? 
	    # Pause time should not be greater than the timeout.
	    Forks::Super::Util::pause();
	    ($pid, $nactive2, $nalive, $nactive, $nalive2) = _reap($reap_bg_ok);
	}
    }
    if (defined $ALL_JOBS{$pid}) {
	my $job = Forks::Super::Job::get($ALL_JOBS{$pid});
	while (not defined $job->{status}) {
	    Forks::Super::Util::pause();
	}
	$? = $job->{status};
    }
    return __waitpid_result($pid);
}

sub __waitpid_result {
    my $pid = shift;
    if ($respect_SIGCHLD_ignore &&
	$Signals::XSIG{CHLD} &&
	ref($Signals::XSIG{CHLD}) eq 'ARRAY' &&
	'IGNORE' eq ($Signals::XSIG::XSIG{CHLD}[0] || '') &&
	defined $Forks::Super::SysInfo::IGNORE_WAITPID_RESULT) {


	$? = $Forks::Super::SysInfo::IGNORE_WAITPID_STATUS;
	$pid = $Forks::Super::SysInfo::IGNORE_WAITPID_RESULT;
    }
    return $pid;
}

sub _activate_one_suspended_job {
    my @suspended =
        grep { $_->{state} eq 'SUSPENDED' } @Forks::Super::ALL_JOBS;
    if (@suspended == 0) {
	@suspended = grep { 
	    $_->{state} =~ /SUSPENDED/
	} @Forks::Super::ALL_JOBS;
    }
    @suspended = sort { 
	$b->{queue_priority} <=> $a->{queue_priority} } @suspended;
    if (@suspended == 0) {
	warn 'Forks::Super::_activate_one_suspended_job(): ',
	" can't find an appropriate suspended job to resume\n";
	return;
    }

    my $j1 = $suspended[0];
    $j1->{queue_priority} -= 1E-4;
    $j1->resume;
    return;
}

sub _bogus_waitpid_result {
    if (defined $Forks::Super::SysInfo::BOGUS_WAITPID_STATUS) {
	$? = $Forks::Super::SysInfo::BOGUS_WAITPID_STATUS;
    }
    return $Forks::Super::SysInfo::BOGUS_WAITPID_RESULT;
}

sub _active_waitpid_result {
    if (defined $Forks::Super::SysInfo::ACTIVE_WAITPID_STATUS) {
	$? = $Forks::Super::SysInfo::ACTIVE_WAITPID_STATUS;
    }



( run in 1.399 second using v1.01-cache-2.11-cpan-39bf76dae61 )