IPC-DirQueue

 view release on metacpan or  search on metacpan

lib/IPC/DirQueue.pm  view on Meta::CPAN

The optional parameter C<$pollinterval> indicates how frequently to wake
up and check for new jobs.  It is specified in seconds, and floating-point
precision is supported.  The default is C<1>.

Note that if C<$timeout> is not a round multiple of C<$pollinterval>,
the nearest round multiple of C<$pollinterval> greater than C<$timeout>
will be used instead.  Also note that C<$timeout> is used as an integer.

=cut

sub wait_for_queued_job {
  my ($self, $timeout, $pollintvl) = @_;

  my $finishtime;
  if ($timeout && $timeout > 0) {
    $finishtime = time + int ($timeout);
  }

  dbg "wait_for_queued_job starting";

  if ($pollintvl) {
    $pollintvl *= 1000000;  # from secs to usecs
  } else {
    $pollintvl = 1000000;   # default: 1 sec
  }

  my $pathqueuedir = $self->q_subdir('queue');
  $self->ensure_dir_exists ($pathqueuedir);

  # TODO: would be nice to use fam for this, where available.  But
  # no biggie...

  while (1) {
    # check the stat time on the queue dir *before* we call pickup,
    # to avoid a race condition where a job is added while we're
    # checking in that function.

    my @stat = stat ($pathqueuedir);
    my $qdirlaststat = $stat[9];

    my $job = $self->pickup_queued_job();
    if ($job) { return $job; }

    # there's another semi-race condition here, brought about by a lack of
    # sub-second precision from stat(2).  if the last enq occurred inside
    # *this* current 1-second window, then *another* one can happen inside this
    # second right afterwards, and we wouldn't notice.

    # in other words (ASCII-art alert):
    #    TIME   | t                                         | t+1
    #    E      |          enq                      enq     |
    #    D      |    stat       pickup_queued_job           |

    # the enqueuer process E enqueues a job just after the stat, inside the
    # 1-second period "t".  dequeuer process D dequeues it with
    # pickup_queued_job(). all is well.  But then, E enqueues another job
    # inside the same 1-second period "t", and since the stat() has already
    # happened for "t", and since we've already picked up the job in "t", we
    # don't recheck; result is, we miss this enqueue event.
    #
    # Avoid this by checking in a busy-loop until time(2) says we're out of
    # that "danger zone" 1-second period.  Any further enq's would then
    # cause stat(2) to report a different timestamp.

    while (time == $qdirlaststat) {
      Time::HiRes::usleep ($pollintvl);
      dbg "wait_for_queued_job: spinning until time != stat $qdirlaststat";
      my $job = $self->pickup_queued_job();
      if ($job) { return $job; }
    }

    # sleep until the directory's mtime changes from what it was when
    # we ran pickup_queued_job() last.

    dbg "wait_for_queued_job: sleeping on $pathqueuedir";
    while (1) {
      my $now = time;
      if ($finishtime && $now >= $finishtime) {
        dbg "wait_for_queued_job timed out";
        return undef;           # out of time
      }

      Time::HiRes::usleep ($pollintvl);

      @stat = stat ($pathqueuedir);
      # dbg "wait_for_queued_job: stat $stat[9] $qdirlaststat $pathqueuedir";
      last if (defined $stat[9] &&
            ((defined $qdirlaststat && $stat[9] != $qdirlaststat)
                    || !defined $qdirlaststat));
    }

    dbg "wait_for_queued_job: activity, calling pickup";
  }
}

###########################################################################

=item $dq->visit_all_jobs($visitor, $visitcontext);

Visit all the jobs in the queue, in a read-only mode.  Used to list
the entire queue.

The callback function C<$visitor> will be called for each job in
the queue, like so:

  &$visitor ($visitcontext, $job);

C<$visitcontext> is whatever you pass in that variable above.
C<$job> is a new, read-only instance of C<IPC::DirQueue::Job> representing
that job.

If a job is active (being processed), the C<$job> object also contains the
following additional data:

  'active_host': the hostname on which the job is active
  'active_pid': the process ID of the process which picked up the job

=cut

sub visit_all_jobs {
  my ($self, $visitor, $visitcontext) = @_;



( run in 0.705 second using v1.01-cache-2.11-cpan-39bf76dae61 )