Forks-Super

 view release on metacpan or  search on metacpan

lib/Forks/Super/Job.pm  view on Meta::CPAN

    my $optional_pgid = shift;
    if (defined $optional_pgid) {
	return scalar grep {
	    $_->{state} eq 'ACTIVE'
		and $_->{pgid} == $optional_pgid } @ALL_JOBS;
    }
    return scalar grep { defined($_->{state})
			     && $_->{state} eq 'ACTIVE' } @ALL_JOBS;
}

sub count_alive_processes {
    my ($count_bg, $optional_pgid) = @_;
    my @alive = grep { $_->{state} eq 'ACTIVE' ||
			   $_->{state} eq 'COMPLETE' ||
			   $_->{state} eq 'DEFERRED' ||
			   $_->{state} eq 'LAUNCHING' || # rare
			   $_->{state} eq 'SUSPENDED' ||
			   $_->{state} eq 'SUSPENDED-DEFERRED' 
		   } @ALL_JOBS;
    if (!$count_bg) {
	@alive = grep { $_->{_is_bg} == 0 } @alive;
    }
    if (defined $optional_pgid) {
	@alive = grep { $_->{pgid} == $optional_pgid } @alive;
    }
    return scalar @alive;
}

sub count_queued_processes {
    my ($count_bg,$optional_pgid) = @_;
    my @deferred = grep { $_->{state} eq 'DEFERRED' ||
                          $_->{state} eq 'SUSPENDED-DEFERRED' } @ALL_JOBS;
    if (!$count_bg) {
        @deferred = grep { $_->{_is_bg} == 0 } @deferred;
    }
    if (defined $optional_pgid) {
        @deferred = grep { $_->{pgid} == $optional_pgid } @deferred;
    }
    return scalar @deferred;
}

#
# _reap should distinguish:
#
#    all alive jobs (ACTIVE+COMPLETE+SUSPENDED+DEFERRED+SUSPENDED-DEFERRED)
#    all active jobs (ACTIVE + COMPLETE + DEFERRED)
#    filtered alive jobs (by optional pgid)
#    filtered ACTIVE + COMPLETE + DEFERRED jobs
#
#    if  all_active==0  and  all_alive>0,  
#    then see Wait::WAIT_ACTION_ON_SUSPENDED_JOBS
#
sub count_processes {
    my ($count_bg, $optional_pgid) = @_;
    my @alive = grep { 
	$_->{state} ne 'REAPED' && 
	  $_->{state} ne 'NEW' &&
	  !$_->{daemon}
    } @ALL_JOBS;
    if (!$count_bg) {
	@alive = grep { $_->{_is_bg} == 0 } @alive;
    }
    my @active = grep { $_->{state} !~ /SUSPENDED/ } @alive;
    my @filtered_active = @active;
    my @filtered_alive = @alive;
    if (defined $optional_pgid) {
	@filtered_active = grep $_->{pgid} == $optional_pgid, @filtered_active;
	@filtered_alive = grep $_->{pgid} == $optional_pgid, @filtered_alive;
    }

    my @n = (scalar(@filtered_active), scalar(@alive), 
	     scalar(@active), scalar(@filtered_alive));

    if ($Forks::Super::Debug::DEBUG) {
	debug("count_processes(): @n");
        if ($n[0]) {
            debug('count_processes(): Filtered active: ',
                  $filtered_active[0]->toString());
        }
        if ($n[1]) {
            debug('count_processes(): Alive: ', $alive[0]->toShortString());
        }
        if ($n[2]) {
            debug("count_processes(): Active: @active");
        }
    }

    return @n;
}

sub count_active_processes_on_host {

lib/Forks/Super/Job/Ipc.pm  view on Meta::CPAN

	warn "Forks::Super::END_cleanup: rmdir $_IPC_DIR failed. $!\n";

	opendir(my $_Z, $_IPC_DIR);
	my @g = grep { !/^\.nfs/ } readdir($_Z);
	closedir $_Z;
    }
    return;
}

# if we have created temporary files for IPC, clean them up.
# clean them up even if the children are still alive -- these files
# are exclusively for IPC, and IPC isn't needed after the parent
# process is done.
sub END_cleanup {

    if ($$ != ($Forks::Super::MAIN_PID || $MAIN_PID)) {
	return;
    }

    return if _END_foreground_cleanup();

lib/Forks/Super/Job/Ipc.pm  view on Meta::CPAN

		# in the parent, we can tell the difference between this input

		# stream being empty because the child process is finished
		# (see _check_if_job_is_complete_and_close_io() call, above),
		# and the stream being empty because the child isn't producing
		# enough output to keep it full.

		# We can and do distinguish between these two cases by
		# returning <undef> when the child is finished and will
		# not produce any more input, and  ""  (empty string) when
		# the child is still alive and it could potentially
		# produce more input.

		return '';
	    } else {
		# in the child, we don't make this distinction.
		return;
	    }
	}
    }
    return;

