Forks-Super

 view release on metacpan or  search on metacpan

lib/Forks/Super/Job.pm  view on Meta::CPAN

	    push @output, "$attr=[" . join(',', @{$job->{$attr}}) . ']';
	} else {
	    push @output, "$attr=" . $job->{$attr};
	}
    }
    return $job->{short_string} = '{' . join(';',@output) . '}';
}

sub _mark_complete {
    my $job = shift;
    $job->{end} = Time::HiRes::time();
    $job->{state} = 'COMPLETE';
    $job->run_callback('share');
    $job->run_callback('collect');
    $job->run_callback('finish');
    return;
}

sub _mark_reaped {
    my $job = shift;
    $job->{state} = 'REAPED';
    $job->{reaped} = Time::HiRes::time();
    $job->run_callback('reaped');
    $? = defined $job->{status} ? $job->{status} : $job->{daemon} && 0;
    debug("Job $job->{pid} reaped") if $job->{debug};
    return;
}

#
# determine whether a job is eligible to start
#
sub can_launch {
    no strict 'refs';

    my $job = shift;
    $job->{last_check} = Time::HiRes::time();
    if (defined $job->{can_launch}) {
	if (ref $job->{can_launch} eq 'CODE') {
	    $job->{queue_message} = 'user can_launch function failed';
	    return $job->{can_launch}->($job);
	} elsif (ref $job->{can_launch} eq '') {
	    my $can_launch_sub = $job->{can_launch};
	    $job->{queue_message} = 'user can_launch function failed';
	    return $can_launch_sub->($job);
	}
    } else {
	$job->{queue_message} = 'default can_launch function failed';
	return $job->_can_launch;
    }
}

sub _can_launch_delayed_start_check {
    my $job = shift;
    return 1 if !defined($job->{start_after}) ||
	Time::HiRes::time() >= $job->{start_after};

    if ($job->{debug}) {
        debug('_can_launch(): start delay requested. launch fail');
    }

    # delay option should normally be associated with queue on busy behavior.
    # any reason not to make this the default ?
    #  delay + fail   is pretty dumb
    #  delay + block  is like sleep + fork

    if (! defined $job->{on_busy}) {
	$job->{_on_busy} = 'QUEUE';
    }
    $job->{queue_message} = "haven't yet reached job start_after time "
	. localtime($job->{start_after});
    return 0;
}

sub _can_launch_dependency_check {
    my $job = shift;
    my @dep_on = defined($job->{depend_on}) ? @{$job->{depend_on}} : ();
    my @dep_start = defined($job->{depend_start}) 
        ? @{$job->{depend_start}} : ();

    foreach my $dj (@dep_on) {
	my $j = $ALL_JOBS{$dj};
	if (not defined $j) {
	    carp 'Forks::Super::Job: ',
	    "dependency $dj for job $job->{pid} is invalid. Ignoring.\n";
	    next;
	}
	if (!$j->is_complete) {
	    debug('_can_launch(): ',
		  "job waiting for job $j->{pid} to finish. launch fail.")
		if $j->{debug};
	    $job->{queue_message} = "depends on job $j->{pid} to finish";
	    return 0;
	}
    }

    foreach my $dj (@dep_start) {
	my $j = $ALL_JOBS{$dj};
	if (not defined $j) {
	    carp 'Forks::Super::Job ',
		 "start dependency $dj for job $job->{pid} is invalid. ",
                 "Ignoring.\n";
	    next;
	}
	if (!$j->is_started) {
	    debug('_can_launch(): ',
		  "job waiting for job $j->{pid} to start. launch fail.")
		if $j->{debug};
	    $job->{queue_message} = "depends on job $j->{pid} to start";
	    return 0;
	}
    }
    return 1;
}

