Directory-Queue

 view release on metacpan or  search on metacpan

lib/Directory/Queue.pm  view on Meta::CPAN

L<Directory::Queue::Null> is special: it is a kind of black hole with the same
API as the other directory queues.

=head1 LOCKING

Adding an element is not a problem because the add() method is atomic.

In order to support multiple reader processes interacting with the same queue,
advisory locking is used. Processes should first lock an element before
working with it. In fact, the get() and remove() methods report a fatal error
if they are called on unlocked elements.

If the process that created the lock dies without unlocking the element, we
end up with a staled lock. The purge() method can be used to remove these
staled locks.

An element can basically be in only one of two states: locked or unlocked.

A newly created element is unlocked as a writer usually does not need to do
anything more with it.

Iterators return all the elements, regardless of their states.

There is no method to get an element state as this information is usually
useless since it may change at any time. Instead, programs should directly try
to lock elements to make sure they are indeed locked.

=head1 CONSTRUCTOR

The new() method of this module can be used to create a Directory::Queue
object that will later be used to interact with the queue. It can have a
C<type> attribute specifying the queue type to use. If not specified, the type
defaults to C<Simple>.

This method is however only a wrapper around the constructor of the underlying
module implementing the functionality. So:

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

#
# constants
#

# name of the directory holding temporary elements
use constant TEMPORARY_DIRECTORY => "temporary";

# name of the directory holding obsolete elements
use constant OBSOLETE_DIRECTORY => "obsolete";

# name of the directory indicating a locked element
use constant LOCKED_DIRECTORY => "locked";

#
# global variables
#

our(
    $_FileRegexp,     # regexp matching a file in an element directory
    %_Byte2Esc,       # byte to escape map
    %_Esc2Byte,       # escape to byte map
);

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

    # count sub-directories
    foreach my $name (@list) {
        $subdirs = _subdirs($self, $self->{path}."/".$name);
        $count += $subdirs if $subdirs;
    }
    # that's all
    return($count);
}

#
# check if an element is locked:
#  - this is best effort only as it may change while we test (only locking is atomic)
#  - if given a time, only return true on locks older than this time (needed by purge)
#

sub _is_locked ($$;$) {
    my($self, $name, $time) = @_;
    my($path, @stat);

    $path = $self->{path}."/".$name;
    return(0) unless -d $path."/".LOCKED_DIRECTORY;
    return(1) unless defined($time);
    @stat = lstat($path);
    unless (@stat) {
        dief("cannot lstat(%s): %s", $path, $!) unless $! == ENOENT;
        # RACE: this path does not exist (anymore)
        return(0);
    }
    return($stat[ST_MTIME] < $time);
}

#
# lock an element:
#  - return true on success
#  - return false in case the element could not be locked (in permissive mode)
#
# note:
#  - locking can fail:
#     - if the element has been locked by somebody else (EEXIST)
#     - if the element has been removed by somebody else (ENOENT)
#  - if the optional second argument is true, it is not an error if
#    the element cannot be locked (= permissive mode), this is the default
#    as one usually cannot be sure that nobody else will try to lock it
#  - the directory's mtime will change automatically (after a successful mkdir()),
#    this will later be used to detect stalled locks
#

sub lock : method { ## no critic 'ProhibitBuiltinHomonyms'
    my($self, $element, $permissive) = @_;
    my($path, $oldumask, $success);

    _check_element($element);

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

    $path = $self->{path}."/".$element."/".LOCKED_DIRECTORY;
    if (defined($self->{umask})) {
        $oldumask = umask($self->{umask});
        $success = mkdir($path);
        umask($oldumask);
    } else {
        $success = mkdir($path);
    }
    unless ($success) {
        if ($permissive) {
            # RACE: the locked directory already exists
            return(0) if $! == EEXIST;
            # RACE: the element directory does not exist anymore
            return(0) if $! == ENOENT;
        }
        # otherwise this is unexpected...
        dief("cannot mkdir(%s): %s", $path, $!);
    }
    $path = $self->{path}."/".$element;
    unless (lstat($path)) {
        if ($permissive and $! == ENOENT) {
            # RACE: the element directory does not exist anymore
            # (this can happen if an other process locked & removed the element
            #  while our mkdir() was in progress... yes, this can happen!)
            return(0);
        }
        # otherwise this is unexpected...
        dief("cannot lstat(%s): %s", $path, $!);
    }
    # so far so good
    return(1);
}

