Data-Queue-Shared
view release on metacpan or search on metacpan
lib/Data/Queue/Shared.pm view on Meta::CPAN
my $ok = $q->push($value); # non-blocking, false if full
my $val = $q->pop; # non-blocking, undef if empty
my $ok = $q->push_wait($value); # blocking, infinite wait
my $ok = $q->push_wait($value, $secs); # blocking with timeout
my $val = $q->pop_wait; # blocking, infinite wait
my $val = $q->pop_wait($secs); # blocking with timeout
my $val = $q->peek; # read front without consuming
C<peek> returns the front element without removing it (C<undef> if empty).
For Int, this is a best-effort snapshot (racy in concurrent MPMC).
For Str, this is exact (mutex-protected).
=head3 Deque operations (Str only)
my $ok = $q->push_front($value); # non-blocking push to front
my $ok = $q->push_front_wait($value); # blocking push to front
my $ok = $q->push_front_wait($val, $secs); # with timeout
my $val = $q->pop_back; # non-blocking pop from back
my $val = $q->pop_back_wait; # blocking pop from back
my $val = $q->pop_back_wait($timeout); # with timeout
C<push_front> inserts at the head â useful for requeueing failed jobs.
C<pop_back> removes from the tail â useful for work-stealing or undo.
Not available for Int (Vyukov algorithm is strictly FIFO).
=head3 Batch operations
my $n = $q->push_multi(@values); # non-blocking, returns pushed count
my @v = $q->pop_multi($count); # non-blocking, pop up to $count
my $n = $q->push_wait_multi($timeout, @values); # blocking batch push
my @v = $q->pop_wait_multi($n, $timeout); # block for >=1, grab up to $n
my @v = $q->drain; # pop all elements
my @v = $q->drain($max); # pop up to $max elements
C<pop_wait_multi> blocks until at least one element is available (or timeout),
then grabs up to C<$n> elements non-blocking. Returns empty list on timeout.
C<push_wait_multi> pushes all values, blocking if the queue is full.
C<$timeout> is seconds (C<-1> = infinite, C<0> = try once).
=head3 Status
my $n = $q->size; # approximate under concurrent access
my $cap = $q->capacity; # max elements
my $ok = $q->is_empty;
my $ok = $q->is_full;
=head3 Management
$q->clear; # remove all elements
$q->sync; # msync â flush to disk for crash durability
$q->unlink; # remove backing file
Class->unlink($path); # class method form
my $p = $q->path; # backing file path
my $s = $q->stats; # diagnostic hashref
Stats keys: C<size>, C<capacity>, C<mmap_size>, C<push_ok>, C<pop_ok>,
C<push_full>, C<pop_empty>, C<recoveries>, C<push_waiters>, C<pop_waiters>.
Str queues additionally include C<arena_cap> and C<arena_used>.
All counters are approximate under concurrent access (diagnostic only).
C<push_waiters>/C<pop_waiters> show currently blocked producers/consumers.
=head2 Event Loop Integration (eventfd)
my $fd = $q->eventfd; # create eventfd, returns fd number
$q->eventfd_set($fd); # use an existing fd (e.g. inherited via fork)
my $fd = $q->fileno; # current eventfd (-1 if none)
$q->notify; # signal eventfd (call after push)
$q->eventfd_consume; # drain notification counter
Notification is B<opt-in>: C<push> does not write to the eventfd
automatically. Call C<notify> explicitly after pushing. This gives full
control over batching (push N items, notify once) and avoids any overhead
when eventfd is not used.
use EV;
my $q = Data::Queue::Shared::Str->new($path, 1024);
my $fd = $q->eventfd;
my $w = EV::io $fd, EV::READ, sub {
$q->eventfd_consume;
while (defined(my $item = $q->pop)) {
process($item);
}
};
# Producer side:
$q->push($item);
$q->notify; # wake the EV watcher
EV::run;
For cross-process notification, create the eventfd B<before> C<fork()>.
Child processes inherit the fd and should call C<eventfd_set($fd)> on
their queue handle. Writes from any process sharing the fd will wake
all event-loop watchers.
=head2 Crash Safety
If a process dies while holding the Str queue mutex, other processes
detect the stale lock within 2 seconds via PID tracking and automatically
recover. The Int queue is lock-free and requires no crash recovery for
normal push/pop operations.
=head2 Keyword API
When L<XS::Parse::Keyword> is installed at build time, keyword forms
are available that bypass method dispatch:
use Data::Queue::Shared::Int; # activates q_int_* keywords
q_int_push $q, $value;
my $val = q_int_pop $q;
my $val = q_int_peek $q;
my $n = q_int_size $q;
Replace C<int> with C<int32>, C<int16>, or C<str> for other variants.
Keywords are lexically scoped and require C<use> (not C<require>).
=head1 BENCHMARKS
Throughput versus other Perl queue/IPC modules, 200K items,
single process and cross-process, Linux x86_64.
Run C<perl -Mblib bench/vs.pl 200000> to reproduce.
( run in 0.987 second using v1.01-cache-2.11-cpan-e1769b4cff6 )