Data-Pool-Shared

 view release on metacpan or  search on metacpan

pool.h  view on Meta::CPAN

#define POOL_VAR_F64  2
#define POOL_VAR_I32  3
#define POOL_VAR_STR  4

#define POOL_ALIGN8(x) (((x) + 7) & ~(uint64_t)7)

/* ================================================================
 * Header (128 bytes = 2 cache lines)
 * ================================================================ */

typedef struct {
    /* ---- Cache line 0 (0-63): immutable after create ---- */
    uint32_t magic;          /* 0 */
    uint32_t version;        /* 4 */
    uint32_t elem_size;      /* 8 */
    uint32_t variant_id;     /* 12 */
    uint64_t capacity;       /* 16: number of slots */
    uint64_t total_size;     /* 24: total mmap size */
    uint64_t data_off;       /* 32: offset to slot data */
    uint64_t bitmap_off;     /* 40: offset to allocation bitmap */
    uint64_t owners_off;     /* 48: offset to per-slot owner PIDs */
    uint8_t  _pad0[8];       /* 56-63 */

    /* ---- Cache line 1 (64-127): mutable state ---- */
    uint32_t used;           /* 64: allocated count (futex word) */
    uint32_t waiters;        /* 68: blocked on alloc */
    uint8_t  _pad1[8];       /* 72-79 */
    uint64_t stat_allocs;    /* 80 */
    uint64_t stat_frees;     /* 88 */
    uint64_t stat_waits;     /* 96 */
    uint64_t stat_timeouts;  /* 104 */
    uint64_t stat_recoveries;/* 112 */
    uint8_t  _pad2[8];       /* 120-127 */
} PoolHeader;

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(PoolHeader) == 128, "PoolHeader must be 128 bytes");
#endif

/* ================================================================
 * Process-local handle
 * ================================================================ */

typedef struct {
    PoolHeader *hdr;
    uint64_t   *bitmap;
    uint32_t   *owners;
    uint8_t    *data;
    size_t      mmap_size;
    uint32_t    bitmap_words;
    char       *path;
    int         notify_fd;
    int         backing_fd;
    uint32_t    scan_hint;
} PoolHandle;

/* ================================================================
 * Utility
 * ================================================================ */

static inline int pool_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

static inline void pool_make_deadline(double timeout, struct timespec *deadline) {
    clock_gettime(CLOCK_MONOTONIC, deadline);
    deadline->tv_sec += (time_t)timeout;
    deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
    if (deadline->tv_nsec >= 1000000000L) {
        deadline->tv_sec++;
        deadline->tv_nsec -= 1000000000L;
    }
}

static inline int pool_remaining_time(const struct timespec *deadline,
                                       struct timespec *remaining) {
    struct timespec now;
    clock_gettime(CLOCK_MONOTONIC, &now);
    remaining->tv_sec = deadline->tv_sec - now.tv_sec;
    remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
    if (remaining->tv_nsec < 0) {
        remaining->tv_sec--;
        remaining->tv_nsec += 1000000000L;
    }
    return remaining->tv_sec >= 0;
}

/* ================================================================
 * Slot access
 * ================================================================ */

static inline uint8_t *pool_slot_ptr(PoolHandle *h, uint64_t slot) {
    return h->data + slot * h->hdr->elem_size;
}

static inline int pool_is_allocated(PoolHandle *h, uint64_t slot) {
    uint32_t widx = (uint32_t)(slot / 64);
    int bit = (int)(slot % 64);
    uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
    return (word >> bit) & 1;
}

/* ================================================================
 * Allocation (lock-free bitmap scan + CAS)
 * ================================================================ */

static inline int64_t pool_try_alloc(PoolHandle *h) {
    uint32_t nwords = h->bitmap_words;
    uint64_t cap = h->hdr->capacity;
    uint32_t start = h->scan_hint;
    uint32_t mypid = (uint32_t)getpid();

    for (uint32_t i = 0; i < nwords; i++) {
        uint32_t widx = (start + i) % nwords;
        uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);

        while (word != ~(uint64_t)0) {
            int bit = __builtin_ctzll(~word);
            uint64_t slot = (uint64_t)widx * 64 + bit;
            if (slot >= cap) break;

pool.h  view on Meta::CPAN

        }
    }
    return freed;
}

