Data-SortedSet-Shared

 view release on metacpan or  search on metacpan

sortedset.h  view on Meta::CPAN

_Static_assert(sizeof(SsHeader) == 256, "SsHeader must be 256 bytes");

/* ---- Process-local handle ---- */

typedef struct SsHandle {
    SsHeader     *hdr;
    SsReaderSlot *reader_slots;  /* SS_READER_SLOTS entries */
    SsIdxSlot    *index;         /* member -> score */
    SsNode       *nodes;         /* B+tree node pool */
    size_t        mmap_size;
    char         *path;          /* backing file path (strdup'd) */
    int           notify_fd;
    int           backing_fd;    /* memfd fd to close on destroy, -1 otherwise */
    uint32_t      my_slot_idx;   /* UINT32_MAX if all slots taken (no recovery for this handle) */
    uint32_t      cached_pid;    /* getpid() cached at last slot claim */
    uint32_t      cached_fork_gen; /* ss_fork_gen value at last slot claim */
} SsHandle;

/* ================================================================
 * Helpers
 * ================================================================ */

static inline uint32_t ss_next_pow2(uint32_t v) {
    if (v < 2) return 1;
    return 1u << (32 - __builtin_clz(v - 1));
}

/* member hash: splitmix64 finalizer (good avalanche for int64 keys) */
static inline uint64_t ss_hash_member(int64_t m) {
    uint64_t x = (uint64_t)m + 0x9E3779B97F4A7C15ULL;
    x = (x ^ (x >> 30)) * 0xBF58476D1CE4E5B9ULL;
    x = (x ^ (x >> 27)) * 0x94D049BB133111EBULL;
    return x ^ (x >> 31);
}

/* total order on (score, member): -1 / 0 / +1 */
static inline int ss_key_cmp(double sa, int64_t ma, double sb, int64_t mb) {
    if (sa < sb) return -1;
    if (sa > sb) return  1;
    return (ma < mb) ? -1 : (ma > mb) ? 1 : 0;
}

/* ================================================================
 * Futex-based write-preferring read-write lock
 * with reader-slot dead-process recovery
 * ================================================================ */

#define SS_RWLOCK_SPIN_LIMIT 32
#define SS_LOCK_TIMEOUT_SEC  2  /* FUTEX_WAIT timeout for stale lock detection */

static inline void ss_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define SS_RWLOCK_WRITER_BIT 0x80000000U
#define SS_RWLOCK_PID_MASK   0x7FFFFFFFU
#define SS_RWLOCK_WR(pid)    (SS_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SS_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/version change).
 * Documented under "Crash Safety" in the POD. */
