Data-Heap-Shared

 view release on metacpan or  search on metacpan

heap.h  view on Meta::CPAN

#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>

#define HEAP_MAGIC       0x48455031U  /* "HEP1" */
#define HEAP_VERSION     1
#define HEAP_ERR_BUFLEN  256
#define HEAP_SPIN_LIMIT  32

#define HEAP_MUTEX_BIT   0x80000000U
#define HEAP_MUTEX_PID   0x7FFFFFFFU

typedef struct {
    int64_t priority;
    int64_t value;
} HeapEntry;

/* ================================================================
 * Header (128 bytes)
 * ================================================================ */

typedef struct {
    uint32_t magic;
    uint32_t version;
    uint64_t capacity;
    uint64_t total_size;
    uint64_t data_off;
    uint8_t  _pad0[32];

    uint32_t size;             /* 64: current element count (futex word for pop) */
    uint32_t mutex;            /* 68: 0=free, HEAP_MUTEX_BIT|pid=locked */
    uint32_t mutex_waiters;    /* 72 */
    uint32_t waiters_pop;      /* 76 */
    uint64_t stat_pushes;      /* 80 */
    uint64_t stat_pops;        /* 88 */
    uint64_t stat_waits;       /* 96 */
    uint64_t stat_timeouts;    /* 104 */
    uint64_t stat_recoveries;  /* 112 */
    uint8_t  _pad1[8];         /* 120-127 */
} HeapHeader;

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(HeapHeader) == 128, "HeapHeader must be 128 bytes");
#endif

typedef struct {
    HeapHeader *hdr;
    HeapEntry  *data;
    size_t      mmap_size;
    char       *path;
    int         notify_fd;
    int         backing_fd;
} HeapHandle;

/* ================================================================
 * Mutex (PID-based, stale-recoverable)
 * ================================================================ */

static const struct timespec heap_lock_timeout = { 2, 0 };

static inline int heap_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

static inline void heap_mutex_lock(HeapHeader *hdr) {
    uint32_t mypid = HEAP_MUTEX_BIT | ((uint32_t)getpid() & HEAP_MUTEX_PID);
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (spin < HEAP_SPIN_LIMIT) {
#if defined(__x86_64__) || defined(__i386__)
            __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
            __asm__ volatile("yield" ::: "memory");
#endif
            continue;
        }
        __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
                              &heap_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT && cur >= HEAP_MUTEX_BIT) {
                uint32_t pid = cur & HEAP_MUTEX_PID;
                if (!heap_pid_alive(pid)) {
                    if (__atomic_compare_exchange_n(&hdr->mutex, &cur, 0,
                            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
                        __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
                        /* Wake one waiter so recovery latency is not bounded by the 2s timeout. */
                        if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
                            syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
                    }
                }
            }
        }
        __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

static inline void heap_mutex_unlock(HeapHeader *hdr) {
    __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

/* ================================================================
 * Heap operations (must hold mutex)
 * ================================================================ */

static inline void heap_swap(HeapEntry *a, HeapEntry *b) {
    HeapEntry t = *a; *a = *b; *b = t;
}

static inline void heap_sift_up(HeapEntry *data, uint32_t idx) {
    while (idx > 0) {
        uint32_t parent = (idx - 1) / 2;
        if (data[parent].priority <= data[idx].priority) break;
        heap_swap(&data[parent], &data[idx]);
        idx = parent;
    }
}

static inline void heap_sift_down(HeapEntry *data, uint32_t size, uint32_t idx) {
    while (1) {
        uint32_t smallest = idx;
        uint32_t left = 2 * idx + 1;
        uint32_t right = 2 * idx + 2;
        if (left < size && data[left].priority < data[smallest].priority)
            smallest = left;
        if (right < size && data[right].priority < data[smallest].priority)
            smallest = right;
        if (smallest == idx) break;
        heap_swap(&data[idx], &data[smallest]);
        idx = smallest;
    }
}

/* ================================================================
 * Public API
 * ================================================================ */

static inline void heap_make_deadline(double t, struct timespec *dl) {
    clock_gettime(CLOCK_MONOTONIC, dl);
    dl->tv_sec += (time_t)t;



( run in 2.469 seconds using v1.01-cache-2.11-cpan-5837b0d9d2c )