view release on metacpan or search on metacpan
Lose support of JSON as a serialization protocol for 'bg_eval'
RT#78285 - support $ON_TOO_MANY_OPEN_FILEHANDLES module variable
More compensation for Cygwin's buggy flock
New Forks::Super::Job::count_queued_processes function
0.73 2014-06-18
support corrective measures for too many open file handles
0.72 2013-12-09
Support fork \&code and fork \@cmd syntax.
Use 'queue' as default on_busy behavior for jobs with dependencies
0.71 2013-10-10
test fixes that should help single CPU systems pass timing tests
0.70 2013-09-18
Experimental emulation of process groups in MSWin32 in
waitpid/kill functions.
Improved procedures to terminate a process tree in MSWin32.
0.69 2013-08-19
README
README.windows
spike-pma.pl
system-limits.PL
SysInfo.pm.PL
t/00-use.t
t/01a-import.t
t/01b-import.t
t/01c-import.t
t/02a-tie-enum.t
t/02b-tie-on_busy.t
t/03a-queue-ties.t
t/03b-queue-func.t
t/04-ipc-dir.t
t/05-util.t
t/06-job.t
t/07-sync.t
t/09-show-config.t
t/10-to-natural.t
t/11-to-command.t
t/12-to-sub.t
t/44a-pipes.t
t/44b-pipes.t
t/44c-pipes.t
t/44d-pipes.t
t/44f-pipes.t
t/44g-pipes.t
t/44h-pipes.t
t/44j-pipes.t
t/44l-pipes.t
t/45-no-filehandles.t
t/46a-busy.t
t/46b-userbusy.t
t/46c-busy.t
t/46d-userbusy.t
t/47-share.t
t/48a-suspend.t
t/48b-suspend-callback.t
t/49a-remote.t
t/49b-remote-timeout.t
t/49c-remote-iohandles.t
t/49d-remote-lazy.t
t/49e-remote-fails.t
t/49g-remote.t
t/50-queue.t
can specify a deadline (in relative or absolute
time) for a background process to complete,
with the background process being killed if it does
not complete by its deadline.
* throttling
Limiting the number of simultaneous processes, or
block new processes from starting when the system's
CPU load is too high. Callers can install their own
functions to determine when the system is too busy
to launch another job. More advanced features allow
you to suspend and resume background tasks according
to your own criteria.
* dependencies
Jobs can be specified to wait until other jobs
have started and/or completed before they can begin.
* deferred jobs
The Sys::CpuAffinity module is for manipulating process
CPU affinities. The Forks::Super module can make use of
this module to control the CPU affinities of background
processes. This is a released module that can also be
retrieved and installed from CPAN.
The Sys::CpuLoadX module is for determining the current
CPU load of your system. The Forks::Super module can make
use of this module to decide whether the system is too
busy to launch additional background tasks. As of Forks::Super
version 0.30, this is an unreleased module and it is only
available bundled with Forks::Super.
Installation of these additional modules is optional. If the
modules are not available, then certain features of the
Forks::Super module may not work.
WINDOWS
___ with queueing to perform other tasks
___ example: web crawler
_x_ example: multi-threaded du
_x_ timeout long running jobs
_x_ manipulate CPU affinities
___ dependencies
___ interactive client/server example of IPC
___ run a server using Forks::Super
_X_ see t/forked_harness.pl
_X_ load management
_X_ block while system is busy
_X_ suspend/resume
_X_ suspend/resume callback
___ bg_eval, bg_qx examples
_X_ factorial.pl for bg_eval
___ can_launch examples
___ how to: use sleep with Forks::Super
___ how to: use alarm with Forks::Super
___ changing IPC_DIR
___ tuning Forks::Super for fast jobs, slow jobs,
memory intensive jobs, cpu intensive jobs,
sub/natural-style)
___ encryption and other layers on IO channels
___ parent_dump enhancements:
_X_ get and display stack trace of natural/sub-style children
___ measure input and output for IPCxxxHandle classes
___ CPAN testers find lots of timing errors in openbsd. Is pause(n)
on openbsd prone to returning significantly more or less than n
seconds later? Would a busy wait just for openbsd make things
better or worse?
_o_ async method like threads, forks, Coro?
bg_eval is like the async method
___ $job->{child_stdin}, {child_stdout}, {child_xxx} should be
restructured as $job->{child_fh}{stdin}, {stdout}, etc.
i.e., a dedicated member that holds a collection of iohandles
___ what happens if you mix Forks::Super and forks?
bundle/Sys-CpuLoadX/bundle.pl view on Meta::CPAN
}
my $version = MM->parse_version('lib/Sys/CpuLoadX.pm');
my $TargetModulePitch = qq[
The Sys::CpuLoadX module provides a (cross-fingers) portable way
to access your system's current CPU load. The Forks::Super module
can use this information to decide whether your system is too
busy to launch more background processes. Without Sys::CpuLoadX,
Forks::Super will not make use of CPU load information.
Installation of this module is entirely optional. The Module::Build
module is required to install this module. The installation of
Forks::Super will proceed even if the installation of Sys::CpuLoadX
is unsuccessful.
];
my $TargetModulePrompt
= "Do you want to attempt to install Sys::CpuLoadX v$version?";
examples/Benchmark.pm view on Meta::CPAN
} else {
# Wait for the user timer to tick. This makes the error range more like
# -0.01, +0. If we don't wait, then it's more like -0.01, +0.01. This
# may not seem important, but it significantly reduces the chances of
# getting a too low initial $n in the initial, 'find the minimum' loop
# in &countit. This, in turn, can reduce the number of calls to
# &runloop a lot, and thus reduce additive errors.
#
# Note that its possible for the act of reading the system clock to
# burn lots of system CPU while we burn very little user clock in the
# busy loop, which can cause the loop to run for a very long wall time.
# So gradually ramp up the duration of the loop. See RT #122003
#
my $tbase = Benchmark->new(0)->[1];
my $limit = 1;
while ( ( $t0 = Benchmark->new(0) )->[1] == $tbase ) {
for (my $i=0; $i < $limit; $i++) { my $x = $i / 1.5 } # burn user CPU
$limit *= 1.1;
}
$subref->($n);
$t1 = Benchmark->new($n);
examples/forked_harness.pl view on Meta::CPAN
# has a dark background?
my %colors = (ITERATION => 'bold white',
GOOD_STATUS => 'bold green',
BAD_STATUS => 'bold red',
'STDERR' => 'yellow bold',
DEBUG => 'cyan bold',
NORMAL => '');
if ($debug) {
color_print('DEBUG', "MAX_PROC is $Forks::Super::MAX_PROC, ",
"on busy is $Forks::Super::ON_BUSY\n");
}
#####################################################3
#
# determine the set of test scripts to run
#
my $glob_required = 0;
if (@ARGV == 0) {
# read ${TEST_FILES} from %ENV
lib/Forks/Super.pm view on Meta::CPAN
# OK for child process to call Forks::Super::fork()? Could be a bad idea
$CHILD_FORK_OK = 0;
# Disable cleanup of IPC files? Sometimes helpful for debugging.
$DONT_CLEANUP = $ENV{FORKS_DONT_CLEANUP} || 0;
# choose of $Forks::Super::Util::DEFAULT_PAUSE is a tradeoff between
# accuracy/responsiveness and performance.
#
# Low values will make pause/waitpid calls very busy, consuming cpu cycles
#
# High values increase the average delay between the time one job
# finishes and the next waiting job starts.
$Forks::Super::Util::DEFAULT_PAUSE = 0.10; # seconds
$Forks::Super::Util::DEFAULT_PAUSE_IO = 0.05;
*handle_CHLD = *Forks::Super::Sigchld::handle_CHLD;
Forks::Super::Util::set_productive_pause_code {
lib/Forks/Super.pm view on Meta::CPAN
return undef;
}
if ($job->{debug}) {
debug('fork(): ', $job->toString(), ' initialized.');
}
# handle_CHLD(-1); # <-- benefits MSWin32? or causes memory wrap panic?
while (!$job->can_launch) {
if ($job->{debug}) {
debug("fork(): job can not launch. Behavior=$job->{_on_busy}");
}
if ($job->{_on_busy} eq 'FAIL') {
$job->run_callback('fail');
$job->{end} = Time::HiRes::time();
$job->{status} = -1;
$job->_mark_reaped;
# -1: failure: system is too busy to create a new job
# in $Forks::Super::ON_BUSY documentation
# XXX - document this better?
return -1;
} elsif ($job->{_on_busy} eq 'QUEUE') {
$job->run_callback('queue');
$job->queue_job;
if ($Forks::Super::Job::OVERLOAD_ENABLED) {
return $job;
} else {
return $job->{pid};
}
} else { # BLOCK
pause();
}
lib/Forks/Super.pm view on Meta::CPAN
# and %hash has values from 2nd job
# ---------- manage jobs and system resources ---------------
# --- run 100 tasks but fork blocks while there are already 5 active jobs
$Forks::Super::MAX_PROC = 5;
$Forks::Super::ON_BUSY = 'block';
for ($i=0; $i<100; $i++) {
$pid = fork { cmd => $task[$i] };
}
# --- jobs fail (without blocking) if the system is too busy
$Forks::Super::MAX_LOAD = 2.0;
$Forks::Super::ON_BUSY = 'fail';
$pid = fork { cmd => $task };
if ($pid > 0) { print "'$task' is running\n" }
elsif ($pid < 0) { print "current CPU load > 2.0: didn't start '$task'\n" }
# $Forks::Super::MAX_PROC setting can be overridden.
# Start job immediately if < 3 jobs running
$pid = fork { sub => 'MyModule::MyMethod', args => [ @b ], max_proc => 3 };
# --- try to fork no matter how busy the system is
$pid = fork { sub => \&MyMethod, force => 1 }
# when system is busy, queue jobs. When system becomes less busy,
# some jobs on the queue will start.
# if job is queued, return value from fork() is a very negative number
$Forks::Super::ON_BUSY = 'queue';
$pid = fork { cmd => $command };
$pid = fork { cmd => $useless_command, queue_priority => -5 };
$pid = fork { cmd => $important_command, queue_priority => 5 };
$pid = fork { cmd => $future_job, delay => 20 }; # queue job for at least 20s
# --- assign descriptive names to tasks
$pid1 = fork { cmd => $command, name => "my task" };
lib/Forks/Super.pm view on Meta::CPAN
=item C<< fork { max_proc => $max_simultaneous_jobs } >>
=item C<< fork { max_proc => \&subroutine } >>
Specifies the maximum number of background processes that should run
simultaneously. If a C<fork> call is attempted while there are already
the maximum number of child processes running, then the C<fork()>
call will either block (until some child processes complete),
fail (return a negative value without spawning the child process),
or queue the job (returning a very negative value called a job ID),
according to the specified "on_busy" behavior (see L<"on_busy">, below).
See the L<"Deferred processes"> section for information about
how queued jobs are handled.
On any individual C<fork> call, the maximum number of processes may be
overridden by also specifying C<max_proc> or L<"force"> options.
$Forks::Super::MAX_PROC = 8;
# launch 2nd job only when system is very not busy
# always launch 3rd job no matter how busy we are
$pid1 = fork { sub => 'method1' };
$pid2 = fork { sub => 'method2', max_proc => 1 };
$pid3 = fork { sub => 'method3', force => 1 };
Setting C<max_proc> parameter
to zero or a negative number will disable the check for too many
simultaneous processes. Also see the L<"force"> option, below.
C<max_fork> is a synonym for C<max_proc>.
lib/Forks/Super.pm view on Meta::CPAN
L<Sys::CpuLoadX|Sys::CpuLoadX> module to use this feature.
The C<Sys::CpuLoadX> module is only available bundled with
C<Forks::Super> and otherwise cannot be downloaded from CPAN.
Also see L<$Forks::Super::MAX_LOAD in MODULE VARIABLES|"MAX_LOAD">,
which will specifies the maximum CPU load for launching a job when
the C<max_load> parameter is not provided to C<fork>.
=back
=head3 on_busy
=over 4
=item C<< fork { on_busy => "block" | "fail" | "queue" } >>
Dictates the behavior of C<fork> in the event that the module is not allowed
to launch the specified job for whatever reason. If you are using
C<Forks::Super> to throttle (see
L<max_proc, $Forks::Super::MAX_PROC|"max_proc">)
or impose dependencies on (see L<depend_start|"depend_start">,
L<depend_on|"depend_on">) background processes, then failure to launch a job
should be expected.
=over 4
lib/Forks/Super.pm view on Meta::CPAN
If the module cannot create a new child process for the specified job,
the job will be deferred, and an attempt will be made to launch the
job at a later time. See L<"Deferred processes">
below. The return
value will be a very negative number (job ID).
=back
Note that jobs that use any of the L<"delay">, L<"start_after">, L<"depend_on">,
or L<"depend_start"> options ignore this setting and always put the job
on the deferred job queue (unless a different C<on_busy> attribute is
explicitly provided).
Also see L<$Forks::Super::ON_BUSY in MODULE VARIABLES|"ON_BUSY">,
which specifies the busy behavior when an C<on_busy> parameter
is not supplied to the C<fork> call.
=back
=head3 force
=over 4
=item C<< fork { force => $bool } >>
lib/Forks/Super.pm view on Meta::CPAN
# Job 3 will not start until BOTH job 1 and job 2 are done
$job1 = fork { name => "Sally", ... };
$job2 = fork { name => "Sally", ... };
$job3 = fork { depend_on => "Sally", ... };
# all of these jobs have the same name and depend on ALL previous jobs
$job4 = fork {name=>"Ralph", depend_start=>"Ralph", ...}; # no dependencies
$job5 = fork {name=>"Ralph", depend_start=>"Ralph", ...}; # depends on Job 4
$job6 = fork {name=>"Ralph", depend_start=>"Ralph", ...}; # depends on 4 and 5
The default "on_busy" behavior for jobs with dependencies is to go on to
the job queue, ignoring the value of L<$Forks::Super::ON_BUSY/"ON_BUSY">
(but not ignoring the L<< C<on_busy>|"on_busy" >> attribute passed to the
job, if any).
=back
=head3 can_launch
=over 4
=item C<< fork { can_launch => \&methodName } >>
lib/Forks/Super.pm view on Meta::CPAN
of the job object:
# Running on a BSD system with the uptime(1) call.
# Want to block jobs when the current CPU load
# (1 minute) is greater than 4 and respect all other criteria:
fork { cmd => $my_command,
can_launch => sub {
$job = shift; # a Forks::Super::Job object
return 0 if !$job->_can_launch; # default
$cpu_load = (split /\s+/,`uptime`)[-3]; # get 1 minute avg CPU load
return 0 if $cpu_load > 4.0; # system too busy. let's wait
return 1;
} }
=back
=head3 callback
=over 4
=item C<< fork { callback => $subroutineName } >>
lib/Forks/Super.pm view on Meta::CPAN
or they may be set explicitly at run-time:
$Forks::Super::ON_BUSY = 'queue';
$Forks::Super::IPC_DIR = "/home/joe/temp-ipc-files";
Many module variables govern global settings that affect all C<fork> calls.
But many can be overridden by a parameter setting in any
specific C<fork> call.
$Forks::Super::ON_BUSY = 'queue';
$j1 = fork { sub => ... }; # put on queue if busy
$j2 = fork { sub => ..., on_busy = 'block' }; # block if busy
Module variables that may be of interest include:
=head3 MAX_PROC
=over 4
=item C<< $Forks::Super::MAX_PROC = int >>
The maximum number of simultaneous background processes that can
be spawned by C<Forks::Super>. If a C<fork> call is attempted while
there are already at least this many active background processes,
the behavior of the C<fork> call will be determined by the
value in L<$Forks::Super::ON_BUSY|/"ON_BUSY"> or by the
L<"on_busy"> option passed to the C<fork> call.
The L<"force"> option passed to a C<fork> call overrides this
setting. The value might also
not be respected if the user supplies a code reference in the
L<"can_launch"> option and the user-supplied code does not test
whether there are already too many active proceeses.
Since v0.77, the package variable C<$Forks::Super::MAX_PROC> or the
C<max_proc> parameter to C<fork> may be assigned a code reference.
When the module needs to know the maximum
lib/Forks/Super.pm view on Meta::CPAN
=back
=head3 ON_BUSY
=over 4
=item C<$Forks::Super::ON_BUSY = 'block' | 'fail' | 'queue'>
Determines behavior of a C<fork> call when the system is too
busy to create another background process.
If this value is set to C<block>,
then C<fork> will wait until the system is no
longer too busy and then launch the background process.
The return value will be a normal process ID value (assuming
there was no system error in creating a new process).
If the value is set to C<fail>, the C<fork> call will return
immediately without launching the background process. The return
value will be C<-1>. A C<Forks::Super::Job> object will not be
created.
If the value is set to C<queue>, then the C<fork> call
will create a "deferred" job that will be queued and run at
a later time. Also see the L<"queue_priority"> option to C<fork>
to set the urgency level of a job in case it is deferred.
The return value will be a large and negative
job ID.
This value will be ignored in favor of an L<"on_busy"> option
supplied to the C<fork> call.
=back
=head3 CHILD_FORK_OK
=over 4
=item C<$Forks::Super::CHILD_FORK_OK = -1 | 0 | +1>
lib/Forks/Super/Debug.pm view on Meta::CPAN
open my $TTY, '>>', &IS_WIN32 ? 'CON' : '/dev/tty';
print $TTY scalar localtime(time), "\n";
print $TTY "Full Forks::Super v$Forks::Super::VERSION ",
"job dump process $$\n";
# if $MAX_PROC is a coderef, this output will be like "CODE(0x0123ABCD)"
print $TTY "Default maximum background procs: $Forks::Super::MAX_PROC\n";
print $TTY "Default maximum CPU load: $Forks::Super::MAX_LOAD\n";
print $TTY "Child fork ok: ",
"$Forks::Super::CHILD_FORK_OK\n";
print $TTY "Default busy system busy behavior: $Forks::Super::ON_BUSY\n";
if (defined($Forks::Super::IPC_DIR) && $Forks::Super::IPC_DIR ne '') {
print $TTY "Default IPC directory: $Forks::Super::IPC_DIR\n";
}
print $TTY "\n";
# parent process
print $TTY "PARENT PROCESS\n--------------\n";
print $TTY &Carp::longmess, "\n\n";
lib/Forks/Super/Job.pm view on Meta::CPAN
sub _can_launch_delayed_start_check {
my $job = shift;
return 1 if !defined($job->{start_after}) ||
Time::HiRes::time() >= $job->{start_after};
if ($job->{debug}) {
debug('_can_launch(): start delay requested. launch fail');
}
# delay option should normally be associated with queue on busy behavior.
# any reason not to make this the default ?
# delay + fail is pretty dumb
# delay + block is like sleep + fork
if (! defined $job->{on_busy}) {
$job->{_on_busy} = 'QUEUE';
}
$job->{queue_message} = "haven't yet reached job start_after time "
. localtime($job->{start_after});
return 0;
}
sub _can_launch_dependency_check {
my $job = shift;
my @dep_on = defined($job->{depend_on}) ? @{$job->{depend_on}} : ();
my @dep_start = defined($job->{depend_start})
lib/Forks/Super/Job.pm view on Meta::CPAN
foreach my $spec (@specs) {
my $host = $spec->{host};
next unless $host;
if ($job->_can_launch_remote_check_host($host)) {
$job->{remote} = $spec;
return 1;
}
}
if ($job->{debug}) {
my @hosts = map { $_->{host} } @specs;
debug("_can_launch_remote: host(s) @hosts are too busy");
}
return 0;
}
sub _can_launch_remote_check_host {
my ($job, $h) = @_;
my $max_proc = $job->max_proc($h);
if ($max_proc < 1) {
debug("_can_launch_remote: no restriction on host $h")
if $job->{debug};
return 1;
}
my $num_active = count_active_processes_on_host($h);
if ($num_active >= $max_proc) {
debug('_can_launch_remote(): ',
"host $h too busy. ($num_active >= $max_proc)") if $job->{debug};
} else {
debug('_can_launch_remote(): ',
"host $h not busy. ($num_active < $max_proc) ",
" launch ok") if $job->{debug};
return 1;
}
}
sub max_proc {
my $val;
if (@_ > 0 && ref($_[0]) eq 'Forks::Super::Job'
&& (defined($_[0]->{max_proc}) || defined($_[0]->{max_fork}))) {
$val = $_[0]->{max_proc} || $_[0]->{max_fork};
lib/Forks/Super/Job.pm view on Meta::CPAN
return defined($j->{max_proc}) ? $j->{max_proc} : max_proc();
}
sub _max_load { # used in test suite but not in this distro itself
my $j = shift;
return defined($j->{max_load}) ? $j->{max_load} : $Forks::Super::MAX_LOAD;
}
#
# default function for determining whether the system
# is too busy to create a new child process or not
#
sub _can_launch {
no warnings qw(once);
my $job = shift;
if ($job->{force}) {
debug('_can_launch(): force attr set. launch ok')
if $job->{debug};
return 1;
}
return 0 if not $job->_can_launch_delayed_start_check;
return 0 if not $job->_can_launch_dependency_check;
if ($job->{remote}) {
if ($job->_can_launch_remote) {
debug('_can_launch_remote(): system not busy. launch ok.')
if $job->{debug};
return 1;
} else {
return 0;
}
}
my $max_proc = $job->max_proc();
my $max_load = defined($job->{max_load})
? $job->{max_load} : $Forks::Super::MAX_LOAD;
lib/Forks/Super/Job.pm view on Meta::CPAN
my $load = get_cpu_load();
if ($load > $max_load) {
debug('_can_launch(): ',
"cpu load $load exceeds limit $max_load. launch fail.")
if $job->{debug};
$job->{queue_message} =
"cpu load $load exceeds limit $max_load";
return 0;
}
}
debug('_can_launch(): system not busy. launch ok.')
if $job->{debug};
return 1;
}
# Perl system fork() call. Encapsulated here so it can be overridden
# and mocked for testing. See t/17-retries.t
sub _CORE_fork { return CORE::fork }
#
# make a system fork call and configure the job object
lib/Forks/Super/Job.pm view on Meta::CPAN
}
#
# do further initialization of a Forks::Super::Job object,
# mainly setting derived fields
#
sub _preconfig {
my $job = shift;
$job->_preconfig_style;
$job->_preconfig_dir;
$job->_preconfig_busy_action;
$job->_preconfig_start_time;
$job->_preconfig_dependencies;
$job->_preconfig_share;
$job->_preconfig_remote;
Forks::Super::Job::Callback::_preconfig_callbacks($job);
Forks::Super::Job::OS::_preconfig_os($job);
return;
}
# some final initialization just before launch
lib/Forks/Super/Job.pm view on Meta::CPAN
my $job = shift;
if (defined $job->{chdir}) {
$job->{dir} ||= $job->{chdir};
}
if (defined $job->{dir}) {
$job->{dir} = Forks::Super::Util::abs_path($job->{dir});
}
return;
}
sub _preconfig_busy_action {
my $job = shift;
######################
# what will we do if the job cannot launch?
#
if (defined $job->{on_busy}) {
$job->{_on_busy} = $job->{on_busy};
} else {
no warnings 'once';
$job->{_on_busy} = $Forks::Super::ON_BUSY || 'block';
# may be overridden to 'queue' if depend_on or
# depend_start is set. See _preconfig_dependencies
}
$job->{_on_busy} = uc $job->{_on_busy};
########################
# make a queue priority available if needed
#
if (not defined $job->{queue_priority}) {
$job->{queue_priority} = Forks::Super::Deferred::get_default_priority();
}
return;
}
lib/Forks/Super/Job.pm view on Meta::CPAN
##########################
# assert dependencies are expressed as array refs
# expand job names to pids
#
if (defined $job->{depend_on}) {
if (ref $job->{depend_on} ne 'ARRAY') {
$job->{depend_on} = [ $job->{depend_on} ];
}
$job->{depend_on} = _resolve_names($job, $job->{depend_on});
$job->{_on_busy} = 'QUEUE' unless $job->{on_busy};
}
if (defined $job->{depend_start}) {
if (ref $job->{depend_start} ne 'ARRAY') {
$job->{depend_start} = [ $job->{depend_start} ];
}
$job->{depend_start} = _resolve_names($job, $job->{depend_start});
$job->{_on_busy} = 'QUEUE' unless $job->{on_busy};
}
return;
}
sub _preconfig_remote {
my $job = shift;
return if !defined $job->{remote};
my $remote = $job->{remote};
if (!$job->{cmd}) {
if ($job->{exec}) {
lib/Forks/Super/Job.pm view on Meta::CPAN
The shell command to run that was supplied in the C<fork> call.
=item sub
=item args
The name of or reference to CODE to run and the subroutine
arguments that were supplied in the C<fork> call.
=item _on_busy
The behavior of this job in the event that the system was
too "busy" to enable the job to launch. Will have one of
the string values C<block>, C<fail>, or C<queue>.
=item queue_priority
If this job was deferred, the relative priority of this
job.
=item can_launch
By default undefined, but could be a CODE reference
supplied in the C<fork()> call. If defined, it is the
code that runs when a job is ready to start to determine
whether the system is too busy or not.
=item depend_on
If defined, contains a list of process IDs and job IDs that
must B<complete> before this job will be allowed to start.
=item depend_start
If defined, contains a list of process IDs and job IDs that
must B<start> before this job will be allowed to start.
lib/Forks/Super/Sync/Semaphlock.pm view on Meta::CPAN
sub acquire {
my ($self, $n, $timeout) = @_;
return if $n<0 || $n>=$self->{count};
my $file = $self->{files}[$n];
if (defined $self->{acquired}[$n]) {
return -1;
}
my $fh;
# on Cygwin, using fcntl to emulate flock, this open can
# (intermittently) fail with $! := "Device or resource busy"
for my $try (1..5) {
last if open $fh, '>>', $file;
if ($try == 5) {
carp "failed to acquire file resource $file after 5 tries: $!";
return;
}
Time::HiRes::sleep(0.25 * $try);
}
if (defined $timeout) {
t/46a-busy.t view on Meta::CPAN
use Forks::Super ':test';
use Test::More tests => 13;
use strict;
use warnings;
#
# test that jobs don't launch when the system is
# "too busy" (which so far means that there are
# already too many active subprocesses). Jobs that
# are too busy to start can either block or fail.
#
#######################################################
sub sleepy { return sleep 3 }
my $sleepy = \&sleepy;
$Forks::Super::MAX_PROC = 3;
$Forks::Super::ON_BUSY = "block";
t/46a-busy.t view on Meta::CPAN
ok(!isValidPid($pid8), 'third job fails');
ok(isValidPid($pid9), 'fourth job ok with force => 1');
waitall;
#######################################################
$Forks::Super::MAX_PROC = 3;
$Forks::Super::ON_BUSY = "fail";
my $pid = fork { sub =>
sub { # a subroutine that will make the processor busy for a while
my $z=0;
my $timeout = time + ($^O eq 'MSWin32' ? 15 : 45);
while (time < $timeout) {
$z += rand()-rand()
}
} };
$Forks::Super::MAX_LOAD = 0.001;
sleep 1;
SKIP: {
t/46b-userbusy.t view on Meta::CPAN
use Forks::Super ':test';
use Test::More tests => 9;
use strict;
use warnings;
#
# user can supply their own subroutine to decide
# whether the system is too busy to fork a new
# process.
#
sub do_launch { 1; }
sub dont_launch { 0; }
my $launch_after_nap = sub { sleep 10; return 1 };
my $sleepy = sub { return sleep 30 };
sub dont_launch_external {
# block jobs that invoke external commands
t/46c-busy.t view on Meta::CPAN
use Forks::Super ':test';
use Test::More tests => 13;
use strict;
use warnings;
#
# test that jobs don't launch when the system is
# "too busy" (which so far means that there are
# already too many active subprocesses). Jobs that
# are too busy to start can either block or fail.
#
#######################################################
my $maxJobs = 3;
sub sleepy { return sleep 3 }
my $sleepy = \&sleepy;
$Forks::Super::MAX_PROC = sub { return $maxJobs; };
$Forks::Super::ON_BUSY = "block";
t/46c-busy.t view on Meta::CPAN
ok(!isValidPid($pid8), 'third job fails');
ok(isValidPid($pid9), 'fourth job ok with force => 1');
waitall;
#######################################################
$Forks::Super::MAX_PROC = sub { $maxJobs };
$Forks::Super::ON_BUSY = "fail";
my $pid = fork { sub =>
sub { # a subroutine that will make the processor busy for a while
my $z=0;
my $timeout = time + ($^O eq 'MSWin32' ? 15 : 45);
while (time < $timeout) {
$z += rand()-rand()
}
} };
$Forks::Super::MAX_LOAD = 0.001;
sleep 1;
SKIP: {
t/46d-userbusy.t view on Meta::CPAN
use Forks::Super ':test';
use Test::More tests => 9;
use strict;
use warnings;
#
# user can supply their own subroutine to decide
# whether the system is too busy to fork a new
# process.
#
sub do_launch { 1; }
sub dont_launch { 0; }
my $launch_after_nap = sub { sleep 10; return 1 };
my $sleepy = sub { return sleep 30 };
sub dont_launch_external {
# block jobs that invoke external commands
t/50-queue.t view on Meta::CPAN
use Forks::Super ':test';
use Test::More tests => 15;
use strict;
use warnings;
#
# if configured the right way, jobs should go to a
# job queue for deferred launch when the system is
# too busy.
#
$Forks::Super::MAX_PROC = 2;
$Forks::Super::ON_BUSY = "queue";
ok(@Forks::Super::Deferred::QUEUE == 0, "$$\\initial queue is empty");
my $pid1 = fork sub { sleep 5 };
my $pid2 = fork sub { sleep 5 };
ok(isValidPid($pid1) && isValidPid($pid2), "two successful fork calls");
my $pid3 = fork sub { sleep 5 };
t/51b-delayed.t view on Meta::CPAN
# test that "delay" and "start_after" options are
# respected by the fork() call. Delayed jobs should
# go directly to the job queue.
#
$Forks::Super::ON_BUSY = "block";
my $now = Time::HiRes::time();
my $t = Time::HiRes::time();
my $p1 = fork { sub => sub { sleep 3 } , delay => 5, on_busy => 'block' };
$t = Time::HiRes::time() - $t;
okl($t >= 4, "delayed job blocked took ${t}s expected >=5s");
ok(isValidPid($p1), "delayed job blocked and ran");
my $j1 = Forks::Super::Job::get($p1);
ok($j1->{state} eq "ACTIVE", "state of delayed job is ACTIVE");
my $future = Time::HiRes::time() + 10;
$t = Time::HiRes::time();
my $p2 = fork { sub => sub { sleep 3 } , start_after => $future,
on_busy => 'block' };
$t = Time::HiRes::time() - $t;
okl($t >= 4, "start_after job blocked took ${t}s expected ~10s");
ok(isValidPid($p2), "start_after job blocked and ran");
my $j2 = Forks::Super::Job::get($p2);
ok($j2->{state} eq "ACTIVE", "job ACTIVE after delay");
waitall;
ok($j1->{start} >= $now + 5, "job start was delayed");
ok($j2->{start} >= $future, "job start was delayed");
t/52b-dependencies.t view on Meta::CPAN
# wait for all of the jobs in its "depend_on"
# list to complete before starting.
#
$Forks::Super::MAX_PROC = 20;
my $pid1 = fork { sub => sub { sleep 5 } };
ok(isValidPid($pid1), "job 1 started");
my $j1 = Forks::Super::Job::get($pid1);
my $t = Time::HiRes::time();
my $pid2 = fork {sub => sub {sleep 5}, depend_on => $pid1, on_busy => 'block'};
my $j2 = Forks::Super::Job::get($pid2);
ok($j1->{state} eq "COMPLETE", "job 1 complete when job 2 starts");
my $pid3 = fork { sub => sub { } };
my $j3 = Forks::Super::Job::get($pid3);
$t = Time::HiRes::time() - $t;
okl($t >= 3.85, "job 2 took ${t}s to start expected >5s"); ### 3 ###
ok($j2->{state} eq "ACTIVE", "job 2 still running");
waitall;
ok($j1->{end} <= $j2->{start} + $TOL,
t/61-callbacks.t view on Meta::CPAN
$j = Forks::Super::Job::get($pid);
for (my $i = 0; $i < 7 && $j->{state} eq 'ACTIVE'; $i++) {
Forks::Super::pause(1);
}
ok($w == 9, "finish callback invoked"); ### 10 ###
waitpid $pid,0;
$w = 26;
my $pid1 = fork { sub => sub { sleep 4 }, name => 'foo' };
my $pid2 = fork { sub => sub { sleep 4 }, depend_on => 'foo' ,
on_busy => "queue",
callback => { queue => sub { $w = 27 },
start => sub { $w = 28 },
finish => sub { $w = 29 } } };
ok($w == 27, "queue callback runs") or diag "\$w=$w";
wait;
ok($w == 28, "start callback runs") or diag "\$w=$w";
wait;
ok($w == 29, "finish callback runs") or diag "\$w=$w";
$w = 33;
$pid1 = fork { sub => sub { sleep 2 }, name => 'quux' };
$pid2 = fork { sub => sub { sleep 2 }, depend_on => 'quux',
on_busy => "fail",
callback => { queue => sub { $w = 37 },
fail => sub { $w = 38 },
start => sub { $w = 39 },
finish => sub { $w = 40 },
bogus => sub { $w =41 } } };
ok($w == 38, "fail callback runs");
# $Forks::Super::Util::DEFAULT_PAUSE =
waitall;
ok($w == 38, "no other callbacks after fail");
t/62a-bg_eval.tt view on Meta::CPAN
ok(@$x == 3, "listref bg_eval overwrite ok");
waitall;
### test variery of %options ###
$x = 20;
my $w = 14;
$t0 = Time::HiRes::time();
$x = bg_eval {
sleep 5; return 19
} { name => 'bg_eval_job', delay => 3, on_busy => "queue",
callback => { queue => sub { $w++ }, start => sub { $w+=2 },
finish => sub { $w+=5 } },
untaint => $untaint
};
$t = Time::HiRes::time();
my $j = Forks::Super::Job::get('bg_eval_job');
ok($j eq $Forks::Super::LAST_JOB, "\$Forks::Super::LAST_JOB updated");
ok($j->{state} eq "DEFERRED", "bg_eval with delay");
ok($w == 14 + 1, "bg_eval job queue callback");
Forks::Super::pause(4);
t/62b-bg_eval_tie.tt view on Meta::CPAN
ok(@$x == 3, "listref bg_eval overwrite ok");
waitall;
### test variery of %options ###
$x = 20;
my $w = 14;
$t0 = Time::HiRes::time();
tie $x, &BG_EVAL, sub {
sleep 5; return 19
},{ name => 'bg_eval_job', delay => 3, on_busy => "queue",
callback => { queue => sub { $w++ }, start => sub { $w+=2 },
finish => sub { $w+=5 } },
untaint => $untaint
};
$t = Time::HiRes::time();
my $j = Forks::Super::Job::get('bg_eval_job');
ok($j eq $Forks::Super::LAST_JOB, "\$Forks::Super::LAST_JOB updated");
ok($j->{state} eq "DEFERRED", "bg_eval with delay");
ok($w == 14 + 1, "bg_eval job queue callback");
Forks::Super::pause(4);
t/66b-daemon.t view on Meta::CPAN
daemon => 0,
name => 'daemon1 monitor';
sleep 1;
my $d2 = fork {
daemon => 1,
name => 'daemon2',
depend_on => 'daemon1 monitor',
sub => sub { sleep 1 },
on_busy => 'queue',
debug => 0,
};
ok($d2->{state} eq 'DEFERRED', '2nd daemon is deferred'); ### 7 ###
Forks::Super::Util::pause(6);
ok($d1 && $d2, "daemon procs launched") or diag("d1=$d1, d2=$d2");
ok($d1->{start} > $t + 2, "daemon1 launch was delayed");
ok($d2->{start} >= ($n1->{end} || 0), ### 10 ###
"daemon2 launch waited for daemon1")
t/71-config.t view on Meta::CPAN
print "MAX_PROC:\$Forks::Super::MAX_PROC\\n";
print "MAX_LOAD:\$Forks::Super::MAX_LOAD\\n";
print "ON_BUSY:\$Forks::Super::ON_BUSY\\n";
];
close T1;
open CFG, '>', "t/out/71a.$$.cfg";
print CFG q[# -- test config
max.proc=17.3
MAX_LOAD=qwerty
on_busy=bogus
];
close CFG;
my @j = qx($^X -Iblib/lib t/out/71a.$$.pl);
ok($j[0] =~ /17.3/, "respects config file directive");
ok($j[1] =~ /qwerty/, "respects improper config file directive");
ok($j[2] =~ /block|queue|fail/ && $j[2] !~ /bogus/,
"handles improper config file directive");
unlink "t/out/71a.$$.pl";
t/external-command.pl view on Meta::CPAN
# $^X t/external-command.pl -o=t/out/test -e=Hello, -e=Whirled -p -x=0
#
# This script is used in tests:
# t/11-to-command.t
# t/13-to-exec.t
# t/25-open.t
# t/26-waitpid-MSWin32-pgrp.t
# t/40h-timeout.t
# t/42[abcef]-filehandle.t
# t/43e-sockethandles.t
# t/46[bd]-userbusy.t
# t/49*.tt
# t/56a-dir.t
# t/60-os.t
# t/63a-bg_qx.t
# t/63b-bg_qx_tie.t
# t/63c-bg_qx_list.t
# t/67[ab]-emulate.t
#
use strict;
t/forked_harness.pl view on Meta::CPAN
# has a dark background?
my %colors = (ITERATION => 'bold white',
GOOD_STATUS => 'bold green',
BAD_STATUS => 'bold red',
'STDERR' => 'yellow bold',
DEBUG => 'cyan bold',
NORMAL => '');
if ($debug) {
color_print('DEBUG', "MAX_PROC is $Forks::Super::MAX_PROC, ",
"on busy is $Forks::Super::ON_BUSY\n");
}
#####################################################3
#
# determine the set of test scripts to run
#
my $glob_required = 0;
if (@ARGV == 0) {
# read ${TEST_FILES} from %ENV