Data-Queue-Shared

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN


  API
   Core operations
        my $ok  = $q->push($value);             # non-blocking, false if full
        my $val = $q->pop;                       # non-blocking, undef if empty
        my $ok  = $q->push_wait($value);         # blocking, infinite wait
        my $ok  = $q->push_wait($value, $secs);  # blocking with timeout
        my $val = $q->pop_wait;                  # blocking, infinite wait
        my $val = $q->pop_wait($secs);           # blocking with timeout
        my $val = $q->peek;                      # read front without consuming

    "peek" returns the front element without removing it ("undef" if empty).
    For Int, this is a best-effort snapshot (racy in concurrent MPMC). For
    Str, this is exact (mutex-protected).

   Deque operations (Str only)
        my $ok  = $q->push_front($value);           # non-blocking push to front
        my $ok  = $q->push_front_wait($value);       # blocking push to front
        my $ok  = $q->push_front_wait($val, $secs);  # with timeout
        my $val = $q->pop_back;                 # non-blocking pop from back
        my $val = $q->pop_back_wait;            # blocking pop from back
        my $val = $q->pop_back_wait($timeout);  # with timeout

    "push_front" inserts at the head — useful for requeueing failed jobs.
    "pop_back" removes from the tail — useful for work-stealing or undo. Not
    available for Int (Vyukov algorithm is strictly FIFO).

   Batch operations
        my $n  = $q->push_multi(@values);          # non-blocking, returns pushed count
        my @v  = $q->pop_multi($count);            # non-blocking, pop up to $count
        my $n  = $q->push_wait_multi($timeout, @values);  # blocking batch push
        my @v  = $q->pop_wait_multi($n, $timeout); # block for >=1, grab up to $n
        my @v  = $q->drain;                        # pop all elements
        my @v  = $q->drain($max);                  # pop up to $max elements

    "pop_wait_multi" blocks until at least one element is available (or
    timeout), then grabs up to $n elements non-blocking. Returns empty list
    on timeout.

    "push_wait_multi" pushes all values, blocking if the queue is full.
    $timeout is seconds (-1 = infinite, 0 = try once).

   Status
        my $n   = $q->size;         # approximate under concurrent access
        my $cap = $q->capacity;     # max elements
        my $ok  = $q->is_empty;
        my $ok  = $q->is_full;

   Management
        $q->clear;                  # remove all elements
        $q->sync;                   # msync — flush to disk for crash durability
        $q->unlink;                 # remove backing file
        Class->unlink($path);       # class method form
        my $p = $q->path;           # backing file path
        my $s = $q->stats;          # diagnostic hashref

    Stats keys: "size", "capacity", "mmap_size", "push_ok", "pop_ok",
    "push_full", "pop_empty", "recoveries", "push_waiters", "pop_waiters".
    Str queues additionally include "arena_cap" and "arena_used". All
    counters are approximate under concurrent access (diagnostic only).
    "push_waiters"/"pop_waiters" show currently blocked producers/consumers.

  Event Loop Integration (eventfd)
        my $fd = $q->eventfd;           # create eventfd, returns fd number
        $q->eventfd_set($fd);           # use an existing fd (e.g. inherited via fork)
        my $fd = $q->fileno;            # current eventfd (-1 if none)
        $q->notify;                     # signal eventfd (call after push)
        $q->eventfd_consume;            # drain notification counter

    Notification is opt-in: "push" does not write to the eventfd
    automatically. Call "notify" explicitly after pushing. This gives full
    control over batching (push N items, notify once) and avoids any
    overhead when eventfd is not used.

        use EV;
        my $q = Data::Queue::Shared::Str->new($path, 1024);
        my $fd = $q->eventfd;
        my $w = EV::io $fd, EV::READ, sub {
            $q->eventfd_consume;
            while (defined(my $item = $q->pop)) {
                process($item);
            }
        };
        # Producer side:
        $q->push($item);
        $q->notify;   # wake the EV watcher
        EV::run;

    For cross-process notification, create the eventfd before fork(). Child
    processes inherit the fd and should call eventfd_set($fd) on their queue
    handle. Writes from any process sharing the fd will wake all event-loop
    watchers.

  Crash Safety
    If a process dies while holding the Str queue mutex, other processes
    detect the stale lock within 2 seconds via PID tracking and
    automatically recover. The Int queue is lock-free and requires no crash
    recovery for normal push/pop operations.

  Keyword API
    When XS::Parse::Keyword is installed at build time, keyword forms are
    available that bypass method dispatch:

        use Data::Queue::Shared::Int;    # activates q_int_* keywords

        q_int_push $q, $value;
        my $val = q_int_pop $q;
        my $val = q_int_peek $q;
        my $n   = q_int_size $q;

    Replace "int" with "int32", "int16", or "str" for other variants.
    Keywords are lexically scoped and require "use" (not "require").

BENCHMARKS
    Throughput versus other Perl queue/IPC modules, 200K items, single
    process and cross-process, Linux x86_64. Run "perl -Mblib bench/vs.pl
    200000" to reproduce.

        SINGLE-PROCESS INTEGER PUSH+POP (interleaved)
                                   Rate
        Data::Queue::Shared::Int  5.0M/s



( run in 0.693 second using v1.01-cache-2.11-cpan-e1769b4cff6 )