Data-SortedSet-Shared
view release on metacpan or search on metacpan
sortedset.h view on Meta::CPAN
_Static_assert(sizeof(SsHeader) == 256, "SsHeader must be 256 bytes");
/* ---- Process-local handle ---- */
typedef struct SsHandle {
SsHeader *hdr;
SsReaderSlot *reader_slots; /* SS_READER_SLOTS entries */
SsIdxSlot *index; /* member -> score */
SsNode *nodes; /* B+tree node pool */
size_t mmap_size;
char *path; /* backing file path (strdup'd) */
int notify_fd;
int backing_fd; /* memfd fd to close on destroy, -1 otherwise */
uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
uint32_t cached_pid; /* getpid() cached at last slot claim */
uint32_t cached_fork_gen; /* ss_fork_gen value at last slot claim */
} SsHandle;
/* ================================================================
* Helpers
* ================================================================ */
static inline uint32_t ss_next_pow2(uint32_t v) {
if (v < 2) return 1;
return 1u << (32 - __builtin_clz(v - 1));
}
/* member hash: splitmix64 finalizer (good avalanche for int64 keys) */
static inline uint64_t ss_hash_member(int64_t m) {
uint64_t x = (uint64_t)m + 0x9E3779B97F4A7C15ULL;
x = (x ^ (x >> 30)) * 0xBF58476D1CE4E5B9ULL;
x = (x ^ (x >> 27)) * 0x94D049BB133111EBULL;
return x ^ (x >> 31);
}
/* total order on (score, member): -1 / 0 / +1 */
static inline int ss_key_cmp(double sa, int64_t ma, double sb, int64_t mb) {
if (sa < sb) return -1;
if (sa > sb) return 1;
return (ma < mb) ? -1 : (ma > mb) ? 1 : 0;
}
/* ================================================================
* Futex-based write-preferring read-write lock
* with reader-slot dead-process recovery
* ================================================================ */
#define SS_RWLOCK_SPIN_LIMIT 32
#define SS_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
static inline void ss_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define SS_RWLOCK_WRITER_BIT 0x80000000U
#define SS_RWLOCK_PID_MASK 0x7FFFFFFFU
#define SS_RWLOCK_WR(pid) (SS_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SS_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Robust detection would require
* a per-slot process-start-time epoch (a header-layout/version change).
* Documented under "Crash Safety" in the POD. */
static inline int ss_pid_alive(uint32_t pid) {
if (pid == 0) return 1; /* no owner recorded, assume alive */
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Force-recover a stale write lock left by a dead process.
* CAS to OUR pid to hold the lock while fixing shared state, then release.
* Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
* recovering process can detect and re-recover if we crash mid-recovery. */
static inline void ss_recover_stale_lock(SsHandle *h, uint32_t observed_rwlock) {
SsHeader *hdr = h->hdr;
uint32_t mypid = SS_RWLOCK_WR((uint32_t)getpid());
if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
/* We now hold the write lock as mypid. No additional shared state needs
* repair here (this module has no seqlock); just release the lock. */
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static const struct timespec ss_lock_timeout = { SS_LOCK_TIMEOUT_SEC, 0 };
/* Process-global fork-generation counter. Incremented in the pthread_atfork
* child callback so every open handle detects a fork transition on the next
* lock call without paying a getpid() syscall on the hot path. */
static uint32_t ss_fork_gen = 1;
static pthread_once_t ss_atfork_once = PTHREAD_ONCE_INIT;
static void ss_on_fork_child(void) {
__atomic_add_fetch(&ss_fork_gen, 1, __ATOMIC_RELAXED);
}
static void ss_atfork_init(void) {
pthread_atfork(NULL, NULL, ss_on_fork_child);
}
/* Ensure this process owns a reader slot. Called from the lock helpers so
* that fork()'d children pick up their own slot lazily instead of sharing
* the parent's. Hot-path is a single relaxed load + compare; only on a
* fork-generation mismatch do we touch getpid() and scan slots. */
static inline void ss_claim_reader_slot(SsHandle *h) {
uint32_t cur_gen = __atomic_load_n(&ss_fork_gen, __ATOMIC_RELAXED);
if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
return;
/* Cold path -- register the atfork hook once per process, then claim. */
pthread_once(&ss_atfork_once, ss_atfork_init);
/* Re-read after pthread_once: ss_on_fork_child may have bumped it. */
cur_gen = __atomic_load_n(&ss_fork_gen, __ATOMIC_RELAXED);
uint32_t now_pid = (uint32_t)getpid();
sortedset.h view on Meta::CPAN
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void ss_unpark_reader(SsHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void ss_park_writer(SsHandle *h) {
if (h->my_slot_idx != UINT32_MAX) {
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void ss_unpark_writer(SsHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX) {
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
}
static inline void ss_rwlock_rdlock(SsHandle *h) {
ss_claim_reader_slot(h);
SsHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
/* Claim subcount BEFORE bumping the shared rwlock counter. This way
* a concurrent writer-side recovery scan that sees our PID alive with
* subcount > 0 will (correctly) defer force-reset, even while we are
* still spinning trying to win the rwlock CAS. Without this, a reader
* killed between rwlock CAS-success and subcount++ would let recovery
* force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
* our eventual rdunlock dec. */
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Write-preferring: when lock is free (cur==0) and writers are
* waiting, yield to let the writer acquire. When readers are
* already active (cur>=1), new readers may join freely. */
if (cur > 0 && cur < SS_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < SS_RWLOCK_SPIN_LIMIT, 1)) {
ss_rwlock_spin_pause();
continue;
}
ss_park_reader(h);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR when yielding to waiting writers */
if (cur >= SS_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&ss_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
ss_unpark_reader(h);
ss_recover_after_timeout(h);
spin = 0;
continue;
}
}
ss_unpark_reader(h);
spin = 0;
}
}
static inline void ss_rwlock_rdunlock(SsHandle *h) {
SsHeader *hdr = h->hdr;
/* Release the shared counter BEFORE dropping our subcount so that
* "any live PID with subcount > 0" is a reliable in-flight indicator
* for the writer-side recovery scan. Inverting these would create a
* window where we still own a unit of rwlock but our slot subcount is
* 0, letting recovery force-reset rwlock underneath us. */
uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void ss_rwlock_wrlock(SsHandle *h) {
ss_claim_reader_slot(h); /* refresh cached_pid across fork */
SsHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
/* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
* any crash window between acquiring the lock and storing the owner. */
uint32_t mypid = SS_RWLOCK_WR(h->cached_pid);
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < SS_RWLOCK_SPIN_LIMIT, 1)) {
ss_rwlock_spin_pause();
continue;
}
ss_park_writer(h);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&ss_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
ss_unpark_writer(h);
ss_recover_after_timeout(h);
spin = 0;
continue;
}
}
ss_unpark_writer(h);
spin = 0;
}
sortedset.h view on Meta::CPAN
ss_init_header(base, max_entries, index_slots, node_capacity, total);
return ss_setup(base, (size_t)total, NULL, fd);
}
static SsHandle *ss_open_fd(int fd, char *errbuf) {
if (errbuf) errbuf[0] = '\0';
struct stat st;
if (fstat(fd, &st) < 0) { SS_ERR("fstat: %s", strerror(errno)); return NULL; }
if ((uint64_t)st.st_size < sizeof(SsHeader)) { SS_ERR("too small"); return NULL; }
size_t ms = (size_t)st.st_size;
void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) { SS_ERR("mmap: %s", strerror(errno)); return NULL; }
if (!ss_validate_header((SsHeader *)base, (uint64_t)st.st_size)) {
SS_ERR("invalid sorted-set"); munmap(base, ms); return NULL;
}
int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
if (myfd < 0) { SS_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
return ss_setup(base, ms, NULL, myfd);
}
static void ss_destroy(SsHandle *h) {
if (!h) return;
if (h->notify_fd >= 0) close(h->notify_fd);
if (h->backing_fd >= 0) close(h->backing_fd);
if (h->hdr) munmap(h->hdr, h->mmap_size);
free(h->path);
free(h);
}
static inline int ss_msync(SsHandle *h) {
if (!h || !h->hdr) return 0;
return msync(h->hdr, h->mmap_size, MS_SYNC);
}
static int ss_create_eventfd(SsHandle *h) {
if (h->notify_fd >= 0) return h->notify_fd;
int efd = eventfd(0, EFD_NONBLOCK|EFD_CLOEXEC);
if (efd < 0) return -1;
h->notify_fd = efd;
return efd;
}
static int ss_notify(SsHandle *h) {
if (h->notify_fd < 0) return 0;
uint64_t v = 1;
return write(h->notify_fd, &v, sizeof(v)) == sizeof(v);
}
static int64_t ss_eventfd_consume(SsHandle *h) {
if (h->notify_fd < 0) return -1;
uint64_t v = 0;
if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
return (int64_t)v;
}
/* ================================================================
* Sorted set: node pool, member index, B+tree (callers hold the lock)
* ================================================================ */
/* reset to the empty set (caller holds the write lock) */
static inline void ss_clear_locked(SsHandle *h) {
SsHeader *hdr = h->hdr;
hdr->count = 0;
hdr->root = SS_NONE;
hdr->height = 0;
hdr->leftmost = SS_NONE;
hdr->rightmost = SS_NONE;
for (uint32_t i = 0; i < hdr->node_capacity; i++)
h->nodes[i].parent = (i + 1 < hdr->node_capacity) ? (i + 1) : SS_NONE;
hdr->node_free_head = 0;
memset(h->index, 0, (size_t)hdr->index_slots * sizeof(SsIdxSlot));
}
/* ---- node pool ---- */
static inline uint32_t ss_node_alloc(SsHandle *h) {
uint32_t idx = h->hdr->node_free_head;
if (idx == SS_NONE) return SS_NONE;
h->hdr->node_free_head = h->nodes[idx].parent;
return idx;
}
static inline void ss_node_free(SsHandle *h, uint32_t idx) {
h->nodes[idx].parent = h->hdr->node_free_head;
h->hdr->node_free_head = idx;
}
/* ---- member -> score index (open addressing, linear probe) ---- */
/* returns the slot of `member` (if present) or the first empty slot for it;
*found (optional, may be NULL) says which */
static inline uint32_t ss_idx_find(SsHandle *h, int64_t member, int *found) {
uint32_t mask = h->hdr->index_slots - 1;
uint32_t i = (uint32_t)(ss_hash_member(member) & mask);
while (h->index[i].state) {
if (h->index[i].member == member) { if (found) *found = 1; return i; }
i = (i + 1) & mask;
}
if (found) *found = 0;
return i;
}
static inline int ss_idx_get(SsHandle *h, int64_t member, double *score) {
int f; uint32_t i = ss_idx_find(h, member, &f);
if (f) { *score = h->index[i].score; return 1; }
return 0;
}
static inline void ss_idx_set(SsHandle *h, int64_t member, double score) {
uint32_t i = ss_idx_find(h, member, NULL);
h->index[i].member = member;
h->index[i].score = score;
h->index[i].state = 1;
}
/* ---- B+tree ---- */
static inline uint32_t ss_node_total(const SsNode *nd) {
if (nd->is_leaf) return nd->num;
uint32_t t = 0;
for (int i = 0; i < nd->num; i++) t += nd->counts[i];
return t;
}
/* child index in an internal node whose subtree would hold (score,member) */
static inline int ss_child_index(const SsNode *nd, double score, int64_t member) {
int lo = 0, hi = nd->num - 1;
sortedset.h view on Meta::CPAN
if (cn->is_leaf) {
cn->scores[cn->num] = rn->scores[0]; cn->members[cn->num] = rn->members[0]; cn->num++;
for (int i = 0; i+1 < rn->num; i++) { rn->scores[i] = rn->scores[i+1]; rn->members[i] = rn->members[i+1]; }
rn->num--;
p->scores[c] = rn->scores[0]; p->members[c] = rn->members[0];
p->counts[c]++; p->counts[c+1]--;
} else {
uint32_t moved = rn->counts[0];
cn->scores[cn->num-1] = p->scores[c]; cn->members[cn->num-1] = p->members[c];
cn->children[cn->num] = rn->children[0]; cn->counts[cn->num] = moved;
h->nodes[cn->children[cn->num]].parent = cidx;
cn->num++;
p->scores[c] = rn->scores[0]; p->members[c] = rn->members[0];
for (int i = 0; i+1 < rn->num; i++) { rn->children[i] = rn->children[i+1]; rn->counts[i] = rn->counts[i+1]; }
for (int i = 0; i+1 < rn->num-1; i++) { rn->scores[i] = rn->scores[i+1]; rn->members[i] = rn->members[i+1]; }
rn->num--;
p->counts[c] += moved; p->counts[c+1] -= moved;
}
return;
}
}
ss_merge(h, pidx, (c > 0) ? c - 1 : c); /* merge with a sibling */
}
/* delete (score,member) from subtree nidx (known present); decrement counts;
return 1 if nidx now underflows (num < SS_MIN) */
static int ss_delete_rec(SsHandle *h, uint32_t nidx, double score, int64_t member) {
SsNode *nd = &h->nodes[nidx];
if (nd->is_leaf) {
int pos = 0;
while (pos < nd->num && ss_key_cmp(nd->scores[pos], nd->members[pos], score, member) != 0) pos++;
for (int i = pos; i+1 < nd->num; i++) { nd->scores[i] = nd->scores[i+1]; nd->members[i] = nd->members[i+1]; }
nd->num--;
return nd->num < SS_MIN;
}
int c = ss_child_index(nd, score, member);
int under = ss_delete_rec(h, nd->children[c], score, member);
nd->counts[c]--;
if (under) ss_fix_underflow(h, nidx, c);
return nd->num < SS_MIN;
}
static void ss_tree_del(SsHandle *h, double score, int64_t member) {
SsHeader *hdr = h->hdr;
ss_delete_rec(h, hdr->root, score, member);
SsNode *root = &h->nodes[hdr->root];
if (root->is_leaf) {
if (root->num == 0) {
ss_node_free(h, hdr->root);
hdr->root = SS_NONE; hdr->leftmost = SS_NONE; hdr->rightmost = SS_NONE; hdr->height = 0;
}
} else if (root->num == 1) {
uint32_t child = root->children[0];
ss_node_free(h, hdr->root);
hdr->root = child; h->nodes[child].parent = SS_NONE; hdr->height--;
}
hdr->count--;
}
/* add: 1 (new), 0 (existing -- score updated if changed), -1 (full) */
static int ss_add_locked(SsHandle *h, int64_t member, double score) {
double old;
if (ss_idx_get(h, member, &old)) {
if (old != score) { ss_tree_del(h, old, member); ss_tree_add(h, score, member); ss_idx_set(h, member, score); }
return 0;
}
if (h->hdr->count >= h->hdr->max_entries) return -1;
ss_tree_add(h, score, member);
ss_idx_set(h, member, score);
return 1;
}
/* remove: 1 if removed, 0 if absent */
static int ss_remove_locked(SsHandle *h, int64_t member) {
double old;
if (!ss_idx_get(h, member, &old)) return 0;
ss_tree_del(h, old, member);
ss_idx_del(h, member);
return 1;
}
/* incr by delta. *out = new score. returns 1 (created), 0 (updated), -1 (full),
-2 (result is NaN) */
static int ss_incr_locked(SsHandle *h, int64_t member, double delta, double *out) {
double old;
if (ss_idx_get(h, member, &old)) {
double ns = old + delta; *out = ns;
if (ns != ns) return -2;
if (ns != old) { ss_tree_del(h, old, member); ss_tree_add(h, ns, member); ss_idx_set(h, member, ns); }
return 0;
}
if (h->hdr->count >= h->hdr->max_entries) return -1;
*out = delta;
if (delta != delta) return -2;
ss_tree_add(h, delta, member); ss_idx_set(h, member, delta);
return 1;
}
/* pop the min (max=0) or max (max=1): 0 if empty, else 1 with *m,*s */
static int ss_pop_locked(SsHandle *h, int max, int64_t *m, double *s) {
if (h->hdr->root == SS_NONE) return 0;
SsNode *nd = &h->nodes[max ? h->hdr->rightmost : h->hdr->leftmost];
int pos = max ? nd->num - 1 : 0;
*m = nd->members[pos]; *s = nd->scores[pos];
ss_tree_del(h, *s, *m);
ss_idx_del(h, *m);
return 1;
}
/* ---- structural validator (debug / tests) ---- */
static long ss_check_rec(SsHandle *h, uint32_t nidx, int depth, int is_root,
double *ps, int64_t *pm, int *hp, int *leaf_depth) {
SsNode *nd = &h->nodes[nidx];
if (nd->num < 1 || nd->num > SS_ORDER) return -1;
if (!is_root && nd->num < SS_MIN) return -1;
if (nd->is_leaf) {
if (*leaf_depth < 0) *leaf_depth = depth;
else if (*leaf_depth != depth) return -1;
for (int i = 0; i < nd->num; i++) {
if (*hp && ss_key_cmp(*ps, *pm, nd->scores[i], nd->members[i]) >= 0) return -1;
*ps = nd->scores[i]; *pm = nd->members[i]; *hp = 1;
}
return nd->num;
}
long total = 0;
for (int i = 0; i < nd->num; i++) {
long c = ss_check_rec(h, nd->children[i], depth + 1, 0, ps, pm, hp, leaf_depth);
if (c < 0 || (uint32_t)c != nd->counts[i]) return -1;
total += c;
}
return total;
}
static int ss_validate_tree(SsHandle *h) {
SsHeader *hdr = h->hdr;
if (hdr->root == SS_NONE) return hdr->count == 0 && hdr->leftmost == SS_NONE;
double ps = 0; int64_t pm = 0; int hp = 0, ld = -1;
long total = ss_check_rec(h, hdr->root, 0, 1, &ps, &pm, &hp, &ld);
if (total < 0 || (uint32_t)total != hdr->count) return 0;
/* leaf links == in-order, and reach rightmost */
uint32_t leaf = hdr->leftmost; double ls = 0; int64_t lm = 0; int lh = 0; uint32_t seen = 0;
while (leaf != SS_NONE) {
SsNode *nd = &h->nodes[leaf];
if (!nd->is_leaf) return 0;
for (int i = 0; i < nd->num; i++) {
if (lh && ss_key_cmp(ls, lm, nd->scores[i], nd->members[i]) >= 0) return 0;
ls = nd->scores[i]; lm = nd->members[i]; lh = 1; seen++;
}
if (nd->next == SS_NONE && leaf != hdr->rightmost) return 0;
leaf = nd->next;
}
if (seen != hdr->count) return 0;
/* index population == count */
uint32_t icount = 0;
for (uint32_t i = 0; i < hdr->index_slots; i++) if (h->index[i].state) icount++;
return icount == hdr->count;
}
/* ---- order-statistics queries (read paths; caller holds the read lock) ---- */
/* number of entries strictly less than (score, member) -- the rank of the key */
( run in 0.354 second using v1.01-cache-2.11-cpan-bbe5e583499 )