Data-Queue-Shared

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

        $q->sync;                   # msync — flush to disk for crash durability
        $q->unlink;                 # remove backing file
        Class->unlink($path);       # class method form
        my $p = $q->path;           # backing file path
        my $s = $q->stats;          # diagnostic hashref

    Stats keys: "size", "capacity", "mmap_size", "push_ok", "pop_ok",
    "push_full", "pop_empty", "recoveries", "push_waiters", "pop_waiters".
    Str queues additionally include "arena_cap" and "arena_used". All
    counters are approximate under concurrent access (diagnostic only).
    "push_waiters"/"pop_waiters" show currently blocked producers/consumers.

  Event Loop Integration (eventfd)
        my $fd = $q->eventfd;           # create eventfd, returns fd number
        $q->eventfd_set($fd);           # use an existing fd (e.g. inherited via fork)
        my $fd = $q->fileno;            # current eventfd (-1 if none)
        $q->notify;                     # signal eventfd (call after push)
        $q->eventfd_consume;            # drain notification counter

    Notification is opt-in: "push" does not write to the eventfd
    automatically. Call "notify" explicitly after pushing. This gives full

Shared.xs  view on Meta::CPAN

        Newx(args, count, struct qsm_arg);
        SAVEFREEPV(args);
        for (uint32_t i = 0; i < count; i++) {
            SV *sv = ST(i + 1);
            args[i].str = SvPV(sv, args[i].len);
            args[i].utf8 = SvUTF8(sv) ? true : false;
        }
    }
    queue_mutex_lock(h->hdr);
    for (uint32_t i = 0; i < count; i++) {
        int r = queue_str_push_locked(h, args[i].str, (uint32_t)args[i].len, args[i].utf8);
        if (r == -2) { queue_mutex_unlock(h->hdr); croak("Data::Queue::Shared::Str: string too long (max 2GB)"); }
        if (r != 1) break;
        pushed++;
    }
    queue_mutex_unlock(h->hdr);
    if (pushed) queue_wake_consumers_n(h->hdr, pushed);
    RETVAL = pushed;
  OUTPUT:
    RETVAL

Shared.xs  view on Meta::CPAN

    /* Cap count at capacity: the queue can't hold more than capacity items,
     * so a single pop_multi can't return more than that. This also prevents
     * size_t overflow in the items_buf allocation for adversarial inputs. */
    if (count > h->capacity) count = h->capacity;
    if (count > 0) {
        items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
        if (!items_buf) croak("Data::Queue::Shared::Str: out of memory");
    }
    queue_mutex_lock(h->hdr);
    for (UV i = 0; i < count; i++) {
        last_r = queue_str_pop_locked(h, &str, &len, &utf8);
        if (last_r <= 0) break;
        char *c = (char *)malloc(len ? len : 1);
        if (!c) { oom = 1; break; }
        if (len) memcpy(c, str, len);
        items_buf[n].buf = c;
        items_buf[n].len = len;
        items_buf[n].utf8 = utf8;
        n++;
    }
    queue_mutex_unlock(h->hdr);

