Data-ReqRep-Shared

 view release on metacpan or  search on metacpan

reqrep.h  view on Meta::CPAN

    size_t         mmap_size;
    uint32_t       req_cap;
    uint32_t       req_cap_mask;  /* req_cap - 1 */
    uint32_t       req_arena_cap;
    uint32_t       resp_slots;
    uint32_t       resp_data_max;
    uint32_t       resp_stride;
    char          *copy_buf;
    uint32_t       copy_buf_cap;
    char          *path;
    int            notify_fd;     /* request notification eventfd, -1 if unset */
    int            reply_fd;      /* reply notification eventfd, -1 if unset */
    int            backing_fd;    /* memfd fd, -1 for file-backed/anonymous */
} ReqRepHandle;

/* ================================================================
 * Utility
 * ================================================================ */

static inline uint32_t reqrep_next_pow2(uint32_t v) {
    if (v < 2) return 2;
    if (v > 0x80000000U) return 0;
    v--;
    v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
    return v + 1;
}

static inline void reqrep_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

static inline int reqrep_ensure_copy_buf(ReqRepHandle *h, uint32_t needed) {
    if (needed <= h->copy_buf_cap) return 1;
    uint32_t ns = h->copy_buf_cap ? h->copy_buf_cap : 64;
    while (ns < needed) { uint32_t n2 = ns * 2; if (n2 <= ns) { ns = needed; break; } ns = n2; }
    char *nb = (char *)realloc(h->copy_buf, ns);
    if (!nb) return 0;
    h->copy_buf = nb;
    h->copy_buf_cap = ns;
    return 1;
}

static inline RespSlotHeader *reqrep_resp_slot(ReqRepHandle *h, uint32_t idx) {
    return (RespSlotHeader *)(h->resp_area + (uint64_t)idx * h->resp_stride);
}

/* ================================================================
 * Futex helpers
 * ================================================================ */

#define REQREP_MUTEX_WRITER_BIT 0x80000000U
#define REQREP_MUTEX_PID_MASK   0x7FFFFFFFU
#define REQREP_MUTEX_VAL(pid)   (REQREP_MUTEX_WRITER_BIT | ((uint32_t)(pid) & REQREP_MUTEX_PID_MASK))

static inline int reqrep_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

static const struct timespec reqrep_lock_timeout = { REQREP_LOCK_TIMEOUT_SEC, 0 };