sub _can_launch_remote {
    my $job = shift;
    return 1 if !defined $job->{remote};
    my @specs = @{$job->{remote}};
    for (my $i=$#specs; $i>=1; $i--) {
        my $j = int(rand($i+1));
        @specs[$i,$j] = @specs[$j,$i];
    }

    foreach my $spec (@specs) {
        my $host = $spec->{host};
        next unless $host;
        if ($job->_can_launch_remote_check_host($host)) {
            $job->{remote} = $spec;
            return 1;
        }
    }
    if ($job->{debug}) {
        my @hosts = map { $_->{host} } @specs;
        debug("_can_launch_remote: host(s) @hosts are too busy");
    }
    return 0;
}

sub _can_launch_remote_check_host {
    my ($job, $h) = @_;
    my $max_proc = $job->max_proc($h);
    if ($max_proc < 1) {
        debug("_can_launch_remote: no restriction on host $h")
            if $job->{debug};
        return 1;
    }
    my $num_active = count_active_processes_on_host($h);
    if ($num_active >= $max_proc) {
        debug('_can_launch_remote(): ',
              "host $h too busy. ($num_active >= $max_proc)") if $job->{debug};
    } else {
        debug('_can_launch_remote(): ',
              "host $h not busy. ($num_active < $max_proc) ",
              " launch ok") if $job->{debug};
        return 1;
    } 
}

sub max_proc {
    my $val;
    if (@_ > 0 && ref($_[0]) eq 'Forks::Super::Job'
        && (defined($_[0]->{max_proc}) || defined($_[0]->{max_fork}))) {
        $val = $_[0]->{max_proc} || $_[0]->{max_fork};
    } else {
        $val = $Forks::Super::MAX_PROC;
        my $host = shift;
        if (ref($host) && ref($host) eq 'Forks::Super::Job') {
            $host = shift;
        }
        if (defined($host)) {
            if (defined($Forks::Super::MAX_PROC{$host})) {
                $val = $Forks::Super::MAX_PROC{$host};
            } elsif (defined($Forks::Super::MAX_PROC{DEFAULT})) {
                $val = $Forks::Super::MAX_PROC{DEFAULT};
            }
        }
    }
    if (ref($val) eq 'CODE') {
        $val = $val->();
    }
    return $val || 0;  # RT124316b
}

sub _max_proc { # used in test suite but not in this distro itself
    my $j = shift;
    $j->{max_proc} ||= $j->{max_fork};
    return defined($j->{max_proc}) ? $j->{max_proc} : max_proc();
}

sub _max_load { # used in test suite but not in this distro itself
    my $j = shift;
    return defined($j->{max_load}) ? $j->{max_load} : $Forks::Super::MAX_LOAD;
}

#
# default function for determining whether the system
# is too busy to create a new child process or not
#
sub _can_launch {
    no warnings qw(once);

    my $job = shift;
    if ($job->{force}) {
	debug('_can_launch(): force attr set. launch ok')
	    if $job->{debug};
	return 1;
    }

    return 0 if not $job->_can_launch_delayed_start_check;
    return 0 if not $job->_can_launch_dependency_check;
    if ($job->{remote}) {
        if ($job->_can_launch_remote) {
            debug('_can_launch_remote(): system not busy. launch ok.')
                if $job->{debug};
            return 1;
        } else {
            return 0;
        }
    }

    my $max_proc = $job->max_proc();
    my $max_load = defined($job->{max_load})
	? $job->{max_load} : $Forks::Super::MAX_LOAD;
    if ($max_proc > 0) {
	my $num_active = count_active_processes();
	if ($num_active >= $max_proc) {
	    debug('_can_launch(): ',
		  "active jobs $num_active exceeds limit $max_proc. ",
		  'launch fail.') if $job->{debug};
	    $job->{queue_message} =
		"active jobs $num_active exceeds limit $max_proc";
	    return 0;
	}
    }

    if ($max_load > 0) {
	my $load = get_cpu_load();
	if ($load > $max_load) {
	    debug('_can_launch(): ',
		  "cpu load $load exceeds limit $max_load. launch fail.")
		if $job->{debug};
	    $job->{queue_message} = 
		"cpu load $load exceeds limit $max_load";
	    return 0;
	}
    }
    debug('_can_launch(): system not busy. launch ok.')
	if $job->{debug};
    return 1;
}

