Data-Stack-Shared
view release on metacpan or search on metacpan
if (fstat(fd, &st) < 0) { STK_ERR("fstat: %s", strerror(errno)); return NULL; }
if ((uint64_t)st.st_size < sizeof(StkHeader)) { STK_ERR("too small"); return NULL; }
size_t ms = (size_t)st.st_size;
void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) { STK_ERR("mmap: %s", strerror(errno)); return NULL; }
if (!stk_validate_header((StkHeader *)base, (uint64_t)st.st_size, variant_id)) {
STK_ERR("invalid stack"); munmap(base, ms); return NULL;
}
int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
if (myfd < 0) { STK_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
return stk_setup(base, ms, NULL, myfd);
}
static void stk_destroy(StkHandle *h) {
if (!h) return;
if (h->notify_fd >= 0) close(h->notify_fd);
if (h->backing_fd >= 0) close(h->backing_fd);
if (h->hdr) munmap(h->hdr, h->mmap_size);
free(h->path);
free(h);
}
/* NOT concurrency-safe â use drain() for concurrent scenarios */
static void stk_clear(StkHandle *h) {
__atomic_store_n(&h->hdr->top, 0, __ATOMIC_RELEASE);
memset(h->ctl, 0, (size_t)h->hdr->capacity * sizeof(uint64_t));
/* clear() frees the entire stack at once â wake all waiters. */
if (__atomic_load_n(&h->hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&h->hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &h->hdr->push_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
if (__atomic_load_n(&h->hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&h->hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &h->hdr->pop_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
}
/* Concurrency-safe drain: atomically swap top to 0, then release each
* drained slot through the state machine. Returns count drained.
*
* Crash-recovery: a pusher that won the top CAS but died (SIGKILL/segfault)
* between stk_slot_claim_write and stk_slot_publish leaves the slot stuck
* in WRITING. Plain stk_slot_claim_read would spin forever. We bound the
* wait at ~2s per slot; on timeout we CAS WRITING -> EMPTY (gen bumped) so
* the slot is reclaimed.
*
* Limitation: slot ctl encodes only (gen << 2) | state â no PID â so we
* cannot distinguish a crashed pusher from a merely slow one. A live pusher
* stalled > 2s would be falsely reclaimed; its subsequent publish is a CAS
* (see stk_slot_publish) so it observes the gen bump and silently no-ops
* rather than resurrecting a phantom FILLED slot. The pusher's value is
* dropped â equivalent to a crashed pusher. In practice the gap between
* claim_write and publish is sub-microsecond memcpy time, so the false-
* positive threshold is many orders of magnitude away from normal latency. */
static inline uint32_t stk_drain(StkHandle *h) {
StkHeader *hdr = h->hdr;
uint32_t t = __atomic_exchange_n(&hdr->top, 0, __ATOMIC_ACQ_REL);
if (t == 0) return 0;
/* Wall-clock deadline for the per-slot wait. We hot-spin first, then
* fall back to short sleeps to avoid burning a core for 2s on a stuck
* slot. The deadline is checked periodically (every 64 iterations) to
* keep the steady-state cost ~zero. */
for (uint32_t i = 0; i < t; i++) {
struct timespec dl;
int dl_set = 0;
uint32_t spins = 0;
for (;;) {
uint64_t c = __atomic_load_n(&h->ctl[i], __ATOMIC_ACQUIRE);
uint32_t st = STK_SLOT_STATE(c);
if (st == STK_SLOT_FILLED) {
uint64_t nc = (STK_SLOT_GEN(c) << 2) | STK_SLOT_READING;
if (__atomic_compare_exchange_n(&h->ctl[i], &c, nc,
0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
stk_slot_release(&h->ctl[i], STK_SLOT_GEN(c));
break;
}
continue;
}
stk_spin_pause();
if ((++spins & 0x3F) == 0) {
if (!dl_set) { stk_make_deadline(2.0, &dl); dl_set = 1; }
struct timespec rem;
if (!stk_remaining(&dl, &rem)) {
/* Treat as abandoned (crashed writer/reader): force the
* slot back to EMPTY with gen bumped. If CAS succeeds we
* skipped the slot; if it fails, the writer just published
* (or another recoverer fixed it) â loop and re-observe so
* a FILLED value is not leaked. */
uint64_t nc = ((STK_SLOT_GEN(c) + 1) << 2) | STK_SLOT_EMPTY;
if (__atomic_compare_exchange_n(&h->ctl[i], &c, nc,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
break;
continue;
}
/* Short sleep to keep CPU usage low during the long wait. */
struct timespec ts = { 0, 100000L }; /* 100us */
nanosleep(&ts, NULL);
}
}
}
/* drain freed `t` slots at once â wake up to that many. */
if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE,
t < INT_MAX ? (int)t : INT_MAX, NULL, NULL, 0);
}
return t;
}
/* eventfd */
static int stk_create_eventfd(StkHandle *h) {
if (h->notify_fd >= 0) return h->notify_fd;
int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
if (efd < 0) return -1;
h->notify_fd = efd;
return efd;
}
static int stk_notify(StkHandle *h) {
if (h->notify_fd < 0) return 0;
uint64_t v = 1;
return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
( run in 2.115 seconds using v1.01-cache-2.11-cpan-71847e10f99 )