Data-HashMap-Shared

 view release on metacpan or  search on metacpan

shm_generic.h  view on Meta::CPAN

    return (len_field >> 24) & 0x7;
}

/* Read inline string into caller buffer. Returns pointer to data (buf). */
static inline const char *shm_inline_read(uint32_t off, uint32_t len_field,
                                           char *buf) {
    uint32_t slen = shm_inline_len(len_field);
    memcpy(buf, &off, slen > 4 ? 4 : slen);
    if (slen > 4) {
        uint32_t rest = len_field & 0x00FFFFFFU;
        memcpy(buf + 4, &rest, slen - 4);
    }
    return buf;
}

/* Get string pointer + length, handling both inline and arena modes.
 * For inline, copies to buf and returns buf. For arena, returns arena pointer directly. */
static inline const char *shm_str_ptr(uint32_t off, uint32_t len_field,
                                       const char *arena, char *inline_buf,
                                       uint32_t *out_len) {
    if (SHM_IS_INLINE(len_field)) {
        *out_len = shm_inline_len(len_field);
        return shm_inline_read(off, len_field, inline_buf);
    }
    *out_len = SHM_UNPACK_LEN(len_field);
    return arena + off;
}

/* ---- Shared memory header (256 bytes, 4 cache lines, in mmap) ---- */

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
#define SHM_STATIC_ASSERT(cond, msg) _Static_assert(cond, msg)
#else
#define SHM_STATIC_ASSERT(cond, msg)
#endif

typedef struct {
    /* ---- Cache line 0 (0-63): immutable after create ---- */
    uint32_t magic;           /* 0 */
    uint32_t version;         /* 4 */
    uint32_t variant_id;      /* 8 */
    uint32_t node_size;       /* 12 */
    uint32_t max_table_cap;   /* 16 */
    uint32_t table_cap;       /* 20: changes on resize only */
    uint32_t max_size;        /* 24: LRU capacity, 0 = disabled */
    uint32_t default_ttl;     /* 28: TTL seconds, 0 = disabled */
    uint64_t total_size;      /* 32 */
    uint64_t nodes_off;       /* 40 */
    uint64_t states_off;      /* 48 */
    uint64_t arena_off;       /* 56 */

    /* ---- Cache line 1 (64-127): seqlock + read-path data ---- */
    uint32_t seq;             /* 64: seqlock counter, odd = writer active */
    uint32_t rwlock_writers_waiting; /* 68: count of writers in FUTEX_WAIT
                                        (reader write-preferring yield signal) */
    uint64_t arena_cap;       /* 72: immutable, read by seqlock string path */
    uint64_t reader_slots_off;/* 80: offset of reader-PID slot table for dead-reader recovery */
    uint8_t  _reserved1[40];  /* 88-127 */

    /* ---- Cache line 2 (128-191): rwlock + write-hot fields ---- */
    uint32_t rwlock;          /* 128: 0=unlocked, 1..0x7FFFFFFF=readers, 0x80000000|pid=writer */
    uint32_t rwlock_waiters;  /* 132 */
    uint32_t size;            /* 136 */
    uint32_t tombstones;      /* 140 */
    uint32_t lru_head;        /* 144: MRU slot index */
    uint32_t lru_tail;        /* 148: LRU slot index */
    uint32_t flush_cursor;    /* 152: partial flush_expired scan cursor */
    uint32_t table_gen;       /* 156: incremented on every resize */
    uint64_t arena_bump;      /* 160 */
    uint64_t stat_evictions;  /* 168: cumulative LRU eviction count */
    uint64_t stat_expired;    /* 176: cumulative TTL expiration count */
    uint32_t stat_recoveries; /* 184: cumulative stale lock recovery count */
    uint32_t lru_skip;        /* 188: promotion skip mask (power-of-2 minus 1, 0=strict LRU) */

    /* ---- Cache line 3 (192-255): arena free lists ---- */
    uint32_t arena_free[SHM_ARENA_NUM_CLASSES]; /* 192-255 */
} ShmHeader;

SHM_STATIC_ASSERT(sizeof(ShmHeader) == 256, "ShmHeader must be exactly 256 bytes (4 cache lines)");

/* Per-process slot for dead-process recovery.  Each shared rwlock counter
 * (the main rwlock-reader count, rwlock_waiters, rwlock_writers_waiting)
 * is mirrored here so a wrlock timeout can attribute and reverse a dead
 * process's contribution instead of waiting for the slow per-op timeout
 * drain. */