static inline void reqrep_recover_stale_mutex(ReqRepHeader *hdr, uint32_t observed) {
    if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return;
    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

static inline void reqrep_mutex_lock(ReqRepHeader *hdr) {
    uint32_t mypid = REQREP_MUTEX_VAL((uint32_t)getpid());
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < REQREP_SPIN_LIMIT, 1)) {
            reqrep_spin_pause();
            continue;
        }
        __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
                              &reqrep_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
                if (val >= REQREP_MUTEX_WRITER_BIT) {
                    uint32_t pid = val & REQREP_MUTEX_PID_MASK;
                    if (!reqrep_pid_alive(pid))
                        reqrep_recover_stale_mutex(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

static inline void reqrep_mutex_unlock(ReqRepHeader *hdr) {
    __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

static inline void reqrep_wake_consumers(ReqRepHeader *hdr) {
    if (__atomic_load_n(&hdr->recv_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->recv_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
    }
}

static inline void reqrep_wake_producers(ReqRepHeader *hdr) {
    if (__atomic_load_n(&hdr->send_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->send_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->send_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
    }
}

static inline void reqrep_wake_slot_waiters(ReqRepHeader *hdr) {
    if (__atomic_load_n(&hdr->slot_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->slot_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->slot_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
    }
}

static inline int reqrep_remaining_time(const struct timespec *deadline,
                                         struct timespec *remaining) {
    struct timespec now;
    clock_gettime(CLOCK_MONOTONIC, &now);
    remaining->tv_sec = deadline->tv_sec - now.tv_sec;
    remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
    if (remaining->tv_nsec < 0) {
        remaining->tv_sec--;
        remaining->tv_nsec += 1000000000L;
    }
    return remaining->tv_sec >= 0;
}

static inline void reqrep_make_deadline(double timeout, struct timespec *deadline) {
    clock_gettime(CLOCK_MONOTONIC, deadline);
    deadline->tv_sec += (time_t)timeout;
    deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
    if (deadline->tv_nsec >= 1000000000L) {
        deadline->tv_sec++;
        deadline->tv_nsec -= 1000000000L;
    }
}

/* ================================================================
 * Response slot operations
 * ================================================================ */

static int32_t reqrep_slot_acquire(ReqRepHandle *h) {
    uint32_t n = h->resp_slots;
    uint32_t hint = __atomic_load_n(&h->hdr->resp_hint, __ATOMIC_RELAXED);
    uint32_t mypid = (uint32_t)getpid();

    for (uint32_t i = 0; i < n; i++) {
        uint32_t idx = (hint + i) % n;
        RespSlotHeader *slot = reqrep_resp_slot(h, idx);
        uint32_t expected = RESP_FREE;
        if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
                0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
            __atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
            __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
            __atomic_store_n(&h->hdr->resp_hint, (idx + 1) % n, __ATOMIC_RELAXED);
            return (int32_t)idx;
        }
    }

    /* Recover stale slots from dead processes (both ACQUIRED and READY).
     *
     * ABA-safe pattern (mirrors Pool's recover_stale): CAS owner_pid dead→0
     * FIRST to claim exclusive recovery rights. Without this, two recoverers
     * (or a recoverer racing a free+fresh-acquire cycle) can both see
     * state==ACQUIRED and the same dead PID, both race their state CAS, and
     * the loser silently clobbers a live owner — the constant RESP_ACQUIRED
     * value is identical across acquisitions, so the state CAS alone cannot
     * detect the recycle.
     *
     * Use ACQUIRE on owner_pid so we synchronize with the writer's RELEASE
     * generation bump (which orders the owner_pid store before any external
     * observer of the new generation). */
    for (uint32_t i = 0; i < n; i++) {
        RespSlotHeader *slot = reqrep_resp_slot(h, i);
        uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
        if (state != RESP_ACQUIRED && state != RESP_READY) continue;
        uint32_t pid = __atomic_load_n(&slot->owner_pid, __ATOMIC_ACQUIRE);
        if (!pid || reqrep_pid_alive(pid)) continue;

        /* Claim exclusive recovery rights by CASing the dead owner to 0.
         * Loser of this race exits — winner of state CAS would have already
         * progressed; we can't safely race the state transition. */
        uint32_t expected_pid = pid;
        if (!__atomic_compare_exchange_n(&slot->owner_pid, &expected_pid, 0,
                0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
            continue;

        /* We own the recovery now. Drive state to FREE; retry on transient
         * ACQUIRED→READY (reply arrived after death) until state is FREE.
         * If clear() or another path beat us, state==FREE already.
         * State can only be 0/1/2; CAS-on-fail sets cur_state to current. */
        uint32_t cur_state = state;
        while (cur_state != RESP_FREE) {
            if (__atomic_compare_exchange_n(&slot->state, &cur_state, RESP_FREE,
                    0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
                break;
        }
        __atomic_add_fetch(&h->hdr->stat_recoveries, 1, __ATOMIC_RELAXED);

        /* Now claim FREE→ACQUIRED for ourselves. */
        uint32_t expected = RESP_FREE;
        if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
                0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
            __atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
            __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
            return (int32_t)i;
        }
        /* Someone else grabbed the FREE slot; wake any waiters on slot avail. */
        reqrep_wake_slot_waiters(h->hdr);
    }

    return -1;
}

/* Release a response slot from a known prior state we owned (ACQUIRED for
 * the post-try_send-failure path, READY for the try_get success path).
 *
 * Two-step claim to survive clear()/recovery races:
 *   1. CAS owner_pid mypid -> 0 to claim release rights. If this fails,
 *      clear() already reset us (or a fresh acquirer set their own pid):
 *      either way, the slot is no longer ours and the release is a no-op.
 *   2. Single-shot CAS state from_state -> FREE. If state is unexpected
 *      (already FREE from clear, or ACQUIRED again from a fresh acquirer
 *      after clear), CAS-on-fail is a no-op — fresh acquirer's claim is
 *      preserved.
 *
 * Without step 1, a blind state CAS from RESP_ACQUIRED -> FREE would
 * silently clobber a freshly-re-acquired slot (ABA on the state value). */
static inline void reqrep_slot_release_from(ReqRepHandle *h, uint32_t idx,
                                             uint32_t from_state) {
    RespSlotHeader *slot = reqrep_resp_slot(h, idx);
    uint32_t mypid = (uint32_t)getpid();
    uint32_t expected_pid = mypid;
    if (!__atomic_compare_exchange_n(&slot->owner_pid, &expected_pid, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return;
    uint32_t expected_state = from_state;
    (void)__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_FREE,



( run in 1.347 second using v1.01-cache-2.11-cpan-df04353d9ac )