lib/Forks/Super/Sync/IPCSemaphore.pm  view on Meta::CPAN

	} elsif ($!{EINVAL}) {  # semaphore was removed

	    carp "sync::_wait_on: \$!=Invalid resource ... return 2";
	    return 2;
	}

	if ($expire > 0 && Time::HiRes::time() >= $expire) {
	    return 0;
	}

	# sem value not zero. Is the process that partner process still alive?
	if (!CORE::kill(0, $partner)) {
	    carp "sync::_wait_on thinks that $partner is gone ...return 3";
	    $self->{skip_wait_on} = 1;
	    delete $self->{sems};
	    return $Forks::Super::Sync::SYNC_PARTNER_GONE;
	}
	Time::HiRes::sleep( $NOWAIT_YIELD_DURATION );
	my $z5 = waitpid -1, &WNOHANG;
    }
    return; # unreachable

lib/Forks/Super/Sync/Win32.pm  view on Meta::CPAN


our @ISA = qw(Forks::Super::Sync);
our $VERSION = '0.97';
our $NOWAIT_YIELD_DURATION = 250;
my @RELEASE_ON_EXIT = ();

# Something we have to watch out for is a process dying without
# releasing the resources that it possessed. We have three
# defences against this issue below.
#
# 1. call CORE::kill 0, ... to see if other proc is still alive
# 2. check $!{EINVAL} (Win) and $!{ESRCH} (Cyg) to see if wait call failed
# 3. release resources in a DESTROY block (and  remove  func, though that
#    probably doesn't help that much)

$Forks::Super::Config::CONFIG{'Win32::Semaphore'} = 1;

my $sem_id = 0;

sub new {
    my ($pkg, $count, @initial) = @_;

lib/Forks/Super/Sync/Win32Mutex.pm  view on Meta::CPAN

use warnings;

our @ISA = qw(Forks::Super::Sync);
our $VERSION = '0.97';
our $NOWAIT_YIELD_DURATION = 50; # milliseconds

# Something we have to watch out for is a process dying without
# releasing the resources that it possessed. We have three
# defences against this issue below.
#
# 1. call CORE::kill 0, ... to see if other proc is still alive
# 2. check $! to see if/how the  Win32::Mutex::wait  call failed
# 3. release resources in a DESTROY block (and  remove  func, though that
#    probably doesn't help that much)

sub new {
    my ($pkg, $count, @initial) = @_;
    my $self = bless {}, $pkg;
    $self->{count} = $count;
    $self->{initial} = [ @initial ];

lib/Forks/Super/Wait.pm  view on Meta::CPAN

	my $real_pid = $job->{real_pid};
	my $pid = $job->{pid};

	if ($job->{debug}) {
	    debug("_reap: reaping $pid/$real_pid.");
	}
	if (not wantarray) {
	    return _reap_return($job);
	}

	my ($nactive1, $nalive, $nactive2, $nalive2)
	    = Forks::Super::Job::count_processes($reap_bg_ok, $optional_pgid);
	debug("_reap:  $nalive remain.") if $DEBUG;
	$job->_mark_reaped;
	return (_reap_return($job), $nactive1, $nalive, $nactive2, $nalive2);
    }


    # the failure to reap active jobs may occur because the jobs are still
    # running, or it may occur because the relevant signals arrived at a
    # time when the signal handler was overwhelmed

    my ($nactive1, $nalive, $nactive2, $nalive2)
	= Forks::Super::Job::count_processes($reap_bg_ok, $optional_pgid);

    my $val = $nalive2 ? _active_waitpid_result() : _reaped_waitpid_result();
    return $val if not wantarray;

    if ($DEBUG) {
	debug("_reap(): nothing to reap now. $nactive1 remain.");
    }
    return ($val, $nactive1, $nalive, $nactive2, $nalive2);
}


