Data-Queue-Shared

 view release on metacpan or  search on metacpan

queue.h  view on Meta::CPAN

 * Two variants:
 *   Int  — lock-free Vyukov bounded MPMC queue (int64 values)
 *   Str  — futex-mutex protected queue with circular arena (byte strings)
 *
 * Both use file-backed mmap(MAP_SHARED) for cross-process sharing,
 * futex for blocking wait, and PID-based stale lock recovery.
 */

#ifndef QUEUE_H
#define QUEUE_H

#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#include <limits.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>

/* ================================================================
 * Constants
 * ================================================================ */

#define QUEUE_MAGIC       0x51554531U  /* "QUE1" */
#define QUEUE_VERSION     1
#define QUEUE_MODE_INT    0
#define QUEUE_MODE_STR    1
#define QUEUE_MODE_INT32  2
#define QUEUE_MODE_INT16  3
#define QUEUE_ERR_BUFLEN  256
#define QUEUE_SPIN_LIMIT  32
#define QUEUE_LOCK_TIMEOUT_SEC 2

/* ================================================================
 * Header (256 bytes = 4 cache lines, lives at start of mmap)
 * ================================================================ */

typedef struct {
    /* ---- Cache line 0 (0-63): immutable after create ---- */
    uint32_t magic;          /* 0 */
    uint32_t version;        /* 4 */
    uint32_t mode;           /* 8: QUEUE_MODE_INT or QUEUE_MODE_STR */
    uint32_t capacity;       /* 12: max elements (power of 2) */
    uint64_t total_size;     /* 16: mmap size */
    uint64_t slots_off;      /* 24: offset to slot array */
    uint64_t arena_off;      /* 32: str mode: offset to arena; int: 0 */
    uint64_t arena_cap;      /* 40: str mode: arena byte capacity; int: 0 */
    uint8_t  _pad0[16];      /* 48-63 */

    /* ---- Cache line 1 (64-127): head / consumer hot ---- */
    uint64_t head;           /* 64: consumer position */
    uint32_t pop_waiters;    /* 72: count of blocked consumers */
    uint32_t pop_futex;      /* 76: futex word for consumer wakeup */
    uint8_t  _pad1[48];      /* 80-127 */

    /* ---- Cache line 2 (128-191): tail / producer hot ---- */
    uint64_t tail;           /* 128: producer position */
    uint32_t push_waiters;   /* 136: count of blocked producers */
    uint32_t push_futex;     /* 140: futex word for producer wakeup */
    uint8_t  _pad2[48];      /* 144-191 */

    /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
    uint32_t mutex;          /* 192: futex-based mutex (0 or PID|0x80000000) */
    uint32_t mutex_waiters;  /* 196 */
    uint32_t arena_wpos;     /* 200: str mode: next write position in arena */
    uint32_t arena_used;     /* 204: str mode: total arena bytes consumed */
    uint64_t stat_push_ok;   /* 208 */
    uint64_t stat_pop_ok;    /* 216 */
    uint64_t stat_push_full; /* 224 */
    uint64_t stat_pop_empty; /* 232 */
    uint64_t stat_recoveries;/* 240 */
    uint8_t  _pad3[8];       /* 248-255 */
} QueueHeader;

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(QueueHeader) == 256, "QueueHeader must be 256 bytes");
#endif

/* ================================================================
 * Slot types
 * ================================================================ */

/* Int slot: Vyukov MPMC sequence + value */
typedef struct {
    uint64_t sequence;
    int64_t  value;
} QueueIntSlot;  /* 16 bytes */

/* Compact int slots: 32-bit sequence + value = 8 bytes (2x cache density) */
typedef struct {
    uint32_t sequence;
    int32_t  value;
} QueueInt32Slot;  /* 8 bytes */

typedef struct {
    uint32_t sequence;
    int16_t  value;
    int16_t  _pad;
} QueueInt16Slot;  /* 8 bytes */

