Data-HashMap-Shared
view release on metacpan or search on metacpan
shm_generic.h view on Meta::CPAN
uint32_t shard_count; /* total shards (1 for single maps) */
char *copy_buf;
uint32_t copy_buf_size;
} ShmCursor;
/* Grow a copy buffer to hold `needed` bytes; returns 0 on OOM */
static inline int shm_grow_buf(char **buf, uint32_t *cap, uint32_t needed) {
if (needed == 0) needed = 1;
if (needed <= *cap) return 1;
uint32_t ns = *cap ? *cap : 64;
while (ns < needed) {
uint32_t next = ns * 2;
if (next <= ns) { ns = needed; break; } /* overflow guard */
ns = next;
}
char *nb = (char *)realloc(*buf, ns);
if (!nb) return 0;
*buf = nb;
*cap = ns;
return 1;
}
static inline int shm_ensure_copy_buf(ShmHandle *h, uint32_t needed) {
return shm_grow_buf(&h->copy_buf, &h->copy_buf_size, needed);
}
static inline int shm_cursor_ensure_copy_buf(ShmCursor *c, uint32_t needed) {
return shm_grow_buf(&c->copy_buf, &c->copy_buf_size, needed);
}
/* ---- Hash functions (xxHash, XXH3) ---- */
static inline uint64_t shm_hash_int64(int64_t key) {
return XXH3_64bits(&key, sizeof(key));
}
static inline uint64_t shm_hash_string(const char *data, uint32_t len) {
return XXH3_64bits(data, (size_t)len);
}
/* ---- Futex-based read-write lock ---- */
#define SHM_RWLOCK_SPIN_LIMIT 32
#define SHM_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
static inline void shm_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define SHM_RWLOCK_WRITER_BIT 0x80000000U
#define SHM_RWLOCK_PID_MASK 0x7FFFFFFFU
#define SHM_RWLOCK_WR(pid) (SHM_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SHM_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
static inline int shm_pid_alive(uint32_t pid) {
if (pid == 0) return 1; /* no owner recorded, assume alive */
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Force-recover a stale write lock left by a dead process.
* CAS to OUR pid to hold the lock while fixing seqlock, then release.
* Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
* recovering process can detect and re-recover if we crash mid-recovery. */
static inline void shm_recover_stale_lock(ShmHeader *hdr, uint32_t observed_rwlock) {
uint32_t mypid = SHM_RWLOCK_WR((uint32_t)getpid());
if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
/* Fix seqlock while lock is still held by us */
uint32_t seq = __atomic_load_n(&hdr->seq, __ATOMIC_RELAXED);
if (seq & 1)
__atomic_store_n(&hdr->seq, seq + 1, __ATOMIC_RELEASE);
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
/* Now release the lock */
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static const struct timespec shm_lock_timeout = { SHM_LOCK_TIMEOUT_SEC, 0 };
static inline void shm_rwlock_rdlock(ShmHeader *hdr) {
uint32_t *lock = &hdr->rwlock;
uint32_t *waiters = &hdr->rwlock_waiters;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Write-preferring: when lock is free (cur==0) and writers are
* waiting, yield to let the writer acquire. When readers are
* already active (cur>=1), new readers may join freely. */
if (cur > 0 && cur < SHM_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < SHM_RWLOCK_SPIN_LIMIT, 1)) {
shm_rwlock_spin_pause();
continue;
}
__atomic_add_fetch(waiters, 1, __ATOMIC_RELAXED);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR when yielding to waiting writers */
if (cur >= SHM_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&shm_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(waiters, 1, __ATOMIC_RELAXED);
if (cur >= SHM_RWLOCK_WRITER_BIT) {
uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (val >= SHM_RWLOCK_WRITER_BIT) {
uint32_t pid = val & SHM_RWLOCK_PID_MASK;
if (!shm_pid_alive(pid))
shm_recover_stale_lock(hdr, val);
}
} else {
/* Yielding to writer timed out â drop one writers_waiting
* to recover from a potentially-crashed parked writer. */
uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
while (wc > 0 && !__atomic_compare_exchange_n(
writers_waiting, &wc, wc - 1,
1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(waiters, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
static inline void shm_rwlock_rdunlock(ShmHeader *hdr) {
uint32_t prev = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
if (prev == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void shm_rwlock_wrlock(ShmHeader *hdr) {
uint32_t *lock = &hdr->rwlock;
uint32_t *waiters = &hdr->rwlock_waiters;
/* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
* any crash window between acquiring the lock and storing the owner. */
uint32_t mypid = SHM_RWLOCK_WR((uint32_t)getpid());
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < SHM_RWLOCK_SPIN_LIMIT, 1)) {
shm_rwlock_spin_pause();
continue;
}
__atomic_add_fetch(waiters, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&shm_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (val >= SHM_RWLOCK_WRITER_BIT) {
uint32_t pid = val & SHM_RWLOCK_PID_MASK;
if (!shm_pid_alive(pid))
shm_recover_stale_lock(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
static inline void shm_rwlock_wrunlock(ShmHeader *hdr) {
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
/* ---- Seqlock (lock-free readers) ---- */
static inline uint32_t shm_seqlock_read_begin(ShmHeader *hdr) {
int spin = 0;
for (;;) {
uint32_t s = __atomic_load_n(&hdr->seq, __ATOMIC_ACQUIRE);
if (__builtin_expect((s & 1) == 0, 1)) return s;
if (__builtin_expect(spin < 100000, 1)) {
shm_rwlock_spin_pause();
spin++;
continue;
}
/* Prolonged odd seq â check for dead writer */
uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
if (val >= SHM_RWLOCK_WRITER_BIT) {
uint32_t pid = val & SHM_RWLOCK_PID_MASK;
if (!shm_pid_alive(pid)) {
shm_recover_stale_lock(hdr, val);
spin = 0;
continue;
}
}
/* Writer is alive, yield CPU */
struct timespec ts = {0, 1000000}; /* 1ms */
nanosleep(&ts, NULL);
spin = 0;
}
}
static inline int shm_seqlock_read_retry(uint32_t *seq, uint32_t start) {
__atomic_thread_fence(__ATOMIC_ACQUIRE); /* ensure data loads complete before retry check */
return __atomic_load_n(seq, __ATOMIC_RELAXED) != start;
}
static inline void shm_seqlock_write_begin(uint32_t *seq) {
__atomic_add_fetch(seq, 1, __ATOMIC_RELEASE); /* seq becomes odd */
}
static inline void shm_seqlock_write_end(uint32_t *seq) {
__atomic_add_fetch(seq, 1, __ATOMIC_RELEASE); /* seq becomes even */
}
/* ---- Arena allocator ---- */
static inline uint32_t shm_next_pow2(uint32_t v);
static inline uint32_t shm_arena_round_up(uint32_t len) {
if (len < SHM_ARENA_MIN_ALLOC) return SHM_ARENA_MIN_ALLOC;
return shm_next_pow2(len);
}
static inline int shm_arena_class_index(uint32_t alloc_size) {
if (alloc_size <= SHM_ARENA_MIN_ALLOC) return 0;
if (alloc_size > (SHM_ARENA_MIN_ALLOC << (SHM_ARENA_NUM_CLASSES - 1))) return -1;
return 32 - __builtin_clz(alloc_size - 1) - 4; /* log2(alloc_size) - 4 */
}
static inline uint32_t shm_arena_alloc(ShmHeader *hdr, char *arena, uint32_t len) {
uint32_t asize = shm_arena_round_up(len);
int cls = shm_arena_class_index(asize);
if (cls >= 0 && hdr->arena_free[cls] != 0) {
uint32_t head = hdr->arena_free[cls];
uint32_t next;
memcpy(&next, arena + head, sizeof(uint32_t));
hdr->arena_free[cls] = next;
return head;
}
uint64_t off = hdr->arena_bump;
if (off + asize > hdr->arena_cap || off + asize > (uint64_t)UINT32_MAX)
return 0;
hdr->arena_bump = off + asize;
return (uint32_t)off;
}
static inline void shm_arena_free_block(ShmHeader *hdr, char *arena,
uint32_t off, uint32_t len) {
uint32_t asize = shm_arena_round_up(len);
int cls = shm_arena_class_index(asize);
if (cls < 0 || off == 0) return;
uint32_t old_head = hdr->arena_free[cls];
( run in 0.438 second using v1.01-cache-2.11-cpan-39bf76dae61 )