Shared.xs  view on Meta::CPAN

    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    /* Hoist SV construction out of the mutex (see pop_multi). */
    struct drain_item { char *buf; uint32_t len; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
    UV drained_n = 0;
    int last_r = 0;
    int oom = 0;
    queue_mutex_lock(h->hdr);
    while (max_count-- > 0) {
        last_r = queue_str_pop_locked(h, &str, &len, &utf8);
        if (last_r <= 0) break;
        struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
        char *c = (char *)malloc(len ? len : 1);
        if (!it || !c) { free(it); free(c); oom = 1; break; }
        if (len) memcpy(c, str, len);
        it->buf = c; it->len = len; it->utf8 = utf8; it->next = NULL;
        if (drained_tail) drained_tail->next = it; else drained_head = it;
        drained_tail = it;
        drained_n++;
    }

lib/Data/Queue/Shared.pm  view on Meta::CPAN

    $q->sync;                   # msync — flush to disk for crash durability
    $q->unlink;                 # remove backing file
    Class->unlink($path);       # class method form
    my $p = $q->path;           # backing file path
    my $s = $q->stats;          # diagnostic hashref

Stats keys: C<size>, C<capacity>, C<mmap_size>, C<push_ok>, C<pop_ok>,
C<push_full>, C<pop_empty>, C<recoveries>, C<push_waiters>, C<pop_waiters>.
Str queues additionally include C<arena_cap> and C<arena_used>.
All counters are approximate under concurrent access (diagnostic only).
C<push_waiters>/C<pop_waiters> show currently blocked producers/consumers.

=head2 Event Loop Integration (eventfd)

    my $fd = $q->eventfd;           # create eventfd, returns fd number
    $q->eventfd_set($fd);           # use an existing fd (e.g. inherited via fork)
    my $fd = $q->fileno;            # current eventfd (-1 if none)
    $q->notify;                     # signal eventfd (call after push)
    $q->eventfd_consume;            # drain notification counter

Notification is B<opt-in>: C<push> does not write to the eventfd

queue.h  view on Meta::CPAN

    uint32_t mode;           /* 8: QUEUE_MODE_INT or QUEUE_MODE_STR */
    uint32_t capacity;       /* 12: max elements (power of 2) */
    uint64_t total_size;     /* 16: mmap size */
    uint64_t slots_off;      /* 24: offset to slot array */
    uint64_t arena_off;      /* 32: str mode: offset to arena; int: 0 */
    uint64_t arena_cap;      /* 40: str mode: arena byte capacity; int: 0 */
    uint8_t  _pad0[16];      /* 48-63 */

    /* ---- Cache line 1 (64-127): head / consumer hot ---- */
    uint64_t head;           /* 64: consumer position */
    uint32_t pop_waiters;    /* 72: count of blocked consumers */
    uint32_t pop_futex;      /* 76: futex word for consumer wakeup */
    uint8_t  _pad1[48];      /* 80-127 */

    /* ---- Cache line 2 (128-191): tail / producer hot ---- */
    uint64_t tail;           /* 128: producer position */
    uint32_t push_waiters;   /* 136: count of blocked producers */
    uint32_t push_futex;     /* 140: futex word for producer wakeup */
    uint8_t  _pad2[48];      /* 144-191 */

    /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
    uint32_t mutex;          /* 192: futex-based mutex (0 or PID|0x80000000) */
    uint32_t mutex_waiters;  /* 196 */
    uint32_t arena_wpos;     /* 200: str mode: next write position in arena */
    uint32_t arena_used;     /* 204: str mode: total arena bytes consumed */
    uint64_t stat_push_ok;   /* 208 */
    uint64_t stat_pop_ok;    /* 216 */

queue.h  view on Meta::CPAN

        spin = 0;
    }
}

static inline void queue_mutex_unlock(QueueHeader *hdr) {
    __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

/* Wake up to `n` blocked consumers (after batch push). Each woken
 * consumer pops at most one item, so batch publishers must wake `n`
 * (not 1) to drain a multi-item commit without leaving consumers
 * sleeping on still-available items. */
static inline void queue_wake_consumers_n(QueueHeader *hdr, uint32_t n) {
    if (n == 0) return;
    if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE,
                n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
    }
}

/* Wake blocked consumers (after single push) */
static inline void queue_wake_consumers(QueueHeader *hdr) {
    queue_wake_consumers_n(hdr, 1);
}

/* Wake up to `n` blocked producers (after batch pop). See
 * queue_wake_consumers_n for the batching rationale. */
static inline void queue_wake_producers_n(QueueHeader *hdr, uint32_t n) {
    if (n == 0) return;
    if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE,
                n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
    }
}

/* Wake blocked producers (after single pop) */
static inline void queue_wake_producers(QueueHeader *hdr) {
    queue_wake_producers_n(hdr, 1);
}

/* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
static inline int queue_remaining_time(const struct timespec *deadline,
                                        struct timespec *remaining) {
    struct timespec now;
    clock_gettime(CLOCK_MONOTONIC, &now);
    remaining->tv_sec = deadline->tv_sec - now.tv_sec;

queue.h  view on Meta::CPAN

DEFINE_INT_QUEUE(int32, QueueInt32Slot, int32_t, uint32_t, int32_t)

/* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
DEFINE_INT_QUEUE(int16, QueueInt16Slot, int16_t, uint32_t, int32_t)

/* ================================================================
 * Str queue: mutex-protected with circular arena
 * ================================================================ */

/* Push one item while mutex is already held. Returns 1=ok, 0=full, -2=too long. */
static inline int queue_str_push_locked(QueueHandle *h, const char *str,
                                         uint32_t len, bool utf8) {
    QueueHeader *hdr = h->hdr;

    if (len > QUEUE_STR_LEN_MASK) return -2;

    if (hdr->tail - hdr->head >= h->capacity) {
        __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
        return 0;
    }

queue.h  view on Meta::CPAN

    hdr->arena_wpos = pos + alloc;
    hdr->arena_used += (uint32_t)skip;
    hdr->tail++;
    __atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED);
    return 1;
}

static inline int queue_str_try_push(QueueHandle *h, const char *str,
                                      uint32_t len, bool utf8) {
    queue_mutex_lock(h->hdr);
    int r = queue_str_push_locked(h, str, len, utf8);
    queue_mutex_unlock(h->hdr);
    if (r == 1) queue_wake_consumers(h->hdr);
    return r;
}

