Data-ReqRep-Shared

 view release on metacpan or  search on metacpan

reqrep.h  view on Meta::CPAN

}

/* ================================================================
 * Futex helpers
 * ================================================================ */

#define REQREP_MUTEX_WRITER_BIT 0x80000000U
#define REQREP_MUTEX_PID_MASK   0x7FFFFFFFU
#define REQREP_MUTEX_VAL(pid)   (REQREP_MUTEX_WRITER_BIT | ((uint32_t)(pid) & REQREP_MUTEX_PID_MASK))

static inline int reqrep_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

static const struct timespec reqrep_lock_timeout = { REQREP_LOCK_TIMEOUT_SEC, 0 };

static inline void reqrep_recover_stale_mutex(ReqRepHeader *hdr, uint32_t observed) {
    if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return;

reqrep.h  view on Meta::CPAN

        __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
                              &reqrep_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
                if (val >= REQREP_MUTEX_WRITER_BIT) {
                    uint32_t pid = val & REQREP_MUTEX_PID_MASK;
                    if (!reqrep_pid_alive(pid))
                        reqrep_recover_stale_mutex(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

reqrep.h  view on Meta::CPAN

     * detect the recycle.
     *
     * Use ACQUIRE on owner_pid so we synchronize with the writer's RELEASE
     * generation bump (which orders the owner_pid store before any external
     * observer of the new generation). */
    for (uint32_t i = 0; i < n; i++) {
        RespSlotHeader *slot = reqrep_resp_slot(h, i);
        uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
        if (state != RESP_ACQUIRED && state != RESP_READY) continue;
        uint32_t pid = __atomic_load_n(&slot->owner_pid, __ATOMIC_ACQUIRE);
        if (!pid || reqrep_pid_alive(pid)) continue;

        /* Claim exclusive recovery rights by CASing the dead owner to 0.
         * Loser of this race exits — winner of state CAS would have already
         * progressed; we can't safely race the state transition. */
        uint32_t expected_pid = pid;
        if (!__atomic_compare_exchange_n(&slot->owner_pid, &expected_pid, 0,
                0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
            continue;

        /* We own the recovery now. Drive state to FREE; retry on transient



( run in 0.601 second using v1.01-cache-2.11-cpan-df04353d9ac )