Forks-Super

 view release on metacpan or  search on metacpan

lib/Forks/Super/Job.pm  view on Meta::CPAN

    my $id = shift;
    my @j = grep { defined($_->{name}) && $_->{name} eq $id } @ALL_JOBS;
    if (@j > 0) {
	return wantarray ? @j : $j[0];
    }
    return;
}

sub getOrMock {
    my $id = shift;
    return get($id) ||
	$Forks::Super::Job::Foreign::FOREIGN_JOBS{$id} ||
        Forks::Super::Job::Foreign->new($id);
}

{
    # bare bones partial emulation of Forks::Super::Job to represent
    # processes that were not initiated by Forks::Super. 
    # Used in Forks::Super::kill.

    # XXX - best practice to move this block to its own file
    package Forks::Super::Job::Foreign;          ## yes critic
    our %FOREIGN_JOBS = ();
    sub new {
	my ($class,$id) = @_;
	my $self = { pid => $id, real_pid => $id, state => 'FOREIGN' };
	$FOREIGN_JOBS{$id} = $self;
	return bless $self, $class;
    }
    sub is_deferred { return 0 }
    sub is_complete { return 0 }
    sub signal_pids { my $self = shift; return $self->{pid} }
}

# retrieve a job object for a pid or job name, if necessary
sub _resolve {
    if (!ref($_[0]) || !$_[0]->isa('Forks::Super::Job')) {
	my $job = get($_[0]);
	if (defined $job) {
	    return $_[0] = $job;
	}
	return $job;
    }
    return $_[0];
}

#
# count the number of active processes
#
sub count_active_processes {
    my $optional_pgid = shift;
    if (defined $optional_pgid) {
	return scalar grep {
	    $_->{state} eq 'ACTIVE'
		and $_->{pgid} == $optional_pgid } @ALL_JOBS;
    }
    return scalar grep { defined($_->{state})
			     && $_->{state} eq 'ACTIVE' } @ALL_JOBS;
}

sub count_alive_processes {
    my ($count_bg, $optional_pgid) = @_;
    my @alive = grep { $_->{state} eq 'ACTIVE' ||
			   $_->{state} eq 'COMPLETE' ||
			   $_->{state} eq 'DEFERRED' ||
			   $_->{state} eq 'LAUNCHING' || # rare
			   $_->{state} eq 'SUSPENDED' ||
			   $_->{state} eq 'SUSPENDED-DEFERRED' 
		   } @ALL_JOBS;
    if (!$count_bg) {
	@alive = grep { $_->{_is_bg} == 0 } @alive;
    }
    if (defined $optional_pgid) {
	@alive = grep { $_->{pgid} == $optional_pgid } @alive;
    }
    return scalar @alive;
}

sub count_queued_processes {
    my ($count_bg,$optional_pgid) = @_;
    my @deferred = grep { $_->{state} eq 'DEFERRED' ||
                          $_->{state} eq 'SUSPENDED-DEFERRED' } @ALL_JOBS;
    if (!$count_bg) {
        @deferred = grep { $_->{_is_bg} == 0 } @deferred;
    }
    if (defined $optional_pgid) {
        @deferred = grep { $_->{pgid} == $optional_pgid } @deferred;
    }
    return scalar @deferred;
}

#
# _reap should distinguish:
#
#    all alive jobs (ACTIVE+COMPLETE+SUSPENDED+DEFERRED+SUSPENDED-DEFERRED)
#    all active jobs (ACTIVE + COMPLETE + DEFERRED)
#    filtered alive jobs (by optional pgid)
#    filtered ACTIVE + COMPLETE + DEFERRED jobs
#
#    if  all_active==0  and  all_alive>0,  
#    then see Wait::WAIT_ACTION_ON_SUSPENDED_JOBS
#
sub count_processes {
    my ($count_bg, $optional_pgid) = @_;
    my @alive = grep { 
	$_->{state} ne 'REAPED' && 
	  $_->{state} ne 'NEW' &&
	  !$_->{daemon}
    } @ALL_JOBS;
    if (!$count_bg) {
	@alive = grep { $_->{_is_bg} == 0 } @alive;
    }
    my @active = grep { $_->{state} !~ /SUSPENDED/ } @alive;
    my @filtered_active = @active;
    my @filtered_alive = @alive;
    if (defined $optional_pgid) {
	@filtered_active = grep $_->{pgid} == $optional_pgid, @filtered_active;
	@filtered_alive = grep $_->{pgid} == $optional_pgid, @filtered_alive;
    }

    my @n = (scalar(@filtered_active), scalar(@alive), 
	     scalar(@active), scalar(@filtered_alive));

    if ($Forks::Super::Debug::DEBUG) {
	debug("count_processes(): @n");
        if ($n[0]) {
            debug('count_processes(): Filtered active: ',
                  $filtered_active[0]->toString());
        }
        if ($n[1]) {
            debug('count_processes(): Alive: ', $alive[0]->toShortString());
        }
        if ($n[2]) {
            debug("count_processes(): Active: @active");
        }
    }

    return @n;
}

sub count_active_processes_on_host {
    my $host = shift;
    my $n = scalar grep {
        $_->{remote} &&
            defined($_->{state}) &&
            $_->{state} eq 'ACTIVE' &&
            $_->{remote}{host} &&
            $_->{remote}{host} eq $host
    } @ALL_JOBS;
    return $n;
}

sub init_child {
    Forks::Super::Job::Ipc::init_child();
    return;
}

sub deinit_child {

    # global destruction does not always release any sync objects held
    # by the child. this is especially true on MSWin32.
    my $job = Forks::Super::Job->this;
    if ($job->{_sync}) {
      $job->{_sync}->releaseAll;
    }

    Forks::Super::Job::Ipc::deinit_child();

    return;
}

#
# get the current CPU load. May not be possible
# to do on all operating systems.
#
sub get_cpu_load {
    return Forks::Super::Job::OS::get_cpu_load();
}

sub dispose {
    my @jobs = @_;
    foreach my $job (@jobs) {
        next if $job->{disposed};

	my $pid = $job->{pid};
	my $real_pid = $job->{real_pid} || $pid;

	$job->close_fh('all');
	delete $Forks::Super::CHILD_STDIN{$pid};
	delete $Forks::Super::CHILD_STDIN{$real_pid};
	delete $Forks::Super::CHILD_STDOUT{$pid};



( run in 0.616 second using v1.01-cache-2.11-cpan-39bf76dae61 )