Data-Stack-Shared

 view release on metacpan or  search on metacpan

stack.h  view on Meta::CPAN

    if (fstat(fd, &st) < 0) { STK_ERR("fstat: %s", strerror(errno)); return NULL; }
    if ((uint64_t)st.st_size < sizeof(StkHeader)) { STK_ERR("too small"); return NULL; }
    size_t ms = (size_t)st.st_size;
    void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
    if (base == MAP_FAILED) { STK_ERR("mmap: %s", strerror(errno)); return NULL; }
    if (!stk_validate_header((StkHeader *)base, (uint64_t)st.st_size, variant_id)) {
        STK_ERR("invalid stack"); munmap(base, ms); return NULL;
    }
    int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
    if (myfd < 0) { STK_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    return stk_setup(base, ms, NULL, myfd);
}

static void stk_destroy(StkHandle *h) {
    if (!h) return;
    if (h->notify_fd >= 0) close(h->notify_fd);
    if (h->backing_fd >= 0) close(h->backing_fd);
    if (h->hdr) munmap(h->hdr, h->mmap_size);
    free(h->path);
    free(h);
}

/* NOT concurrency-safe — use drain() for concurrent scenarios */
static void stk_clear(StkHandle *h) {
    __atomic_store_n(&h->hdr->top, 0, __ATOMIC_RELEASE);
    memset(h->ctl, 0, (size_t)h->hdr->capacity * sizeof(uint64_t));
    /* clear() frees the entire stack at once — wake all waiters. */
    if (__atomic_load_n(&h->hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&h->hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &h->hdr->push_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
    }
    if (__atomic_load_n(&h->hdr->waiters_pop, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&h->hdr->pop_wake_seq, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &h->hdr->pop_wake_seq, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
    }
}

/* Concurrency-safe drain: atomically swap top to 0, then release each
 * drained slot through the state machine. Returns count drained.
 *
 * Crash-recovery: a pusher that won the top CAS but died (SIGKILL/segfault)
 * between stk_slot_claim_write and stk_slot_publish leaves the slot stuck
 * in WRITING. Plain stk_slot_claim_read would spin forever. We bound the
 * wait at ~2s per slot; on timeout we CAS WRITING -> EMPTY (gen bumped) so
 * the slot is reclaimed.
 *
 * Limitation: slot ctl encodes only (gen << 2) | state — no PID — so we
 * cannot distinguish a crashed pusher from a merely slow one. A live pusher
 * stalled > 2s would be falsely reclaimed; its subsequent publish is a CAS
 * (see stk_slot_publish) so it observes the gen bump and silently no-ops
 * rather than resurrecting a phantom FILLED slot. The pusher's value is
 * dropped — equivalent to a crashed pusher. In practice the gap between
 * claim_write and publish is sub-microsecond memcpy time, so the false-
 * positive threshold is many orders of magnitude away from normal latency. */
static inline uint32_t stk_drain(StkHandle *h) {
    StkHeader *hdr = h->hdr;
    uint32_t t = __atomic_exchange_n(&hdr->top, 0, __ATOMIC_ACQ_REL);
    if (t == 0) return 0;
    /* Wall-clock deadline for the per-slot wait. We hot-spin first, then
     * fall back to short sleeps to avoid burning a core for 2s on a stuck
     * slot. The deadline is checked periodically (every 64 iterations) to
     * keep the steady-state cost ~zero. */
    for (uint32_t i = 0; i < t; i++) {
        struct timespec dl;
        int dl_set = 0;
        uint32_t spins = 0;
        for (;;) {
            uint64_t c = __atomic_load_n(&h->ctl[i], __ATOMIC_ACQUIRE);
            uint32_t st = STK_SLOT_STATE(c);
            if (st == STK_SLOT_FILLED) {
                uint64_t nc = (STK_SLOT_GEN(c) << 2) | STK_SLOT_READING;
                if (__atomic_compare_exchange_n(&h->ctl[i], &c, nc,
                        0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
                    stk_slot_release(&h->ctl[i], STK_SLOT_GEN(c));
                    break;
                }
                continue;
            }
            stk_spin_pause();
            if ((++spins & 0x3F) == 0) {
                if (!dl_set) { stk_make_deadline(2.0, &dl); dl_set = 1; }
                struct timespec rem;
                if (!stk_remaining(&dl, &rem)) {
                    /* Treat as abandoned (crashed writer/reader): force the
                     * slot back to EMPTY with gen bumped. If CAS succeeds we
                     * skipped the slot; if it fails, the writer just published
                     * (or another recoverer fixed it) — loop and re-observe so
                     * a FILLED value is not leaked. */
                    uint64_t nc = ((STK_SLOT_GEN(c) + 1) << 2) | STK_SLOT_EMPTY;
                    if (__atomic_compare_exchange_n(&h->ctl[i], &c, nc,
                            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
                        break;
                    continue;
                }
                /* Short sleep to keep CPU usage low during the long wait. */
                struct timespec ts = { 0, 100000L }; /* 100us */
                nanosleep(&ts, NULL);
            }
        }
    }
    /* drain freed `t` slots at once — wake up to that many. */
    if (__atomic_load_n(&hdr->waiters_push, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->push_wake_seq, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->push_wake_seq, FUTEX_WAKE,
                t < INT_MAX ? (int)t : INT_MAX, NULL, NULL, 0);
    }
    return t;
}

/* eventfd */
static int stk_create_eventfd(StkHandle *h) {
    if (h->notify_fd >= 0) return h->notify_fd;
    int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
    if (efd < 0) return -1;
    h->notify_fd = efd;
    return efd;
}
static int stk_notify(StkHandle *h) {
    if (h->notify_fd < 0) return 0;
    uint64_t v = 1;
    return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);



( run in 2.115 seconds using v1.01-cache-2.11-cpan-71847e10f99 )