Parallel-TaskExecutor

 view release on metacpan or  search on metacpan

lib/Parallel/TaskExecutor.pm  view on Meta::CPAN

}

=pod

=head2 run()

  my $task = $executor->run($sub, %options);

Fork a new child process and use it to execute the given I<$sub>. The execution
can be tracked using the returned I<$task> object of type
L<Parallel::TaskExecutor::Task>.

If there are already B<max_parallel_tasks> tasks running, then the call will
block until the count of running tasks goes below that limit.

The possible options are the following:

=over 4

=item *

B<SIG> (hash-reference): if provided, this specifies a set of signal
handlers to be set in the child process. These signal handler are installed
before the provided I<$sub> is called and before the call to run() returns.

=item *

B<wait>: if set to a true value, the call to run will wait for the task
to be complete before returning (this means that C<$task->done()> will always be
true when you get the task).

=item *

B<catch_error>: by default, a failure of a child task will abort the parent
process. If this option is set to true, the failure will be reported by the task
instead.

=item *

B<scalar>: when set to true, the I<$sub> is called in scalar context. Otherwise
it is called in list context.

=item *

B<forced>: if set to true, the task will be run immediately, even if this means
exceeding the value for the B<max_parallel_tasks> passed to the constructor.
Note however that the task will still increase by one the number of running
tasks tracked by the executor (unless B<untracked> is also set to true).

=item *

B<untracked>: if set to true, the task will not increase the number of running
task counted by the executor. However, the call to run() might still be blocked
if the number of outstanding tasks exceeds B<max_parallel_tasks> (unless
B<forced> is set to true too).

=back

=cut

Readonly::Scalar my $busy_loop_wait_time_us => 1000;

sub run {
  my ($this, $sub, %options) = @_;
  %options = (%{$this->{options}}, %options);
  # TODO: add an option to always call _remove_done_tasks here, to cleanup.
  while (!$options{forced} && $this->{current_tasks} >= $this->{max_parallel_tasks}) {
    $this->_remove_done_tasks();
    usleep($busy_loop_wait_time_us);
  }
  return $this->_fork_and_run($sub, %options);
}

=pod

=head2 run_now()

  my $data = $executor->run_now($sub, %options);

Runs the given I<$sub> in a forked process and waits for its result. This never
blocks (the I<$sub> is run even if the executor max parallelism is already
reached) and this does not increase the counted parallelism of the executor
either (in effect the B<untracked>, B<forced>, and B<wait> options are set to
true).

In addition, the B<scalar> option is set to true if this method is called in
scalar context, unless that option was explicitly passed to the run_now() call.

=cut

sub run_now {
  my ($this, $sub, %options) = @_;
  $options{scalar} = 1 unless exists $options{scalar} || wantarray;
  my $task = $this->_fork_and_run($sub, %options, untracked => 1, wait => 1);
  $task->wait();
  return $task->data();
}

=pod

=head2 wait()

  $executor->wait();

Waits for all the outstanding tasks to terminate. This waits for all the tasks
independently of whether their L<Parallel::TaskExecutor::Task> object is still
live.

=cut

sub wait {  ## no critic (ProhibitBuiltinHomonyms)
  my ($this) = @_;
  my $nb_children = $this->{current_tasks};
  return unless $nb_children;
  debug("Waiting for ${nb_children} running tasks...");
  while (my (undef, $c) = each %{$this->{zombies}}) {
    # $c is never weak here and wait() will also not delete from this hash
    # itself
    $c->wait();
    delete $this->{zombies}{$c};  # $c is both the key and the value.
  }
  while (my (undef, $c) = each %{$this->{tasks}}) {
    # $c can be a weak reference, but it should never be undef because the task
    # will remove itself from this hash when it’s done (and the reference is
    # unweakened when it’s the last reference to the task).
    # $c->wait() will delete this entry from the hash, but this is legal when
    # looping with each.
    $c->wait();
  }



( run in 1.802 second using v1.01-cache-2.11-cpan-d8267643d1d )