/* ================================================================
 * Batch alloc — all-or-nothing, shared deadline
 * ================================================================ */

static inline int pool_alloc_n(PoolHandle *h, uint64_t *out, uint32_t count,
                                double timeout) {
    if (count == 0) return 1;

    if (timeout == 0) {
        for (uint32_t i = 0; i < count; i++) {
            int64_t slot = pool_try_alloc(h);
            if (slot < 0) {
                if (i > 0) pool_free_n(h, out, i);
                return 0;
            }
            out[i] = (uint64_t)slot;
        }
        return 1;
    }

    struct timespec deadline, remaining;
    int has_deadline = (timeout > 0);
    if (has_deadline) pool_make_deadline(timeout, &deadline);

    for (uint32_t i = 0; i < count; i++) {
        double t = timeout;
        if (has_deadline) {
            if (!pool_remaining_time(&deadline, &remaining)) {
                __atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
                if (i > 0) pool_free_n(h, out, i);
                return 0;
            }
            t = (double)remaining.tv_sec + (double)remaining.tv_nsec / 1e9;
        }
        int64_t slot = pool_alloc(h, t);
        if (slot < 0) {
            if (i > 0) pool_free_n(h, out, i);
            return 0;
        }
        out[i] = (uint64_t)slot;
    }
    return 1;
}

/* ================================================================
 * Stale recovery — CAS owner to narrow race window
 * ================================================================ */

static inline uint32_t pool_recover_stale(PoolHandle *h) {
    uint32_t recovered = 0;
    uint64_t cap = h->hdr->capacity;

    for (uint64_t slot = 0; slot < cap; slot++) {
        if (!pool_is_allocated(h, slot)) continue;
        uint32_t owner = __atomic_load_n(&h->owners[slot], __ATOMIC_ACQUIRE);
        if (owner == 0 || pool_pid_alive(owner)) continue;

        /* CAS owner from dead PID to 0 — if it fails, slot was
         * re-allocated or already recovered by another process */
        if (!__atomic_compare_exchange_n(&h->owners[slot], &owner, 0,
                0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
            continue;

        /* We now own the right to free this slot's bitmap bit */
        uint32_t widx = (uint32_t)(slot / 64);
        int bit = (int)(slot % 64);
        uint64_t mask = (uint64_t)1 << bit;

        for (;;) {
            uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
            if (!(word & mask)) break;
            uint64_t new_word = word & ~mask;
            if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
                    1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
                __atomic_sub_fetch(&h->hdr->used, 1, __ATOMIC_RELEASE);
                __atomic_add_fetch(&h->hdr->stat_frees, 1, __ATOMIC_RELAXED);
                if (__atomic_load_n(&h->hdr->waiters, __ATOMIC_RELAXED) > 0)
                    syscall(SYS_futex, &h->hdr->used, FUTEX_WAKE, 1, NULL, NULL, 0);
                recovered++;
                break;
            }
        }
    }

    if (recovered > 0)
        __atomic_add_fetch(&h->hdr->stat_recoveries, recovered, __ATOMIC_RELAXED);

    return recovered;
}

/* ================================================================
 * Layout calculation
 * ================================================================ */

static inline void pool_calc_layout(uint64_t capacity, uint32_t elem_size,
                                     uint64_t *bitmap_off, uint64_t *owners_off,
                                     uint64_t *data_off, uint64_t *total_size) {
    uint64_t bwords = (capacity + 63) / 64;
    uint64_t bitmap_sz = bwords * 8;
    uint64_t owners_sz = POOL_ALIGN8(capacity * 4);
    uint64_t data_sz   = (uint64_t)capacity * elem_size;

    *bitmap_off   = sizeof(PoolHeader);
    *owners_off   = *bitmap_off + bitmap_sz;
    *data_off     = *owners_off + owners_sz;
    *total_size   = *data_off + data_sz;
}

/* ================================================================
 * Header initialization (shared by pool_create and pool_create_memfd)
 * ================================================================ */

static inline void pool_init_header(void *base, uint64_t total,
                                     uint32_t elem_size, uint32_t variant_id,
                                     uint64_t capacity, uint64_t bm_off,
                                     uint64_t own_off, uint64_t dat_off) {



( run in 0.771 second using v1.01-cache-2.11-cpan-39bf76dae61 )