Data-Sync-Shared
view release on metacpan or search on metacpan
static inline void sync_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
static inline int sync_pid_alive(uint32_t pid) {
if (pid == 0) return 1;
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Convert timeout in seconds (double) to absolute deadline */
static inline void sync_make_deadline(double timeout, struct timespec *deadline) {
clock_gettime(CLOCK_MONOTONIC, deadline);
deadline->tv_sec += (time_t)timeout;
deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
if (deadline->tv_nsec >= 1000000000L) {
__atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
&sync_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (val >= SYNC_MUTEX_WRITER_BIT) {
uint32_t pid = val & SYNC_MUTEX_PID_MASK;
if (!sync_pid_alive(pid))
sync_recover_stale_mutex(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
int any_recovery = 0;
/* Pass 1: scan; classify; immediate-wipe dead slots with sc==0 (no
* rwlock contribution to lose). Defer wiping dead-with-sc>0 slots
* until force-reset can fire â otherwise we'd lose the only record
* of the orphan rwlock contribution while a live reader is present. */
for (uint32_t i = 0; i < SYNC_READER_SLOTS; i++) {
uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
if (pid == 0) continue;
uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
if (sync_pid_alive(pid)) {
if (sc > 0) any_live_reader = 1;
continue;
}
if (sc > 0) { found_dead_reader = 1; continue; }
if (sync_drain_dead_slot(h, i, pid)) any_recovery = 1;
}
/* Pass 2: only if force-reset will fire. Issue the rwlock CAS first
* to keep the race window with new readers narrow, then wipe the
* deferred dead slots. */
if (__atomic_compare_exchange_n(&hdr->value, &cur, 0,
0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
any_recovery = 1;
if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
}
for (uint32_t i = 0; i < SYNC_READER_SLOTS; i++) {
uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
if (pid == 0) continue;
if (sync_pid_alive(pid)) continue;
if (sync_drain_dead_slot(h, i, pid)) any_recovery = 1;
}
}
if (any_recovery)
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
}
/* Park/unpark helpers â keep global hdr->waiters/rwlock_writers_waiting
* and per-slot mirror counters in sync so recovery can drain them. */
static inline void sync_park_reader(SyncHandle *h) {
* value here (rather than trusting a stale snapshot from the futex
* caller) so that (a) a writer that died after our futex_wait started
* is detected on the same timeout, and (b) phantom waiter/writers_waiting
* contributions left by a dead parked writer are drained even when the
* lock word itself is now 0. */
static inline void sync_recover_after_timeout(SyncHandle *h) {
SyncHeader *hdr = h->hdr;
uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
if (val >= SYNC_RWLOCK_WRITER_BIT) {
uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
if (!sync_pid_alive(pid))
sync_recover_stale_rwlock(hdr, val);
} else {
sync_recover_dead_readers(h);
}
}
static inline void sync_rwlock_rdlock(SyncHandle *h) {
SyncHeader *hdr = h->hdr;
sync_claim_reader_slot(h);
uint32_t *lock = &hdr->value;
if (r == 0) return 0; /* already done */
/* r == -1: someone else is running. Wait or detect stale. */
uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
if (val == SYNC_ONCE_DONE) return 0;
if (val == SYNC_ONCE_INIT) continue; /* race: was reset, retry */
/* Check stale initializer */
if (val >= SYNC_MUTEX_WRITER_BIT) {
uint32_t pid = val & SYNC_MUTEX_PID_MASK;
if (!sync_pid_alive(pid)) {
if (__atomic_compare_exchange_n(&hdr->value, &val, SYNC_ONCE_INIT,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
continue;
}
}
( run in 3.370 seconds using v1.01-cache-2.11-cpan-df04353d9ac )