Data-HashMap-Shared
view release on metacpan or search on metacpan
shm_generic.h view on Meta::CPAN
return (len_field >> 24) & 0x7;
}
/* Read inline string into caller buffer. Returns pointer to data (buf). */
static inline const char *shm_inline_read(uint32_t off, uint32_t len_field,
char *buf) {
uint32_t slen = shm_inline_len(len_field);
memcpy(buf, &off, slen > 4 ? 4 : slen);
if (slen > 4) {
uint32_t rest = len_field & 0x00FFFFFFU;
memcpy(buf + 4, &rest, slen - 4);
}
return buf;
}
/* Get string pointer + length, handling both inline and arena modes.
* For inline, copies to buf and returns buf. For arena, returns arena pointer directly. */
static inline const char *shm_str_ptr(uint32_t off, uint32_t len_field,
const char *arena, char *inline_buf,
uint32_t *out_len) {
if (SHM_IS_INLINE(len_field)) {
*out_len = shm_inline_len(len_field);
return shm_inline_read(off, len_field, inline_buf);
}
*out_len = SHM_UNPACK_LEN(len_field);
return arena + off;
}
/* ---- Shared memory header (256 bytes, 4 cache lines, in mmap) ---- */
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
#define SHM_STATIC_ASSERT(cond, msg) _Static_assert(cond, msg)
#else
#define SHM_STATIC_ASSERT(cond, msg)
#endif
typedef struct {
/* ---- Cache line 0 (0-63): immutable after create ---- */
uint32_t magic; /* 0 */
uint32_t version; /* 4 */
uint32_t variant_id; /* 8 */
uint32_t node_size; /* 12 */
uint32_t max_table_cap; /* 16 */
uint32_t table_cap; /* 20: changes on resize only */
uint32_t max_size; /* 24: LRU capacity, 0 = disabled */
uint32_t default_ttl; /* 28: TTL seconds, 0 = disabled */
uint64_t total_size; /* 32 */
uint64_t nodes_off; /* 40 */
uint64_t states_off; /* 48 */
uint64_t arena_off; /* 56 */
/* ---- Cache line 1 (64-127): seqlock + read-path data ---- */
uint32_t seq; /* 64: seqlock counter, odd = writer active */
uint32_t rwlock_writers_waiting; /* 68: count of writers in FUTEX_WAIT
(reader write-preferring yield signal) */
uint64_t arena_cap; /* 72: immutable, read by seqlock string path */
uint64_t reader_slots_off;/* 80: offset of reader-PID slot table for dead-reader recovery */
uint8_t _reserved1[40]; /* 88-127 */
/* ---- Cache line 2 (128-191): rwlock + write-hot fields ---- */
uint32_t rwlock; /* 128: 0=unlocked, 1..0x7FFFFFFF=readers, 0x80000000|pid=writer */
uint32_t rwlock_waiters; /* 132 */
uint32_t size; /* 136 */
uint32_t tombstones; /* 140 */
uint32_t lru_head; /* 144: MRU slot index */
uint32_t lru_tail; /* 148: LRU slot index */
uint32_t flush_cursor; /* 152: partial flush_expired scan cursor */
uint32_t table_gen; /* 156: incremented on every resize */
uint64_t arena_bump; /* 160 */
uint64_t stat_evictions; /* 168: cumulative LRU eviction count */
uint64_t stat_expired; /* 176: cumulative TTL expiration count */
uint32_t stat_recoveries; /* 184: cumulative stale lock recovery count */
uint32_t lru_skip; /* 188: promotion skip mask (power-of-2 minus 1, 0=strict LRU) */
/* ---- Cache line 3 (192-255): arena free lists ---- */
uint32_t arena_free[SHM_ARENA_NUM_CLASSES]; /* 192-255 */
} ShmHeader;
SHM_STATIC_ASSERT(sizeof(ShmHeader) == 256, "ShmHeader must be exactly 256 bytes (4 cache lines)");
/* Per-process slot for dead-process recovery. Each shared rwlock counter
* (the main rwlock-reader count, rwlock_waiters, rwlock_writers_waiting)
* is mirrored here so a wrlock timeout can attribute and reverse a dead
* process's contribution instead of waiting for the slow per-op timeout
* drain. */
typedef struct {
uint32_t pid; /* 0 = unclaimed */
uint32_t subcount; /* in-flight rdlock acquisitions for this process */
uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters */
uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
} ShmReaderSlot;
/* ---- Process-local handle ---- */
typedef struct ShmHandle_s {
ShmHeader *hdr;
void *nodes;
uint8_t *states;
char *arena;
uint32_t *lru_prev; /* NULL if LRU disabled */
uint32_t *lru_next; /* NULL if LRU disabled */
uint8_t *lru_accessed; /* NULL if LRU disabled â clock second-chance bit */
uint32_t *expires_at; /* NULL if TTL disabled */
ShmReaderSlot *reader_slots; /* SHM_READER_SLOTS entries */
uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
uint32_t cached_pid; /* getpid() cached at last slot claim */
uint32_t cached_fork_gen; /* shm_fork_gen value at last slot claim â mismatch triggers reclaim */
size_t mmap_size;
uint32_t max_mask; /* max_table_cap - 1, for seqlock bounds clamping */
uint32_t iter_pos;
char *copy_buf;
uint32_t copy_buf_size;
uint32_t iterating; /* active iterator count (each + cursors) */
uint32_t iter_gen; /* table_gen snapshot for each() */
uint8_t iter_active; /* 1 = built-in each is in progress */
uint8_t deferred; /* shrink/compact deferred while iterating */
char *path; /* backing file path (strdup'd) */
int backing_fd; /* memfd fd to close on destroy, -1 otherwise */
/* Sharding: if shard_handles != NULL, this is a sharded map dispatcher */
struct ShmHandle_s **shard_handles; /* NULL for single map */
uint32_t num_shards;
shm_generic.h view on Meta::CPAN
ShmHandle *handle; /* for single maps, direct handle; for sharded, the dispatcher */
ShmHandle *current; /* current shard handle (== handle for single maps) */
uint32_t iter_pos;
uint32_t gen; /* table_gen snapshot â reset on mismatch */
uint32_t shard_idx; /* current shard index (0 for single maps) */
uint32_t shard_count; /* total shards (1 for single maps) */
char *copy_buf;
uint32_t copy_buf_size;
} ShmCursor;
/* Grow a copy buffer to hold `needed` bytes; returns 0 on OOM */
static inline int shm_grow_buf(char **buf, uint32_t *cap, uint32_t needed) {
if (needed == 0) needed = 1;
if (needed <= *cap) return 1;
uint32_t ns = *cap ? *cap : 64;
while (ns < needed) {
uint32_t next = ns * 2;
if (next <= ns) { ns = needed; break; } /* overflow guard */
ns = next;
}
char *nb = (char *)realloc(*buf, ns);
if (!nb) return 0;
*buf = nb;
*cap = ns;
return 1;
}
static inline int shm_ensure_copy_buf(ShmHandle *h, uint32_t needed) {
return shm_grow_buf(&h->copy_buf, &h->copy_buf_size, needed);
}
static inline int shm_cursor_ensure_copy_buf(ShmCursor *c, uint32_t needed) {
return shm_grow_buf(&c->copy_buf, &c->copy_buf_size, needed);
}
/* ---- Hash functions (xxHash, XXH3) ---- */
static inline uint64_t shm_hash_int64(int64_t key) {
return XXH3_64bits(&key, sizeof(key));
}
static inline uint64_t shm_hash_string(const char *data, uint32_t len) {
return XXH3_64bits(data, (size_t)len);
}
/* ---- Futex-based read-write lock ---- */
#define SHM_RWLOCK_SPIN_LIMIT 32
#define SHM_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
static inline void shm_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define SHM_RWLOCK_WRITER_BIT 0x80000000U
#define SHM_RWLOCK_PID_MASK 0x7FFFFFFFU
#define SHM_RWLOCK_WR(pid) (SHM_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SHM_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse â if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Robust detection would require
* a per-slot process-start-time epoch (a header-layout/SHM_VERSION change).
* Documented under "Crash Safety" in the POD. */
static inline int shm_pid_alive(uint32_t pid) {
if (pid == 0) return 1; /* no owner recorded, assume alive */
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Forward declaration â defined later in the LRU helpers section. */
static void shm_lru_rebuild_if_corrupt(ShmHandle *h);
/* Force-recover a stale write lock left by a dead process.
* CAS to OUR pid to hold the lock while fixing seqlock, then release.
* Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
* recovering process can detect and re-recover if we crash mid-recovery. */
static inline void shm_recover_stale_lock(ShmHandle *h, uint32_t observed_rwlock) {
ShmHeader *hdr = h->hdr;
uint32_t mypid = SHM_RWLOCK_WR((uint32_t)getpid());
if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
/* We now hold the write lock as mypid. Repair shared state â the
* seqlock counter (if dead writer left it odd) and the LRU doubly-
* linked list (if dead writer left it one-way-broken) â while no
* other process can mutate them. */
uint32_t seq = __atomic_load_n(&hdr->seq, __ATOMIC_RELAXED);
if (seq & 1)
__atomic_store_n(&hdr->seq, seq + 1, __ATOMIC_RELEASE);
shm_lru_rebuild_if_corrupt(h);
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
/* Release the lock */
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static const struct timespec shm_lock_timeout = { SHM_LOCK_TIMEOUT_SEC, 0 };
/* Process-global fork-generation counter. Incremented in the pthread_atfork
* child callback so every open handle detects a fork transition on the next
* lock call without paying a getpid() syscall on the hot path. */
static uint32_t shm_fork_gen = 1;
static pthread_once_t shm_atfork_once = PTHREAD_ONCE_INIT;
static void shm_on_fork_child(void) {
__atomic_add_fetch(&shm_fork_gen, 1, __ATOMIC_RELAXED);
}
static void shm_atfork_init(void) {
pthread_atfork(NULL, NULL, shm_on_fork_child);
}
/* Ensure this process owns a reader slot. Called from the lock helpers so
* that fork()'d children pick up their own slot lazily instead of sharing
shm_generic.h view on Meta::CPAN
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void shm_unpark_reader(ShmHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void shm_park_writer(ShmHandle *h) {
if (h->my_slot_idx != UINT32_MAX) {
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void shm_unpark_writer(ShmHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX) {
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
}
static inline void shm_rwlock_rdlock(ShmHandle *h) {
shm_claim_reader_slot(h);
ShmHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
/* Claim subcount BEFORE bumping the shared rwlock counter. This way
* a concurrent writer-side recovery scan that sees our PID alive with
* subcount > 0 will (correctly) defer force-reset, even while we are
* still spinning trying to win the rwlock CAS. Without this, a reader
* killed between rwlock CAS-success and subcount++ would let recovery
* force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
* our eventual rdunlock dec. */
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Write-preferring: when lock is free (cur==0) and writers are
* waiting, yield to let the writer acquire. When readers are
* already active (cur>=1), new readers may join freely. */
if (cur > 0 && cur < SHM_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < SHM_RWLOCK_SPIN_LIMIT, 1)) {
shm_rwlock_spin_pause();
continue;
}
shm_park_reader(h);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR when yielding to waiting writers */
if (cur >= SHM_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&shm_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
shm_unpark_reader(h);
shm_recover_after_timeout(h);
spin = 0;
continue;
}
}
shm_unpark_reader(h);
spin = 0;
}
}
static inline void shm_rwlock_rdunlock(ShmHandle *h) {
ShmHeader *hdr = h->hdr;
/* Release the shared counter BEFORE dropping our subcount so that
* "any live PID with subcount > 0" is a reliable in-flight indicator
* for the writer-side recovery scan. Inverting these would create a
* window where we still own a unit of rwlock but our slot subcount is
* 0, letting recovery force-reset rwlock underneath us. */
uint32_t prev = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
if (prev == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void shm_rwlock_wrlock(ShmHandle *h) {
shm_claim_reader_slot(h); /* refresh cached_pid across fork */
ShmHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
/* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
* any crash window between acquiring the lock and storing the owner. */
uint32_t mypid = SHM_RWLOCK_WR(h->cached_pid);
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < SHM_RWLOCK_SPIN_LIMIT, 1)) {
shm_rwlock_spin_pause();
continue;
}
shm_park_writer(h);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&shm_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
shm_unpark_writer(h);
shm_recover_after_timeout(h);
spin = 0;
continue;
}
}
shm_unpark_writer(h);
spin = 0;
}
( run in 0.535 second using v1.01-cache-2.11-cpan-13bb782fe5a )