Data-HashMap-Shared

 view release on metacpan or  search on metacpan

shm_generic.h  view on Meta::CPAN

    uint32_t   shard_count;  /* total shards (1 for single maps) */
    char      *copy_buf;
    uint32_t   copy_buf_size;
} ShmCursor;

/* Grow a copy buffer to hold `needed` bytes; returns 0 on OOM */
static inline int shm_grow_buf(char **buf, uint32_t *cap, uint32_t needed) {
    if (needed == 0) needed = 1;
    if (needed <= *cap) return 1;
    uint32_t ns = *cap ? *cap : 64;
    while (ns < needed) {
        uint32_t next = ns * 2;
        if (next <= ns) { ns = needed; break; } /* overflow guard */
        ns = next;
    }
    char *nb = (char *)realloc(*buf, ns);
    if (!nb) return 0;
    *buf = nb;
    *cap = ns;
    return 1;
}

static inline int shm_ensure_copy_buf(ShmHandle *h, uint32_t needed) {
    return shm_grow_buf(&h->copy_buf, &h->copy_buf_size, needed);
}

static inline int shm_cursor_ensure_copy_buf(ShmCursor *c, uint32_t needed) {
    return shm_grow_buf(&c->copy_buf, &c->copy_buf_size, needed);
}

/* ---- Hash functions (xxHash, XXH3) ---- */

static inline uint64_t shm_hash_int64(int64_t key) {
    return XXH3_64bits(&key, sizeof(key));
}

static inline uint64_t shm_hash_string(const char *data, uint32_t len) {
    return XXH3_64bits(data, (size_t)len);
}

/* ---- Futex-based read-write lock ---- */

#define SHM_RWLOCK_SPIN_LIMIT 32
#define SHM_LOCK_TIMEOUT_SEC  2  /* FUTEX_WAIT timeout for stale lock detection */

static inline void shm_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define SHM_RWLOCK_WRITER_BIT 0x80000000U
#define SHM_RWLOCK_PID_MASK   0x7FFFFFFFU
#define SHM_RWLOCK_WR(pid)    (SHM_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SHM_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse — if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/SHM_VERSION change).
 * Documented under "Crash Safety" in the POD. */