/* Pop one item while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
static inline int queue_str_pop_locked(QueueHandle *h, const char **out_str,
                                        uint32_t *out_len, bool *out_utf8) {
    QueueHeader *hdr = h->hdr;

    if (hdr->tail == hdr->head) {
        __atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED);
        return 0;
    }

    uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
    QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];

queue.h  view on Meta::CPAN

        hdr->arena_wpos = 0;

    hdr->head++;
    __atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED);
    return 1;
}

static inline int queue_str_try_pop(QueueHandle *h, const char **out_str,
                                     uint32_t *out_len, bool *out_utf8) {
    queue_mutex_lock(h->hdr);
    int r = queue_str_pop_locked(h, out_str, out_len, out_utf8);
    queue_mutex_unlock(h->hdr);
    if (r == 1) queue_wake_producers(h->hdr);
    return r;
}

static int queue_str_push_wait(QueueHandle *h, const char *str,
                                uint32_t len, bool utf8, double timeout) {
    int r = queue_str_try_push(h, str, len, utf8);
    if (r != 0) return r;  /* 1 = success, -2 = too long */
    if (timeout == 0) return 0;

queue.h  view on Meta::CPAN

}

static void queue_str_clear(QueueHandle *h) {
    QueueHeader *hdr = h->hdr;
    queue_mutex_lock(hdr);
    hdr->head = 0;
    hdr->tail = 0;
    hdr->arena_wpos = 0;
    hdr->arena_used = 0;
    queue_mutex_unlock(hdr);
    /* clear is a bulk transition — wake every blocked producer and
     * consumer so they re-evaluate state, not just one of each. */
    if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
    }
    if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
    }
}

t/03-multiprocess.t  view on Meta::CPAN


    if ($pid == 0) {
        my $cq = Data::Queue::Shared::Str->new($path5, 4, 4096);
        select(undef, undef, undef, 0.1);
        $cq->clear;
        POSIX::_exit(0);
    }

    my $t0 = time;
    ok $q->push_wait("after_clear", 30), 'push_wait succeeded after clear()';
    cmp_ok time - $t0, '<', 20, 'push_wait unblocked (not full-timeout hang)';
    waitpid($pid, 0);

    $q->unlink;
};

# Str: clear() wakes blocked pop_wait consumers
my $path6 = tmpnam() . '.shm';
END { unlink $path6 if $$ == $main_pid && $path6 && -f $path6 }

subtest 'str clear wakes pop_wait' => sub {
    my $q = Data::Queue::Shared::Str->new($path6, 4, 4096);

    my $pid = fork();
    die "fork: $!" unless defined $pid;

    if ($pid == 0) {
        my $cq = Data::Queue::Shared::Str->new($path6, 4, 4096);
        select(undef, undef, undef, 0.1);
        $cq->clear;  # should wake the blocked consumer
        select(undef, undef, undef, 0.1);
        $cq->push("after_clear");  # then push something
        POSIX::_exit(0);
    }

    # Parent blocks in pop_wait — clear wakes it, then it re-blocks,
    # then the push wakes it again with actual data
    my $t0 = time;
    my $val = $q->pop_wait(30);
    is $val, "after_clear", 'got value pushed after clear';

xt/eintr.t  view on Meta::CPAN

if ($pid == 0) {
    # Producer: wait a bit, push
    select undef, undef, undef, 0.3;
    $q->push(42);
    _exit(0);
}

# Install a SIGUSR1 handler that fires during the pop_wait
$SIG{USR1} = sub { diag "SIGUSR1 received" };

# Schedule a SIGUSR1 to ourself while blocked
my $signaller_pid = fork // die;
if ($signaller_pid == 0) {
    select undef, undef, undef, 0.1;
    kill USR1 => getppid();
    _exit(0);
}

my $t0 = time;
my $v = $q->pop_wait(2.0);
my $dt = time - $t0;

xt/eintr_sigalrm.t  view on Meta::CPAN

use strict;
use warnings;
use Test::More;
use POSIX ();
use Time::HiRes qw(time);

# SIGALRM during futex-based pop_wait: the syscall may return EINTR; the
# wrapper must either retry transparently or surface a clean error, and
# must not leave waiters deadlocked.

use Data::Queue::Shared::Int;

my $q = Data::Queue::Shared::Int->new_memfd("eintr", 4);

# Arrange SIGALRM to fire ~100ms into a 500ms wait.
my $got_alrm = 0;
local $SIG{ALRM} = sub { $got_alrm++ };

POSIX::setitimer(POSIX::ITIMER_REAL(), 0.1, 0) if POSIX->can('setitimer');



( run in 1.230 second using v1.01-cache-2.11-cpan-e1769b4cff6 )