Data-Queue-Shared

 view release on metacpan or  search on metacpan

queue.h  view on Meta::CPAN

/* ================================================================
 * Process-local handle
 * ================================================================ */

typedef struct {
    QueueHeader *hdr;
    void        *slots;      /* QueueIntSlot* or QueueStrSlot* */
    char        *arena;      /* NULL for int mode */
    size_t       mmap_size;
    uint32_t     capacity;
    uint32_t     cap_mask;   /* capacity - 1 */
    uint64_t     arena_cap;
    char        *copy_buf;   /* for str pop: buffer to copy string before unlock */
    uint32_t     copy_buf_cap;
    char        *path;
    int          notify_fd;  /* eventfd for event-loop integration, -1 if disabled */
    int          backing_fd; /* memfd fd, -1 for file-backed/anonymous */
} QueueHandle;

/* ================================================================
 * Utility
 * ================================================================ */

static inline uint32_t queue_next_pow2(uint32_t v) {
    if (v < 2) return 2;
    if (v > 0x80000000U) return 0;
    v--;
    v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
    return v + 1;
}

static inline void queue_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

static inline int queue_ensure_copy_buf(QueueHandle *h, uint32_t needed) {
    if (needed <= h->copy_buf_cap) return 1;
    uint32_t ns = h->copy_buf_cap ? h->copy_buf_cap : 64;
    while (ns < needed) { uint32_t n2 = ns * 2; if (n2 <= ns) { ns = needed; break; } ns = n2; }
    char *nb = (char *)realloc(h->copy_buf, ns);
    if (!nb) return 0;
    h->copy_buf = nb;
    h->copy_buf_cap = ns;
    return 1;
}

/* ================================================================
 * Futex helpers
 * ================================================================ */

#define QUEUE_MUTEX_WRITER_BIT 0x80000000U
#define QUEUE_MUTEX_PID_MASK   0x7FFFFFFFU
#define QUEUE_MUTEX_VAL(pid)   (QUEUE_MUTEX_WRITER_BIT | ((uint32_t)(pid) & QUEUE_MUTEX_PID_MASK))

static inline int queue_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

static const struct timespec queue_lock_timeout = { QUEUE_LOCK_TIMEOUT_SEC, 0 };

static inline void queue_recover_stale_mutex(QueueHeader *hdr, uint32_t observed) {
    if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return;
    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

static inline void queue_mutex_lock(QueueHeader *hdr) {
    uint32_t mypid = QUEUE_MUTEX_VAL((uint32_t)getpid());
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < QUEUE_SPIN_LIMIT, 1)) {
            queue_spin_pause();
            continue;
        }
        __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
                              &queue_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
                if (val >= QUEUE_MUTEX_WRITER_BIT) {
                    uint32_t pid = val & QUEUE_MUTEX_PID_MASK;
                    if (!queue_pid_alive(pid))
                        queue_recover_stale_mutex(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

static inline void queue_mutex_unlock(QueueHeader *hdr) {
    __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

/* Wake blocked consumers (after push) */
static inline void queue_wake_consumers(QueueHeader *hdr) {
    if (__atomic_load_n(&hdr->pop_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->pop_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->pop_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
    }
}

/* Wake blocked producers (after pop) */
static inline void queue_wake_producers(QueueHeader *hdr) {
    if (__atomic_load_n(&hdr->push_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->push_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->push_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
    }
}

/* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
static inline int queue_remaining_time(const struct timespec *deadline,
                                        struct timespec *remaining) {
    struct timespec now;
    clock_gettime(CLOCK_MONOTONIC, &now);
    remaining->tv_sec = deadline->tv_sec - now.tv_sec;
    remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
    if (remaining->tv_nsec < 0) {
        remaining->tv_sec--;
        remaining->tv_nsec += 1000000000L;
    }
    return remaining->tv_sec >= 0;
}

/* Convert timeout in seconds (double) to absolute deadline */
static inline void queue_make_deadline(double timeout, struct timespec *deadline) {
    clock_gettime(CLOCK_MONOTONIC, deadline);
    deadline->tv_sec += (time_t)timeout;
    deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
    if (deadline->tv_nsec >= 1000000000L) {
        deadline->tv_sec++;
        deadline->tv_nsec -= 1000000000L;
    }
}

/* ================================================================
 * Create / Open / Close



( run in 1.393 second using v1.01-cache-2.11-cpan-39bf76dae61 )