Data-Queue-Shared

 view release on metacpan or  search on metacpan

lib/Data/Queue/Shared.pm  view on Meta::CPAN

    my $ok  = $q->push($value);             # non-blocking, false if full
    my $val = $q->pop;                       # non-blocking, undef if empty
    my $ok  = $q->push_wait($value);         # blocking, infinite wait
    my $ok  = $q->push_wait($value, $secs);  # blocking with timeout
    my $val = $q->pop_wait;                  # blocking, infinite wait
    my $val = $q->pop_wait($secs);           # blocking with timeout
    my $val = $q->peek;                      # read front without consuming

C<peek> returns the front element without removing it (C<undef> if empty).
For Int, this is a best-effort snapshot (racy in concurrent MPMC).
For Str, this is exact (mutex-protected).

=head3 Deque operations (Str only)

    my $ok  = $q->push_front($value);           # non-blocking push to front
    my $ok  = $q->push_front_wait($value);       # blocking push to front
    my $ok  = $q->push_front_wait($val, $secs);  # with timeout
    my $val = $q->pop_back;                 # non-blocking pop from back
    my $val = $q->pop_back_wait;            # blocking pop from back
    my $val = $q->pop_back_wait($timeout);  # with timeout

C<push_front> inserts at the head — useful for requeueing failed jobs.
C<pop_back> removes from the tail — useful for work-stealing or undo.
Not available for Int (Vyukov algorithm is strictly FIFO).

=head3 Batch operations

    my $n  = $q->push_multi(@values);          # non-blocking, returns pushed count
    my @v  = $q->pop_multi($count);            # non-blocking, pop up to $count
    my $n  = $q->push_wait_multi($timeout, @values);  # blocking batch push
    my @v  = $q->pop_wait_multi($n, $timeout); # block for >=1, grab up to $n
    my @v  = $q->drain;                        # pop all elements
    my @v  = $q->drain($max);                  # pop up to $max elements

C<pop_wait_multi> blocks until at least one element is available (or timeout),
then grabs up to C<$n> elements non-blocking. Returns empty list on timeout.

C<push_wait_multi> pushes all values, blocking if the queue is full.
C<$timeout> is seconds (C<-1> = infinite, C<0> = try once).

=head3 Status

    my $n   = $q->size;         # approximate under concurrent access
    my $cap = $q->capacity;     # max elements
    my $ok  = $q->is_empty;
    my $ok  = $q->is_full;

=head3 Management

    $q->clear;                  # remove all elements
    $q->sync;                   # msync — flush to disk for crash durability
    $q->unlink;                 # remove backing file
    Class->unlink($path);       # class method form
    my $p = $q->path;           # backing file path
    my $s = $q->stats;          # diagnostic hashref

Stats keys: C<size>, C<capacity>, C<mmap_size>, C<push_ok>, C<pop_ok>,
C<push_full>, C<pop_empty>, C<recoveries>, C<push_waiters>, C<pop_waiters>.
Str queues additionally include C<arena_cap> and C<arena_used>.
All counters are approximate under concurrent access (diagnostic only).
C<push_waiters>/C<pop_waiters> show currently blocked producers/consumers.

=head2 Event Loop Integration (eventfd)

    my $fd = $q->eventfd;           # create eventfd, returns fd number
    $q->eventfd_set($fd);           # use an existing fd (e.g. inherited via fork)
    my $fd = $q->fileno;            # current eventfd (-1 if none)
    $q->notify;                     # signal eventfd (call after push)
    $q->eventfd_consume;            # drain notification counter

Notification is B<opt-in>: C<push> does not write to the eventfd
automatically. Call C<notify> explicitly after pushing. This gives full
control over batching (push N items, notify once) and avoids any overhead
when eventfd is not used.

    use EV;
    my $q = Data::Queue::Shared::Str->new($path, 1024);
    my $fd = $q->eventfd;
    my $w = EV::io $fd, EV::READ, sub {
        $q->eventfd_consume;
        while (defined(my $item = $q->pop)) {
            process($item);
        }
    };
    # Producer side:
    $q->push($item);
    $q->notify;   # wake the EV watcher
    EV::run;

For cross-process notification, create the eventfd B<before> C<fork()>.
Child processes inherit the fd and should call C<eventfd_set($fd)> on
their queue handle. Writes from any process sharing the fd will wake
all event-loop watchers.

=head2 Crash Safety

If a process dies while holding the Str queue mutex, other processes
detect the stale lock within 2 seconds via PID tracking and automatically
recover. The Int queue is lock-free and requires no crash recovery for
normal push/pop operations.

=head2 Keyword API

When L<XS::Parse::Keyword> is installed at build time, keyword forms
are available that bypass method dispatch:

    use Data::Queue::Shared::Int;    # activates q_int_* keywords

    q_int_push $q, $value;
    my $val = q_int_pop $q;
    my $val = q_int_peek $q;
    my $n   = q_int_size $q;

Replace C<int> with C<int32>, C<int16>, or C<str> for other variants.
Keywords are lexically scoped and require C<use> (not C<require>).

=head1 BENCHMARKS

Throughput versus other Perl queue/IPC modules, 200K items,
single process and cross-process, Linux x86_64.
Run C<perl -Mblib bench/vs.pl 200000> to reproduce.



( run in 0.492 second using v1.01-cache-2.11-cpan-e1769b4cff6 )