Data-Sync-Shared

 view release on metacpan or  search on metacpan

sync.h  view on Meta::CPAN

 *   Condvar   — condition variable with futex wait/signal/broadcast
 *   Once      — one-time initialization gate (like pthread_once)
 *
 * All use file-backed mmap(MAP_SHARED) for cross-process sharing,
 * futex for blocking wait, and PID-based stale lock recovery.
 */

#ifndef SYNC_H
#define SYNC_H

#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#include <limits.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>

/* ================================================================
 * Constants
 * ================================================================ */

#define SYNC_MAGIC        0x53594E31U  /* "SYN1" */
#define SYNC_VERSION      1

/* Primitive type IDs */
#define SYNC_TYPE_SEMAPHORE  0
#define SYNC_TYPE_BARRIER    1
#define SYNC_TYPE_RWLOCK     2
#define SYNC_TYPE_CONDVAR    3
#define SYNC_TYPE_ONCE       4

#define SYNC_ERR_BUFLEN      256
#define SYNC_SPIN_LIMIT      32
#define SYNC_LOCK_TIMEOUT_SEC 2

/* ================================================================
 * Header (128 bytes = 2 cache lines, lives at start of mmap)
 * ================================================================ */

typedef struct {
    /* ---- Cache line 0 (0-63): immutable after create ---- */
    uint32_t magic;          /* 0 */
    uint32_t version;        /* 4 */
    uint32_t type;           /* 8: SYNC_TYPE_* */
    uint32_t param;          /* 12: type-specific (sem max, barrier count, etc.) */
    uint64_t total_size;     /* 16: mmap size */
    uint8_t  _pad0[40];      /* 24-63 */

    /* ---- Cache line 1 (64-127): mutable state ---- */

    /* Semaphore: value = current count, waiters = blocked acquirers */
    /* Barrier: value = arrived count, waiters = blocked at barrier,
                generation = increments each time barrier trips */
    /* RWLock: value = rwlock word (0=free, N=N readers, 0x80000000|pid=writer),
               waiters = blocked lockers */
    /* Condvar: value = signal counter (futex word), waiters = blocked waiters,
                mutex = associated mutex for predicate protection */
    /* Once: value = state (0=INIT, 1=RUNNING|pid, 2=DONE),
             waiters = blocked on completion */

    uint32_t value;          /* 64: primary state word (futex target) */
    uint32_t waiters;        /* 68: waiter count */
    uint32_t generation;     /* 72: barrier generation / condvar epoch */
    uint32_t mutex;          /* 76: condvar mutex (0 or PID|0x80000000) */
    uint32_t mutex_waiters;  /* 80: condvar mutex waiter count */
    uint32_t stat_recoveries;/* 84 */
    uint64_t stat_acquires;  /* 88 */
    uint64_t stat_releases;  /* 96 */
    uint64_t stat_waits;     /* 104 */
    uint64_t stat_timeouts;  /* 112 */
    uint32_t stat_signals;   /* 120 */
    uint32_t rwlock_writers_waiting; /* 124: RWLock write-preferring yield signal
                                             (writers only, not readers) */
} SyncHeader;

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(SyncHeader) == 128, "SyncHeader must be 128 bytes");
#endif

/* ================================================================
 * Process-local handle
 * ================================================================ */

typedef struct {
    SyncHeader *hdr;
    size_t      mmap_size;
    char       *path;
    int         notify_fd;   /* eventfd, -1 if disabled */
    int         backing_fd;  /* memfd fd, -1 for file-backed/anonymous */
} SyncHandle;

/* ================================================================
 * Utility
 * ================================================================ */

static inline void sync_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

