Data-Buffer-Shared

 view release on metacpan or  search on metacpan

buf_generic.h  view on Meta::CPAN

typedef struct {
    /* ---- Cache line 0 (0-63): immutable after create ---- */
    uint32_t magic;           /* 0 */
    uint32_t version;         /* 4 */
    uint32_t variant_id;      /* 8 */
    uint32_t elem_size;       /* 12 */
    uint64_t capacity;        /* 16: number of elements */
    uint64_t total_size;      /* 24: total mmap size */
    uint64_t data_off;        /* 32: offset to data array */
    uint64_t reader_slots_off;/* 40: offset to BufReaderSlot[BUF_READER_SLOTS] */
    uint8_t  _reserved0[16];  /* 48-63 */

    /* ---- Cache line 1 (64-127): seqlock + rwlock + mutable state ---- */
    uint32_t seq;             /* 64: seqlock counter, odd = writer active */
    uint32_t rwlock;          /* 68: 0=unlocked, readers=1..0x7FFFFFFF, writer=0x80000000|pid */
    uint32_t rwlock_waiters;  /* 72: wake-target counter (readers+writers) */
    uint32_t stat_recoveries; /* 76 */
    uint32_t rwlock_writers_waiting; /* 80: reader yield signal (writers only) */
    uint32_t _pad2;           /* 84 */
    uint64_t _reserved1[5];   /* 88-127 */
} BufHeader;

BUF_STATIC_ASSERT(sizeof(BufHeader) == 128, "BufHeader must be exactly 128 bytes (2 cache lines)");

/* ---- Process-local handle ---- */

typedef struct {
    BufHeader *hdr;
    void      *data;         /* pointer to element array in mmap */
    BufReaderSlot *reader_slots; /* in mmap, BUF_READER_SLOTS entries */
    size_t     mmap_size;
    char      *path;         /* backing file path (strdup'd, NULL for anon) */
    int        fd;           /* kept open for memfd, -1 otherwise */
    int        efd;          /* eventfd for notifications, -1 if none */
    uint32_t   my_slot_idx;  /* UINT32_MAX = unclaimed; per-process slot index */
    uint32_t   cached_pid;   /* getpid() at claim time */
    uint32_t   cached_fork_gen; /* fork-generation at claim time */
    uint8_t    wr_locked;    /* process-local: 1 if lock_wr is held */
    uint8_t    efd_owned;    /* 1 if we created the eventfd (close on destroy) */
} BufHandle;

/* ---- Futex-based read-write lock ---- */

#define BUF_RWLOCK_SPIN_LIMIT 32
#define BUF_LOCK_TIMEOUT_SEC  2

static inline void buf_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

#define BUF_RWLOCK_WRITER_BIT 0x80000000U
#define BUF_RWLOCK_PID_MASK   0x7FFFFFFFU
#define BUF_RWLOCK_WR(pid)    (BUF_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & BUF_RWLOCK_PID_MASK))

static inline int buf_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* ---- Per-process slot lifecycle (dead-reader recovery) ----
 * Each process claims one BufReaderSlot lazily on first lock op so that
 * its contribution to the shared rwlock counter can be reclaimed by other
 * processes if it dies (SIGKILL'd worker no longer pins the counter). */
static uint32_t buf_fork_gen = 0;
static pthread_once_t buf_atfork_once = PTHREAD_ONCE_INIT;
static void buf_on_fork_child(void) {
    __atomic_add_fetch(&buf_fork_gen, 1, __ATOMIC_RELAXED);
}
static void buf_atfork_init(void) {
    pthread_atfork(NULL, NULL, buf_on_fork_child);
}

static inline void buf_claim_reader_slot(BufHandle *h) {
    if (!h->reader_slots) return;
    pthread_once(&buf_atfork_once, buf_atfork_init);
    uint32_t cur_gen = __atomic_load_n(&buf_fork_gen, __ATOMIC_RELAXED);
    if (h->cached_fork_gen != cur_gen) {
        h->cached_fork_gen = cur_gen;
        h->my_slot_idx = UINT32_MAX;
    }
    if (h->my_slot_idx != UINT32_MAX) return;
    uint32_t now_pid = (uint32_t)getpid();
    h->cached_pid = now_pid;
    uint32_t start = now_pid % BUF_READER_SLOTS;
    for (uint32_t i = 0; i < BUF_READER_SLOTS; i++) {
        uint32_t s = (start + i) % BUF_READER_SLOTS;
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(&h->reader_slots[s].pid,
                &expected, now_pid, 0,
                __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
            __atomic_store_n(&h->reader_slots[s].subcount, 0, __ATOMIC_RELAXED);
            __atomic_store_n(&h->reader_slots[s].waiters_parked, 0, __ATOMIC_RELAXED);
            __atomic_store_n(&h->reader_slots[s].writers_parked, 0, __ATOMIC_RELAXED);
            h->my_slot_idx = s;
            return;
        }
    }
    /* Slot table full — silently skip tracking; recovery falls back to
     * the slow per-op timeout drain. */
}

/* Atomically subtract `sub` from a counter, capped at 0 (never underflows). */
static inline void buf_atomic_sub_cap(uint32_t *p, uint32_t sub) {
    if (!sub) return;
    uint32_t cur = __atomic_load_n(p, __ATOMIC_RELAXED);
    for (;;) {
        uint32_t want = (cur > sub) ? cur - sub : 0;
        if (__atomic_compare_exchange_n(p, &cur, want,
                1, __ATOMIC_RELAXED, __ATOMIC_RELAXED))
            return;
    }
}

/* Try to claim a dead slot (CAS pid → 0) and drain its parked-waiter
 * contributions to the global counters. Returns 1 if drained, 0 if lost
 * the CAS race or had no contributions. ACQ_REL syncs us with the dead
 * process's RELAXED stores to mirror fields on weakly-ordered archs. */
static inline int buf_drain_dead_slot(BufHandle *h, uint32_t i, uint32_t pid) {
    BufHeader *hdr = h->hdr;
    uint32_t expected = pid;
    if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return 0;
    uint32_t wp    = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
    uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
    int drained = 0;
    if (wp)    { buf_atomic_sub_cap(&hdr->rwlock_waiters, wp); drained = 1; }
    if (writp) { buf_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp); drained = 1; }
    /* Don't zero slot fields — buf_claim_reader_slot zeros them on the
     * next claim; zeroing here can race a new claimant's increments. */
    return drained;
}