typedef struct {
    uint32_t pid;            /* 0 = unclaimed */
    uint32_t subcount;       /* in-flight rdlock acquisitions for this process */
    uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters         */
    uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
} ShmReaderSlot;

/* ---- Process-local handle ---- */

typedef struct ShmHandle_s {
    ShmHeader *hdr;
    void      *nodes;
    uint8_t   *states;
    char      *arena;
    uint32_t  *lru_prev;    /* NULL if LRU disabled */
    uint32_t  *lru_next;    /* NULL if LRU disabled */
    uint8_t   *lru_accessed; /* NULL if LRU disabled — clock second-chance bit */
    uint32_t  *expires_at;  /* NULL if TTL disabled */
    ShmReaderSlot *reader_slots; /* SHM_READER_SLOTS entries */
    uint32_t   my_slot_idx;  /* UINT32_MAX if all slots taken (no recovery for this handle) */
    uint32_t   cached_pid;   /* getpid() cached at last slot claim */
    uint32_t   cached_fork_gen; /* shm_fork_gen value at last slot claim — mismatch triggers reclaim */
    size_t     mmap_size;
    uint32_t   max_mask;    /* max_table_cap - 1, for seqlock bounds clamping */
    uint32_t   iter_pos;
    char      *copy_buf;
    uint32_t   copy_buf_size;
    uint32_t   iterating;   /* active iterator count (each + cursors) */
    uint32_t   iter_gen;    /* table_gen snapshot for each() */
    uint8_t    iter_active; /* 1 = built-in each is in progress */
    uint8_t    deferred;    /* shrink/compact deferred while iterating */
    char      *path;        /* backing file path (strdup'd) */
    int        backing_fd;  /* memfd fd to close on destroy, -1 otherwise */
    /* Sharding: if shard_handles != NULL, this is a sharded map dispatcher */
    struct ShmHandle_s **shard_handles; /* NULL for single map */
    uint32_t   num_shards;

shm_generic.h  view on Meta::CPAN

    ShmHandle *handle;       /* for single maps, direct handle; for sharded, the dispatcher */
    ShmHandle *current;      /* current shard handle (== handle for single maps) */
    uint32_t   iter_pos;
    uint32_t   gen;          /* table_gen snapshot — reset on mismatch */
    uint32_t   shard_idx;    /* current shard index (0 for single maps) */
    uint32_t   shard_count;  /* total shards (1 for single maps) */
    char      *copy_buf;
    uint32_t   copy_buf_size;
} ShmCursor;

/* Grow a copy buffer to hold `needed` bytes; returns 0 on OOM */
static inline int shm_grow_buf(char **buf, uint32_t *cap, uint32_t needed) {
    if (needed == 0) needed = 1;
    if (needed <= *cap) return 1;
    uint32_t ns = *cap ? *cap : 64;
    while (ns < needed) {
        uint32_t next = ns * 2;
        if (next <= ns) { ns = needed; break; } /* overflow guard */
        ns = next;
    }
    char *nb = (char *)realloc(*buf, ns);
    if (!nb) return 0;
    *buf = nb;
    *cap = ns;
    return 1;
}

static inline int shm_ensure_copy_buf(ShmHandle *h, uint32_t needed) {
    return shm_grow_buf(&h->copy_buf, &h->copy_buf_size, needed);
}

static inline int shm_cursor_ensure_copy_buf(ShmCursor *c, uint32_t needed) {
    return shm_grow_buf(&c->copy_buf, &c->copy_buf_size, needed);
}

/* ---- Hash functions (xxHash, XXH3) ---- */

static inline uint64_t shm_hash_int64(int64_t key) {
    return XXH3_64bits(&key, sizeof(key));
}

static inline uint64_t shm_hash_string(const char *data, uint32_t len) {
    return XXH3_64bits(data, (size_t)len);
}

/* ---- Futex-based read-write lock ---- */

#define SHM_RWLOCK_SPIN_LIMIT 32
#define SHM_LOCK_TIMEOUT_SEC  2  /* FUTEX_WAIT timeout for stale lock detection */

static inline void shm_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define SHM_RWLOCK_WRITER_BIT 0x80000000U
#define SHM_RWLOCK_PID_MASK   0x7FFFFFFFU
#define SHM_RWLOCK_WR(pid)    (SHM_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SHM_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse — if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/SHM_VERSION change).
 * Documented under "Crash Safety" in the POD. */
