Data-Pool-Shared
view release on metacpan or search on metacpan
#define POOL_VAR_F64 2
#define POOL_VAR_I32 3
#define POOL_VAR_STR 4
#define POOL_ALIGN8(x) (((x) + 7) & ~(uint64_t)7)
/* ================================================================
* Header (128 bytes = 2 cache lines)
* ================================================================ */
typedef struct {
/* ---- Cache line 0 (0-63): immutable after create ---- */
uint32_t magic; /* 0 */
uint32_t version; /* 4 */
uint32_t elem_size; /* 8 */
uint32_t variant_id; /* 12 */
uint64_t capacity; /* 16: number of slots */
uint64_t total_size; /* 24: total mmap size */
uint64_t data_off; /* 32: offset to slot data */
uint64_t bitmap_off; /* 40: offset to allocation bitmap */
uint64_t owners_off; /* 48: offset to per-slot owner PIDs */
uint8_t _pad0[8]; /* 56-63 */
/* ---- Cache line 1 (64-127): mutable state ---- */
uint32_t used; /* 64: allocated count (futex word) */
uint32_t waiters; /* 68: blocked on alloc */
uint8_t _pad1[8]; /* 72-79 */
uint64_t stat_allocs; /* 80 */
uint64_t stat_frees; /* 88 */
uint64_t stat_waits; /* 96 */
uint64_t stat_timeouts; /* 104 */
uint64_t stat_recoveries;/* 112 */
uint8_t _pad2[8]; /* 120-127 */
} PoolHeader;
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(PoolHeader) == 128, "PoolHeader must be 128 bytes");
#endif
/* ================================================================
* Process-local handle
* ================================================================ */
typedef struct {
PoolHeader *hdr;
uint64_t *bitmap;
uint32_t *owners;
uint8_t *data;
size_t mmap_size;
uint32_t bitmap_words;
char *path;
int notify_fd;
int backing_fd;
uint32_t scan_hint;
} PoolHandle;
/* ================================================================
* Utility
* ================================================================ */
static inline int pool_pid_alive(uint32_t pid) {
if (pid == 0) return 1;
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
static inline void pool_make_deadline(double timeout, struct timespec *deadline) {
clock_gettime(CLOCK_MONOTONIC, deadline);
deadline->tv_sec += (time_t)timeout;
deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
if (deadline->tv_nsec >= 1000000000L) {
deadline->tv_sec++;
deadline->tv_nsec -= 1000000000L;
}
}
static inline int pool_remaining_time(const struct timespec *deadline,
struct timespec *remaining) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
remaining->tv_sec = deadline->tv_sec - now.tv_sec;
remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
if (remaining->tv_nsec < 0) {
remaining->tv_sec--;
remaining->tv_nsec += 1000000000L;
}
return remaining->tv_sec >= 0;
}
/* ================================================================
* Slot access
* ================================================================ */
static inline uint8_t *pool_slot_ptr(PoolHandle *h, uint64_t slot) {
return h->data + slot * h->hdr->elem_size;
}
static inline int pool_is_allocated(PoolHandle *h, uint64_t slot) {
uint32_t widx = (uint32_t)(slot / 64);
int bit = (int)(slot % 64);
uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
return (word >> bit) & 1;
}
/* ================================================================
* Allocation (lock-free bitmap scan + CAS)
* ================================================================ */
static inline int64_t pool_try_alloc(PoolHandle *h) {
uint32_t nwords = h->bitmap_words;
uint64_t cap = h->hdr->capacity;
uint32_t start = h->scan_hint;
uint32_t mypid = (uint32_t)getpid();
for (uint32_t i = 0; i < nwords; i++) {
uint32_t widx = (start + i) % nwords;
uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
while (word != ~(uint64_t)0) {
int bit = __builtin_ctzll(~word);
uint64_t slot = (uint64_t)widx * 64 + bit;
if (slot >= cap) break;
}
}
return freed;
}
/* ================================================================
* Batch alloc â all-or-nothing, shared deadline
* ================================================================ */
static inline int pool_alloc_n(PoolHandle *h, uint64_t *out, uint32_t count,
double timeout) {
if (count == 0) return 1;
if (timeout == 0) {
for (uint32_t i = 0; i < count; i++) {
int64_t slot = pool_try_alloc(h);
if (slot < 0) {
if (i > 0) pool_free_n(h, out, i);
return 0;
}
out[i] = (uint64_t)slot;
}
return 1;
}
struct timespec deadline, remaining;
int has_deadline = (timeout > 0);
if (has_deadline) pool_make_deadline(timeout, &deadline);
for (uint32_t i = 0; i < count; i++) {
double t = timeout;
if (has_deadline) {
if (!pool_remaining_time(&deadline, &remaining)) {
__atomic_add_fetch(&h->hdr->stat_timeouts, 1, __ATOMIC_RELAXED);
if (i > 0) pool_free_n(h, out, i);
return 0;
}
t = (double)remaining.tv_sec + (double)remaining.tv_nsec / 1e9;
}
int64_t slot = pool_alloc(h, t);
if (slot < 0) {
if (i > 0) pool_free_n(h, out, i);
return 0;
}
out[i] = (uint64_t)slot;
}
return 1;
}
/* ================================================================
* Stale recovery â CAS owner to narrow race window
* ================================================================ */
static inline uint32_t pool_recover_stale(PoolHandle *h) {
uint32_t recovered = 0;
uint64_t cap = h->hdr->capacity;
for (uint64_t slot = 0; slot < cap; slot++) {
if (!pool_is_allocated(h, slot)) continue;
uint32_t owner = __atomic_load_n(&h->owners[slot], __ATOMIC_ACQUIRE);
if (owner == 0 || pool_pid_alive(owner)) continue;
/* CAS owner from dead PID to 0 â if it fails, slot was
* re-allocated or already recovered by another process */
if (!__atomic_compare_exchange_n(&h->owners[slot], &owner, 0,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
continue;
/* We now own the right to free this slot's bitmap bit */
uint32_t widx = (uint32_t)(slot / 64);
int bit = (int)(slot % 64);
uint64_t mask = (uint64_t)1 << bit;
for (;;) {
uint64_t word = __atomic_load_n(&h->bitmap[widx], __ATOMIC_RELAXED);
if (!(word & mask)) break;
uint64_t new_word = word & ~mask;
if (__atomic_compare_exchange_n(&h->bitmap[widx], &word, new_word,
1, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
__atomic_sub_fetch(&h->hdr->used, 1, __ATOMIC_RELEASE);
__atomic_add_fetch(&h->hdr->stat_frees, 1, __ATOMIC_RELAXED);
if (__atomic_load_n(&h->hdr->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &h->hdr->used, FUTEX_WAKE, 1, NULL, NULL, 0);
recovered++;
break;
}
}
}
if (recovered > 0)
__atomic_add_fetch(&h->hdr->stat_recoveries, recovered, __ATOMIC_RELAXED);
return recovered;
}
/* ================================================================
* Layout calculation
* ================================================================ */
static inline void pool_calc_layout(uint64_t capacity, uint32_t elem_size,
uint64_t *bitmap_off, uint64_t *owners_off,
uint64_t *data_off, uint64_t *total_size) {
uint64_t bwords = (capacity + 63) / 64;
uint64_t bitmap_sz = bwords * 8;
uint64_t owners_sz = POOL_ALIGN8(capacity * 4);
uint64_t data_sz = (uint64_t)capacity * elem_size;
*bitmap_off = sizeof(PoolHeader);
*owners_off = *bitmap_off + bitmap_sz;
*data_off = *owners_off + owners_sz;
*total_size = *data_off + data_sz;
}
/* ================================================================
* Header initialization (shared by pool_create and pool_create_memfd)
* ================================================================ */
static inline void pool_init_header(void *base, uint64_t total,
uint32_t elem_size, uint32_t variant_id,
uint64_t capacity, uint64_t bm_off,
uint64_t own_off, uint64_t dat_off) {
( run in 0.771 second using v1.01-cache-2.11-cpan-39bf76dae61 )