# wait on any process
sub _waitpid_any {
    my ($no_hang,$reap_bg_ok,$timeout) = @_;
    my $expire = Time::HiRes::time() + ($timeout || &FOREVER);
    my ($pid, $nactive2, $nalive, $nactive, $nalive2) = _reap($reap_bg_ok);
    if ($no_hang == 0) {
	while (!isValidPid($pid,1) && $nalive > 0) {
	    if (Time::HiRes::time() >= $expire) {
		# XXX - reset $? ?
		return TIMEOUT;
	    }
	    if ($nactive == 0) {

		if ($WAIT_ACTION_ON_SUSPENDED_JOBS eq 'fail') {
		    # XXX - reset $? ?
		    return ONLY_SUSPENDED_JOBS_LEFT;
		} elsif ($WAIT_ACTION_ON_SUSPENDED_JOBS eq 'resume') {
		    _activate_one_suspended_job($reap_bg_ok);
		}
	    }
	    __run_productive_waitpid_code();

	    # XXX - $DEFAULT_PAUSE here? 
	    # Pause time should not be greater than the timeout.
	    Forks::Super::Util::pause();
	    ($pid, $nactive2, $nalive, $nactive, $nalive2) = _reap($reap_bg_ok);
	}
    }
    if (defined $ALL_JOBS{$pid}) {
	my $job = Forks::Super::Job::get($ALL_JOBS{$pid});
	while (not defined $job->{status}) {
	    Forks::Super::Util::pause();
	}
	$? = $job->{status};
    }
    return __waitpid_result($pid);

t/65e-daemon.t  view on Meta::CPAN

my $pid = fork {
     env => { LOG_FILE => $output, VALUE => 30 },
     name => 'daemon4',
     daemon => 1,
     cmd => [ $^X, "$CWD/t/external-daemon.pl" ],
     timeout => 4,
};
ok(isValidPid($pid), "fork to cmd with daemon & timeout");
sleep 2;
my $k = Forks::Super::kill 'ZERO', $pid;
ok($k, "($k) daemon proc is alive");
sleep 6;
$k = Forks::Super::kill 'ZERO', $pid;
okl(!$k, "($k) daemon proc timed out in <= 8s") or do {
    if (!$Forks::Super::SysInfo::PREFER_ALTERNATE_ALARM) {
        diag qq[You may have better luck setting\n],
                qq[\$PREFER_ALTERNATE_ALARM to a true value\n],
                qq[in lib/Forks/Super/SysInfo.pm\n];
    }
    if ($k) {
        for (1..5) {

t/66a-daemon.t  view on Meta::CPAN

#    in the child:
#        launch a daemon with F::S::fork
#           the daemon should live for a while and produce some output
#        print out the child pid and the daemon pid
#        exit
#    wait for the child
#    verify that the child is gone
#    verify that the daemon exists
#    verify that only some of the daemon output exists
#    wait a little more
#    verify more daemon output exists, which proves that the daemon is alive
#    kill the daemon

my $child_output = "$CWD/t/out/daemon-parent.$$.out";
my $daemon_output = "$CWD/t/out/daemon.$$.out";

unlink $child_output, $daemon_output;

my $child_pid = CORE::fork();
if ($child_pid == 0) {

t/66a-daemon.t  view on Meta::CPAN

okl($t < 3.0, "child process finished quickly ${t}s, expected fast");  ### 1 ###
ok($wait == $child_pid, "CORE::wait captured child process");

sleep 3;
my $s1 = -s $daemon_output;

ok(-f $daemon_output, "daemon process created output file  size=$s1");
sleep 5;

my $s2 = -s $daemon_output;
ok($s1 < $s2, "daemon process still alive after child is gone $s1 => $s2");

open my $dh, '<', $daemon_output;
my $dpid = 0 + <$dh>;
my $dppid = <$dh>;
chomp($dppid);
close $dh;

ok(is_init_process($dppid),
   "daemon process $dpid does not have a parent")
    or diag("daemon process parent was '$dppid', expected '1'");

t/66c-daemon.t  view on Meta::CPAN


ok($job, "$$\\daemon launched");
ok($job->write_stdin("4\n"), "write to daemon stdin ok");
ok($job->write_stdin("5\n"), "write to daemon stdin ok");
my $x1 = $job->read_stdout(block => 1);
my $x2 = $job->read_stderr();
ok($x1 == 16, "read from daemon stdout ok");
ok($x2 == 4, "read from daemon stderr ok") or diag $x2;
$x1 = $job->read_stdout(block => 1);
ok($x1 == 25, "2nd read from daemon stdout ok");
ok(Forks::Super::kill('ZERO', $job->signal_pids), "daemon is alive");
$job->write_stdin("__EOF__\n");
$job->close_fh('stdin');

$x2 = $job->read_stderr();
ok($x2 == 5, "read from daemon stderr after stdout closed ok") or diag $x2,$job->read_stderr;
sleep 4;
ok(!Forks::Super::kill('ZERO', $job->{real_pid}), "daemon is not alive")
    or diag('could not signal ',$job->{real_pid});



( run in 1.833 second using v1.01-cache-2.11-cpan-39bf76dae61 )