/* Str slot: arena pointer + length + skip (for FIFO arena free) */
typedef struct {
    uint32_t arena_off;
    uint32_t packed_len;   /* bit 31 = UTF-8, bits 0-30 = byte length */
    uint32_t arena_skip;   /* bytes to release from arena on pop (includes wrap waste) */
    uint32_t prev_wpos;   /* arena_wpos before this push (for pop_back rollback) */
} QueueStrSlot;  /* 16 bytes */

#define QUEUE_STR_UTF8_FLAG  0x80000000U
#define QUEUE_STR_LEN_MASK   0x7FFFFFFFU

/* ================================================================
 * Process-local handle
 * ================================================================ */

typedef struct {
    QueueHeader *hdr;
    void        *slots;      /* QueueIntSlot* or QueueStrSlot* */

queue.h  view on Meta::CPAN


#define QUEUE_MUTEX_WRITER_BIT 0x80000000U
#define QUEUE_MUTEX_PID_MASK   0x7FFFFFFFU
#define QUEUE_MUTEX_VAL(pid)   (QUEUE_MUTEX_WRITER_BIT | ((uint32_t)(pid) & QUEUE_MUTEX_PID_MASK))

static inline int queue_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

static const struct timespec queue_lock_timeout = { QUEUE_LOCK_TIMEOUT_SEC, 0 };

static inline void queue_recover_stale_mutex(QueueHeader *hdr, uint32_t observed) {
    if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return;
    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

static inline void queue_mutex_lock(QueueHeader *hdr) {
    uint32_t mypid = QUEUE_MUTEX_VAL((uint32_t)getpid());
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < QUEUE_SPIN_LIMIT, 1)) {
            queue_spin_pause();
            continue;
        }
        __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
                              &queue_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
                if (val >= QUEUE_MUTEX_WRITER_BIT) {
                    uint32_t pid = val & QUEUE_MUTEX_PID_MASK;
                    if (!queue_pid_alive(pid))
                        queue_recover_stale_mutex(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

static inline void queue_mutex_unlock(QueueHeader *hdr) {
    __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

/* Wake up to `n` blocked consumers (after batch push). Each woken
 * consumer pops at most one item, so batch publishers must wake `n`
 * (not 1) to drain a multi-item commit without leaving consumers
 * sleeping on still-available items. */
static inline void queue_wake_consumers_n(QueueHeader *hdr, uint32_t n) {
    if (n == 0) return;
    if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE,
                n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
    }
}

/* Wake blocked consumers (after single push) */
static inline void queue_wake_consumers(QueueHeader *hdr) {
    queue_wake_consumers_n(hdr, 1);
}

/* Wake up to `n` blocked producers (after batch pop). See
 * queue_wake_consumers_n for the batching rationale. */
static inline void queue_wake_producers_n(QueueHeader *hdr, uint32_t n) {
    if (n == 0) return;
    if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE,
                n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
    }
}

/* Wake blocked producers (after single pop) */
static inline void queue_wake_producers(QueueHeader *hdr) {
    queue_wake_producers_n(hdr, 1);
}

/* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
static inline int queue_remaining_time(const struct timespec *deadline,
                                        struct timespec *remaining) {
    struct timespec now;
    clock_gettime(CLOCK_MONOTONIC, &now);
    remaining->tv_sec = deadline->tv_sec - now.tv_sec;
    remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
    if (remaining->tv_nsec < 0) {
        remaining->tv_sec--;
        remaining->tv_nsec += 1000000000L;
    }
    return remaining->tv_sec >= 0;
}

/* Convert timeout in seconds (double) to absolute deadline */
static inline void queue_make_deadline(double timeout, struct timespec *deadline) {
    clock_gettime(CLOCK_MONOTONIC, deadline);
    deadline->tv_sec += (time_t)timeout;
    deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
    if (deadline->tv_nsec >= 1000000000L) {
        deadline->tv_sec++;
        deadline->tv_nsec -= 1000000000L;
    }
}

/* ================================================================
 * Create / Open / Close
 * ================================================================ */

#define QUEUE_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, QUEUE_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)

static inline void queue_init_new_header(void *base, uint32_t cap, uint64_t arena_cap,
                                          uint32_t slots_off, uint32_t arena_off,
                                          uint32_t mode, uint64_t total_size) {
    QueueHeader *hdr = (QueueHeader *)base;
    memset(hdr, 0, sizeof(QueueHeader));
    hdr->magic      = QUEUE_MAGIC;
    hdr->version    = QUEUE_VERSION;
    hdr->mode       = mode;
    hdr->capacity   = cap;
    hdr->total_size = total_size;
    hdr->slots_off  = slots_off;
    hdr->arena_off  = arena_off;
    hdr->arena_cap  = arena_cap;
    #define INIT_SEQ(STYPE, C) do { \
        STYPE *s = (STYPE *)((char *)base + slots_off); \
        for (uint32_t _i = 0; _i < (C); _i++) s[_i].sequence = _i; \
    } while(0)
    if      (mode == QUEUE_MODE_INT)   INIT_SEQ(QueueIntSlot,   cap);
    else if (mode == QUEUE_MODE_INT32) INIT_SEQ(QueueInt32Slot, cap);
    else if (mode == QUEUE_MODE_INT16) INIT_SEQ(QueueInt16Slot, cap);
    #undef INIT_SEQ
    __atomic_thread_fence(__ATOMIC_SEQ_CST);
}

static QueueHandle *queue_create(const char *path, uint32_t capacity,

queue.h  view on Meta::CPAN

         * ordering closes. */                                             \
        __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);           \
        uint32_t fseq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE);  \
        if (queue_##PFX##_try_pop(h, value)) {                                \
            __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);       \
            return 1;                                                         \
        }                                                                     \
        struct timespec *pts = NULL;                                          \
        if (has_deadline) {                                                    \
            if (!queue_remaining_time(&deadline, &remaining)) {               \
                __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);  \
                return 0;                                                     \
            }                                                                 \
            pts = &remaining;                                                 \
        }                                                                     \
        long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT,            \
                          fseq, pts, NULL, 0);                                \
        __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);           \
        if (queue_##PFX##_try_pop(h, value)) return 1;                        \
        if (rc == -1 && errno == ETIMEDOUT) return 0;                         \
    }                                                                         \
}                                                                             \
                                                                               \
