Data-Sync-Shared
view release on metacpan or search on metacpan
uint64_t total_size; /* 16: mmap size */
uint8_t _pad0[40]; /* 24-63 */
/* ---- Cache line 1 (64-127): mutable state ---- */
/* Semaphore: value = current count, waiters = blocked acquirers */
/* Barrier: value = arrived count, waiters = blocked at barrier,
generation = increments each time barrier trips */
/* RWLock: value = rwlock word (0=free, N=N readers, 0x80000000|pid=writer),
waiters = blocked lockers */
/* Condvar: value = signal counter (futex word), waiters = blocked waiters,
mutex = associated mutex for predicate protection */
/* Once: value = state (0=INIT, 1=RUNNING|pid, 2=DONE),
waiters = blocked on completion */
uint32_t value; /* 64: primary state word (futex target) */
uint32_t waiters; /* 68: waiter count */
uint32_t generation; /* 72: barrier generation / condvar epoch */
uint32_t mutex; /* 76: condvar mutex (0 or PID|0x80000000) */
uint32_t mutex_waiters; /* 80: condvar mutex waiter count */
uint32_t stat_recoveries;/* 84 */
uint64_t stat_acquires; /* 88 */
uint64_t stat_releases; /* 96 */
uint64_t stat_waits; /* 104 */
uint64_t stat_timeouts; /* 112 */
uint32_t stat_signals; /* 120 */
uint32_t rwlock_writers_waiting; /* 124: RWLock write-preferring yield signal
(writers only, not readers) */
} SyncHeader;
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(SyncHeader) == 128, "SyncHeader must be 128 bytes");
#endif
/* ================================================================
* Process-local handle
* ================================================================ */
typedef struct {
SyncHeader *hdr;
size_t mmap_size;
char *path;
int notify_fd; /* eventfd, -1 if disabled */
int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
} SyncHandle;
/* ================================================================
* Utility
* ================================================================ */
static inline void sync_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
static inline int sync_pid_alive(uint32_t pid) {
if (pid == 0) return 1;
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Convert timeout in seconds (double) to absolute deadline */
static inline void sync_make_deadline(double timeout, struct timespec *deadline) {
clock_gettime(CLOCK_MONOTONIC, deadline);
deadline->tv_sec += (time_t)timeout;
deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
if (deadline->tv_nsec >= 1000000000L) {
deadline->tv_sec++;
deadline->tv_nsec -= 1000000000L;
}
}
/* Compute remaining timespec from absolute deadline. Returns 0 if deadline passed. */
static inline int sync_remaining_time(const struct timespec *deadline,
struct timespec *remaining) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
remaining->tv_sec = deadline->tv_sec - now.tv_sec;
remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
if (remaining->tv_nsec < 0) {
remaining->tv_sec--;
remaining->tv_nsec += 1000000000L;
}
return remaining->tv_sec >= 0;
}
/* ================================================================
* Mutex helpers (for Condvar's internal mutex)
* ================================================================ */
#define SYNC_MUTEX_WRITER_BIT 0x80000000U
#define SYNC_MUTEX_PID_MASK 0x7FFFFFFFU
#define SYNC_MUTEX_VAL(pid) (SYNC_MUTEX_WRITER_BIT | ((uint32_t)(pid) & SYNC_MUTEX_PID_MASK))
static const struct timespec sync_lock_timeout = { SYNC_LOCK_TIMEOUT_SEC, 0 };
static inline void sync_recover_stale_mutex(SyncHeader *hdr, uint32_t observed) {
if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
return;
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
static inline void sync_mutex_lock(SyncHeader *hdr) {
uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
sync_spin_pause();
continue;
}
__atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
&sync_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (val >= SYNC_MUTEX_WRITER_BIT) {
uint32_t pid = val & SYNC_MUTEX_PID_MASK;
if (!sync_pid_alive(pid))
sync_recover_stale_mutex(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
static inline void sync_mutex_unlock(SyncHeader *hdr) {
__atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
/* ================================================================
* RWLock helpers (for SYNC_TYPE_RWLOCK)
*
* value == 0: unlocked
* value 1..0x7FFFFFFF: N active readers
* value 0x80000000 | pid: write-locked by pid
* ================================================================ */
#define SYNC_RWLOCK_WRITER_BIT 0x80000000U
#define SYNC_RWLOCK_PID_MASK 0x7FFFFFFFU
#define SYNC_RWLOCK_WR(pid) (SYNC_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SYNC_RWLOCK_PID_MASK))
static inline int sync_rwlock_try_rdlock(SyncHeader *hdr);
static inline int sync_rwlock_try_wrlock(SyncHeader *hdr);
static inline void sync_recover_stale_rwlock(SyncHeader *hdr, uint32_t observed) {
if (!__atomic_compare_exchange_n(&hdr->value, &observed, 0,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
return;
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void sync_rwlock_rdlock(SyncHeader *hdr) {
uint32_t *lock = &hdr->value;
uint32_t *w = &hdr->waiters;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Write-preferring: yield to parked writers when lock is free. */
if (cur > 0 && cur < SYNC_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
sync_spin_pause();
continue;
}
__atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR yielding to parked writers (cur==0) */
if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&sync_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
if (cur >= SYNC_RWLOCK_WRITER_BIT) {
uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (val >= SYNC_RWLOCK_WRITER_BIT) {
uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
if (!sync_pid_alive(pid))
sync_recover_stale_rwlock(hdr, val);
}
} else {
/* Yielding to writers timed out â optimistically drop one
* writers_waiting to recover from potentially-crashed
* parked writer. A live writer just re-increments. */
uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
while (wc > 0 && !__atomic_compare_exchange_n(
writers_waiting, &wc, wc - 1,
1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
/* Timed rdlock: returns 1 on success, 0 on timeout. timeout<0 = infinite.
* No try-lock fast-path: would bypass write-preference when cur==0 &&
* writers_waiting > 0. Main loop's first iteration handles the uncontended
* case at ~same cost. */
static inline int sync_rwlock_rdlock_timed(SyncHeader *hdr, double timeout) {
if (timeout == 0) {
return sync_rwlock_try_rdlock(hdr);
}
uint32_t *lock = &hdr->value;
uint32_t *w = &hdr->waiters;
struct timespec deadline, remaining;
int has_deadline = (timeout > 0);
if (has_deadline) sync_make_deadline(timeout, &deadline);
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur > 0 && cur < SYNC_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return 1;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return 1;
}
if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
sync_spin_pause();
continue;
}
__atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
struct timespec *pts = NULL;
/* Cap wait at SYNC_LOCK_TIMEOUT_SEC so stale-holder recovery
* runs periodically even with a user-supplied deadline. */
if (has_deadline) {
if (!sync_remaining_time(&deadline, &remaining)) {
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
return 0;
}
if (remaining.tv_sec >= SYNC_LOCK_TIMEOUT_SEC)
pts = (struct timespec *)&sync_lock_timeout;
else
pts = &remaining;
} else {
pts = (struct timespec *)&sync_lock_timeout;
}
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
if (cur >= SYNC_RWLOCK_WRITER_BIT) {
uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (val >= SYNC_RWLOCK_WRITER_BIT) {
uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
if (!sync_pid_alive(pid))
sync_recover_stale_rwlock(hdr, val);
}
} else {
/* Yielding to writer timed out â drop one writers_waiting
* to recover from a potentially-crashed parked writer. */
uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
while (wc > 0 && !__atomic_compare_exchange_n(
writers_waiting, &wc, wc - 1,
1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
static inline int sync_rwlock_try_rdlock(SyncHeader *hdr) {
uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
if (cur >= SYNC_RWLOCK_WRITER_BIT) return 0;
return __atomic_compare_exchange_n(&hdr->value, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
}
static inline void sync_rwlock_rdunlock(SyncHeader *hdr) {
uint32_t prev = __atomic_sub_fetch(&hdr->value, 1, __ATOMIC_RELEASE);
if (prev == 0 && __atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void sync_rwlock_wrlock(SyncHeader *hdr) {
uint32_t *lock = &hdr->value;
uint32_t *w = &hdr->waiters;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
sync_spin_pause();
continue;
}
__atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&sync_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (val >= SYNC_RWLOCK_WRITER_BIT) {
uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
if (!sync_pid_alive(pid))
sync_recover_stale_rwlock(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
/* Timed wrlock: returns 1 on success, 0 on timeout. timeout<0 = infinite. */
static inline int sync_rwlock_wrlock_timed(SyncHeader *hdr, double timeout) {
if (sync_rwlock_try_wrlock(hdr)) return 1;
if (timeout == 0) return 0;
uint32_t *lock = &hdr->value;
uint32_t *w = &hdr->waiters;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
struct timespec deadline, remaining;
int has_deadline = (timeout > 0);
if (has_deadline) sync_make_deadline(timeout, &deadline);
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return 1;
if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
sync_spin_pause();
continue;
}
__atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
struct timespec *pts = NULL;
/* Cap wait at SYNC_LOCK_TIMEOUT_SEC so stale-holder recovery
* runs periodically even with a user-supplied deadline. */
if (has_deadline) {
if (!sync_remaining_time(&deadline, &remaining)) {
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
return 0;
}
if (remaining.tv_sec >= SYNC_LOCK_TIMEOUT_SEC)
pts = (struct timespec *)&sync_lock_timeout;
else
pts = &remaining;
} else {
pts = (struct timespec *)&sync_lock_timeout;
}
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (val >= SYNC_RWLOCK_WRITER_BIT) {
uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
if (!sync_pid_alive(pid))
sync_recover_stale_rwlock(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
static inline int sync_rwlock_try_wrlock(SyncHeader *hdr) {
uint32_t expected = 0;
uint32_t mypid = SYNC_RWLOCK_WR((uint32_t)getpid());
return __atomic_compare_exchange_n(&hdr->value, &expected, mypid,
0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED);
}
static inline void sync_rwlock_wrunlock(SyncHeader *hdr) {
__atomic_store_n(&hdr->value, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
/* Downgrade: atomically convert wrlock to rdlock (writer -> 1 reader) */
static inline void sync_rwlock_downgrade(SyncHeader *hdr) {
__atomic_store_n(&hdr->value, 1, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
/* ================================================================
* Semaphore operations
*
* value = current count (0..param where param=max)
* CAS-based acquire/release, futex wait when 0
* ================================================================ */
static inline int sync_sem_try_acquire(SyncHandle *h) {
SyncHeader *hdr = h->hdr;
for (;;) {
uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
if (cur == 0) return 0;
if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
__atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
return 1;
}
}
}
static inline int sync_sem_try_acquire_n(SyncHandle *h, uint32_t n) {
if (n == 0) return 1;
SyncHeader *hdr = h->hdr;
for (;;) {
uint32_t cur = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
if (cur < n) return 0;
if (__atomic_compare_exchange_n(&hdr->value, &cur, cur - n,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
*
* value states: 0=INIT, (SYNC_MUTEX_WRITER_BIT|pid)=RUNNING, 1=DONE
* ================================================================ */
#define SYNC_ONCE_INIT 0
#define SYNC_ONCE_DONE 1
/* RUNNING = SYNC_MUTEX_WRITER_BIT | pid */
static inline int sync_once_is_done(SyncHandle *h) {
return __atomic_load_n(&h->hdr->value, __ATOMIC_ACQUIRE) == SYNC_ONCE_DONE;
}
/* Try to become the initializer. Returns:
* 1 = you are the initializer, call once_done() when finished
* 0 = already done
* -1 = another process is initializing (wait with once_wait) */
static inline int sync_once_try(SyncHandle *h) {
SyncHeader *hdr = h->hdr;
uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
uint32_t expected = SYNC_ONCE_INIT;
if (__atomic_compare_exchange_n(&hdr->value, &expected, mypid,
0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
__atomic_add_fetch(&hdr->stat_acquires, 1, __ATOMIC_RELAXED);
return 1;
}
if (expected == SYNC_ONCE_DONE) return 0;
return -1;
}
/* Call/wait combo: try to become initializer, or wait for completion.
* Returns 1 if caller is the initializer, 0 if already done or waited. */
static inline int sync_once_enter(SyncHandle *h, double timeout) {
SyncHeader *hdr = h->hdr;
/* Non-blocking probe: just try, don't wait */
int r = sync_once_try(h);
if (r == 1) return 1;
if (r == 0) return 0;
if (timeout == 0) return 0;
struct timespec deadline, remaining;
int has_deadline = (timeout > 0);
if (has_deadline) sync_make_deadline(timeout, &deadline);
__atomic_add_fetch(&hdr->stat_waits, 1, __ATOMIC_RELAXED);
for (;;) {
r = sync_once_try(h);
if (r == 1) return 1; /* caller is initializer */
if (r == 0) return 0; /* already done */
/* r == -1: someone else is running. Wait or detect stale. */
uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
if (val == SYNC_ONCE_DONE) return 0;
if (val == SYNC_ONCE_INIT) continue; /* race: was reset, retry */
/* Check stale initializer */
if (val >= SYNC_MUTEX_WRITER_BIT) {
uint32_t pid = val & SYNC_MUTEX_PID_MASK;
if (!sync_pid_alive(pid)) {
if (__atomic_compare_exchange_n(&hdr->value, &val, SYNC_ONCE_INIT,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
continue;
}
}
__atomic_add_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
/* Always cap at SYNC_LOCK_TIMEOUT_SEC so stale-initializer recovery
* runs periodically even when the caller specifies infinite timeout. */
struct timespec *pts = (struct timespec *)&sync_lock_timeout;
if (has_deadline) {
if (!sync_remaining_time(&deadline, &remaining)) {
__atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
return 0;
}
if (remaining.tv_sec < SYNC_LOCK_TIMEOUT_SEC)
pts = &remaining;
}
syscall(SYS_futex, &hdr->value, FUTEX_WAIT, val, pts, NULL, 0);
__atomic_sub_fetch(&hdr->waiters, 1, __ATOMIC_RELAXED);
}
}
static inline void sync_once_done(SyncHandle *h) {
SyncHeader *hdr = h->hdr;
__atomic_store_n(&hdr->value, SYNC_ONCE_DONE, __ATOMIC_RELEASE);
__atomic_add_fetch(&hdr->stat_releases, 1, __ATOMIC_RELAXED);
if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void sync_once_reset(SyncHandle *h) {
SyncHeader *hdr = h->hdr;
__atomic_store_n(&hdr->value, SYNC_ONCE_INIT, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
/* ================================================================
* Create / Open / Close
* ================================================================ */
#define SYNC_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, SYNC_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
static SyncHandle *sync_create(const char *path, uint32_t type, uint32_t param,
uint32_t initial, char *errbuf) {
if (errbuf) errbuf[0] = '\0';
if (type > SYNC_TYPE_ONCE) { SYNC_ERR("unknown type %u", type); return NULL; }
if (type == SYNC_TYPE_SEMAPHORE && param == 0) { SYNC_ERR("semaphore max must be > 0"); return NULL; }
if (type == SYNC_TYPE_SEMAPHORE && initial > param) { SYNC_ERR("initial (%u) > max (%u)", initial, param); return NULL; }
if (type == SYNC_TYPE_BARRIER && param < 2) { SYNC_ERR("barrier count must be >= 2"); return NULL; }
( run in 0.697 second using v1.01-cache-2.11-cpan-39bf76dae61 )