Data-HyperLogLog-Shared
view release on metacpan or search on metacpan
* drain. */
typedef struct {
uint32_t pid; /* 0 = unclaimed */
uint32_t subcount; /* in-flight rdlock acquisitions for this process */
uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters */
uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
} HllReaderSlot;
struct HllHeader {
uint32_t magic, version; /* 0,4 */
uint32_t precision; /* 8 register-index bit count */
uint32_t m; /* 12 register count (= 1 << precision) */
uint32_t _pad0; /* 16 */
uint32_t _pad1; /* 20 */
uint64_t total_size; /* 24 */
uint64_t reader_slots_off; /* 32 */
uint64_t regs_off; /* 40 */
uint32_t rwlock; /* 48 */
uint32_t rwlock_waiters; /* 52 */
uint32_t rwlock_writers_waiting; /* 56 */
uint32_t _pad2; /* 60 */
uint64_t stat_ops; /* 64 */
uint8_t _pad[184]; /* 72..255 */
};
typedef struct HllHeader HllHeader;
_Static_assert(sizeof(HllHeader) == 256, "HllHeader must be 256 bytes");
/* ---- Process-local handle ---- */
typedef struct HllHandle {
HllHeader *hdr;
HllReaderSlot *reader_slots; /* HLL_READER_SLOTS entries */
void *base; /* mmap base */
size_t mmap_size;
char *path; /* backing file path (strdup'd) */
int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
uint32_t cached_pid; /* getpid() cached at last slot claim */
uint32_t cached_fork_gen; /* hll_fork_gen value at last slot claim */
} HllHandle;
/* ================================================================
* Futex-based write-preferring read-write lock
* with reader-slot dead-process recovery
* ================================================================ */
#define HLL_RWLOCK_SPIN_LIMIT 32
#define HLL_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
static inline void hll_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define HLL_RWLOCK_WRITER_BIT 0x80000000U
#define HLL_RWLOCK_PID_MASK 0x7FFFFFFFU
#define HLL_RWLOCK_WR(pid) (HLL_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & HLL_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Robust detection would require
* a per-slot process-start-time epoch (a header-layout/version change).
* Documented under "Crash Safety" in the POD. */
static inline int hll_pid_alive(uint32_t pid) {
if (pid == 0) return 1; /* no owner recorded, assume alive */
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Force-recover a stale write lock left by a dead process.
* CAS to OUR pid to hold the lock while fixing shared state, then release.
* Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
* recovering process can detect and re-recover if we crash mid-recovery. */
static inline void hll_recover_stale_lock(HllHandle *h, uint32_t observed_rwlock) {
HllHeader *hdr = h->hdr;
uint32_t mypid = HLL_RWLOCK_WR((uint32_t)getpid());
if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
/* We now hold the write lock as mypid. No additional shared state needs
* repair here (this module has no seqlock); just release the lock. */
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static const struct timespec hll_lock_timeout = { HLL_LOCK_TIMEOUT_SEC, 0 };
/* Process-global fork-generation counter. Incremented in the pthread_atfork
* child callback so every open handle detects a fork transition on the next
* lock call without paying a getpid() syscall on the hot path. */
static uint32_t hll_fork_gen = 1;
static pthread_once_t hll_atfork_once = PTHREAD_ONCE_INIT;
static void hll_on_fork_child(void) {
__atomic_add_fetch(&hll_fork_gen, 1, __ATOMIC_RELAXED);
}
static void hll_atfork_init(void) {
pthread_atfork(NULL, NULL, hll_on_fork_child);
}
/* Ensure this process owns a reader slot. Called from the lock helpers so
* that fork()'d children pick up their own slot lazily instead of sharing
* the parent's. Hot-path is a single relaxed load + compare; only on a
* fork-generation mismatch do we touch getpid() and scan slots. */
static inline void hll_claim_reader_slot(HllHandle *h) {
uint32_t cur_gen = __atomic_load_n(&hll_fork_gen, __ATOMIC_RELAXED);
if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
return;
/* Cold path -- register the atfork hook once per process, then claim. */
pthread_once(&hll_atfork_once, hll_atfork_init);
/* Re-read after pthread_once: hll_on_fork_child may have bumped it. */
cur_gen = __atomic_load_n(&hll_fork_gen, __ATOMIC_RELAXED);
uint32_t now_pid = (uint32_t)getpid();
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void hll_unpark_reader(HllHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void hll_park_writer(HllHandle *h) {
if (h->my_slot_idx != UINT32_MAX) {
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void hll_unpark_writer(HllHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX) {
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
}
static inline void hll_rwlock_rdlock(HllHandle *h) {
hll_claim_reader_slot(h);
HllHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
/* Claim subcount BEFORE bumping the shared rwlock counter. This way
* a concurrent writer-side recovery scan that sees our PID alive with
* subcount > 0 will (correctly) defer force-reset, even while we are
* still spinning trying to win the rwlock CAS. Without this, a reader
* killed between rwlock CAS-success and subcount++ would let recovery
* force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
* our eventual rdunlock dec. */
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Write-preferring: when lock is free (cur==0) and writers are
* waiting, yield to let the writer acquire. When readers are
* already active (cur>=1), new readers may join freely. */
if (cur > 0 && cur < HLL_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < HLL_RWLOCK_SPIN_LIMIT, 1)) {
hll_rwlock_spin_pause();
continue;
}
hll_park_reader(h);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR when yielding to waiting writers */
if (cur >= HLL_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&hll_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
hll_unpark_reader(h);
hll_recover_after_timeout(h);
spin = 0;
continue;
}
}
hll_unpark_reader(h);
spin = 0;
}
}
static inline void hll_rwlock_rdunlock(HllHandle *h) {
HllHeader *hdr = h->hdr;
/* Release the shared counter BEFORE dropping our subcount so that
* "any live PID with subcount > 0" is a reliable in-flight indicator
* for the writer-side recovery scan. Inverting these would create a
* window where we still own a unit of rwlock but our slot subcount is
* 0, letting recovery force-reset rwlock underneath us. */
uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void hll_rwlock_wrlock(HllHandle *h) {
hll_claim_reader_slot(h); /* refresh cached_pid across fork */
HllHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
/* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
* any crash window between acquiring the lock and storing the owner. */
uint32_t mypid = HLL_RWLOCK_WR(h->cached_pid);
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < HLL_RWLOCK_SPIN_LIMIT, 1)) {
hll_rwlock_spin_pause();
continue;
}
hll_park_writer(h);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&hll_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
hll_unpark_writer(h);
hll_recover_after_timeout(h);
spin = 0;
continue;
}
}
hll_unpark_writer(h);
spin = 0;
}
flock(fd, LOCK_UN); close(fd);
return hll_setup(base, map_size, path, -1);
}
}
hll_init_header(base, precision, m, total);
if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
return hll_setup(base, map_size, path, -1);
}
static HllHandle *hll_create_memfd(const char *name, uint32_t precision, char *errbuf) {
uint32_t m;
if (!hll_validate_create_args(precision, &m, errbuf)) return NULL;
uint64_t total = hll_total_size(m);
int fd = memfd_create(name ? name : "hll", MFD_CLOEXEC | MFD_ALLOW_SEALING);
if (fd < 0) { HLL_ERR("memfd_create: %s", strerror(errno)); return NULL; }
if (ftruncate(fd, (off_t)total) < 0) {
HLL_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
}
(void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) { HLL_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
hll_init_header(base, precision, m, total);
return hll_setup(base, (size_t)total, NULL, fd);
}
static HllHandle *hll_open_fd(int fd, char *errbuf) {
if (errbuf) errbuf[0] = '\0';
struct stat st;
if (fstat(fd, &st) < 0) { HLL_ERR("fstat: %s", strerror(errno)); return NULL; }
if ((uint64_t)st.st_size < sizeof(HllHeader)) { HLL_ERR("too small"); return NULL; }
size_t ms = (size_t)st.st_size;
void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) { HLL_ERR("mmap: %s", strerror(errno)); return NULL; }
if (!hll_validate_header((HllHeader *)base, (uint64_t)st.st_size)) {
HLL_ERR("invalid HyperLogLog table"); munmap(base, ms); return NULL;
}
int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
if (myfd < 0) { HLL_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
return hll_setup(base, ms, NULL, myfd);
}
static void hll_destroy(HllHandle *h) {
if (!h) return;
if (h->backing_fd >= 0) close(h->backing_fd);
if (h->base) munmap(h->base, h->mmap_size);
free(h->path);
free(h);
}
static inline int hll_msync(HllHandle *h) {
if (!h || !h->base) return 0;
return msync(h->base, h->mmap_size, MS_SYNC);
}
/* ================================================================
* HyperLogLog operations (callers hold the lock)
* ================================================================ */
/* add one item; returns 1 if a register increased, else 0 */
static int hll_add_locked(HllHandle *h, const void *item, size_t len) {
uint64_t x = XXH3_64bits(item, len);
uint32_t p = h->hdr->precision;
uint32_t idx = (uint32_t)(x >> (64 - p)); /* top p bits = register index */
uint64_t rest = (x << p) | (1ULL << (p - 1)); /* guard bit so clz terminates */
uint8_t rho = (uint8_t)(__builtin_clzll(rest) + 1);
uint8_t *regs = hll_regs(h);
if (regs[idx] < rho) { regs[idx] = rho; return 1; }
return 0;
}
/* estimate; returns a double */
static double hll_count_locked(HllHandle *h) {
uint32_t m = h->hdr->m;
uint8_t *regs = hll_regs(h);
double sum = 0.0;
uint32_t V = 0;
for (uint32_t j = 0; j < m; j++) {
sum += ldexp(1.0, -(int)regs[j]);
V += (regs[j] == 0);
}
double alpha;
if (m == 16) alpha = 0.673;
else if (m == 32) alpha = 0.697;
else if (m == 64) alpha = 0.709;
else alpha = 0.7213 / (1.0 + 1.079 / (double)m);
double E = alpha * (double)m * (double)m / sum;
if (E <= 2.5 * (double)m && V > 0)
E = (double)m * log((double)m / (double)V); /* linear counting (small range) */
return E;
}
/* merge src registers into dst (caller guarantees equal m); register-wise max */
static void hll_merge_regs(HllHandle *dst, const uint8_t *src_regs) {
uint32_t m = dst->hdr->m;
uint8_t *regs = hll_regs(dst);
for (uint32_t j = 0; j < m; j++)
if (src_regs[j] > regs[j]) regs[j] = src_regs[j];
}
/* reset all registers to 0 (caller holds the write lock) */
static inline void hll_clear_locked(HllHandle *h) {
memset(hll_regs(h), 0, (size_t)h->hdr->m);
}
#endif /* HLL_H */
( run in 0.437 second using v1.01-cache-2.11-cpan-bbe5e583499 )