#
# unlock an element:
#  - return true on success
#  - return false in case the element could not be unlocked (in permissive mode)
#
# note:
#  - unlocking can fail:
#     - if the element has been unlocked by somebody else (ENOENT)
#     - if the element has been removed by somebody else (ENOENT)
#  - if the optional second argument is true, it is not an error if
#    the element cannot be unlocked (= permissive mode), this is _not_ the default
#    as unlock() should normally be called by whoever locked the element
#

sub unlock : method {
    my($self, $element, $permissive) = @_;
    my($path);

    _check_element($element);
    $path = $self->{path}."/".$element."/".LOCKED_DIRECTORY;
    unless (rmdir($path)) {
        if ($permissive) {

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

# touch an element to indicate that it is still being used
#

sub touch : method {
    my($self, $element) = @_;

    _touch($self->{"path"}."/".$element);
}

#
# remove a locked element from the queue
#

sub remove : method {
    my($self, $element) = @_;
    my($temp, $path);

    _check_element($element);
    dief("cannot remove %s: not locked", $element)
        unless _is_locked($self, $element);
    # move the element out of its intermediate directory
    $path = $self->{path}."/".$element;
    while (1) {
        $temp = $self->{path}
           ."/".OBSOLETE_DIRECTORY
           ."/"._name($self->{rndhex});
        rename($path, $temp) and last;
        dief("cannot rename(%s, %s): %s", $path, $temp, $!)
            unless $! == ENOTEMPTY or $! == EEXIST;
        # RACE: the target directory was already present...

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

    foreach my $name (_special_getdir($temp, "strict")) {
        next if $name eq LOCKED_DIRECTORY;
        if ($name =~ /^($_FileRegexp)$/o) {
            $path = $temp."/".$1; # untaint
        } else {
            dief("unexpected file in %s: %s", $temp, $name);
        }
        unlink($path) and next;
        dief("cannot unlink(%s): %s", $path, $!);
    }
    # remove the locked directory
    $path = $temp."/".LOCKED_DIRECTORY;
    while (1) {
        rmdir($path) or dief("cannot rmdir(%s): %s", $path, $!);
        rmdir($temp) and return;
        dief("cannot rmdir(%s): %s", $temp, $!)
            unless $! == ENOTEMPTY or $! == EEXIST;
        # RACE: this can happen if an other process managed to lock this element
        # while it was being removed (see the comment in the lock() method)
        # so we try to remove the lock again and again...
    }

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

    eval {
        local $SIG{__WARN__} = sub { die($_[0]) };
        $string = decode("UTF-8", $data, FB_CROAK);
    };
    return(\$string) unless $@;
    $@ =~ s/\s+at\s.+?\sline\s+\d+\.?$//;
    dief("cannot UTF-8 decode %s: %s", $path, $@);
}

#
# get an element from a locked element
#

sub get : method {
    my($self, $element) = @_;
    my(%data, $path, $ref);

    dief("unknown schema") unless $self->{type};
    _check_element($element);
    dief("cannot get %s: not locked", $element)
        unless _is_locked($self, $element);
    foreach my $name (keys(%{ $self->{type} })) {
        $path = "$self->{path}/$element/$name";
        unless (lstat($path)) {
            dief("cannot lstat(%s): %s", $path, $!) unless $! == ENOENT;
            if ($self->{mandatory}{$name}) {
                dief("missing data file: %s", $path);
            } else {
                next;
            }
        }

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

        dief("cannot unlink(%s): %s", $path, $!) unless $! == ENOENT;
    }
    _special_rmdir($dir."/".LOCKED_DIRECTORY);
    _special_rmdir($dir);
}

#
# purge the queue:
#  - delete unused intermediate directories
#  - delete too old temporary directories
#  - unlock too old locked directories
#
# note: this uses first()/next() to iterate so this will reset the cursor
#

sub purge : method {
    my($self, %option) = @_;
    my(@list, $path, $subdirs, $oldtime, $locked);

    # check options
    $option{maxtemp} = $self->{maxtemp} unless defined($option{maxtemp});
    $option{maxlock} = $self->{maxlock} unless defined($option{maxlock});
    foreach my $name (keys(%option)) {
        dief("unexpected option: %s", $name)
            unless $name =~ /^(maxtemp|maxlock)$/;
        dief("invalid %s: %s", $name, $option{$name})
            unless $option{$name} =~ /^\d+$/;
    }

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

    # remove the volatile directories which are too old
    if ($option{maxtemp}) {
        $oldtime = time() - $option{maxtemp};
        foreach my $name (_volatile($self)) {
            $path = $self->{path}."/".$name;
            next unless _older($path, $oldtime);
            warnf("removing too old volatile element: %s", $name);
            _destroy_dir($path);
        }
    }
    # iterate to find abandoned locked entries
    if ($option{maxlock}) {
        $oldtime = time() - $option{maxlock};
        $locked = $self->first();
        while ($locked) {
            next unless _is_locked($self, $locked, $oldtime);
            warnf("removing too old locked element: %s", $locked);
            $self->unlock($locked, 1);
        } continue {
            $locked = $self->next();
        }
    }
}

1;

__END__

=head1 NAME

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

the umask to use when creating files and directories (default: use the running
process' umask)

=item maxelts

the maximum number of elements that an intermediate directory can hold
(default: 16,000)

=item maxlock

default maximum time for a locked element (in seconds, default 600)
as used by the purge() method

=item maxtemp

default maximum time for a temporary element (in seconds, default 300)
as used by the purge() method

=item nlink

flag indicating that the "nlink optimization" (faster but only working on some

lib/Directory/Queue/Normal.pm  view on Meta::CPAN

=item add(DATA)

add the given data (a hash or hash reference) to the queue and return the
corresponding element name; the schema must be known and the data must conform
to it

=item lock(ELEMENT[, PERMISSIVE])

attempt to lock the given element and return true on success; if the
PERMISSIVE option is true (which is the default), it is not a fatal error if
the element cannot be locked and false is returned

=item unlock(ELEMENT[, PERMISSIVE])

attempt to unlock the given element and return true on success; if the
PERMISSIVE option is true (which is I<not> the default), it is not a fatal
error if the element cannot be unlocked and false is returned

=item touch(ELEMENT)

update the access and modification times on the element's directory to
indicate that it is still being used; this is useful for elements that are
locked for long periods of time (see the purge() method)

=item remove(ELEMENT)

remove the given element (which must be locked) from the queue

=item get(ELEMENT)

get the data from the given element (which must be locked) and return
basically the same hash as what add() got (in list context, the hash is
returned directly while in scalar context, the hash reference is returned
instead); the schema must be knownand the data must conform to it

=item purge([OPTIONS])

purge the queue by removing unused intermediate directories, removing too old
temporary elements and unlocking too old locked elements (aka staled locks);
note: this can take a long time on queues with many elements; OPTIONS can be:

=over

=item maxtemp

maximum time for a temporary element (in seconds);
if set to 0, temporary elements will not be removed

=item maxlock

maximum time for a locked element (in seconds);
if set to 0, locked elements will not be unlocked

=back

=back

=head1 DIRECTORY STRUCTURE

All the directories holding the elements and all the files holding the data
pieces are located under the queue toplevel directory. This directory can
contain:

lib/Directory/Queue/Normal.pm  view on Meta::CPAN


represents the microsecond part of the time since the Epoch

=item I<R>

is a random hexadecimal digit used to reduce name collisions

=back

Finally, inside an element directory, the different pieces of data are stored
into different files, named according to the schema. A locked element contains
in addition a directory named C<locked>.

=head1 SEE ALSO

L<Directory::Queue>.

=head1 AUTHOR

Lionel Cons L<http://cern.ch/lionel.cons>

Copyright (C) CERN 2010-2022

lib/Directory/Queue/Simple.pm  view on Meta::CPAN


our(@ISA) = qw(Directory::Queue);

#
# constants
#

# suffix indicating a temporary element
use constant TEMPORARY_SUFFIX => ".tmp";

# suffix indicating a locked element
use constant LOCKED_SUFFIX => ".lck";

#
# object constructor
#

sub new : method {
    my($class, %option) = @_;
    my($self);

lib/Directory/Queue/Simple.pm  view on Meta::CPAN

sub add_path : method {
    my($self, $path) = @_;
    my($dir);

    $dir = _add_dir($self);
    _special_mkdir($self->{path}."/".$dir, $self->{umask});
    return(_add_path($self, $path, $dir));
}

#
# get a locked element
#

sub get : method {
    my($self, $name) = @_;

    return(file_read($self->{path}."/".$name . LOCKED_SUFFIX));
}

sub get_ref : method {
    my($self, $name) = @_;

lib/Directory/Queue/Simple.pm  view on Meta::CPAN


sub get_path : method {
    my($self, $name) = @_;

    return($self->{path}."/".$name . LOCKED_SUFFIX);
}

#
# lock an element:
#  - return true on success
#  - return false in case the element could not be locked (in permissive mode)
#

sub lock : method {  ## no critic 'ProhibitBuiltinHomonyms'
    my($self, $name, $permissive) = @_;
    my($path, $lock, $time, $ignored);

    $permissive = 1 unless defined($permissive);
    $path = $self->{path}."/".$name;
    $lock = $path . LOCKED_SUFFIX;
    unless (link($path, $lock)) {
        return(0) if $permissive and ($! == EEXIST or $! == ENOENT);
        dief("cannot link(%s, %s): %s", $path, $lock, $!);
    }
    # we also touch the element to indicate the lock time
    $time = time();
    unless (utime($time, $time, $path)) {
        if ($permissive and $! == ENOENT) {
            # RACE: the element file does not exist anymore
            # (this can happen if an other process locked & removed the element
            #  while our link() was in progress... yes, this can happen!
            #  we do our best and ignore what unlink() returns)
            $ignored = unlink($lock);
            return(0);
        }
        # otherwise this is unexpected...
        dief("cannot utime(%d, %d, %s): %s", $time, $time, $path, $!);
    }
    # so far so good
    return(1);
}

#
# unlock an element:
#  - return true on success
#  - return false in case the element could not be unlocked (in permissive mode)
#

sub unlock : method {
    my($self, $name, $permissive) = @_;
    my($path, $lock);

    $permissive = 0 unless defined($permissive);
    $path = $self->{path}."/".$name;
    $lock = $path . LOCKED_SUFFIX;
    return(1) if unlink($lock);

lib/Directory/Queue/Simple.pm  view on Meta::CPAN

# touch an element to indicate that it is still being used
#

sub touch : method {
    my($self, $element) = @_;

    _touch($self->{"path"}."/".$element);
}

#
# remove a locked element from the queue
#

sub remove : method {
    my($self, $name) = @_;
    my($path, $lock);

    $path = $self->{path}."/".$name;
    $lock = $path . LOCKED_SUFFIX;
    unlink($path) or dief("cannot unlink(%s): %s", $path, $!);
    unlink($lock) or dief("cannot unlink(%s): %s", $lock, $!);
}

#
# return the number of elements in the queue, locked or not (but not temporary)
#

sub count : method {
    my($self) = @_;
    my($count, @list);

    $count = 0;
    # get the list of directories
    foreach my $name (_special_getdir($self->{path}, "strict")) {
        push(@list, $1) if $name =~ /^($_DirectoryRegexp)$/o; # untaint

lib/Directory/Queue/Simple.pm  view on Meta::CPAN

        dief("unexpected option: %s", $name)
            unless $name =~ /^(maxtemp|maxlock)$/;
        dief("invalid %s: %s", $name, $option{$name})
            unless $option{$name} =~ /^\d+$/;
    }
    # get the list of intermediate directories
    @list = ();
    foreach my $name (_special_getdir($self->{path}, "strict")) {
        push(@list, $1) if $name =~ /^($_DirectoryRegexp)$/o; # untaint
    }
    # remove the old temporary or locked elements
    $oldtemp = $oldlock = 0;
    $oldtemp = time() - $option{maxtemp} if $option{maxtemp};
    $oldlock = time() - $option{maxlock} if $option{maxlock};
    if ($oldtemp or $oldlock) {
        foreach my $name (@list) {
            _purge_dir($self->{path}."/".$name, $oldtemp, $oldlock);
        }
    }
    # try to purge all but the last intermediate directory
    if (@list > 1) {

lib/Directory/Queue/Simple.pm  view on Meta::CPAN

the "random" hexadecimal digit to use in element names (aka I<R>) as a number
between 0 and 15 (default: randomly generated)

=item umask

the umask to use when creating files and directories (default: use the running
process' umask)

=item maxlock

default maximum time for a locked element (in seconds, default 600)
as used by the purge() method

=item maxtemp

default maximum time for a temporary element (in seconds, default 300)
as used by the purge() method

=item granularity

the time granularity for intermediate directories, see L</DIRECTORY STRUCTURE>

lib/Directory/Queue/Simple.pm  view on Meta::CPAN

=item add_path(PATH)

add the given file (identified by its path) to the queue and return the
corresponding element name, the file must be on the same filesystem and will
be moved to the queue

=item lock(ELEMENT[, PERMISSIVE])

attempt to lock the given element and return true on success; if the
PERMISSIVE option is true (which is the default), it is not a fatal error if
the element cannot be locked and false is returned

=item unlock(ELEMENT[, PERMISSIVE])

attempt to unlock the given element and return true on success; if the
PERMISSIVE option is true (which is I<not> the default), it is not a fatal
error if the element cannot be unlocked and false is returned

=item touch(ELEMENT)

update the access and modification times on the element's file to indicate
that it is still being used; this is useful for elements that are locked for
long periods of time (see the purge() method)

=item remove(ELEMENT)

remove the given element (which must be locked) from the queue

=item get(ELEMENT)

get the data from the given element (which must be locked) and return a binary
string

=item get_ref(ELEMENT)

get the data from the given element (which must be locked) and return a
reference to a binary string, this can avoid string copies with large strings

=item get_path(ELEMENT)

get the file path of the given element (which must be locked), this file can
be read but not removed, you must use the remove() method for this

=item purge([OPTIONS])

purge the queue by removing unused intermediate directories, removing too old
temporary elements and unlocking too old locked elements (aka staled locks);
note: this can take a long time on queues with many elements; OPTIONS can be:

=over

=item maxtemp

maximum time for a temporary element (in seconds);
if set to 0, temporary elements will not be removed

=item maxlock

maximum time for a locked element (in seconds);
if set to 0, locked elements will not be unlocked

=back

=back

=head1 DIRECTORY STRUCTURE

The toplevel directory contains intermediate directories that contain the
stored elements, each of them in a file.

lib/Directory/Queue/Simple.pm  view on Meta::CPAN

represents the microsecond part of the time since the Epoch

=item I<R>

is a random hexadecimal digit used to reduce name collisions

=back

A temporary element (being added to the queue) will have a C<.tmp> suffix.

A locked element will have a hard link with the same name and the C<.lck>
suffix.

=head1 SEE ALSO

L<Directory::Queue>.

=head1 AUTHOR

Lionel Cons L<http://cern.ch/lionel.cons>

t/1normal.t  view on Meta::CPAN

is("00000000/@list", $elt, "one element");
test_field("string", "ISO-8859-1", STR_ISO8859);
is($dq->count(), 1, "count 1");

$elt = $dq->add(string => STR_UNICODE);
test_field("string", "Unicode", STR_UNICODE);
is($dq->count(), 2, "count 2");

$elt = $dq->first();
ok($elt, "first");
ok(!$dq->_is_locked($elt), "lock testing 1");
ok($dq->lock($elt), "lock");
ok( $dq->_is_locked($elt), "lock testing 2");
ok($dq->unlock($elt), "unlock");
ok(!$dq->_is_locked($elt), "lock testing 3");

$elt = $dq->next();
ok($elt, "next");
ok($dq->lock($elt), "lock");
eval { $dq->remove($elt) };
is($@, "", "remove 1");
is($dq->count(), 1, "count 1");

$elt = $dq->first();
ok($elt, "first");
eval { $dq->remove($elt) };
like($@, qr/not locked/, "remove 2");
ok($dq->lock($elt), "lock");
eval { $dq->remove($elt) };
is($@, "", "remove 3");
is($dq->count(), 0, "count 0");

$dq = Directory::Queue::Normal->new(path => $tmpdir, schema => { binary => "binary" });
$elt = $dq->add(binary => STR_ISO8859);
test_field("binary", "ISO-8859-1", STR_ISO8859);

$tmp = "foobar";

t/1normal.t  view on Meta::CPAN

$tmp = $dq->path() . "/" . $elt;
utime($time, $time, $tmp) or die("cannot utime($time, $time, $tmp): $!\n");
$elt = $dq->next();
$dq->lock($elt);
$tmp = $dq->path() . "/" . $elt;
utime($time, $time, $tmp) or die("cannot utime($time, $time, $tmp): $!\n");
$elt = $dq->first();
$dq->touch($elt);
$tmp = 0;
{
    local $SIG{__WARN__} = sub { $tmp++ if $_[0] =~ /removing too old locked/ };
    $dq->purge(maxlock => 5);
}
is($tmp, 1, "purge 1");
$elt = $dq->first();
$elt = $dq->next();
ok($dq->lock($elt), "purge 2");
is($dq->count(), 3, "purge 3");

$dq = Directory::Queue::Normal->new(path => $tmpdir, schema => { string => "binary", optional => "string?" });
$tmp = "add by hash";
ok($dq->add(string => $tmp), "$tmp 1");
ok($dq->add(string => $tmp, optional => "yes"), "$tmp 2");
$tmp = "add by hash ref";
ok($dq->add({string => $tmp}), "$tmp 1");
ok($dq->add({string => $tmp, optional => "yes"}), "$tmp 2");

$elt = $dq->add(string => "foo", optional => "bar");
eval { @list = $dq->get($elt) };
like($@, qr/not locked/, "get");
ok($dq->lock($elt), "lock");
eval { @list = $dq->get($elt) };
is($@, "", "get by hash 1");
is(scalar(@list), 4, "get by hash 2");
eval { $tmp = $dq->get($elt) };
is($@, "", "get by hash ref 1");
is(ref($tmp), "HASH", "get by hash ref 2");

$dq = Directory::Queue::Normal->new(path => $tmpdir);
$tmp = 0;



( run in 0.339 second using v1.01-cache-2.11-cpan-cba739cd03b )