# Perl system fork() call. Encapsulated here so it can be overridden 
# and mocked for testing. See t/17-retries.t
sub _CORE_fork { return CORE::fork }

#
# make a system fork call and configure the job object
# in the parent and the child processes
#
sub launch {
    my $job = shift;
    if ($job->is_started) {
	Carp::confess 'Forks::Super::Job::launch() ',
	    "called on a job in state $job->{state}!\n";
    }

    if ($$ != $Forks::Super::MAIN_PID && $Forks::Super::CHILD_FORK_OK > 0) {
	$Forks::Super::MAIN_PID = $$;
	$Forks::Super::CHILD_FORK_OK--;
    }

    if ($$ != $Forks::Super::MAIN_PID && $Forks::Super::CHILD_FORK_OK <= 0) {
	return _launch_from_child($job);
    }
    $job->_preconfig_fh;
    $job->_preconfig2;
    $job->{cwd} = &Cwd::getcwd;


    $_->() for @PREFORK;

    if ($job->{emulate} ||
        ($Forks::Super::EMULATION_MODE && !defined $job->{emulate})) {
        debug("emulating child process in main process") if $job->{debug};
        return $job->_emulate;
    }

    my $pid = _robust_fork($job);
    if (!defined $pid) {
	debug('launch(): CORE::fork() returned undefined!')
	    if $job->{debug};
	return;
    }

    if ($job->{_sync}) {
	$job->{_sync}->releaseAfterFork($pid || $$);
    }


    if (Forks::Super::Util::isValidPid($pid)) {

        $_->() for @POSTFORK_PARENT;

	# parent
	_postlaunch_parent1($pid, $job);
        return $job->_postlaunch_parent2;

lib/Forks/Super/Job.pm  view on Meta::CPAN

    local $! = 0;
    my $kill_result = 0;
    if (&IS_CYGWIN) {
	require Forks::Super::Job::OS::Cygwin;
	for my $pid ($j->signal_pids) {
	    $kill_result += !!Forks::Super::Job::OS::Cygwin::resume($pid);
	}
    } else {
	$kill_result = Forks::Super::kill('CONT', $j);
    }
    if ($kill_result > 0) {
	$j->{state} = 'ACTIVE';
	return 1;
    }
    carp "'CONT' signal not received by job ", $j->toString(), "\n";
    return;
}

sub _resume_daemon {
    my $j = shift;

    my $kill_result = 0;
    if (&IS_CYGWIN) {
	require Forks::Super::Job::OS::Cygwin;
	foreach my $pid ($j->signal_pids) {
	    $kill_result += !!Forks::Super::Job::OS::Cygwin::resume($pid);
	}
    } else {
	$kill_result = Forks::Super::kill('CONT', $j);
    }
    if ($kill_result) {
	$j->{state} = 'DAEMON';
    }
    return $kill_result;
}

sub terminate {
    my $j = shift;
    if (!ref($j) || !$j->isa('Forks::Super::Job')) {
	$j = Forks::Super::Job::get($j);
    }
    if (&IS_CYGWIN) {
	require Forks::Super::Job::OS::Cygwin;
	for my $pid ($j->signal_pids) {
	    Forks::Super::Job::OS::Cygwin::terminate($pid);
	}
    } else {
	Forks::Super::kill('KILL', $j);
    }
    return;
}

#
# do further initialization of a Forks::Super::Job object,
# mainly setting derived fields
#
sub _preconfig {
    my $job = shift;
    $job->_preconfig_style;
    $job->_preconfig_dir;
    $job->_preconfig_busy_action;
    $job->_preconfig_start_time;
    $job->_preconfig_dependencies;
    $job->_preconfig_share;
    $job->_preconfig_remote;
    Forks::Super::Job::Callback::_preconfig_callbacks($job);
    Forks::Super::Job::OS::_preconfig_os($job);
    return;
}

# some final initialization just before launch
sub _preconfig2 {
    my $job = shift;
    if (!defined $job->{debug}) {
	$job->{debug} = $Forks::Super::Debug::DEBUG;
    }
    if ($job->{daemon}) {
        # we avoid pipes on Windows because they have tiny capacity
        # and prone to deadlock. Those concerns are not so big for
        # communicating the daemon pid ... do we want to
        # try using pipes for this on Windows?
        if (!&IS_WIN32) {
            my ($p1,$p2);
            pipe $p1, $p2;
            $p2->autoflush(1);
            $job->{daemon_ipc_pipe} = [ $p1, $p2 ];
            if ($job->{debug}) {
                debug('Job will use pipe to get daemon pid');
            }
        } else {
            $job->{daemon_ipc} =
                Forks::Super::Job::Ipc::_choose_fh_filename(
                    '.daemon', purpose => 'daemon ipc');
            if ($job->{debug}) {
                debug('Job will use ', $job->{daemon_ipc},
                      ' to get daemon pid.');
            }
        }
    }
    if ($job->{style} eq 'cmd'
		|| (&IS_WIN32 && $job->{style} eq 'exec')
		|| (&IS_WIN32 && ($job->{timeout} || $job->{expiration}))) {


        if ($Forks::Super::SIGNAL_IPC_FILE 

            # we avoid pipes on Windows because they have tiny capacity
            # and prone to deadlock, but those are not large concerns
            # for a low volume, single purpose communication channel
            # like communicating the signal pid
#           || &IS_WIN32
            
            || $job->{daemon}) {
            $job->{signal_ipc} =
                Forks::Super::Job::Ipc::_choose_fh_filename(
                    '.signal', purpose => 'signal ipc', job => $job);
            if ($job->{debug}) {
                debug('Job will use ', $job->{signal_ipc},
                      ' to get signal pid.');
            }
        } else {

lib/Forks/Super/Job.pm  view on Meta::CPAN

	    $job->{cmd} = [ $job->{cmd} ];
	} else {
            $job->{_indirect} = 1;
        }
	$job->{style} = 'cmd';
    } elsif (defined $job->{exec}) {
	if (ref $job->{exec} ne 'ARRAY') {
	    $job->{exec} = [ $job->{exec} ];
	} else {
            $job->{_indirect} = 1;
        }
	$job->{style} = 'exec';
    } elsif (defined $job->{sub}) {
	$job->{style} = 'sub';
	$job->{sub} = qualify_sub_name $job->{sub};
	if (defined $job->{args}) {
	    if (ref $job->{args} ne 'ARRAY') {
		$job->{args} = [ $job->{args} ];
	    }
	} else {
	    $job->{args} = [];
	}
    } else {
	$job->{style} = 'natural';
    }
    return;
}

sub _preconfig_style_run {    ### for future use
    my $job = shift;
    if (ref $job->{run} ne 'ARRAY') {
	$job->{run} = [ $job->{run} ];
    }

    return;

    # How will we use or emulate the rich functionality
    # of IPC::Run?
    #
    # inputs are a "harness specification"
    # build a harness
    # on "launch", call $harness->start
    # when the job is reaped, call $harness->finish

    # one feature of IPC::Run harnesses is that they
    # may be reused!

}

sub _preconfig_dir {
    my $job = shift;
    if (defined $job->{chdir}) {
	$job->{dir} ||= $job->{chdir};
    }
    if (defined $job->{dir}) {
	$job->{dir} = Forks::Super::Util::abs_path($job->{dir});
    }
    return;
}

sub _preconfig_busy_action {
    my $job = shift;

    ######################
    # what will we do if the job cannot launch?
    #
    if (defined $job->{on_busy}) {
	$job->{_on_busy} = $job->{on_busy};
    } else {
	no warnings 'once';
	$job->{_on_busy} = $Forks::Super::ON_BUSY || 'block';

	# may be overridden to 'queue' if  depend_on  or
	# depend_start  is set. See  _preconfig_dependencies

    }
    $job->{_on_busy} = uc $job->{_on_busy};

    ########################
    # make a queue priority available if needed
    #
    if (not defined $job->{queue_priority}) {
	$job->{queue_priority} = Forks::Super::Deferred::get_default_priority();
    }
    return;
}

sub _preconfig_start_time {
    my $job = shift;

    ###########################
    # configure a future start time
    my $start_after = 0;
    if (defined $job->{delay}) {
	$start_after
	    = Time::HiRes::time() 
	    + Forks::Super::Job::Timeout::_time_from_natural_language(
	            $job->{delay}, 1);
    }
    if (defined $job->{start_after}) {
	my $start_after2 =
            Forks::Super::Job::Timeout::_time_from_natural_language(
                $job->{start_after}, 0);
	if ($start_after < $start_after2) {
	    $start_after = $start_after2 
	}
    }
    if ($start_after) {
	$job->{start_after} = $start_after;
	delete $job->{delay};
	debug('_can_launch(): start delay until ' 
              . localtime($job->{start_after}) . ' requested.')
	    if $job->{debug};
    }
    return;
}

sub _preconfig_dependencies {
    my $job = shift;

    ##########################
    # assert dependencies are expressed as array refs
    # expand job names to pids
    #
    if (defined $job->{depend_on}) {
	if (ref $job->{depend_on} ne 'ARRAY') {
	    $job->{depend_on} = [ $job->{depend_on} ];
	}
	$job->{depend_on} = _resolve_names($job, $job->{depend_on});
	$job->{_on_busy} = 'QUEUE' unless $job->{on_busy};
    }
    if (defined $job->{depend_start}) {
	if (ref $job->{depend_start} ne 'ARRAY') {
	    $job->{depend_start} = [ $job->{depend_start} ];
	}
	$job->{depend_start} = _resolve_names($job, $job->{depend_start});
	$job->{_on_busy} = 'QUEUE' unless $job->{on_busy};
    }
    return;
}

sub _preconfig_remote {
    my $job = shift;
    return if !defined $job->{remote};
    my $remote = $job->{remote};
    if (!$job->{cmd}) {
        if ($job->{exec}) {
            $job->{cmd} = $job->{exec};
        } else {
            carp "fork: remote => ... specified without  cmd => ... ! ",
            "remote spec will be ignored";
            $job->{remote_disabled} = delete $job->{remote};
            return;
        }
    }

    if ('ARRAY' ne ref $job->{remote}) {
        # prior to launch, ensure that $job->{remote} is an array reference
        # of allowable remote specs.
        #
        # after launch, $job->{remote} will point to the actual remote spec
        # that was used to launch the job (see _can_launch_remote)
        $job->{remote} = [ $job->{remote} ];
    }
    $job->{remote} = [
        map {
            if (!ref($_)) {
                { host => $_ };
            } elsif (!defined($_->{host})) {
                ()
            } elsif (defined($_->{proto}) && $_->{proto} ne 'ssh') {
                carp "Forks::Super: Only 'ssh' protocol supported in remote";
                ()
            } elsif (ref($_->{host}) eq 'ARRAY') {
                # not documented, but  remote => { host=>[host1,host2],user=>... }
                # is supported
                my $spec = $_;
                map {
                    my $spec2 = { %$spec };
                    $spec2->{host} = $_;
                    $spec2;
                } @{$_->{host}};
            } else {
                $_
            }
        } @{$job->{remote}}
        ];
    if (@{$job->{remote}} == 0) {
        $job->{__error} = "No valid remote specs in remote param $remote";
        return;
    }
    foreach my $spec (@{$job->{remote}}) {
        my $host = $spec->{host};
        my ($up,$hp) = split /(?<!\\)@/, $host, 2;
        if ($up && $hp) {
            my ($u,$p) = split /(?<!\\):/, $up, 2;
            if ($u && $p) {

lib/Forks/Super/Job.pm  view on Meta::CPAN

For jobs that have started in a child process and are,
to the knowledge of the parent process, still running.

=item C<COMPLETE>

For jobs that have completed and caused the parent process to
receive a C<SIGCHLD> signal, but have not been reaped.

The difference between a C<COMPLETE> job and a C<REAPED> job
is whether the job's process identifier has been returned in
a call to C<Forks::Super::wait> or C<Forks::Super::waitpid>
(or implicitly returned in a call to C<Forks::Super::waitall>).
When the process gets reaped, the global variable C<$?>
(see L<perlvar/"$CHILD_ERROR">) will contain the exit status
of the process, until the next time a process is reaped.

=item C<REAPED>

For jobs that have been reaped by a call to C<Forks::Super::wait>,
C<Forks::Super::waitpid>, or C<Forks::Super::waitall>.

=item C<SUSPENDED>

The job has started but it has been suspended (with a C<SIGSTOP>
or other appropriate mechanism for your operating system) and
is not currently running. A suspended job will not consume CPU
resources but my tie up memory resources.

=item C<SUSPENDED-DEFERRED>

Job is in the job queue and has not started yet, and also
the job has been suspended. A job in the C<SUSPENDED-DEFERRED>
state can only move out of this state to the C<SUSPENDED> state
(with a C<SIGCONT> or a L<"resume"|resume> call).

=back

=item status

The exit status of a job. See L<CHILD_ERROR|perlvar/"CHILD_ERROR"> in
C<perlvar>. Will be undefined until the job is complete.

=item style

One of the strings C<natural>, C<cmd>, or C<sub>, indicating
whether the initial C<fork> call returned from the child process or whether
the child process was going to run a shell command or invoke a Perl
subroutine and then exit.

=item cmd

The shell command to run that was supplied in the C<fork> call.

=item sub

=item args

The name of or reference to CODE to run and the subroutine
arguments that were supplied in the C<fork> call.

=item _on_busy

The behavior of this job in the event that the system was
too "busy" to enable the job to launch. Will have one of
the string values C<block>, C<fail>, or C<queue>.

=item queue_priority

If this job was deferred, the relative priority of this
job.

=item can_launch

By default undefined, but could be a CODE reference
supplied in the C<fork()> call. If defined, it is the
code that runs when a job is ready to start to determine
whether the system is too busy or not.

=item depend_on

If defined, contains a list of process IDs and job IDs that
must B<complete> before this job will be allowed to start.

=item depend_start

If defined, contains a list of process IDs and job IDs that
must B<start> before this job will be allowed to start.

=item start_after

Indicates the earliest time (since the epoch) at
which this job may start.

=item expiration

Indicates the latest time that this job may be allowed to
run. Jobs that run past their expiration parameter will
be killed.

=item os_priority

Value supplied to the C<fork> call about desired
operating system priority for the job.

=item cpu_affinity

Value supplied to the C<fork> call about desired
CPU's for this process to prefer.

=item child_stdin

=item child_stdout

=item child_stderr

If the job has been configured for interprocess communication,
these attributes correspond to the handles for passing
standard input to the child process, and reading standard 
output and standard error from the child process, respectively.

Note that the standard read/write operations on these filehandles
can also be accomplished through the C<write_stdin>, C<read_stdout>,
and C<read_stderr> methods of this class. Since these methods
can adjust their behavior based on the type of IPC channel
(file, socket, or pipe) or other idiosyncracies of your operating
system (#@$%^&*! Windows), B<using these methods is preferred
to using the filehandles directly>.

The package level variables 
C<< L<$Forks::Super::CHILD_STDIN{$job}, $Forks::Super::CHILD_STDOUT{$job},
$Forks::Super::CHILD_STDERR{$job}|Forks::Super/"%25CHILD_STDxxx"> >>
are equivalent to these instance variables.

=back

=cut



( run in 0.699 second using v1.01-cache-2.11-cpan-39bf76dae61 )