Data-Queue-Shared
view release on metacpan or search on metacpan
* Two variants:
* Int â lock-free Vyukov bounded MPMC queue (int64 values)
* Str â futex-mutex protected queue with circular arena (byte strings)
*
* Both use file-backed mmap(MAP_SHARED) for cross-process sharing,
* futex for blocking wait, and PID-based stale lock recovery.
*/
#ifndef QUEUE_H
#define QUEUE_H
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#include <limits.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>
/* ================================================================
* Constants
* ================================================================ */
#define QUEUE_MAGIC 0x51554531U /* "QUE1" */
#define QUEUE_VERSION 1
#define QUEUE_MODE_INT 0
#define QUEUE_MODE_STR 1
#define QUEUE_MODE_INT32 2
#define QUEUE_MODE_INT16 3
#define QUEUE_ERR_BUFLEN 256
#define QUEUE_SPIN_LIMIT 32
#define QUEUE_LOCK_TIMEOUT_SEC 2
/* ================================================================
* Header (256 bytes = 4 cache lines, lives at start of mmap)
* ================================================================ */
typedef struct {
/* ---- Cache line 0 (0-63): immutable after create ---- */
uint32_t magic; /* 0 */
uint32_t version; /* 4 */
uint32_t mode; /* 8: QUEUE_MODE_INT or QUEUE_MODE_STR */
uint32_t capacity; /* 12: max elements (power of 2) */
uint64_t total_size; /* 16: mmap size */
uint64_t slots_off; /* 24: offset to slot array */
uint64_t arena_off; /* 32: str mode: offset to arena; int: 0 */
uint64_t arena_cap; /* 40: str mode: arena byte capacity; int: 0 */
uint8_t _pad0[16]; /* 48-63 */
/* ---- Cache line 1 (64-127): head / consumer hot ---- */
uint64_t head; /* 64: consumer position */
uint32_t pop_waiters; /* 72: count of blocked consumers */
uint32_t pop_futex; /* 76: futex word for consumer wakeup */
uint8_t _pad1[48]; /* 80-127 */
/* ---- Cache line 2 (128-191): tail / producer hot ---- */
uint64_t tail; /* 128: producer position */
uint32_t push_waiters; /* 136: count of blocked producers */
uint32_t push_futex; /* 140: futex word for producer wakeup */
uint8_t _pad2[48]; /* 144-191 */
/* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
uint32_t mutex_waiters; /* 196 */
uint32_t arena_wpos; /* 200: str mode: next write position in arena */
uint32_t arena_used; /* 204: str mode: total arena bytes consumed */
uint64_t stat_push_ok; /* 208 */
uint64_t stat_pop_ok; /* 216 */
uint64_t stat_push_full; /* 224 */
uint64_t stat_pop_empty; /* 232 */
uint64_t stat_recoveries;/* 240 */
uint8_t _pad3[8]; /* 248-255 */
} QueueHeader;
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(QueueHeader) == 256, "QueueHeader must be 256 bytes");
#endif
/* ================================================================
* Slot types
* ================================================================ */
/* Int slot: Vyukov MPMC sequence + value */
typedef struct {
uint64_t sequence;
int64_t value;
} QueueIntSlot; /* 16 bytes */
/* Compact int slots: 32-bit sequence + value = 8 bytes (2x cache density) */
typedef struct {
uint32_t sequence;
int32_t value;
} QueueInt32Slot; /* 8 bytes */
typedef struct {
uint32_t sequence;
int16_t value;
int16_t _pad;
} QueueInt16Slot; /* 8 bytes */
/* Str slot: arena pointer + length + skip (for FIFO arena free) */
typedef struct {
uint32_t arena_off;
uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
uint32_t arena_skip; /* bytes to release from arena on pop (includes wrap waste) */
uint32_t prev_wpos; /* arena_wpos before this push (for pop_back rollback) */
} QueueStrSlot; /* 16 bytes */
#define QUEUE_STR_UTF8_FLAG 0x80000000U
#define QUEUE_STR_LEN_MASK 0x7FFFFFFFU
/* ================================================================
* Process-local handle
* ================================================================ */
typedef struct {
QueueHeader *hdr;
void *slots; /* QueueIntSlot* or QueueStrSlot* */
#define QUEUE_MUTEX_WRITER_BIT 0x80000000U
#define QUEUE_MUTEX_PID_MASK 0x7FFFFFFFU
#define QUEUE_MUTEX_VAL(pid) (QUEUE_MUTEX_WRITER_BIT | ((uint32_t)(pid) & QUEUE_MUTEX_PID_MASK))
static inline int queue_pid_alive(uint32_t pid) {
if (pid == 0) return 1;
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
static const struct timespec queue_lock_timeout = { QUEUE_LOCK_TIMEOUT_SEC, 0 };
static inline void queue_recover_stale_mutex(QueueHeader *hdr, uint32_t observed) {
if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
return;
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
static inline void queue_mutex_lock(QueueHeader *hdr) {
uint32_t mypid = QUEUE_MUTEX_VAL((uint32_t)getpid());
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < QUEUE_SPIN_LIMIT, 1)) {
queue_spin_pause();
continue;
}
__atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
&queue_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (val >= QUEUE_MUTEX_WRITER_BIT) {
uint32_t pid = val & QUEUE_MUTEX_PID_MASK;
if (!queue_pid_alive(pid))
queue_recover_stale_mutex(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
static inline void queue_mutex_unlock(QueueHeader *hdr) {
__atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
/* Wake up to `n` blocked consumers (after batch push). Each woken
* consumer pops at most one item, so batch publishers must wake `n`
* (not 1) to drain a multi-item commit without leaving consumers
* sleeping on still-available items. */
static inline void queue_wake_consumers_n(QueueHeader *hdr, uint32_t n) {
if (n == 0) return;
if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE,
n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
}
}
/* Wake blocked consumers (after single push) */
static inline void queue_wake_consumers(QueueHeader *hdr) {
queue_wake_consumers_n(hdr, 1);
}
/* Wake up to `n` blocked producers (after batch pop). See
* queue_wake_consumers_n for the batching rationale. */
static inline void queue_wake_producers_n(QueueHeader *hdr, uint32_t n) {
if (n == 0) return;
if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE,
n > (uint32_t)INT_MAX ? INT_MAX : (int)n, NULL, NULL, 0);
}
}
/* Wake blocked producers (after single pop) */
static inline void queue_wake_producers(QueueHeader *hdr) {
queue_wake_producers_n(hdr, 1);
}
/* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
static inline int queue_remaining_time(const struct timespec *deadline,
struct timespec *remaining) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
remaining->tv_sec = deadline->tv_sec - now.tv_sec;
remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
if (remaining->tv_nsec < 0) {
remaining->tv_sec--;
remaining->tv_nsec += 1000000000L;
}
return remaining->tv_sec >= 0;
}
/* Convert timeout in seconds (double) to absolute deadline */
static inline void queue_make_deadline(double timeout, struct timespec *deadline) {
clock_gettime(CLOCK_MONOTONIC, deadline);
deadline->tv_sec += (time_t)timeout;
deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
if (deadline->tv_nsec >= 1000000000L) {
deadline->tv_sec++;
deadline->tv_nsec -= 1000000000L;
}
}
/* ================================================================
* Create / Open / Close
* ================================================================ */
#define QUEUE_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, QUEUE_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
static inline void queue_init_new_header(void *base, uint32_t cap, uint64_t arena_cap,
uint32_t slots_off, uint32_t arena_off,
uint32_t mode, uint64_t total_size) {
QueueHeader *hdr = (QueueHeader *)base;
memset(hdr, 0, sizeof(QueueHeader));
hdr->magic = QUEUE_MAGIC;
hdr->version = QUEUE_VERSION;
hdr->mode = mode;
hdr->capacity = cap;
hdr->total_size = total_size;
hdr->slots_off = slots_off;
hdr->arena_off = arena_off;
hdr->arena_cap = arena_cap;
#define INIT_SEQ(STYPE, C) do { \
STYPE *s = (STYPE *)((char *)base + slots_off); \
for (uint32_t _i = 0; _i < (C); _i++) s[_i].sequence = _i; \
} while(0)
if (mode == QUEUE_MODE_INT) INIT_SEQ(QueueIntSlot, cap);
else if (mode == QUEUE_MODE_INT32) INIT_SEQ(QueueInt32Slot, cap);
else if (mode == QUEUE_MODE_INT16) INIT_SEQ(QueueInt16Slot, cap);
#undef INIT_SEQ
__atomic_thread_fence(__ATOMIC_SEQ_CST);
}
static QueueHandle *queue_create(const char *path, uint32_t capacity,
* ordering closes. */ \
__atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
uint32_t fseq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE); \
if (queue_##PFX##_try_pop(h, value)) { \
__atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
return 1; \
} \
struct timespec *pts = NULL; \
if (has_deadline) { \
if (!queue_remaining_time(&deadline, &remaining)) { \
__atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
return 0; \
} \
pts = &remaining; \
} \
long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, \
fseq, pts, NULL, 0); \
__atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE); \
if (queue_##PFX##_try_pop(h, value)) return 1; \
if (rc == -1 && errno == ETIMEDOUT) return 0; \
} \
} \
\
static inline int queue_##PFX##_peek(QueueHandle *h, VTYPE *value) { \
SLOT *slots = (SLOT *)h->slots; \
uint64_t pos = __atomic_load_n(&h->hdr->head, __ATOMIC_ACQUIRE); \
SLOT *slot = &slots[pos & h->cap_mask]; \
STYPE seq = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE); \
if ((DTYPE)seq - (DTYPE)(STYPE)(pos + 1) == 0) { \
*value = slot->value; \
return 1; \
} \
return 0; \
} \
\
static inline uint64_t queue_##PFX##_size(QueueHandle *h) { \
uint64_t tail = __atomic_load_n(&h->hdr->tail, __ATOMIC_RELAXED); \
uint64_t head = __atomic_load_n(&h->hdr->head, __ATOMIC_RELAXED); \
return tail - head; \
} \
\
static void queue_##PFX##_clear(QueueHandle *h) { \
VTYPE tmp; \
while (queue_##PFX##_try_pop(h, &tmp)) {} \
}
/* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
DEFINE_INT_QUEUE(int, QueueIntSlot, int64_t, uint64_t, int64_t)
/* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
DEFINE_INT_QUEUE(int32, QueueInt32Slot, int32_t, uint32_t, int32_t)
/* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
DEFINE_INT_QUEUE(int16, QueueInt16Slot, int16_t, uint32_t, int32_t)
/* ================================================================
* Str queue: mutex-protected with circular arena
* ================================================================ */
/* Push one item while mutex is already held. Returns 1=ok, 0=full, -2=too long. */
static inline int queue_str_push_locked(QueueHandle *h, const char *str,
uint32_t len, bool utf8) {
QueueHeader *hdr = h->hdr;
if (len > QUEUE_STR_LEN_MASK) return -2;
if (hdr->tail - hdr->head >= h->capacity) {
__atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
return 0;
}
uint32_t alloc = (len + 7) & ~7u;
if (alloc == 0) alloc = 8;
if (alloc > h->arena_cap) return -2;
uint32_t saved_wpos = hdr->arena_wpos;
uint32_t pos = saved_wpos;
uint64_t skip = alloc;
if ((uint64_t)pos + alloc > h->arena_cap) {
skip += h->arena_cap - pos;
pos = 0;
}
if ((uint64_t)hdr->arena_used + skip > h->arena_cap) {
if (hdr->tail == hdr->head) {
hdr->arena_wpos = 0;
hdr->arena_used = 0;
saved_wpos = 0;
pos = 0;
skip = alloc;
} else {
__atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
return 0;
}
}
memcpy(h->arena + pos, str, len);
uint32_t idx = (uint32_t)(hdr->tail & h->cap_mask);
QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
slot->arena_off = pos;
slot->packed_len = len | (utf8 ? QUEUE_STR_UTF8_FLAG : 0);
slot->arena_skip = (uint32_t)skip;
slot->prev_wpos = saved_wpos;
hdr->arena_wpos = pos + alloc;
hdr->arena_used += (uint32_t)skip;
hdr->tail++;
__atomic_add_fetch(&hdr->stat_push_ok, 1, __ATOMIC_RELAXED);
return 1;
}
static inline int queue_str_try_push(QueueHandle *h, const char *str,
uint32_t len, bool utf8) {
queue_mutex_lock(h->hdr);
int r = queue_str_push_locked(h, str, len, utf8);
queue_mutex_unlock(h->hdr);
if (r == 1) queue_wake_consumers(h->hdr);
return r;
}
/* Pop one item while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
static inline int queue_str_pop_locked(QueueHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8) {
QueueHeader *hdr = h->hdr;
if (hdr->tail == hdr->head) {
__atomic_add_fetch(&hdr->stat_pop_empty, 1, __ATOMIC_RELAXED);
return 0;
}
uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
*out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
if (!queue_ensure_copy_buf(h, len + 1))
return -1;
if (len > 0)
memcpy(h->copy_buf, h->arena + slot->arena_off, len);
h->copy_buf[len] = '\0';
*out_str = h->copy_buf;
*out_len = len;
hdr->arena_used -= slot->arena_skip;
if (hdr->arena_used == 0)
hdr->arena_wpos = 0;
hdr->head++;
__atomic_add_fetch(&hdr->stat_pop_ok, 1, __ATOMIC_RELAXED);
return 1;
}
static inline int queue_str_try_pop(QueueHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8) {
queue_mutex_lock(h->hdr);
int r = queue_str_pop_locked(h, out_str, out_len, out_utf8);
queue_mutex_unlock(h->hdr);
if (r == 1) queue_wake_producers(h->hdr);
return r;
}
static int queue_str_push_wait(QueueHandle *h, const char *str,
uint32_t len, bool utf8, double timeout) {
int r = queue_str_try_push(h, str, len, utf8);
if (r != 0) return r; /* 1 = success, -2 = too long */
if (timeout == 0) return 0;
QueueHeader *hdr = h->hdr;
struct timespec deadline, remaining;
int has_deadline = (timeout > 0);
if (has_deadline) queue_make_deadline(timeout, &deadline);
for (;;) {
/* Announce BEFORE sampling seq and re-trying; otherwise a popper
* that drains between try_push and waiters++ reads waiters==0,
* skips the push_futex bump, and our FUTEX_WAIT then sleeps on
* an unchanged seq with no future waker. */
__atomic_add_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
uint32_t seq = __atomic_load_n(&hdr->push_futex, __ATOMIC_ACQUIRE);
r = queue_str_try_push(h, str, len, utf8);
if (r != 0) {
__atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
return r;
}
struct timespec *pts = NULL;
if (has_deadline) {
if (!queue_remaining_time(&deadline, &remaining)) {
__atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
return 0;
}
pts = &remaining;
}
long rc = syscall(SYS_futex, &hdr->push_futex, FUTEX_WAIT, seq, pts, NULL, 0);
__atomic_sub_fetch(&hdr->push_waiters, 1, __ATOMIC_RELEASE);
r = queue_str_try_push(h, str, len, utf8);
if (r != 0) return r;
if (rc == -1 && errno == ETIMEDOUT) return 0;
}
}
static int queue_str_pop_wait(QueueHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8, double timeout) {
int r = queue_str_try_pop(h, out_str, out_len, out_utf8);
if (r != 0) return r;
if (timeout == 0) return 0;
QueueHeader *hdr = h->hdr;
struct timespec deadline, remaining;
int has_deadline = (timeout > 0);
if (has_deadline) queue_make_deadline(timeout, &deadline);
for (;;) {
/* Announce BEFORE sampling seq / re-trying; see push_wait. */
__atomic_add_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
uint32_t seq = __atomic_load_n(&hdr->pop_futex, __ATOMIC_ACQUIRE);
r = queue_str_try_pop(h, out_str, out_len, out_utf8);
if (r != 0) {
__atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
return r;
}
struct timespec *pts = NULL;
if (has_deadline) {
if (!queue_remaining_time(&deadline, &remaining)) {
__atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
return 0;
}
pts = &remaining;
}
long rc = syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAIT, seq, pts, NULL, 0);
__atomic_sub_fetch(&hdr->pop_waiters, 1, __ATOMIC_RELEASE);
r = queue_str_try_pop(h, out_str, out_len, out_utf8);
if (r != 0) return r;
if (rc == -1 && errno == ETIMEDOUT) return 0;
}
}
static inline uint64_t queue_str_size(QueueHandle *h) {
QueueHeader *hdr = h->hdr;
uint64_t tail = __atomic_load_n(&hdr->tail, __ATOMIC_RELAXED);
uint64_t head = __atomic_load_n(&hdr->head, __ATOMIC_RELAXED);
return tail - head; /* unsigned wrap is correct for push_front (head > tail) */
}
static void queue_str_clear(QueueHandle *h) {
QueueHeader *hdr = h->hdr;
queue_mutex_lock(hdr);
hdr->head = 0;
hdr->tail = 0;
hdr->arena_wpos = 0;
hdr->arena_used = 0;
queue_mutex_unlock(hdr);
/* clear is a bulk transition â wake every blocked producer and
* consumer so they re-evaluate state, not just one of each. */
if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
}
/* Peek: read front element without consuming (exact, under mutex). */
static inline int queue_str_peek(QueueHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8) {
QueueHeader *hdr = h->hdr;
queue_mutex_lock(hdr);
if (hdr->tail == hdr->head) {
queue_mutex_unlock(hdr);
return 0;
}
uint32_t idx = (uint32_t)(hdr->head & h->cap_mask);
QueueStrSlot *slot = &((QueueStrSlot *)h->slots)[idx];
uint32_t len = slot->packed_len & QUEUE_STR_LEN_MASK;
*out_utf8 = (slot->packed_len & QUEUE_STR_UTF8_FLAG) != 0;
if (!queue_ensure_copy_buf(h, len + 1)) {
queue_mutex_unlock(hdr);
return -1;
}
if (len > 0)
memcpy(h->copy_buf, h->arena + slot->arena_off, len);
h->copy_buf[len] = '\0';
*out_str = h->copy_buf;
*out_len = len;
queue_mutex_unlock(hdr);
return 1;
}
/* Push to front of queue (requeue). Str only â Int is strictly FIFO. */
static inline int queue_str_push_front(QueueHandle *h, const char *str,
uint32_t len, bool utf8) {
QueueHeader *hdr = h->hdr;
queue_mutex_lock(hdr);
if (len > QUEUE_STR_LEN_MASK) {
queue_mutex_unlock(hdr);
return -2;
}
uint64_t size = hdr->tail - hdr->head;
if (size >= h->capacity) {
__atomic_add_fetch(&hdr->stat_push_full, 1, __ATOMIC_RELAXED);
queue_mutex_unlock(hdr);
return 0;
}
uint32_t alloc = (len + 7) & ~7u;
if (alloc == 0) alloc = 8;
if (alloc > h->arena_cap) {
queue_mutex_unlock(hdr);
return -2;
( run in 1.741 second using v1.01-cache-2.11-cpan-e1769b4cff6 )