IPC-DirQueue
view release on metacpan or search on metacpan
lib/IPC/DirQueue.pm view on Meta::CPAN
###########################################################################
=item $job = $dq->pickup_queued_job( [ path => $path ] );
Pick up the next job in the queue, so that it can be processed.
If no job is available for processing, either because the queue is
empty or because other worker processes are already working on
them, C<undef> is returned; otherwise, a new instance of C<IPC::DirQueue::Job>
is returned.
Note that the job is marked as I<active> until C<$job-E<gt>finish()>
is called.
If the (optional) parameter C<path> is used, its value indicates the path of
the desired job's data file. By using this, it is possible to cancel
not-yet-active items from anywhere in the queue, or pick up jobs out of
sequence. The data path must match the value of the I<pathqueue> member of
the C<IPC::DirQueue::Job> object passed to the C<visit_all_jobs()> callback.
=cut
sub pickup_queued_job {
my ($self, %args) = @_;
my $pathqueuedir = $self->q_subdir('queue');
my $pathactivedir = $self->q_subdir('active');
$self->ensure_dir_exists ($pathactivedir);
my $iter = $self->queue_iter_start($pathqueuedir);
while (1) {
my $nextfile = $self->queue_iter_next($iter);
if (!defined $nextfile) {
# no more files in the queue, return empty
last;
}
my $nextfilebase = $self->queue_dir_fanout_path_strip($nextfile);
next if ($nextfilebase !~ /^\d/);
my $pathactive = $pathactivedir.SLASH.$nextfilebase;
my $pathqueue = $pathqueuedir.SLASH.$nextfile;
next if (exists($args{path}) && ($pathqueue ne $args{path}));
my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,
$atime,$mtime,$ctime,$blksize,$blocks) = lstat($pathactive);
if (defined $mtime) {
# *DO* call time() here. In extremely large dirs, it may take
# several seconds to traverse the entire listing from start
# to finish!
if (time() - $mtime < $self->{active_file_lifetime}) {
# active lockfile; it's being worked on. skip this file
next;
}
if ($self->worker_still_working($pathactive)) {
# worker is still alive, although not updating the lock
dbg ("worker still working, skip: $pathactive");
next;
}
# now, we want to try to avoid 2 or 3 dequeuers removing
# the lockfile simultaneously, as that could cause this race:
#
# dqproc1: [checks file] [unlinks] [starts work]
# dqproc2: [checks file] [unlinks]
#
# ie. the second process unlinks the first process' brand-new
# lockfile!
#
# to avoid this, use a random "fudge" on the timeout, so
# that dqproc2 will wait for possibly much longer than
# dqproc1 before it decides to unlink it.
#
# this isn't perfect. TODO: is there a "rename this fd" syscall
# accessible from perl?
my $fudge = get_random_int() % 256;
if (time() - $mtime < $self->{active_file_lifetime}+$fudge) {
# within the fudge zone. don't unlink it in this process.
dbg ("within active fudge zone, skip: $pathactive");
next;
}
# else, we can kill the stale lockfile
unlink $pathactive or warn "IPC::DirQueue: unlink failed: $pathactive";
warn "IPC::DirQueue: killed stale lockfile: $pathactive";
}
# ok, we're free to get cracking on this file.
my $pathtmp = $self->q_subdir('tmp');
$self->ensure_dir_exists ($pathtmp);
# use the name of the queue file itself, plus a tmp prefix, plus active
my $pathtmpactive = $pathtmp.SLASH.
$nextfilebase.".".$self->new_lock_filename().".active";
dbg ("creating tmp active $pathtmpactive");
if (!sysopen (LOCK, $pathtmpactive, O_WRONLY|O_CREAT|O_EXCL,
$self->{queue_file_mode}))
{
if ($!{EEXIST}) {
# contention; skip this file
dbg ("IPC::DirQueue: $pathtmpactive already created, skipping: $!");
}
else {
# could be serious; disk space, permissions etc.
warn "IPC::DirQueue: cannot open $pathtmpactive for write: $!";
}
next;
}
print LOCK $self->gethostname(), "\n", $$, "\n";
close LOCK;
if (!-f $pathqueue) {
# queue file already gone; another worker got it before we did.
# catch this case before we create a lockfile.
( run in 1.035 second using v1.01-cache-2.11-cpan-39bf76dae61 )