Forks-Super
view release on metacpan or search on metacpan
lib/Forks/Super/Job.pm view on Meta::CPAN
push @output, "$attr=[" . join(',', @{$job->{$attr}}) . ']';
} else {
push @output, "$attr=" . $job->{$attr};
}
}
return $job->{short_string} = '{' . join(';',@output) . '}';
}
sub _mark_complete {
my $job = shift;
$job->{end} = Time::HiRes::time();
$job->{state} = 'COMPLETE';
$job->run_callback('share');
$job->run_callback('collect');
$job->run_callback('finish');
return;
}
sub _mark_reaped {
my $job = shift;
$job->{state} = 'REAPED';
$job->{reaped} = Time::HiRes::time();
$job->run_callback('reaped');
$? = defined $job->{status} ? $job->{status} : $job->{daemon} && 0;
debug("Job $job->{pid} reaped") if $job->{debug};
return;
}
#
# determine whether a job is eligible to start
#
sub can_launch {
no strict 'refs';
my $job = shift;
$job->{last_check} = Time::HiRes::time();
if (defined $job->{can_launch}) {
if (ref $job->{can_launch} eq 'CODE') {
$job->{queue_message} = 'user can_launch function failed';
return $job->{can_launch}->($job);
} elsif (ref $job->{can_launch} eq '') {
my $can_launch_sub = $job->{can_launch};
$job->{queue_message} = 'user can_launch function failed';
return $can_launch_sub->($job);
}
} else {
$job->{queue_message} = 'default can_launch function failed';
return $job->_can_launch;
}
}
sub _can_launch_delayed_start_check {
my $job = shift;
return 1 if !defined($job->{start_after}) ||
Time::HiRes::time() >= $job->{start_after};
if ($job->{debug}) {
debug('_can_launch(): start delay requested. launch fail');
}
# delay option should normally be associated with queue on busy behavior.
# any reason not to make this the default ?
# delay + fail is pretty dumb
# delay + block is like sleep + fork
if (! defined $job->{on_busy}) {
$job->{_on_busy} = 'QUEUE';
}
$job->{queue_message} = "haven't yet reached job start_after time "
. localtime($job->{start_after});
return 0;
}
sub _can_launch_dependency_check {
my $job = shift;
my @dep_on = defined($job->{depend_on}) ? @{$job->{depend_on}} : ();
my @dep_start = defined($job->{depend_start})
? @{$job->{depend_start}} : ();
foreach my $dj (@dep_on) {
my $j = $ALL_JOBS{$dj};
if (not defined $j) {
carp 'Forks::Super::Job: ',
"dependency $dj for job $job->{pid} is invalid. Ignoring.\n";
next;
}
if (!$j->is_complete) {
debug('_can_launch(): ',
"job waiting for job $j->{pid} to finish. launch fail.")
if $j->{debug};
$job->{queue_message} = "depends on job $j->{pid} to finish";
return 0;
}
}
foreach my $dj (@dep_start) {
my $j = $ALL_JOBS{$dj};
if (not defined $j) {
carp 'Forks::Super::Job ',
"start dependency $dj for job $job->{pid} is invalid. ",
"Ignoring.\n";
next;
}
if (!$j->is_started) {
debug('_can_launch(): ',
"job waiting for job $j->{pid} to start. launch fail.")
if $j->{debug};
$job->{queue_message} = "depends on job $j->{pid} to start";
return 0;
}
}
return 1;
}
sub _can_launch_remote {
my $job = shift;
return 1 if !defined $job->{remote};
my @specs = @{$job->{remote}};
for (my $i=$#specs; $i>=1; $i--) {
my $j = int(rand($i+1));
@specs[$i,$j] = @specs[$j,$i];
}
foreach my $spec (@specs) {
my $host = $spec->{host};
next unless $host;
if ($job->_can_launch_remote_check_host($host)) {
$job->{remote} = $spec;
return 1;
}
}
if ($job->{debug}) {
my @hosts = map { $_->{host} } @specs;
debug("_can_launch_remote: host(s) @hosts are too busy");
}
return 0;
}
sub _can_launch_remote_check_host {
my ($job, $h) = @_;
my $max_proc = $job->max_proc($h);
if ($max_proc < 1) {
debug("_can_launch_remote: no restriction on host $h")
if $job->{debug};
return 1;
}
my $num_active = count_active_processes_on_host($h);
if ($num_active >= $max_proc) {
debug('_can_launch_remote(): ',
"host $h too busy. ($num_active >= $max_proc)") if $job->{debug};
} else {
debug('_can_launch_remote(): ',
"host $h not busy. ($num_active < $max_proc) ",
" launch ok") if $job->{debug};
return 1;
}
}
sub max_proc {
my $val;
if (@_ > 0 && ref($_[0]) eq 'Forks::Super::Job'
&& (defined($_[0]->{max_proc}) || defined($_[0]->{max_fork}))) {
$val = $_[0]->{max_proc} || $_[0]->{max_fork};
} else {
$val = $Forks::Super::MAX_PROC;
my $host = shift;
if (ref($host) && ref($host) eq 'Forks::Super::Job') {
$host = shift;
}
if (defined($host)) {
if (defined($Forks::Super::MAX_PROC{$host})) {
$val = $Forks::Super::MAX_PROC{$host};
} elsif (defined($Forks::Super::MAX_PROC{DEFAULT})) {
$val = $Forks::Super::MAX_PROC{DEFAULT};
}
}
}
if (ref($val) eq 'CODE') {
$val = $val->();
}
return $val || 0; # RT124316b
}
sub _max_proc { # used in test suite but not in this distro itself
my $j = shift;
$j->{max_proc} ||= $j->{max_fork};
return defined($j->{max_proc}) ? $j->{max_proc} : max_proc();
}
sub _max_load { # used in test suite but not in this distro itself
my $j = shift;
return defined($j->{max_load}) ? $j->{max_load} : $Forks::Super::MAX_LOAD;
}
#
# default function for determining whether the system
# is too busy to create a new child process or not
#
sub _can_launch {
no warnings qw(once);
my $job = shift;
if ($job->{force}) {
debug('_can_launch(): force attr set. launch ok')
if $job->{debug};
return 1;
}
return 0 if not $job->_can_launch_delayed_start_check;
return 0 if not $job->_can_launch_dependency_check;
if ($job->{remote}) {
if ($job->_can_launch_remote) {
debug('_can_launch_remote(): system not busy. launch ok.')
if $job->{debug};
return 1;
} else {
return 0;
}
}
my $max_proc = $job->max_proc();
my $max_load = defined($job->{max_load})
? $job->{max_load} : $Forks::Super::MAX_LOAD;
if ($max_proc > 0) {
my $num_active = count_active_processes();
if ($num_active >= $max_proc) {
debug('_can_launch(): ',
"active jobs $num_active exceeds limit $max_proc. ",
'launch fail.') if $job->{debug};
$job->{queue_message} =
"active jobs $num_active exceeds limit $max_proc";
return 0;
}
}
if ($max_load > 0) {
my $load = get_cpu_load();
if ($load > $max_load) {
debug('_can_launch(): ',
"cpu load $load exceeds limit $max_load. launch fail.")
if $job->{debug};
$job->{queue_message} =
"cpu load $load exceeds limit $max_load";
return 0;
}
}
debug('_can_launch(): system not busy. launch ok.')
if $job->{debug};
return 1;
}
# Perl system fork() call. Encapsulated here so it can be overridden
# and mocked for testing. See t/17-retries.t
sub _CORE_fork { return CORE::fork }
#
# make a system fork call and configure the job object
# in the parent and the child processes
#
sub launch {
my $job = shift;
if ($job->is_started) {
Carp::confess 'Forks::Super::Job::launch() ',
"called on a job in state $job->{state}!\n";
}
if ($$ != $Forks::Super::MAIN_PID && $Forks::Super::CHILD_FORK_OK > 0) {
$Forks::Super::MAIN_PID = $$;
$Forks::Super::CHILD_FORK_OK--;
}
if ($$ != $Forks::Super::MAIN_PID && $Forks::Super::CHILD_FORK_OK <= 0) {
return _launch_from_child($job);
}
$job->_preconfig_fh;
$job->_preconfig2;
$job->{cwd} = &Cwd::getcwd;
$_->() for @PREFORK;
if ($job->{emulate} ||
($Forks::Super::EMULATION_MODE && !defined $job->{emulate})) {
debug("emulating child process in main process") if $job->{debug};
return $job->_emulate;
}
my $pid = _robust_fork($job);
if (!defined $pid) {
debug('launch(): CORE::fork() returned undefined!')
if $job->{debug};
return;
}
if ($job->{_sync}) {
$job->{_sync}->releaseAfterFork($pid || $$);
}
if (Forks::Super::Util::isValidPid($pid)) {
$_->() for @POSTFORK_PARENT;
# parent
_postlaunch_parent1($pid, $job);
return $job->_postlaunch_parent2;
lib/Forks/Super/Job.pm view on Meta::CPAN
local $! = 0;
my $kill_result = 0;
if (&IS_CYGWIN) {
require Forks::Super::Job::OS::Cygwin;
for my $pid ($j->signal_pids) {
$kill_result += !!Forks::Super::Job::OS::Cygwin::resume($pid);
}
} else {
$kill_result = Forks::Super::kill('CONT', $j);
}
if ($kill_result > 0) {
$j->{state} = 'ACTIVE';
return 1;
}
carp "'CONT' signal not received by job ", $j->toString(), "\n";
return;
}
sub _resume_daemon {
my $j = shift;
my $kill_result = 0;
if (&IS_CYGWIN) {
require Forks::Super::Job::OS::Cygwin;
foreach my $pid ($j->signal_pids) {
$kill_result += !!Forks::Super::Job::OS::Cygwin::resume($pid);
}
} else {
$kill_result = Forks::Super::kill('CONT', $j);
}
if ($kill_result) {
$j->{state} = 'DAEMON';
}
return $kill_result;
}
sub terminate {
my $j = shift;
if (!ref($j) || !$j->isa('Forks::Super::Job')) {
$j = Forks::Super::Job::get($j);
}
if (&IS_CYGWIN) {
require Forks::Super::Job::OS::Cygwin;
for my $pid ($j->signal_pids) {
Forks::Super::Job::OS::Cygwin::terminate($pid);
}
} else {
Forks::Super::kill('KILL', $j);
}
return;
}
#
# do further initialization of a Forks::Super::Job object,
# mainly setting derived fields
#
sub _preconfig {
my $job = shift;
$job->_preconfig_style;
$job->_preconfig_dir;
$job->_preconfig_busy_action;
$job->_preconfig_start_time;
$job->_preconfig_dependencies;
$job->_preconfig_share;
$job->_preconfig_remote;
Forks::Super::Job::Callback::_preconfig_callbacks($job);
Forks::Super::Job::OS::_preconfig_os($job);
return;
}
# some final initialization just before launch
sub _preconfig2 {
my $job = shift;
if (!defined $job->{debug}) {
$job->{debug} = $Forks::Super::Debug::DEBUG;
}
if ($job->{daemon}) {
# we avoid pipes on Windows because they have tiny capacity
# and prone to deadlock. Those concerns are not so big for
# communicating the daemon pid ... do we want to
# try using pipes for this on Windows?
if (!&IS_WIN32) {
my ($p1,$p2);
pipe $p1, $p2;
$p2->autoflush(1);
$job->{daemon_ipc_pipe} = [ $p1, $p2 ];
if ($job->{debug}) {
debug('Job will use pipe to get daemon pid');
}
} else {
$job->{daemon_ipc} =
Forks::Super::Job::Ipc::_choose_fh_filename(
'.daemon', purpose => 'daemon ipc');
if ($job->{debug}) {
debug('Job will use ', $job->{daemon_ipc},
' to get daemon pid.');
}
}
}
if ($job->{style} eq 'cmd'
|| (&IS_WIN32 && $job->{style} eq 'exec')
|| (&IS_WIN32 && ($job->{timeout} || $job->{expiration}))) {
if ($Forks::Super::SIGNAL_IPC_FILE
# we avoid pipes on Windows because they have tiny capacity
# and prone to deadlock, but those are not large concerns
# for a low volume, single purpose communication channel
# like communicating the signal pid
# || &IS_WIN32
|| $job->{daemon}) {
$job->{signal_ipc} =
Forks::Super::Job::Ipc::_choose_fh_filename(
'.signal', purpose => 'signal ipc', job => $job);
if ($job->{debug}) {
debug('Job will use ', $job->{signal_ipc},
' to get signal pid.');
}
} else {
lib/Forks/Super/Job.pm view on Meta::CPAN
$job->{cmd} = [ $job->{cmd} ];
} else {
$job->{_indirect} = 1;
}
$job->{style} = 'cmd';
} elsif (defined $job->{exec}) {
if (ref $job->{exec} ne 'ARRAY') {
$job->{exec} = [ $job->{exec} ];
} else {
$job->{_indirect} = 1;
}
$job->{style} = 'exec';
} elsif (defined $job->{sub}) {
$job->{style} = 'sub';
$job->{sub} = qualify_sub_name $job->{sub};
if (defined $job->{args}) {
if (ref $job->{args} ne 'ARRAY') {
$job->{args} = [ $job->{args} ];
}
} else {
$job->{args} = [];
}
} else {
$job->{style} = 'natural';
}
return;
}
sub _preconfig_style_run { ### for future use
my $job = shift;
if (ref $job->{run} ne 'ARRAY') {
$job->{run} = [ $job->{run} ];
}
return;
# How will we use or emulate the rich functionality
# of IPC::Run?
#
# inputs are a "harness specification"
# build a harness
# on "launch", call $harness->start
# when the job is reaped, call $harness->finish
# one feature of IPC::Run harnesses is that they
# may be reused!
}
sub _preconfig_dir {
my $job = shift;
if (defined $job->{chdir}) {
$job->{dir} ||= $job->{chdir};
}
if (defined $job->{dir}) {
$job->{dir} = Forks::Super::Util::abs_path($job->{dir});
}
return;
}
sub _preconfig_busy_action {
my $job = shift;
######################
# what will we do if the job cannot launch?
#
if (defined $job->{on_busy}) {
$job->{_on_busy} = $job->{on_busy};
} else {
no warnings 'once';
$job->{_on_busy} = $Forks::Super::ON_BUSY || 'block';
# may be overridden to 'queue' if depend_on or
# depend_start is set. See _preconfig_dependencies
}
$job->{_on_busy} = uc $job->{_on_busy};
########################
# make a queue priority available if needed
#
if (not defined $job->{queue_priority}) {
$job->{queue_priority} = Forks::Super::Deferred::get_default_priority();
}
return;
}
sub _preconfig_start_time {
my $job = shift;
###########################
# configure a future start time
my $start_after = 0;
if (defined $job->{delay}) {
$start_after
= Time::HiRes::time()
+ Forks::Super::Job::Timeout::_time_from_natural_language(
$job->{delay}, 1);
}
if (defined $job->{start_after}) {
my $start_after2 =
Forks::Super::Job::Timeout::_time_from_natural_language(
$job->{start_after}, 0);
if ($start_after < $start_after2) {
$start_after = $start_after2
}
}
if ($start_after) {
$job->{start_after} = $start_after;
delete $job->{delay};
debug('_can_launch(): start delay until '
. localtime($job->{start_after}) . ' requested.')
if $job->{debug};
}
return;
}
sub _preconfig_dependencies {
my $job = shift;
##########################
# assert dependencies are expressed as array refs
# expand job names to pids
#
if (defined $job->{depend_on}) {
if (ref $job->{depend_on} ne 'ARRAY') {
$job->{depend_on} = [ $job->{depend_on} ];
}
$job->{depend_on} = _resolve_names($job, $job->{depend_on});
$job->{_on_busy} = 'QUEUE' unless $job->{on_busy};
}
if (defined $job->{depend_start}) {
if (ref $job->{depend_start} ne 'ARRAY') {
$job->{depend_start} = [ $job->{depend_start} ];
}
$job->{depend_start} = _resolve_names($job, $job->{depend_start});
$job->{_on_busy} = 'QUEUE' unless $job->{on_busy};
}
return;
}
sub _preconfig_remote {
my $job = shift;
return if !defined $job->{remote};
my $remote = $job->{remote};
if (!$job->{cmd}) {
if ($job->{exec}) {
$job->{cmd} = $job->{exec};
} else {
carp "fork: remote => ... specified without cmd => ... ! ",
"remote spec will be ignored";
$job->{remote_disabled} = delete $job->{remote};
return;
}
}
if ('ARRAY' ne ref $job->{remote}) {
# prior to launch, ensure that $job->{remote} is an array reference
# of allowable remote specs.
#
# after launch, $job->{remote} will point to the actual remote spec
# that was used to launch the job (see _can_launch_remote)
$job->{remote} = [ $job->{remote} ];
}
$job->{remote} = [
map {
if (!ref($_)) {
{ host => $_ };
} elsif (!defined($_->{host})) {
()
} elsif (defined($_->{proto}) && $_->{proto} ne 'ssh') {
carp "Forks::Super: Only 'ssh' protocol supported in remote";
()
} elsif (ref($_->{host}) eq 'ARRAY') {
# not documented, but remote => { host=>[host1,host2],user=>... }
# is supported
my $spec = $_;
map {
my $spec2 = { %$spec };
$spec2->{host} = $_;
$spec2;
} @{$_->{host}};
} else {
$_
}
} @{$job->{remote}}
];
if (@{$job->{remote}} == 0) {
$job->{__error} = "No valid remote specs in remote param $remote";
return;
}
foreach my $spec (@{$job->{remote}}) {
my $host = $spec->{host};
my ($up,$hp) = split /(?<!\\)@/, $host, 2;
if ($up && $hp) {
my ($u,$p) = split /(?<!\\):/, $up, 2;
if ($u && $p) {
lib/Forks/Super/Job.pm view on Meta::CPAN
For jobs that have started in a child process and are,
to the knowledge of the parent process, still running.
=item C<COMPLETE>
For jobs that have completed and caused the parent process to
receive a C<SIGCHLD> signal, but have not been reaped.
The difference between a C<COMPLETE> job and a C<REAPED> job
is whether the job's process identifier has been returned in
a call to C<Forks::Super::wait> or C<Forks::Super::waitpid>
(or implicitly returned in a call to C<Forks::Super::waitall>).
When the process gets reaped, the global variable C<$?>
(see L<perlvar/"$CHILD_ERROR">) will contain the exit status
of the process, until the next time a process is reaped.
=item C<REAPED>
For jobs that have been reaped by a call to C<Forks::Super::wait>,
C<Forks::Super::waitpid>, or C<Forks::Super::waitall>.
=item C<SUSPENDED>
The job has started but it has been suspended (with a C<SIGSTOP>
or other appropriate mechanism for your operating system) and
is not currently running. A suspended job will not consume CPU
resources but my tie up memory resources.
=item C<SUSPENDED-DEFERRED>
Job is in the job queue and has not started yet, and also
the job has been suspended. A job in the C<SUSPENDED-DEFERRED>
state can only move out of this state to the C<SUSPENDED> state
(with a C<SIGCONT> or a L<"resume"|resume> call).
=back
=item status
The exit status of a job. See L<CHILD_ERROR|perlvar/"CHILD_ERROR"> in
C<perlvar>. Will be undefined until the job is complete.
=item style
One of the strings C<natural>, C<cmd>, or C<sub>, indicating
whether the initial C<fork> call returned from the child process or whether
the child process was going to run a shell command or invoke a Perl
subroutine and then exit.
=item cmd
The shell command to run that was supplied in the C<fork> call.
=item sub
=item args
The name of or reference to CODE to run and the subroutine
arguments that were supplied in the C<fork> call.
=item _on_busy
The behavior of this job in the event that the system was
too "busy" to enable the job to launch. Will have one of
the string values C<block>, C<fail>, or C<queue>.
=item queue_priority
If this job was deferred, the relative priority of this
job.
=item can_launch
By default undefined, but could be a CODE reference
supplied in the C<fork()> call. If defined, it is the
code that runs when a job is ready to start to determine
whether the system is too busy or not.
=item depend_on
If defined, contains a list of process IDs and job IDs that
must B<complete> before this job will be allowed to start.
=item depend_start
If defined, contains a list of process IDs and job IDs that
must B<start> before this job will be allowed to start.
=item start_after
Indicates the earliest time (since the epoch) at
which this job may start.
=item expiration
Indicates the latest time that this job may be allowed to
run. Jobs that run past their expiration parameter will
be killed.
=item os_priority
Value supplied to the C<fork> call about desired
operating system priority for the job.
=item cpu_affinity
Value supplied to the C<fork> call about desired
CPU's for this process to prefer.
=item child_stdin
=item child_stdout
=item child_stderr
If the job has been configured for interprocess communication,
these attributes correspond to the handles for passing
standard input to the child process, and reading standard
output and standard error from the child process, respectively.
Note that the standard read/write operations on these filehandles
can also be accomplished through the C<write_stdin>, C<read_stdout>,
and C<read_stderr> methods of this class. Since these methods
can adjust their behavior based on the type of IPC channel
(file, socket, or pipe) or other idiosyncracies of your operating
system (#@$%^&*! Windows), B<using these methods is preferred
to using the filehandles directly>.
The package level variables
C<< L<$Forks::Super::CHILD_STDIN{$job}, $Forks::Super::CHILD_STDOUT{$job},
$Forks::Super::CHILD_STDERR{$job}|Forks::Super/"%25CHILD_STDxxx"> >>
are equivalent to these instance variables.
=back
=cut
( run in 0.699 second using v1.01-cache-2.11-cpan-39bf76dae61 )