Data-ReqRep-Shared
view release on metacpan or search on metacpan
}
/* ================================================================
* Futex helpers
* ================================================================ */
#define REQREP_MUTEX_WRITER_BIT 0x80000000U
#define REQREP_MUTEX_PID_MASK 0x7FFFFFFFU
#define REQREP_MUTEX_VAL(pid) (REQREP_MUTEX_WRITER_BIT | ((uint32_t)(pid) & REQREP_MUTEX_PID_MASK))
static inline int reqrep_pid_alive(uint32_t pid) {
if (pid == 0) return 1;
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
static const struct timespec reqrep_lock_timeout = { REQREP_LOCK_TIMEOUT_SEC, 0 };
static inline void reqrep_recover_stale_mutex(ReqRepHeader *hdr, uint32_t observed) {
if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
return;
__atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
&reqrep_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (val >= REQREP_MUTEX_WRITER_BIT) {
uint32_t pid = val & REQREP_MUTEX_PID_MASK;
if (!reqrep_pid_alive(pid))
reqrep_recover_stale_mutex(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
return (int32_t)idx;
}
}
/* Recover stale slots from dead processes (both ACQUIRED and READY) */
for (uint32_t i = 0; i < n; i++) {
RespSlotHeader *slot = reqrep_resp_slot(h, i);
uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
if (state == RESP_ACQUIRED || state == RESP_READY) {
uint32_t pid = __atomic_load_n(&slot->owner_pid, __ATOMIC_RELAXED);
if (pid && !reqrep_pid_alive(pid)) {
if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_add_fetch(&h->hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
uint32_t expected = RESP_FREE;
if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
__atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
return (int32_t)i;
}
( run in 1.573 second using v1.01-cache-2.11-cpan-39bf76dae61 )