static inline int shm_pid_alive(uint32_t pid) {
    if (pid == 0) return 1; /* no owner recorded, assume alive */
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* Forward declaration — defined later in the LRU helpers section. */
static void shm_lru_rebuild_if_corrupt(ShmHandle *h);

/* Force-recover a stale write lock left by a dead process.
 * CAS to OUR pid to hold the lock while fixing seqlock, then release.
 * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
 * recovering process can detect and re-recover if we crash mid-recovery. */
static inline void shm_recover_stale_lock(ShmHandle *h, uint32_t observed_rwlock) {
    ShmHeader *hdr = h->hdr;
    uint32_t mypid = SHM_RWLOCK_WR((uint32_t)getpid());
    if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
            mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
        return;
    /* We now hold the write lock as mypid.  Repair shared state — the
     * seqlock counter (if dead writer left it odd) and the LRU doubly-
     * linked list (if dead writer left it one-way-broken) — while no
     * other process can mutate them. */
    uint32_t seq = __atomic_load_n(&hdr->seq, __ATOMIC_RELAXED);
    if (seq & 1)
        __atomic_store_n(&hdr->seq, seq + 1, __ATOMIC_RELEASE);
    shm_lru_rebuild_if_corrupt(h);
    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
    /* Release the lock */
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static const struct timespec shm_lock_timeout = { SHM_LOCK_TIMEOUT_SEC, 0 };

/* Process-global fork-generation counter.  Incremented in the pthread_atfork
 * child callback so every open handle detects a fork transition on the next
 * lock call without paying a getpid() syscall on the hot path. */
static uint32_t shm_fork_gen = 1;
static pthread_once_t shm_atfork_once = PTHREAD_ONCE_INIT;
static void shm_on_fork_child(void) {
    __atomic_add_fetch(&shm_fork_gen, 1, __ATOMIC_RELAXED);
}
static void shm_atfork_init(void) {
    pthread_atfork(NULL, NULL, shm_on_fork_child);
}

/* Ensure this process owns a reader slot.  Called from the lock helpers so
 * that fork()'d children pick up their own slot lazily instead of sharing

shm_generic.h  view on Meta::CPAN

    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void shm_unpark_reader(ShmHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void shm_park_writer(ShmHandle *h) {
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void shm_unpark_writer(ShmHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
}

static inline void shm_rwlock_rdlock(ShmHandle *h) {
    shm_claim_reader_slot(h);
    ShmHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
    /* Claim subcount BEFORE bumping the shared rwlock counter.  This way
     * a concurrent writer-side recovery scan that sees our PID alive with
     * subcount > 0 will (correctly) defer force-reset, even while we are
     * still spinning trying to win the rwlock CAS.  Without this, a reader
     * killed between rwlock CAS-success and subcount++ would let recovery
     * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
     * our eventual rdunlock dec. */
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    for (int spin = 0; ; spin++) {
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Write-preferring: when lock is free (cur==0) and writers are
         * waiting, yield to let the writer acquire. When readers are
         * already active (cur>=1), new readers may join freely. */
        if (cur > 0 && cur < SHM_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < SHM_RWLOCK_SPIN_LIMIT, 1)) {
            shm_rwlock_spin_pause();
            continue;
        }
        shm_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= SHM_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &shm_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                shm_unpark_reader(h);
                shm_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        shm_unpark_reader(h);
        spin = 0;
    }
}

static inline void shm_rwlock_rdunlock(ShmHandle *h) {
    ShmHeader *hdr = h->hdr;
    /* Release the shared counter BEFORE dropping our subcount so that
     * "any live PID with subcount > 0" is a reliable in-flight indicator
     * for the writer-side recovery scan.  Inverting these would create a
     * window where we still own a unit of rwlock but our slot subcount is
     * 0, letting recovery force-reset rwlock underneath us. */
    uint32_t prev = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    if (prev == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static inline void shm_rwlock_wrlock(ShmHandle *h) {
    shm_claim_reader_slot(h);  /* refresh cached_pid across fork */
    ShmHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
     * any crash window between acquiring the lock and storing the owner. */
    uint32_t mypid = SHM_RWLOCK_WR(h->cached_pid);
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(lock, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < SHM_RWLOCK_SPIN_LIMIT, 1)) {
            shm_rwlock_spin_pause();
            continue;
        }
        shm_park_writer(h);
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &shm_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                shm_unpark_writer(h);
                shm_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        shm_unpark_writer(h);
        spin = 0;
    }



( run in 0.535 second using v1.01-cache-2.11-cpan-13bb782fe5a )