static inline int queue_##PFX##_peek(QueueHandle *h, VTYPE *value) {          \
    SLOT *slots = (SLOT *)h->slots;                                           \
    uint64_t pos = __atomic_load_n(&h->hdr->head, __ATOMIC_ACQUIRE);          \
    SLOT *slot = &slots[pos & h->cap_mask];                                   \
    STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);           \
    if ((DTYPE)seq - (DTYPE)(STYPE)(pos + 1) == 0) {                          \
        *value = slot->value;                                                 \
        return 1;                                                             \
    }                                                                         \
    return 0;                                                                 \
}                                                                             \
                                                                               \
static inline uint64_t queue_##PFX##_size(QueueHandle *h) {                   \
    uint64_t tail = __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED);         \
    uint64_t head = __atomic_load_n(&h->hdr->head, __ATOMIC_RELAXED);         \
    return tail - head;                                                       \
}                                                                             \
                                                                               \
static void queue_##PFX##_clear(QueueHandle *h) {                             \
    VTYPE tmp;                                                                \
    while (queue_##PFX##_try_pop(h, &tmp)) {}                                 \
}

/* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
DEFINE_INT_QUEUE(int, QueueIntSlot, int64_t, uint64_t, int64_t)

/* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
DEFINE_INT_QUEUE(int32, QueueInt32Slot, int32_t, uint32_t, int32_t)

/* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
DEFINE_INT_QUEUE(int16, QueueInt16Slot, int16_t, uint32_t, int32_t)

/* ================================================================
 * Str queue: mutex-protected with circular arena
 * ================================================================ */