static inline void buf_recover_dead_readers(BufHandle *h) {
    if (!h->reader_slots) return;
    BufHeader *hdr = h->hdr;
    int any_live_reader = 0;
    int found_dead_reader = 0;
    int any_recovery = 0;

    /* Pass 1: scan; classify; immediate-wipe dead slots with sc==0 (no
     * rwlock contribution to lose). Defer wiping dead-with-sc>0 slots
     * until force-reset can fire — otherwise we'd lose the only record
     * of the orphan rwlock contribution while a live reader is present. */
    for (uint32_t i = 0; i < BUF_READER_SLOTS; i++) {
        uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
        if (pid == 0) continue;
        uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
        if (buf_pid_alive(pid)) {
            if (sc > 0) any_live_reader = 1;
            continue;
        }
        if (sc > 0) { found_dead_reader = 1; continue; }
        if (buf_drain_dead_slot(h, i, pid)) any_recovery = 1;
    }

    /* Pass 2: only if force-reset will fire.  Issue the rwlock CAS first
     * to keep the race window with new readers narrow, then wipe the
     * deferred dead slots. */
    if (found_dead_reader && !any_live_reader) {
        uint32_t cur = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
        if (cur > 0 && cur < BUF_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
                    0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
                any_recovery = 1;
                if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
                    syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
            }
        }
        for (uint32_t i = 0; i < BUF_READER_SLOTS; i++) {
            uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
            if (pid == 0) continue;
            if (buf_pid_alive(pid)) continue;
            if (buf_drain_dead_slot(h, i, pid)) any_recovery = 1;
        }
    }
    if (any_recovery)
        __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
}

/* Park/unpark helpers — keep global rwlock_waiters/writers_waiting and
 * per-slot mirror counters in sync so recovery can drain them. */
