Forks-Super
view release on metacpan or search on metacpan
lib/Forks/Super/Job.pm view on Meta::CPAN
my $id = shift;
my @j = grep { defined($_->{name}) && $_->{name} eq $id } @ALL_JOBS;
if (@j > 0) {
return wantarray ? @j : $j[0];
}
return;
}
sub getOrMock {
my $id = shift;
return get($id) ||
$Forks::Super::Job::Foreign::FOREIGN_JOBS{$id} ||
Forks::Super::Job::Foreign->new($id);
}
{
# bare bones partial emulation of Forks::Super::Job to represent
# processes that were not initiated by Forks::Super.
# Used in Forks::Super::kill.
# XXX - best practice to move this block to its own file
package Forks::Super::Job::Foreign; ## yes critic
our %FOREIGN_JOBS = ();
sub new {
my ($class,$id) = @_;
my $self = { pid => $id, real_pid => $id, state => 'FOREIGN' };
$FOREIGN_JOBS{$id} = $self;
return bless $self, $class;
}
sub is_deferred { return 0 }
sub is_complete { return 0 }
sub signal_pids { my $self = shift; return $self->{pid} }
}
# retrieve a job object for a pid or job name, if necessary
sub _resolve {
if (!ref($_[0]) || !$_[0]->isa('Forks::Super::Job')) {
my $job = get($_[0]);
if (defined $job) {
return $_[0] = $job;
}
return $job;
}
return $_[0];
}
#
# count the number of active processes
#
sub count_active_processes {
my $optional_pgid = shift;
if (defined $optional_pgid) {
return scalar grep {
$_->{state} eq 'ACTIVE'
and $_->{pgid} == $optional_pgid } @ALL_JOBS;
}
return scalar grep { defined($_->{state})
&& $_->{state} eq 'ACTIVE' } @ALL_JOBS;
}
sub count_alive_processes {
my ($count_bg, $optional_pgid) = @_;
my @alive = grep { $_->{state} eq 'ACTIVE' ||
$_->{state} eq 'COMPLETE' ||
$_->{state} eq 'DEFERRED' ||
$_->{state} eq 'LAUNCHING' || # rare
$_->{state} eq 'SUSPENDED' ||
$_->{state} eq 'SUSPENDED-DEFERRED'
} @ALL_JOBS;
if (!$count_bg) {
@alive = grep { $_->{_is_bg} == 0 } @alive;
}
if (defined $optional_pgid) {
@alive = grep { $_->{pgid} == $optional_pgid } @alive;
}
return scalar @alive;
}
sub count_queued_processes {
my ($count_bg,$optional_pgid) = @_;
my @deferred = grep { $_->{state} eq 'DEFERRED' ||
$_->{state} eq 'SUSPENDED-DEFERRED' } @ALL_JOBS;
if (!$count_bg) {
@deferred = grep { $_->{_is_bg} == 0 } @deferred;
}
if (defined $optional_pgid) {
@deferred = grep { $_->{pgid} == $optional_pgid } @deferred;
}
return scalar @deferred;
}
#
# _reap should distinguish:
#
# all alive jobs (ACTIVE+COMPLETE+SUSPENDED+DEFERRED+SUSPENDED-DEFERRED)
# all active jobs (ACTIVE + COMPLETE + DEFERRED)
# filtered alive jobs (by optional pgid)
# filtered ACTIVE + COMPLETE + DEFERRED jobs
#
# if all_active==0 and all_alive>0,
# then see Wait::WAIT_ACTION_ON_SUSPENDED_JOBS
#
sub count_processes {
my ($count_bg, $optional_pgid) = @_;
my @alive = grep {
$_->{state} ne 'REAPED' &&
$_->{state} ne 'NEW' &&
!$_->{daemon}
} @ALL_JOBS;
if (!$count_bg) {
@alive = grep { $_->{_is_bg} == 0 } @alive;
}
my @active = grep { $_->{state} !~ /SUSPENDED/ } @alive;
my @filtered_active = @active;
my @filtered_alive = @alive;
if (defined $optional_pgid) {
@filtered_active = grep $_->{pgid} == $optional_pgid, @filtered_active;
@filtered_alive = grep $_->{pgid} == $optional_pgid, @filtered_alive;
}
my @n = (scalar(@filtered_active), scalar(@alive),
scalar(@active), scalar(@filtered_alive));
if ($Forks::Super::Debug::DEBUG) {
debug("count_processes(): @n");
if ($n[0]) {
debug('count_processes(): Filtered active: ',
$filtered_active[0]->toString());
}
if ($n[1]) {
debug('count_processes(): Alive: ', $alive[0]->toShortString());
}
if ($n[2]) {
debug("count_processes(): Active: @active");
}
}
return @n;
}
sub count_active_processes_on_host {
my $host = shift;
my $n = scalar grep {
$_->{remote} &&
defined($_->{state}) &&
$_->{state} eq 'ACTIVE' &&
$_->{remote}{host} &&
$_->{remote}{host} eq $host
} @ALL_JOBS;
return $n;
}
sub init_child {
Forks::Super::Job::Ipc::init_child();
return;
}
sub deinit_child {
# global destruction does not always release any sync objects held
# by the child. this is especially true on MSWin32.
my $job = Forks::Super::Job->this;
if ($job->{_sync}) {
$job->{_sync}->releaseAll;
}
Forks::Super::Job::Ipc::deinit_child();
return;
}
#
# get the current CPU load. May not be possible
# to do on all operating systems.
#
sub get_cpu_load {
return Forks::Super::Job::OS::get_cpu_load();
}
sub dispose {
my @jobs = @_;
foreach my $job (@jobs) {
next if $job->{disposed};
my $pid = $job->{pid};
my $real_pid = $job->{real_pid} || $pid;
$job->close_fh('all');
delete $Forks::Super::CHILD_STDIN{$pid};
delete $Forks::Super::CHILD_STDIN{$real_pid};
delete $Forks::Super::CHILD_STDOUT{$pid};
( run in 0.616 second using v1.01-cache-2.11-cpan-39bf76dae61 )