/* Push one item while mutex is already held. Returns 1=ok, 0=full, -2=too long. */
static inline int queue_str_push_locked(QueueHandle *h, const char *str,
                                         uint32_t len, bool utf8) {
    QueueHeader *hdr = h->hdr;

    if (len > QUEUE_STR_LEN_MASK) return -2;

    if (hdr->tail - hdr->head >= h->capacity) {
        __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
        return 0;
    }

    uint32_t alloc = (len + 7) & ~7u;
    if (alloc == 0) alloc = 8;
    if (alloc > h->arena_cap) return -2;
    uint32_t saved_wpos = hdr->arena_wpos;
    uint32_t pos = saved_wpos;
    uint64_t skip = alloc;

    if ((uint64_t)pos + alloc > h->arena_cap) {
        skip += h->arena_cap - pos;
        pos = 0;
    }

    if ((uint64_t)hdr->arena_used + skip > h->arena_cap) {
        if (hdr->tail == hdr->head) {
            hdr->arena_wpos = 0;
            hdr->arena_used = 0;
            saved_wpos = 0;
            pos = 0;
            skip = alloc;
        } else {
            __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
            return 0;
        }
    }

    memcpy(h->arena + pos, str, len);

    uint32_t idx = (uint32_t)(hdr->tail & h->cap_mask);
    QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
    slot->arena_off = pos;
    slot->packed_len = len | (utf8 ? QUEUE_STR_UTF8_FLAG : 0);
    slot->arena_skip = (uint32_t)skip;
    slot->prev_wpos = saved_wpos;

    hdr->arena_wpos = pos + alloc;
    hdr->arena_used += (uint32_t)skip;
    hdr->tail++;
    __atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED);
    return 1;
}

static inline int queue_str_try_push(QueueHandle *h, const char *str,
                                      uint32_t len, bool utf8) {
    queue_mutex_lock(h->hdr);
    int r = queue_str_push_locked(h, str, len, utf8);
    queue_mutex_unlock(h->hdr);
    if (r == 1) queue_wake_consumers(h->hdr);
    return r;
}

/* Pop one item while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
static inline int queue_str_pop_locked(QueueHandle *h, const char **out_str,
                                        uint32_t *out_len, bool *out_utf8) {
    QueueHeader *hdr = h->hdr;

    if (hdr->tail == hdr->head) {
        __atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED);
        return 0;
    }

    uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
    QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];

    uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
    *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;

    if (!queue_ensure_copy_buf(h, len + 1))
        return -1;
    if (len > 0)
        memcpy(h->copy_buf, h->arena + slot->arena_off, len);
    h->copy_buf[len] = '\0';
    *out_str = h->copy_buf;
    *out_len = len;

    hdr->arena_used -= slot->arena_skip;
    if (hdr->arena_used == 0)
        hdr->arena_wpos = 0;

    hdr->head++;
    __atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED);
    return 1;
}

static inline int queue_str_try_pop(QueueHandle *h, const char **out_str,
                                     uint32_t *out_len, bool *out_utf8) {
    queue_mutex_lock(h->hdr);
    int r = queue_str_pop_locked(h, out_str, out_len, out_utf8);
    queue_mutex_unlock(h->hdr);
    if (r == 1) queue_wake_producers(h->hdr);
    return r;
}

static int queue_str_push_wait(QueueHandle *h, const char *str,
                                uint32_t len, bool utf8, double timeout) {
    int r = queue_str_try_push(h, str, len, utf8);
    if (r != 0) return r;  /* 1 = success, -2 = too long */
    if (timeout == 0) return 0;

    QueueHeader *hdr = h->hdr;
    struct timespec deadline, remaining;
    int has_deadline = (timeout > 0);
    if (has_deadline) queue_make_deadline(timeout, &deadline);

    for (;;) {
        /* Announce BEFORE sampling seq and re-trying; otherwise a popper
         * that drains between try_push and waiters++ reads waiters==0,
         * skips the push_futex bump, and our FUTEX_WAIT then sleeps on
         * an unchanged seq with no future waker. */
        __atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
        uint32_t seq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE);
        r = queue_str_try_push(h, str, len, utf8);
        if (r != 0) {
            __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
            return r;
        }

        struct timespec *pts = NULL;
        if (has_deadline) {
            if (!queue_remaining_time(&deadline, &remaining)) {
                __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
                return 0;
            }
            pts = &remaining;
        }
        long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, seq, pts, NULL, 0);
        __atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);

        r = queue_str_try_push(h, str, len, utf8);
        if (r != 0) return r;
        if (rc == -1 && errno == ETIMEDOUT) return 0;
    }
}

