Data-Queue-Shared
view release on metacpan or search on metacpan
API
Core operations
my $ok = $q->push($value); # non-blocking, false if full
my $val = $q->pop; # non-blocking, undef if empty
my $ok = $q->push_wait($value); # blocking, infinite wait
my $ok = $q->push_wait($value, $secs); # blocking with timeout
my $val = $q->pop_wait; # blocking, infinite wait
my $val = $q->pop_wait($secs); # blocking with timeout
my $val = $q->peek; # read front without consuming
"peek" returns the front element without removing it ("undef" if empty).
For Int, this is a best-effort snapshot (racy in concurrent MPMC). For
Str, this is exact (mutex-protected).
Deque operations (Str only)
my $ok = $q->push_front($value); # non-blocking push to front
my $ok = $q->push_front_wait($value); # blocking push to front
my $ok = $q->push_front_wait($val, $secs); # with timeout
my $val = $q->pop_back; # non-blocking pop from back
my $val = $q->pop_back_wait; # blocking pop from back
my $val = $q->pop_back_wait($timeout); # with timeout
"push_front" inserts at the head â useful for requeueing failed jobs.
"pop_back" removes from the tail â useful for work-stealing or undo. Not
available for Int (Vyukov algorithm is strictly FIFO).
Batch operations
my $n = $q->push_multi(@values); # non-blocking, returns pushed count
my @v = $q->pop_multi($count); # non-blocking, pop up to $count
my $n = $q->push_wait_multi($timeout, @values); # blocking batch push
my @v = $q->pop_wait_multi($n, $timeout); # block for >=1, grab up to $n
my @v = $q->drain; # pop all elements
my @v = $q->drain($max); # pop up to $max elements
"pop_wait_multi" blocks until at least one element is available (or
timeout), then grabs up to $n elements non-blocking. Returns empty list
on timeout.
"push_wait_multi" pushes all values, blocking if the queue is full.
$timeout is seconds (-1 = infinite, 0 = try once).
Status
my $n = $q->size; # approximate under concurrent access
my $cap = $q->capacity; # max elements
my $ok = $q->is_empty;
my $ok = $q->is_full;
Management
$q->clear; # remove all elements
$q->sync; # msync â flush to disk for crash durability
$q->unlink; # remove backing file
Class->unlink($path); # class method form
my $p = $q->path; # backing file path
my $s = $q->stats; # diagnostic hashref
Stats keys: "size", "capacity", "mmap_size", "push_ok", "pop_ok",
"push_full", "pop_empty", "recoveries", "push_waiters", "pop_waiters".
Str queues additionally include "arena_cap" and "arena_used". All
counters are approximate under concurrent access (diagnostic only).
"push_waiters"/"pop_waiters" show currently blocked producers/consumers.
Event Loop Integration (eventfd)
my $fd = $q->eventfd; # create eventfd, returns fd number
$q->eventfd_set($fd); # use an existing fd (e.g. inherited via fork)
my $fd = $q->fileno; # current eventfd (-1 if none)
$q->notify; # signal eventfd (call after push)
$q->eventfd_consume; # drain notification counter
Notification is opt-in: "push" does not write to the eventfd
automatically. Call "notify" explicitly after pushing. This gives full
control over batching (push N items, notify once) and avoids any
overhead when eventfd is not used.
use EV;
my $q = Data::Queue::Shared::Str->new($path, 1024);
my $fd = $q->eventfd;
my $w = EV::io $fd, EV::READ, sub {
$q->eventfd_consume;
while (defined(my $item = $q->pop)) {
process($item);
}
};
# Producer side:
$q->push($item);
$q->notify; # wake the EV watcher
EV::run;
For cross-process notification, create the eventfd before fork(). Child
processes inherit the fd and should call eventfd_set($fd) on their queue
handle. Writes from any process sharing the fd will wake all event-loop
watchers.
Crash Safety
If a process dies while holding the Str queue mutex, other processes
detect the stale lock within 2 seconds via PID tracking and
automatically recover. The Int queue is lock-free and requires no crash
recovery for normal push/pop operations.
Keyword API
When XS::Parse::Keyword is installed at build time, keyword forms are
available that bypass method dispatch:
use Data::Queue::Shared::Int; # activates q_int_* keywords
q_int_push $q, $value;
my $val = q_int_pop $q;
my $val = q_int_peek $q;
my $n = q_int_size $q;
Replace "int" with "int32", "int16", or "str" for other variants.
Keywords are lexically scoped and require "use" (not "require").
BENCHMARKS
Throughput versus other Perl queue/IPC modules, 200K items, single
process and cross-process, Linux x86_64. Run "perl -Mblib bench/vs.pl
200000" to reproduce.
SINGLE-PROCESS INTEGER PUSH+POP (interleaved)
Rate
Data::Queue::Shared::Int 5.0M/s
( run in 0.693 second using v1.01-cache-2.11-cpan-e1769b4cff6 )