IPC-DirQueue
view release on metacpan or search on metacpan
lib/IPC/DirQueue.pm view on Meta::CPAN
The optional parameter C<$pollinterval> indicates how frequently to wake
up and check for new jobs. It is specified in seconds, and floating-point
precision is supported. The default is C<1>.
Note that if C<$timeout> is not a round multiple of C<$pollinterval>,
the nearest round multiple of C<$pollinterval> greater than C<$timeout>
will be used instead. Also note that C<$timeout> is used as an integer.
=cut
sub wait_for_queued_job {
my ($self, $timeout, $pollintvl) = @_;
my $finishtime;
if ($timeout && $timeout > 0) {
$finishtime = time + int ($timeout);
}
dbg "wait_for_queued_job starting";
if ($pollintvl) {
$pollintvl *= 1000000; # from secs to usecs
} else {
$pollintvl = 1000000; # default: 1 sec
}
my $pathqueuedir = $self->q_subdir('queue');
$self->ensure_dir_exists ($pathqueuedir);
# TODO: would be nice to use fam for this, where available. But
# no biggie...
while (1) {
# check the stat time on the queue dir *before* we call pickup,
# to avoid a race condition where a job is added while we're
# checking in that function.
my @stat = stat ($pathqueuedir);
my $qdirlaststat = $stat[9];
my $job = $self->pickup_queued_job();
if ($job) { return $job; }
# there's another semi-race condition here, brought about by a lack of
# sub-second precision from stat(2). if the last enq occurred inside
# *this* current 1-second window, then *another* one can happen inside this
# second right afterwards, and we wouldn't notice.
# in other words (ASCII-art alert):
# TIME | t | t+1
# E | enq enq |
# D | stat pickup_queued_job |
# the enqueuer process E enqueues a job just after the stat, inside the
# 1-second period "t". dequeuer process D dequeues it with
# pickup_queued_job(). all is well. But then, E enqueues another job
# inside the same 1-second period "t", and since the stat() has already
# happened for "t", and since we've already picked up the job in "t", we
# don't recheck; result is, we miss this enqueue event.
#
# Avoid this by checking in a busy-loop until time(2) says we're out of
# that "danger zone" 1-second period. Any further enq's would then
# cause stat(2) to report a different timestamp.
while (time == $qdirlaststat) {
Time::HiRes::usleep ($pollintvl);
dbg "wait_for_queued_job: spinning until time != stat $qdirlaststat";
my $job = $self->pickup_queued_job();
if ($job) { return $job; }
}
# sleep until the directory's mtime changes from what it was when
# we ran pickup_queued_job() last.
dbg "wait_for_queued_job: sleeping on $pathqueuedir";
while (1) {
my $now = time;
if ($finishtime && $now >= $finishtime) {
dbg "wait_for_queued_job timed out";
return undef; # out of time
}
Time::HiRes::usleep ($pollintvl);
@stat = stat ($pathqueuedir);
# dbg "wait_for_queued_job: stat $stat[9] $qdirlaststat $pathqueuedir";
last if (defined $stat[9] &&
((defined $qdirlaststat && $stat[9] != $qdirlaststat)
|| !defined $qdirlaststat));
}
dbg "wait_for_queued_job: activity, calling pickup";
}
}
###########################################################################
=item $dq->visit_all_jobs($visitor, $visitcontext);
Visit all the jobs in the queue, in a read-only mode. Used to list
the entire queue.
The callback function C<$visitor> will be called for each job in
the queue, like so:
&$visitor ($visitcontext, $job);
C<$visitcontext> is whatever you pass in that variable above.
C<$job> is a new, read-only instance of C<IPC::DirQueue::Job> representing
that job.
If a job is active (being processed), the C<$job> object also contains the
following additional data:
'active_host': the hostname on which the job is active
'active_pid': the process ID of the process which picked up the job
=cut
sub visit_all_jobs {
my ($self, $visitor, $visitcontext) = @_;
( run in 0.705 second using v1.01-cache-2.11-cpan-39bf76dae61 )