static inline int shm_pid_alive(uint32_t pid) {
    if (pid == 0) return 1; /* no owner recorded, assume alive */
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* Forward declaration — defined later in the LRU helpers section. */
static void shm_lru_rebuild_if_corrupt(ShmHandle *h);

/* Force-recover a stale write lock left by a dead process.
 * CAS to OUR pid to hold the lock while fixing seqlock, then release.
 * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
 * recovering process can detect and re-recover if we crash mid-recovery. */
static inline void shm_recover_stale_lock(ShmHandle *h, uint32_t observed_rwlock) {
    ShmHeader *hdr = h->hdr;
    uint32_t mypid = SHM_RWLOCK_WR((uint32_t)getpid());
    if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
            mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
        return;
    /* We now hold the write lock as mypid.  Repair shared state — the
     * seqlock counter (if dead writer left it odd) and the LRU doubly-
     * linked list (if dead writer left it one-way-broken) — while no
     * other process can mutate them. */
    uint32_t seq = __atomic_load_n(&hdr->seq, __ATOMIC_RELAXED);
    if (seq & 1)
        __atomic_store_n(&hdr->seq, seq + 1, __ATOMIC_RELEASE);
    shm_lru_rebuild_if_corrupt(h);
    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
    /* Release the lock */
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static const struct timespec shm_lock_timeout = { SHM_LOCK_TIMEOUT_SEC, 0 };

/* Process-global fork-generation counter.  Incremented in the pthread_atfork
 * child callback so every open handle detects a fork transition on the next
 * lock call without paying a getpid() syscall on the hot path. */
static uint32_t shm_fork_gen = 1;
static pthread_once_t shm_atfork_once = PTHREAD_ONCE_INIT;
static void shm_on_fork_child(void) {
    __atomic_add_fetch(&shm_fork_gen, 1, __ATOMIC_RELAXED);
}
static void shm_atfork_init(void) {
    pthread_atfork(NULL, NULL, shm_on_fork_child);
}

/* Ensure this process owns a reader slot.  Called from the lock helpers so
 * that fork()'d children pick up their own slot lazily instead of sharing
 * the parent's.  Hot-path is a single relaxed load + compare; only on a
 * fork-generation mismatch do we touch getpid() and scan slots. */
static inline void shm_claim_reader_slot(ShmHandle *h) {
    uint32_t cur_gen = __atomic_load_n(&shm_fork_gen, __ATOMIC_RELAXED);
    if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
        return;
    /* Cold path — register the atfork hook once per process, then claim. */
    pthread_once(&shm_atfork_once, shm_atfork_init);
    /* Re-read after pthread_once: shm_on_fork_child may have bumped it. */
    cur_gen = __atomic_load_n(&shm_fork_gen, __ATOMIC_RELAXED);
    uint32_t now_pid = (uint32_t)getpid();
    h->cached_pid = now_pid;
    h->cached_fork_gen = cur_gen;

shm_generic.h  view on Meta::CPAN

}

/* Try to claim a dead slot (CAS pid → 0) and drain its parked-waiter
 * contributions back to the global counters.  Returns 1 if the slot was
 * claimed and any drain happened, 0 otherwise (slot was stolen by another
 * recoverer, or had no waiter contribution to drain).
 *
 * Note: subcount/waiters_parked/writers_parked are NOT zeroed here.
 * Between our CAS and a follow-up store, a new process could claim the
 * slot and start populating these fields — our stores would clobber its
 * state.  shm_claim_reader_slot zeros all three on every claim, so
 * leaving stale values is harmless. */
static inline int shm_drain_dead_slot(ShmHandle *h, uint32_t i, uint32_t pid) {
    ShmHeader *hdr = h->hdr;
    uint32_t expected = pid;
    /* ACQ_REL on success: RELEASE publishes pid=0 to other observers;
     * ACQUIRE syncs us with prior writes from the dead process to
     * waiters_parked/writers_parked.  On weakly-ordered archs (aarch64)
     * a plain RELAXED load before the CAS could miss those writes;
     * loading them after the CAS keeps them inside the acquire window. */
    if (!__atomic_compare_exchange_n(&h->reader_slots[i].pid, &expected, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return 0;
    uint32_t wp    = __atomic_load_n(&h->reader_slots[i].waiters_parked, __ATOMIC_RELAXED);
    uint32_t writp = __atomic_load_n(&h->reader_slots[i].writers_parked, __ATOMIC_RELAXED);
    int drained = 0;
    if (wp)    { shm_atomic_sub_cap(&hdr->rwlock_waiters, wp); drained = 1; }
    if (writp) { shm_atomic_sub_cap(&hdr->rwlock_writers_waiting, writp); drained = 1; }
    return drained;
}

/* Scan reader slots for dead-process recovery.
 *
 * For each dead PID with non-zero contributions to the shared rwlock,
 * rwlock_waiters, or rwlock_writers_waiting counters, drain its share back
 * out so live processes don't have to wait for the slow per-op timeout
 * decrement to drain it for them.
 *
 * For the main rwlock counter we use the "no live reader holds → force-
 * reset to 0" trick (precise) because per-process attribution of the
 * subcount is racy across the inc-counter-then-inc-subcount window. */
static inline void shm_recover_dead_readers(ShmHandle *h) {
    if (!h->reader_slots) return;
    ShmHeader *hdr = h->hdr;
    int any_live_reader = 0;
    int found_dead_reader = 0;
    int any_recovery = 0;

    /* Pass 1: classify slots.  Slots with dead pid and sc == 0 (no rwlock
     * contribution to lose) are wiped immediately to free the slot for
     * future claimants and drain any orphan parked-waiter counters.  Slots
     * with dead pid and sc > 0 are left intact in this pass: if force-
     * reset cannot fire (because a live reader is concurrently present),
     * wiping the dead slot would lose the only record of its orphan
     * rwlock contribution and strand writers permanently once the live
     * reader releases. */
    for (uint32_t i = 0; i < SHM_READER_SLOTS; i++) {
        uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
        if (pid == 0) continue;
        uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
        if (shm_pid_alive(pid)) {
            if (sc > 0) any_live_reader = 1;
            continue;
        }
        if (sc > 0) { found_dead_reader = 1; continue; }
        if (shm_drain_dead_slot(h, i, pid)) any_recovery = 1;
    }

    /* Pass 2: only if force-reset will fire.  Issue the rwlock force-
     * reset CAS FIRST, while the window since pass 1's last scan is
     * still narrow (a handful of instructions, as in the original
     * single-pass code).  A new reader that started rdlock between
     * pass 1's scan and the CAS will either:
     *   (a) have already CAS'd rwlock from cur to cur+1 — our CAS then
     *       fails (cur mismatched), recovery yields and a future
     *       cycle retries; or
     *   (b) be still in the subcount-bump phase — our CAS sees the
     *       stale cur and resets to 0; the new reader's subsequent CAS
     *       rwlock(0 → 1) succeeds cleanly.
     * Only after the CAS resolves do we wipe the deferred dead slots,
     * keeping that work outside the race-sensitive window. */
    if (found_dead_reader && !any_live_reader) {
        uint32_t cur = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
        if (cur > 0 && cur < SHM_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
                    0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
                any_recovery = 1;
                if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
                    syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
            }
        }
        for (uint32_t i = 0; i < SHM_READER_SLOTS; i++) {
            uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
            if (pid == 0 || shm_pid_alive(pid)) continue;
            if (shm_drain_dead_slot(h, i, pid)) any_recovery = 1;
        }
    }
    if (any_recovery)
        __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
}

/* Inspect the lock word after a futex-wait timeout.  If a dead writer
 * holds it, force-recover the lock (which also rebuilds the LRU list
 * if it was left half-linked, all under the recovered write lock).
 * Otherwise drain dead readers' shares of the rwlock/waiter counters.
 * Called from rdlock and wrlock ETIMEDOUT branches — identical recovery
 * logic in both. */
static inline void shm_recover_after_timeout(ShmHandle *h) {
    ShmHeader *hdr = h->hdr;
    uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
    if (val >= SHM_RWLOCK_WRITER_BIT) {
        uint32_t pid = val & SHM_RWLOCK_PID_MASK;
        if (!shm_pid_alive(pid))
            shm_recover_stale_lock(h, val);
    } else {
        shm_recover_dead_readers(h);
    }
}

/* Park/unpark helpers: bump the global waiter counters together with this
 * process's mirrored slot counters so a wrlock-timeout recovery scan can
 * attribute and reverse a dead PID's contribution.  Kept paired to make
 * accidental drift between global and per-slot counts impossible. */
static inline void shm_park_reader(ShmHandle *h) {
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void shm_unpark_reader(ShmHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void shm_park_writer(ShmHandle *h) {
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void shm_unpark_writer(ShmHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
}

static inline void shm_rwlock_rdlock(ShmHandle *h) {
    shm_claim_reader_slot(h);
    ShmHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
    /* Claim subcount BEFORE bumping the shared rwlock counter.  This way
     * a concurrent writer-side recovery scan that sees our PID alive with
     * subcount > 0 will (correctly) defer force-reset, even while we are
     * still spinning trying to win the rwlock CAS.  Without this, a reader
     * killed between rwlock CAS-success and subcount++ would let recovery
     * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
     * our eventual rdunlock dec. */
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    for (int spin = 0; ; spin++) {
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Write-preferring: when lock is free (cur==0) and writers are
         * waiting, yield to let the writer acquire. When readers are
         * already active (cur>=1), new readers may join freely. */
        if (cur > 0 && cur < SHM_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < SHM_RWLOCK_SPIN_LIMIT, 1)) {
            shm_rwlock_spin_pause();
            continue;
        }
        shm_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= SHM_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &shm_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                shm_unpark_reader(h);
                shm_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        shm_unpark_reader(h);
        spin = 0;
    }
}

static inline void shm_rwlock_rdunlock(ShmHandle *h) {
    ShmHeader *hdr = h->hdr;
    /* Release the shared counter BEFORE dropping our subcount so that
     * "any live PID with subcount > 0" is a reliable in-flight indicator
     * for the writer-side recovery scan.  Inverting these would create a
     * window where we still own a unit of rwlock but our slot subcount is
     * 0, letting recovery force-reset rwlock underneath us. */
    uint32_t prev = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    if (prev == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static inline void shm_rwlock_wrlock(ShmHandle *h) {
    shm_claim_reader_slot(h);  /* refresh cached_pid across fork */
    ShmHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
     * any crash window between acquiring the lock and storing the owner. */
    uint32_t mypid = SHM_RWLOCK_WR(h->cached_pid);
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(lock, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < SHM_RWLOCK_SPIN_LIMIT, 1)) {
            shm_rwlock_spin_pause();
            continue;
        }
        shm_park_writer(h);
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &shm_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                shm_unpark_writer(h);
                shm_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        shm_unpark_writer(h);
        spin = 0;
    }
}

static inline void shm_rwlock_wrunlock(ShmHandle *h) {
    ShmHeader *hdr = h->hdr;
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

/* ---- Seqlock (lock-free readers) ---- */

static inline uint32_t shm_seqlock_read_begin(ShmHandle *h) {
    ShmHeader *hdr = h->hdr;
    int spin = 0;
    for (;;) {
        uint32_t s = __atomic_load_n(&hdr->seq, __ATOMIC_ACQUIRE);
        if (__builtin_expect((s & 1) == 0, 1)) return s;
        if (__builtin_expect(spin < 100000, 1)) {
            shm_rwlock_spin_pause();
            spin++;
            continue;
        }
        /* Prolonged odd seq — check for dead writer */
        uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
        if (val >= SHM_RWLOCK_WRITER_BIT) {
            uint32_t pid = val & SHM_RWLOCK_PID_MASK;
            if (!shm_pid_alive(pid)) {
                shm_recover_stale_lock(h, val);
                spin = 0;
                continue;
            }
        }
        /* Writer is alive, yield CPU */
        struct timespec ts = {0, 1000000}; /* 1ms */
        nanosleep(&ts, NULL);
        spin = 0;
    }
}

static inline int shm_seqlock_read_retry(uint32_t *seq, uint32_t start) {
    __atomic_thread_fence(__ATOMIC_ACQUIRE);  /* ensure data loads complete before retry check */
    return __atomic_load_n(seq, __ATOMIC_RELAXED) != start;
}

static inline void shm_seqlock_write_begin(uint32_t *seq) {
    __atomic_add_fetch(seq, 1, __ATOMIC_RELEASE);  /* seq becomes odd */
}

static inline void shm_seqlock_write_end(uint32_t *seq) {
    __atomic_add_fetch(seq, 1, __ATOMIC_RELEASE);  /* seq becomes even */
}

/* ---- Arena allocator ---- */

static inline uint32_t shm_next_pow2(uint32_t v);

static inline uint32_t shm_arena_round_up(uint32_t len) {
    if (len < SHM_ARENA_MIN_ALLOC) return SHM_ARENA_MIN_ALLOC;
    return shm_next_pow2(len);
}

static inline int shm_arena_class_index(uint32_t alloc_size) {
    if (alloc_size <= SHM_ARENA_MIN_ALLOC) return 0;
    if (alloc_size > (SHM_ARENA_MIN_ALLOC << (SHM_ARENA_NUM_CLASSES - 1))) return -1;
    return 32 - __builtin_clz(alloc_size - 1) - 4;  /* log2(alloc_size) - 4 */
}

static inline uint32_t shm_arena_alloc(ShmHeader *hdr, char *arena, uint32_t len) {
    uint32_t asize = shm_arena_round_up(len);
    int cls = shm_arena_class_index(asize);

    if (cls >= 0 && hdr->arena_free[cls] != 0) {
        uint32_t head = hdr->arena_free[cls];
        uint32_t next;
        memcpy(&next, arena + head, sizeof(uint32_t));
        hdr->arena_free[cls] = next;
        return head;
    }

    uint64_t off = hdr->arena_bump;
    if (off + asize > hdr->arena_cap || off + asize > (uint64_t)UINT32_MAX)
        return 0;
    hdr->arena_bump = off + asize;
    return (uint32_t)off;
}

static inline void shm_arena_free_block(ShmHeader *hdr, char *arena,
                                         uint32_t off, uint32_t len) {
    uint32_t asize = shm_arena_round_up(len);
    int cls = shm_arena_class_index(asize);
    if (cls < 0 || off == 0) return;

    uint32_t old_head = hdr->arena_free[cls];



( run in 1.767 second using v1.01-cache-2.11-cpan-2398b32b56e )