static int queue_str_pop_wait(QueueHandle *h, const char **out_str,
                               uint32_t *out_len, bool *out_utf8, double timeout) {
    int r = queue_str_try_pop(h, out_str, out_len, out_utf8);
    if (r != 0) return r;
    if (timeout == 0) return 0;

    QueueHeader *hdr = h->hdr;
    struct timespec deadline, remaining;
    int has_deadline = (timeout > 0);
    if (has_deadline) queue_make_deadline(timeout, &deadline);

    for (;;) {
        /* Announce BEFORE sampling seq / re-trying; see push_wait. */
        __atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
        uint32_t seq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE);
        r = queue_str_try_pop(h, out_str, out_len, out_utf8);
        if (r != 0) {
            __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
            return r;
        }

        struct timespec *pts = NULL;
        if (has_deadline) {
            if (!queue_remaining_time(&deadline, &remaining)) {
                __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
                return 0;
            }
            pts = &remaining;
        }
        long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, seq, pts, NULL, 0);
        __atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);

        r = queue_str_try_pop(h, out_str, out_len, out_utf8);
        if (r != 0) return r;
        if (rc == -1 && errno == ETIMEDOUT) return 0;
    }
}

static inline uint64_t queue_str_size(QueueHandle *h) {
    QueueHeader *hdr = h->hdr;
    uint64_t tail = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED);
    uint64_t head = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED);
    return tail - head;  /* unsigned wrap is correct for push_front (head > tail) */
}

static void queue_str_clear(QueueHandle *h) {
    QueueHeader *hdr = h->hdr;
    queue_mutex_lock(hdr);
    hdr->head = 0;
    hdr->tail = 0;
    hdr->arena_wpos = 0;
    hdr->arena_used = 0;
    queue_mutex_unlock(hdr);
    /* clear is a bulk transition — wake every blocked producer and
     * consumer so they re-evaluate state, not just one of each. */
    if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
    }
    if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
    }
}

/* Peek: read front element without consuming (exact, under mutex). */
static inline int queue_str_peek(QueueHandle *h, const char **out_str,
                                  uint32_t *out_len, bool *out_utf8) {
    QueueHeader *hdr = h->hdr;
    queue_mutex_lock(hdr);
    if (hdr->tail == hdr->head) {
        queue_mutex_unlock(hdr);
        return 0;
    }
    uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
    QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
    uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
    *out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
    if (!queue_ensure_copy_buf(h, len + 1)) {
        queue_mutex_unlock(hdr);
        return -1;
    }
    if (len > 0)
        memcpy(h->copy_buf, h->arena + slot->arena_off, len);
    h->copy_buf[len] = '\0';
    *out_str = h->copy_buf;
    *out_len = len;
    queue_mutex_unlock(hdr);
    return 1;
}

/* Push to front of queue (requeue). Str only — Int is strictly FIFO. */
static inline int queue_str_push_front(QueueHandle *h, const char *str,
                                        uint32_t len, bool utf8) {
    QueueHeader *hdr = h->hdr;
    queue_mutex_lock(hdr);

    if (len > QUEUE_STR_LEN_MASK) {
        queue_mutex_unlock(hdr);
        return -2;
    }

    uint64_t size = hdr->tail - hdr->head;
    if (size >= h->capacity) {
        __atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
        queue_mutex_unlock(hdr);
        return 0;
    }

    uint32_t alloc = (len + 7) & ~7u;
    if (alloc == 0) alloc = 8;
    if (alloc > h->arena_cap) {
        queue_mutex_unlock(hdr);
        return -2;



( run in 1.741 second using v1.01-cache-2.11-cpan-e1769b4cff6 )