Data-Heap-Shared
view release on metacpan or search on metacpan
/*
* heap.h -- Shared-memory binary min-heap (priority queue) for Linux
*
* Mutex-protected push/pop with sift-up/sift-down.
* Futex blocking when empty (pop_wait).
* Elements are (int64_t priority, int64_t value) pairs.
* Lowest priority pops first (min-heap).
*/
#ifndef HEAP_H
#define HEAP_H
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
#include <limits.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>
#define HEAP_MAGIC 0x48455031U /* "HEP1" */
#define HEAP_VERSION 1
#define HEAP_ERR_BUFLEN 256
#define HEAP_SPIN_LIMIT 32
#define HEAP_MUTEX_BIT 0x80000000U
#define HEAP_MUTEX_PID 0x7FFFFFFFU
/* Indices into the heap data are uint32_t (sift_up/down). Cap capacity
* to UINT32_MAX so size++ in heap_push cannot wrap and the (uint32_t)
* cast against hdr->capacity in the capacity check cannot truncate. */
#define HEAP_MAX_CAPACITY ((uint64_t)UINT32_MAX)
typedef struct {
int64_t priority;
int64_t value;
} HeapEntry;
/* ================================================================
* Header (128 bytes)
* ================================================================ */
typedef struct {
uint32_t magic;
uint32_t version;
uint64_t capacity;
uint64_t total_size;
uint64_t data_off;
uint8_t _pad0[32];
uint32_t size; /* 64: current element count (futex word for pop) */
uint32_t mutex; /* 68: 0=free, HEAP_MUTEX_BIT|pid=locked */
uint32_t mutex_waiters; /* 72 */
uint32_t waiters_pop; /* 76 */
uint64_t stat_pushes; /* 80 */
uint64_t stat_pops; /* 88 */
uint64_t stat_waits; /* 96 */
uint64_t stat_timeouts; /* 104 */
uint64_t stat_recoveries; /* 112 */
uint8_t _pad1[8]; /* 120-127 */
} HeapHeader;
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(HeapHeader) == 128, "HeapHeader must be 128 bytes");
#endif
typedef struct {
HeapHeader *hdr;
HeapEntry *data;
size_t mmap_size;
char *path;
int notify_fd;
int backing_fd;
} HeapHandle;
/* ================================================================
* Mutex (PID-based, stale-recoverable)
* ================================================================ */
static const struct timespec heap_lock_timeout = { 2, 0 };
static inline int heap_pid_alive(uint32_t pid) {
if (pid == 0) return 1;
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
static inline void heap_mutex_lock(HeapHeader *hdr) {
uint32_t mypid = HEAP_MUTEX_BIT | ((uint32_t)getpid() & HEAP_MUTEX_PID);
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (spin < HEAP_SPIN_LIMIT) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#endif
continue;
}
__atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
&heap_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT && cur >= HEAP_MUTEX_BIT) {
uint32_t pid = cur & HEAP_MUTEX_PID;
if (!heap_pid_alive(pid)) {
if (__atomic_compare_exchange_n(&hdr->mutex, &cur, 0,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
( run in 0.758 second using v1.01-cache-2.11-cpan-df04353d9ac )