static inline int ss_pid_alive(uint32_t pid) {
    if (pid == 0) return 1; /* no owner recorded, assume alive */
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* Force-recover a stale write lock left by a dead process.
 * CAS to OUR pid to hold the lock while fixing shared state, then release.
 * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
 * recovering process can detect and re-recover if we crash mid-recovery. */
static inline void ss_recover_stale_lock(SsHandle *h, uint32_t observed_rwlock) {
    SsHeader *hdr = h->hdr;
    uint32_t mypid = SS_RWLOCK_WR((uint32_t)getpid());
    if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
            mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
        return;
    /* We now hold the write lock as mypid.  No additional shared state needs
     * repair here (this module has no seqlock); just release the lock. */
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static const struct timespec ss_lock_timeout = { SS_LOCK_TIMEOUT_SEC, 0 };

/* Process-global fork-generation counter.  Incremented in the pthread_atfork
 * child callback so every open handle detects a fork transition on the next
 * lock call without paying a getpid() syscall on the hot path. */
static uint32_t ss_fork_gen = 1;
static pthread_once_t ss_atfork_once = PTHREAD_ONCE_INIT;
static void ss_on_fork_child(void) {
    __atomic_add_fetch(&ss_fork_gen, 1, __ATOMIC_RELAXED);
}
static void ss_atfork_init(void) {
    pthread_atfork(NULL, NULL, ss_on_fork_child);
}

/* Ensure this process owns a reader slot.  Called from the lock helpers so
 * that fork()'d children pick up their own slot lazily instead of sharing
 * the parent's.  Hot-path is a single relaxed load + compare; only on a
 * fork-generation mismatch do we touch getpid() and scan slots. */
static inline void ss_claim_reader_slot(SsHandle *h) {
    uint32_t cur_gen = __atomic_load_n(&ss_fork_gen, __ATOMIC_RELAXED);
    if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
        return;
    /* Cold path -- register the atfork hook once per process, then claim. */
    pthread_once(&ss_atfork_once, ss_atfork_init);
    /* Re-read after pthread_once: ss_on_fork_child may have bumped it. */
    cur_gen = __atomic_load_n(&ss_fork_gen, __ATOMIC_RELAXED);
    uint32_t now_pid = (uint32_t)getpid();

sortedset.h  view on Meta::CPAN

    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void ss_unpark_reader(SsHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void ss_park_writer(SsHandle *h) {
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void ss_unpark_writer(SsHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
}

static inline void ss_rwlock_rdlock(SsHandle *h) {
    ss_claim_reader_slot(h);
    SsHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
    /* Claim subcount BEFORE bumping the shared rwlock counter.  This way
     * a concurrent writer-side recovery scan that sees our PID alive with
     * subcount > 0 will (correctly) defer force-reset, even while we are
     * still spinning trying to win the rwlock CAS.  Without this, a reader
     * killed between rwlock CAS-success and subcount++ would let recovery
     * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
     * our eventual rdunlock dec. */
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    for (int spin = 0; ; spin++) {
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Write-preferring: when lock is free (cur==0) and writers are
         * waiting, yield to let the writer acquire. When readers are
         * already active (cur>=1), new readers may join freely. */
        if (cur > 0 && cur < SS_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < SS_RWLOCK_SPIN_LIMIT, 1)) {
            ss_rwlock_spin_pause();
            continue;
        }
        ss_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= SS_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &ss_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                ss_unpark_reader(h);
                ss_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        ss_unpark_reader(h);
        spin = 0;
    }
}

static inline void ss_rwlock_rdunlock(SsHandle *h) {
    SsHeader *hdr = h->hdr;
    /* Release the shared counter BEFORE dropping our subcount so that
     * "any live PID with subcount > 0" is a reliable in-flight indicator
     * for the writer-side recovery scan.  Inverting these would create a
     * window where we still own a unit of rwlock but our slot subcount is
     * 0, letting recovery force-reset rwlock underneath us. */
    uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static inline void ss_rwlock_wrlock(SsHandle *h) {
    ss_claim_reader_slot(h);  /* refresh cached_pid across fork */
    SsHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
     * any crash window between acquiring the lock and storing the owner. */
    uint32_t mypid = SS_RWLOCK_WR(h->cached_pid);
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(lock, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < SS_RWLOCK_SPIN_LIMIT, 1)) {
            ss_rwlock_spin_pause();
            continue;
        }
        ss_park_writer(h);
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &ss_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                ss_unpark_writer(h);
                ss_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        ss_unpark_writer(h);
        spin = 0;
    }

sortedset.h  view on Meta::CPAN

    ss_init_header(base, max_entries, index_slots, node_capacity, total);
    return ss_setup(base, (size_t)total, NULL, fd);
}

static SsHandle *ss_open_fd(int fd, char *errbuf) {
    if (errbuf) errbuf[0] = '\0';
    struct stat st;
    if (fstat(fd, &st) < 0) { SS_ERR("fstat: %s", strerror(errno)); return NULL; }
    if ((uint64_t)st.st_size < sizeof(SsHeader)) { SS_ERR("too small"); return NULL; }
    size_t ms = (size_t)st.st_size;
    void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
    if (base == MAP_FAILED) { SS_ERR("mmap: %s", strerror(errno)); return NULL; }
    if (!ss_validate_header((SsHeader *)base, (uint64_t)st.st_size)) {
        SS_ERR("invalid sorted-set"); munmap(base, ms); return NULL;
    }
    int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
    if (myfd < 0) { SS_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    return ss_setup(base, ms, NULL, myfd);
}

static void ss_destroy(SsHandle *h) {
    if (!h) return;
    if (h->notify_fd >= 0) close(h->notify_fd);
    if (h->backing_fd >= 0) close(h->backing_fd);
    if (h->hdr) munmap(h->hdr, h->mmap_size);
    free(h->path);
    free(h);
}

static inline int ss_msync(SsHandle *h) {
    if (!h || !h->hdr) return 0;
    return msync(h->hdr, h->mmap_size, MS_SYNC);
}

static int ss_create_eventfd(SsHandle *h) {
    if (h->notify_fd >= 0) return h->notify_fd;
    int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
    if (efd < 0) return -1;
    h->notify_fd = efd;
    return efd;
}

static int ss_notify(SsHandle *h) {
    if (h->notify_fd < 0) return 0;
    uint64_t v = 1;
    return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
}

static int64_t ss_eventfd_consume(SsHandle *h) {
    if (h->notify_fd < 0) return -1;
    uint64_t v = 0;
    if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
    return (int64_t)v;
}

/* ================================================================
 * Sorted set: node pool, member index, B+tree (callers hold the lock)
 * ================================================================ */

/* reset to the empty set (caller holds the write lock) */
static inline void ss_clear_locked(SsHandle *h) {
    SsHeader *hdr = h->hdr;
    hdr->count     = 0;
    hdr->root      = SS_NONE;
    hdr->height    = 0;
    hdr->leftmost  = SS_NONE;
    hdr->rightmost = SS_NONE;
    for (uint32_t i = 0; i < hdr->node_capacity; i++)
        h->nodes[i].parent = (i + 1 < hdr->node_capacity) ? (i + 1) : SS_NONE;
    hdr->node_free_head = 0;
    memset(h->index, 0, (size_t)hdr->index_slots * sizeof(SsIdxSlot));
}

/* ---- node pool ---- */
static inline uint32_t ss_node_alloc(SsHandle *h) {
    uint32_t idx = h->hdr->node_free_head;
    if (idx == SS_NONE) return SS_NONE;
    h->hdr->node_free_head = h->nodes[idx].parent;
    return idx;
}
static inline void ss_node_free(SsHandle *h, uint32_t idx) {
    h->nodes[idx].parent = h->hdr->node_free_head;
    h->hdr->node_free_head = idx;
}

/* ---- member -> score index (open addressing, linear probe) ---- */
/* returns the slot of `member` (if present) or the first empty slot for it;
   *found (optional, may be NULL) says which */
static inline uint32_t ss_idx_find(SsHandle *h, int64_t member, int *found) {
    uint32_t mask = h->hdr->index_slots - 1;
    uint32_t i = (uint32_t)(ss_hash_member(member) & mask);
    while (h->index[i].state) {
        if (h->index[i].member == member) { if (found) *found = 1; return i; }
        i = (i + 1) & mask;
    }
    if (found) *found = 0;
    return i;
}
static inline int ss_idx_get(SsHandle *h, int64_t member, double *score) {
    int f; uint32_t i = ss_idx_find(h, member, &f);
    if (f) { *score = h->index[i].score; return 1; }
    return 0;
}
static inline void ss_idx_set(SsHandle *h, int64_t member, double score) {
    uint32_t i = ss_idx_find(h, member, NULL);
    h->index[i].member = member;
    h->index[i].score  = score;
    h->index[i].state  = 1;
}

/* ---- B+tree ---- */
static inline uint32_t ss_node_total(const SsNode *nd) {
    if (nd->is_leaf) return nd->num;
    uint32_t t = 0;
    for (int i = 0; i < nd->num; i++) t += nd->counts[i];
    return t;
}

/* child index in an internal node whose subtree would hold (score,member) */
static inline int ss_child_index(const SsNode *nd, double score, int64_t member) {
    int lo = 0, hi = nd->num - 1;

sortedset.h  view on Meta::CPAN

            if (cn->is_leaf) {
                cn->scores[cn->num] = rn->scores[0]; cn->members[cn->num] = rn->members[0]; cn->num++;
                for (int i = 0; i+1 < rn->num; i++) { rn->scores[i] = rn->scores[i+1]; rn->members[i] = rn->members[i+1]; }
                rn->num--;
                p->scores[c] = rn->scores[0]; p->members[c] = rn->members[0];
                p->counts[c]++; p->counts[c+1]--;
            } else {
                uint32_t moved = rn->counts[0];
                cn->scores[cn->num-1] = p->scores[c]; cn->members[cn->num-1] = p->members[c];
                cn->children[cn->num] = rn->children[0]; cn->counts[cn->num] = moved;
                h->nodes[cn->children[cn->num]].parent = cidx;
                cn->num++;
                p->scores[c] = rn->scores[0]; p->members[c] = rn->members[0];
                for (int i = 0; i+1 < rn->num; i++) { rn->children[i] = rn->children[i+1]; rn->counts[i] = rn->counts[i+1]; }
                for (int i = 0; i+1 < rn->num-1; i++) { rn->scores[i] = rn->scores[i+1]; rn->members[i] = rn->members[i+1]; }
                rn->num--;
                p->counts[c] += moved; p->counts[c+1] -= moved;
            }
            return;
        }
    }
    ss_merge(h, pidx, (c > 0) ? c - 1 : c);           /* merge with a sibling */
}

/* delete (score,member) from subtree nidx (known present); decrement counts;
   return 1 if nidx now underflows (num < SS_MIN) */
static int ss_delete_rec(SsHandle *h, uint32_t nidx, double score, int64_t member) {
    SsNode *nd = &h->nodes[nidx];
    if (nd->is_leaf) {
        int pos = 0;
        while (pos < nd->num && ss_key_cmp(nd->scores[pos], nd->members[pos], score, member) != 0) pos++;
        for (int i = pos; i+1 < nd->num; i++) { nd->scores[i] = nd->scores[i+1]; nd->members[i] = nd->members[i+1]; }
        nd->num--;
        return nd->num < SS_MIN;
    }
    int c = ss_child_index(nd, score, member);
    int under = ss_delete_rec(h, nd->children[c], score, member);
    nd->counts[c]--;
    if (under) ss_fix_underflow(h, nidx, c);
    return nd->num < SS_MIN;
}

static void ss_tree_del(SsHandle *h, double score, int64_t member) {
    SsHeader *hdr = h->hdr;
    ss_delete_rec(h, hdr->root, score, member);
    SsNode *root = &h->nodes[hdr->root];
    if (root->is_leaf) {
        if (root->num == 0) {
            ss_node_free(h, hdr->root);
            hdr->root = SS_NONE; hdr->leftmost = SS_NONE; hdr->rightmost = SS_NONE; hdr->height = 0;
        }
    } else if (root->num == 1) {
        uint32_t child = root->children[0];
        ss_node_free(h, hdr->root);
        hdr->root = child; h->nodes[child].parent = SS_NONE; hdr->height--;
    }
    hdr->count--;
}

/* add: 1 (new), 0 (existing -- score updated if changed), -1 (full) */
static int ss_add_locked(SsHandle *h, int64_t member, double score) {
    double old;
    if (ss_idx_get(h, member, &old)) {
        if (old != score) { ss_tree_del(h, old, member); ss_tree_add(h, score, member); ss_idx_set(h, member, score); }
        return 0;
    }
    if (h->hdr->count >= h->hdr->max_entries) return -1;
    ss_tree_add(h, score, member);
    ss_idx_set(h, member, score);
    return 1;
}

/* remove: 1 if removed, 0 if absent */
static int ss_remove_locked(SsHandle *h, int64_t member) {
    double old;
    if (!ss_idx_get(h, member, &old)) return 0;
    ss_tree_del(h, old, member);
    ss_idx_del(h, member);
    return 1;
}

/* incr by delta. *out = new score. returns 1 (created), 0 (updated), -1 (full),
   -2 (result is NaN) */
static int ss_incr_locked(SsHandle *h, int64_t member, double delta, double *out) {
    double old;
    if (ss_idx_get(h, member, &old)) {
        double ns = old + delta; *out = ns;
        if (ns != ns) return -2;
        if (ns != old) { ss_tree_del(h, old, member); ss_tree_add(h, ns, member); ss_idx_set(h, member, ns); }
        return 0;
    }
    if (h->hdr->count >= h->hdr->max_entries) return -1;
    *out = delta;
    if (delta != delta) return -2;
    ss_tree_add(h, delta, member); ss_idx_set(h, member, delta);
    return 1;
}

/* pop the min (max=0) or max (max=1): 0 if empty, else 1 with *m,*s */
static int ss_pop_locked(SsHandle *h, int max, int64_t *m, double *s) {
    if (h->hdr->root == SS_NONE) return 0;
    SsNode *nd = &h->nodes[max ? h->hdr->rightmost : h->hdr->leftmost];
    int pos = max ? nd->num - 1 : 0;
    *m = nd->members[pos]; *s = nd->scores[pos];
    ss_tree_del(h, *s, *m);
    ss_idx_del(h, *m);
    return 1;
}

/* ---- structural validator (debug / tests) ---- */
static long ss_check_rec(SsHandle *h, uint32_t nidx, int depth, int is_root,
                         double *ps, int64_t *pm, int *hp, int *leaf_depth) {
    SsNode *nd = &h->nodes[nidx];
    if (nd->num < 1 || nd->num > SS_ORDER) return -1;
    if (!is_root && nd->num < SS_MIN) return -1;
    if (nd->is_leaf) {
        if (*leaf_depth < 0) *leaf_depth = depth;
        else if (*leaf_depth != depth) return -1;
        for (int i = 0; i < nd->num; i++) {
            if (*hp && ss_key_cmp(*ps, *pm, nd->scores[i], nd->members[i]) >= 0) return -1;
            *ps = nd->scores[i]; *pm = nd->members[i]; *hp = 1;
        }
        return nd->num;
    }
    long total = 0;
    for (int i = 0; i < nd->num; i++) {
        long c = ss_check_rec(h, nd->children[i], depth + 1, 0, ps, pm, hp, leaf_depth);
        if (c < 0 || (uint32_t)c != nd->counts[i]) return -1;
        total += c;
    }
    return total;
}
static int ss_validate_tree(SsHandle *h) {
    SsHeader *hdr = h->hdr;
    if (hdr->root == SS_NONE) return hdr->count == 0 && hdr->leftmost == SS_NONE;
    double ps = 0; int64_t pm = 0; int hp = 0, ld = -1;
    long total = ss_check_rec(h, hdr->root, 0, 1, &ps, &pm, &hp, &ld);
    if (total < 0 || (uint32_t)total != hdr->count) return 0;
    /* leaf links == in-order, and reach rightmost */
    uint32_t leaf = hdr->leftmost; double ls = 0; int64_t lm = 0; int lh = 0; uint32_t seen = 0;
    while (leaf != SS_NONE) {
        SsNode *nd = &h->nodes[leaf];
        if (!nd->is_leaf) return 0;
        for (int i = 0; i < nd->num; i++) {
            if (lh && ss_key_cmp(ls, lm, nd->scores[i], nd->members[i]) >= 0) return 0;
            ls = nd->scores[i]; lm = nd->members[i]; lh = 1; seen++;
        }
        if (nd->next == SS_NONE && leaf != hdr->rightmost) return 0;
        leaf = nd->next;
    }
    if (seen != hdr->count) return 0;
    /* index population == count */
    uint32_t icount = 0;
    for (uint32_t i = 0; i < hdr->index_slots; i++) if (h->index[i].state) icount++;
    return icount == hdr->count;
}

/* ---- order-statistics queries (read paths; caller holds the read lock) ---- */

/* number of entries strictly less than (score, member) -- the rank of the key */



( run in 0.354 second using v1.01-cache-2.11-cpan-bbe5e583499 )