static inline void buf_park_reader(BufHandle *h) {
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void buf_unpark_reader(BufHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void buf_park_writer(BufHandle *h) {
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
}
static inline void buf_unpark_writer(BufHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
}

static inline void buf_recover_stale_lock(BufHeader *hdr, uint32_t observed_rwlock) {
    uint32_t mypid = BUF_RWLOCK_WR((uint32_t)getpid());
    if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
            mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
        return;
    uint32_t seq = __atomic_load_n(&hdr->seq, __ATOMIC_ACQUIRE);
    if (seq & 1)
        __atomic_store_n(&hdr->seq, seq + 1, __ATOMIC_RELEASE);
    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static const struct timespec buf_lock_timeout = { BUF_LOCK_TIMEOUT_SEC, 0 };

/* Recovery dispatcher: if a writer is dead, force-reset the lock word;
 * otherwise scan reader slots for dead readers and drain their stuck
 * contributions to the rwlock and waiter counters.  Reload the lock
 * value here (rather than trusting a stale snapshot from the futex
 * caller) so that (a) a writer that died after our futex_wait started
 * is detected on the same timeout, and (b) phantom waiter/writers_waiting
 * contributions left by a dead parked writer are drained even when the
 * lock word itself is now 0. */
static inline void buf_recover_after_timeout(BufHandle *h) {
    BufHeader *hdr = h->hdr;
    uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
    if (val >= BUF_RWLOCK_WRITER_BIT) {
        uint32_t pid = val & BUF_RWLOCK_PID_MASK;
        if (!buf_pid_alive(pid))
            buf_recover_stale_lock(hdr, val);
    } else {
        buf_recover_dead_readers(h);
    }
}

static inline void buf_rwlock_rdlock(BufHandle *h) {
    BufHeader *hdr = h->hdr;
    buf_claim_reader_slot(h);
    uint32_t *lock = &hdr->rwlock;
    uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
    /* Bump per-process subcount BEFORE attempting the rwlock CAS so a
     * concurrent recovery scan sees us as a live in-flight reader. */
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    for (int spin = 0; ; spin++) {
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur > 0 && cur < BUF_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < BUF_RWLOCK_SPIN_LIMIT, 1)) {
            buf_spin_pause();
            continue;
        }
        buf_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur >= BUF_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &buf_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                buf_unpark_reader(h);
                buf_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        buf_unpark_reader(h);
        spin = 0;
    }
}

static inline void buf_rwlock_rdunlock(BufHandle *h) {
    /* Decrement rwlock BEFORE subcount: a concurrent recovery scan that
     * sees subcount > 0 with our (live) PID will (correctly) treat us as
     * an in-flight reader and skip force-reset. */
    uint32_t prev = __atomic_sub_fetch(&h->hdr->rwlock, 1, __ATOMIC_RELEASE);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    if (prev == 0 && __atomic_load_n(&h->hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &h->hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static inline void buf_rwlock_wrlock(BufHandle *h) {
    BufHeader *hdr = h->hdr;
    buf_claim_reader_slot(h);
    uint32_t *lock = &hdr->rwlock;
    uint32_t mypid = BUF_RWLOCK_WR((uint32_t)getpid());
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(lock, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < BUF_RWLOCK_SPIN_LIMIT, 1)) {
            buf_spin_pause();
            continue;
        }
        buf_park_writer(h);
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &buf_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                buf_unpark_writer(h);
                buf_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        buf_unpark_writer(h);
        spin = 0;
    }
}

static inline void buf_rwlock_wrunlock(BufHandle *h) {
    __atomic_store_n(&h->hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&h->hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &h->hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

/* ---- Seqlock ---- */

static inline uint32_t buf_seqlock_read_begin(BufHeader *hdr) {
    int spin = 0;
    for (;;) {
        uint32_t s = __atomic_load_n(&hdr->seq, __ATOMIC_ACQUIRE);
        if (__builtin_expect((s & 1) == 0, 1)) return s;
        if (__builtin_expect(spin < 100000, 1)) {
            buf_spin_pause();
            spin++;
            continue;
        }
        uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
        if (val >= BUF_RWLOCK_WRITER_BIT) {
            uint32_t pid = val & BUF_RWLOCK_PID_MASK;
            if (!buf_pid_alive(pid)) {
                buf_recover_stale_lock(hdr, val);
                spin = 0;
                continue;
            }
        }
        struct timespec ts = {0, 1000000};
        nanosleep(&ts, NULL);
        spin = 0;
    }
}

static inline int buf_seqlock_read_retry(uint32_t *seq, uint32_t start) {
    return __atomic_load_n(seq, __ATOMIC_ACQUIRE) != start;
}

static inline void buf_seqlock_write_begin(uint32_t *seq) {
    __atomic_add_fetch(seq, 1, __ATOMIC_RELEASE);
}

static inline void buf_seqlock_write_end(uint32_t *seq) {
    __atomic_add_fetch(seq, 1, __ATOMIC_RELEASE);
}

/* ---- mmap create/open ---- */

static BufHandle *buf_create_map(const char *path, uint64_t capacity,
                                  uint32_t elem_size, uint32_t variant_id,
                                  char *errbuf) {
    errbuf[0] = '\0';
    int created = 0;
    int fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0666);
    if (fd >= 0) {
        created = 1;
    } else if (errno == EEXIST) {
        fd = open(path, O_RDWR);
    }
    if (fd < 0) {
        snprintf(errbuf, BUF_ERR_BUFLEN, "open(%s): %s", path, strerror(errno));
        return NULL;
    }

    /* Lock file for init race prevention */
    if (flock(fd, LOCK_EX) < 0) {
        snprintf(errbuf, BUF_ERR_BUFLEN, "flock(%s): %s", path, strerror(errno));
        close(fd);
        return NULL;
    }

    uint64_t reader_slots_off = sizeof(BufHeader); /* 128 */
    uint64_t reader_slots_size = (uint64_t)BUF_READER_SLOTS * sizeof(BufReaderSlot);
    uint64_t data_off = reader_slots_off + reader_slots_size; /* cache-aligned */
    if (elem_size > 0 && capacity > (UINT64_MAX - data_off) / elem_size) {
        snprintf(errbuf, BUF_ERR_BUFLEN, "buffer size overflow");
        flock(fd, LOCK_UN);
        close(fd);
        return NULL;
    }
    uint64_t total_size = data_off + capacity * elem_size;

    struct stat st;



( run in 1.135 second using v1.01-cache-2.11-cpan-2398b32b56e )