static inline int sync_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* Convert timeout in seconds (double) to absolute deadline */
static inline void sync_make_deadline(double timeout, struct timespec *deadline) {
    clock_gettime(CLOCK_MONOTONIC, deadline);
    deadline->tv_sec += (time_t)timeout;
    deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
    if (deadline->tv_nsec >= 1000000000L) {
        deadline->tv_sec++;
        deadline->tv_nsec -= 1000000000L;
    }

sync.h  view on Meta::CPAN

 * Mutex helpers (for Condvar's internal mutex)
 * ================================================================ */

#define SYNC_MUTEX_WRITER_BIT 0x80000000U
#define SYNC_MUTEX_PID_MASK   0x7FFFFFFFU
#define SYNC_MUTEX_VAL(pid)   (SYNC_MUTEX_WRITER_BIT | ((uint32_t)(pid) & SYNC_MUTEX_PID_MASK))

static const struct timespec sync_lock_timeout = { SYNC_LOCK_TIMEOUT_SEC, 0 };

static inline void sync_recover_stale_mutex(SyncHeader *hdr, uint32_t observed) {
    if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return;
    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

static inline void sync_mutex_lock(SyncHeader *hdr) {
    uint32_t mypid = SYNC_MUTEX_VAL((uint32_t)getpid());
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
            sync_spin_pause();
            continue;
        }
        __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
                              &sync_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
                if (val >= SYNC_MUTEX_WRITER_BIT) {
                    uint32_t pid = val & SYNC_MUTEX_PID_MASK;
                    if (!sync_pid_alive(pid))
                        sync_recover_stale_mutex(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

static inline void sync_mutex_unlock(SyncHeader *hdr) {
    __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

/* ================================================================
 * RWLock helpers (for SYNC_TYPE_RWLOCK)
 *
 * value == 0:                  unlocked
 * value  1..0x7FFFFFFF:        N active readers
 * value  0x80000000 | pid:     write-locked by pid
 * ================================================================ */

#define SYNC_RWLOCK_WRITER_BIT 0x80000000U
#define SYNC_RWLOCK_PID_MASK   0x7FFFFFFFU
#define SYNC_RWLOCK_WR(pid)    (SYNC_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SYNC_RWLOCK_PID_MASK))

static inline int sync_rwlock_try_rdlock(SyncHeader *hdr);
static inline int sync_rwlock_try_wrlock(SyncHeader *hdr);

static inline void sync_recover_stale_rwlock(SyncHeader *hdr, uint32_t observed) {
    if (!__atomic_compare_exchange_n(&hdr->value, &observed, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return;
    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
    if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static inline void sync_rwlock_rdlock(SyncHeader *hdr) {
    uint32_t *lock = &hdr->value;
    uint32_t *w = &hdr->waiters;
    uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
    for (int spin = 0; ; spin++) {
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Write-preferring: yield to parked writers when lock is free. */
        if (cur > 0 && cur < SYNC_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
            sync_spin_pause();
            continue;
        }
        __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR yielding to parked writers (cur==0) */
        if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &sync_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
                if (cur >= SYNC_RWLOCK_WRITER_BIT) {
                    uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
                    if (val >= SYNC_RWLOCK_WRITER_BIT) {
                        uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
                        if (!sync_pid_alive(pid))
                            sync_recover_stale_rwlock(hdr, val);
                    }
                } else {
                    /* Yielding to writers timed out — optimistically drop one
                     * writers_waiting to recover from potentially-crashed
                     * parked writer. A live writer just re-increments. */
                    uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
                    while (wc > 0 && !__atomic_compare_exchange_n(
                            writers_waiting, &wc, wc - 1,
                            1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

/* Timed rdlock: returns 1 on success, 0 on timeout. timeout<0 = infinite.
 * No try-lock fast-path: would bypass write-preference when cur==0 &&
 * writers_waiting > 0. Main loop's first iteration handles the uncontended
 * case at ~same cost. */
static inline int sync_rwlock_rdlock_timed(SyncHeader *hdr, double timeout) {
    if (timeout == 0) {
        return sync_rwlock_try_rdlock(hdr);
    }

    uint32_t *lock = &hdr->value;
    uint32_t *w = &hdr->waiters;
    struct timespec deadline, remaining;
    int has_deadline = (timeout > 0);
    if (has_deadline) sync_make_deadline(timeout, &deadline);

    uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
    for (int spin = 0; ; spin++) {
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur > 0 && cur < SYNC_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return 1;
        } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return 1;
        }
        if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
            sync_spin_pause();
            continue;



( run in 1.595 second using v1.01-cache-2.11-cpan-39bf76dae61 )