Data-HyperLogLog-Shared

 view release on metacpan or  search on metacpan

hll.h  view on Meta::CPAN

 * drain. */
typedef struct {
    uint32_t pid;            /* 0 = unclaimed */
    uint32_t subcount;       /* in-flight rdlock acquisitions for this process */
    uint32_t waiters_parked; /* contribution to hdr->rwlock_waiters         */
    uint32_t writers_parked; /* contribution to hdr->rwlock_writers_waiting */
} HllReaderSlot;

struct HllHeader {
    uint32_t magic, version;          /* 0,4 */
    uint32_t precision;               /* 8   register-index bit count */
    uint32_t m;                       /* 12  register count (= 1 << precision) */
    uint32_t _pad0;                   /* 16 */
    uint32_t _pad1;                   /* 20 */
    uint64_t total_size;              /* 24 */
    uint64_t reader_slots_off;        /* 32 */
    uint64_t regs_off;                /* 40 */
    uint32_t rwlock;                  /* 48 */
    uint32_t rwlock_waiters;          /* 52 */
    uint32_t rwlock_writers_waiting;  /* 56 */
    uint32_t _pad2;                   /* 60 */
    uint64_t stat_ops;                /* 64 */
    uint8_t  _pad[184];               /* 72..255 */
};
typedef struct HllHeader HllHeader;

_Static_assert(sizeof(HllHeader) == 256, "HllHeader must be 256 bytes");

/* ---- Process-local handle ---- */

typedef struct HllHandle {
    HllHeader     *hdr;
    HllReaderSlot *reader_slots;  /* HLL_READER_SLOTS entries */
    void          *base;          /* mmap base */
    size_t         mmap_size;
    char          *path;          /* backing file path (strdup'd) */
    int            backing_fd;    /* memfd or reopened-fd to close on destroy, -1 for file/anon */
    uint32_t       my_slot_idx;   /* UINT32_MAX if all slots taken (no recovery for this handle) */
    uint32_t       cached_pid;    /* getpid() cached at last slot claim */
    uint32_t       cached_fork_gen; /* hll_fork_gen value at last slot claim */
} HllHandle;

/* ================================================================
 * Futex-based write-preferring read-write lock
 * with reader-slot dead-process recovery
 * ================================================================ */

#define HLL_RWLOCK_SPIN_LIMIT 32
#define HLL_LOCK_TIMEOUT_SEC  2  /* FUTEX_WAIT timeout for stale lock detection */

static inline void hll_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define HLL_RWLOCK_WRITER_BIT 0x80000000U
#define HLL_RWLOCK_PID_MASK   0x7FFFFFFFU
#define HLL_RWLOCK_WR(pid)    (HLL_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & HLL_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/version change).
 * Documented under "Crash Safety" in the POD. */
static inline int hll_pid_alive(uint32_t pid) {
    if (pid == 0) return 1; /* no owner recorded, assume alive */
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* Force-recover a stale write lock left by a dead process.
 * CAS to OUR pid to hold the lock while fixing shared state, then release.
 * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
 * recovering process can detect and re-recover if we crash mid-recovery. */
static inline void hll_recover_stale_lock(HllHandle *h, uint32_t observed_rwlock) {
    HllHeader *hdr = h->hdr;
    uint32_t mypid = HLL_RWLOCK_WR((uint32_t)getpid());
    if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
            mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
        return;
    /* We now hold the write lock as mypid.  No additional shared state needs
     * repair here (this module has no seqlock); just release the lock. */
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static const struct timespec hll_lock_timeout = { HLL_LOCK_TIMEOUT_SEC, 0 };

/* Process-global fork-generation counter.  Incremented in the pthread_atfork
 * child callback so every open handle detects a fork transition on the next
 * lock call without paying a getpid() syscall on the hot path. */
static uint32_t hll_fork_gen = 1;
static pthread_once_t hll_atfork_once = PTHREAD_ONCE_INIT;
static void hll_on_fork_child(void) {
    __atomic_add_fetch(&hll_fork_gen, 1, __ATOMIC_RELAXED);
}
static void hll_atfork_init(void) {
    pthread_atfork(NULL, NULL, hll_on_fork_child);
}

/* Ensure this process owns a reader slot.  Called from the lock helpers so
 * that fork()'d children pick up their own slot lazily instead of sharing
 * the parent's.  Hot-path is a single relaxed load + compare; only on a
 * fork-generation mismatch do we touch getpid() and scan slots. */
static inline void hll_claim_reader_slot(HllHandle *h) {
    uint32_t cur_gen = __atomic_load_n(&hll_fork_gen, __ATOMIC_RELAXED);
    if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
        return;
    /* Cold path -- register the atfork hook once per process, then claim. */
    pthread_once(&hll_atfork_once, hll_atfork_init);
    /* Re-read after pthread_once: hll_on_fork_child may have bumped it. */
    cur_gen = __atomic_load_n(&hll_fork_gen, __ATOMIC_RELAXED);
    uint32_t now_pid = (uint32_t)getpid();

hll.h  view on Meta::CPAN

    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void hll_unpark_reader(HllHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void hll_park_writer(HllHandle *h) {
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void hll_unpark_writer(HllHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
}

static inline void hll_rwlock_rdlock(HllHandle *h) {
    hll_claim_reader_slot(h);
    HllHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
    /* Claim subcount BEFORE bumping the shared rwlock counter.  This way
     * a concurrent writer-side recovery scan that sees our PID alive with
     * subcount > 0 will (correctly) defer force-reset, even while we are
     * still spinning trying to win the rwlock CAS.  Without this, a reader
     * killed between rwlock CAS-success and subcount++ would let recovery
     * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
     * our eventual rdunlock dec. */
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    for (int spin = 0; ; spin++) {
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Write-preferring: when lock is free (cur==0) and writers are
         * waiting, yield to let the writer acquire. When readers are
         * already active (cur>=1), new readers may join freely. */
        if (cur > 0 && cur < HLL_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < HLL_RWLOCK_SPIN_LIMIT, 1)) {
            hll_rwlock_spin_pause();
            continue;
        }
        hll_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= HLL_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &hll_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                hll_unpark_reader(h);
                hll_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        hll_unpark_reader(h);
        spin = 0;
    }
}

static inline void hll_rwlock_rdunlock(HllHandle *h) {
    HllHeader *hdr = h->hdr;
    /* Release the shared counter BEFORE dropping our subcount so that
     * "any live PID with subcount > 0" is a reliable in-flight indicator
     * for the writer-side recovery scan.  Inverting these would create a
     * window where we still own a unit of rwlock but our slot subcount is
     * 0, letting recovery force-reset rwlock underneath us. */
    uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static inline void hll_rwlock_wrlock(HllHandle *h) {
    hll_claim_reader_slot(h);  /* refresh cached_pid across fork */
    HllHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
     * any crash window between acquiring the lock and storing the owner. */
    uint32_t mypid = HLL_RWLOCK_WR(h->cached_pid);
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(lock, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < HLL_RWLOCK_SPIN_LIMIT, 1)) {
            hll_rwlock_spin_pause();
            continue;
        }
        hll_park_writer(h);
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &hll_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                hll_unpark_writer(h);
                hll_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        hll_unpark_writer(h);
        spin = 0;
    }

hll.h  view on Meta::CPAN

            flock(fd, LOCK_UN); close(fd);
            return hll_setup(base, map_size, path, -1);
        }
    }
    hll_init_header(base, precision, m, total);
    if (fd >= 0) { flock(fd, LOCK_UN); close(fd); }
    return hll_setup(base, map_size, path, -1);
}

static HllHandle *hll_create_memfd(const char *name, uint32_t precision, char *errbuf) {
    uint32_t m;
    if (!hll_validate_create_args(precision, &m, errbuf)) return NULL;

    uint64_t total = hll_total_size(m);
    int fd = memfd_create(name ? name : "hll", MFD_CLOEXEC | MFD_ALLOW_SEALING);
    if (fd < 0) { HLL_ERR("memfd_create: %s", strerror(errno)); return NULL; }
    if (ftruncate(fd, (off_t)total) < 0) {
        HLL_ERR("ftruncate: %s", strerror(errno)); close(fd); return NULL;
    }
    (void)fcntl(fd, F_ADD_SEALS, F_SEAL_SHRINK | F_SEAL_GROW);
    void *base = mmap(NULL, (size_t)total, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
    if (base == MAP_FAILED) { HLL_ERR("mmap: %s", strerror(errno)); close(fd); return NULL; }
    hll_init_header(base, precision, m, total);
    return hll_setup(base, (size_t)total, NULL, fd);
}

static HllHandle *hll_open_fd(int fd, char *errbuf) {
    if (errbuf) errbuf[0] = '\0';
    struct stat st;
    if (fstat(fd, &st) < 0) { HLL_ERR("fstat: %s", strerror(errno)); return NULL; }
    if ((uint64_t)st.st_size < sizeof(HllHeader)) { HLL_ERR("too small"); return NULL; }
    size_t ms = (size_t)st.st_size;
    void *base = mmap(NULL, ms, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
    if (base == MAP_FAILED) { HLL_ERR("mmap: %s", strerror(errno)); return NULL; }
    if (!hll_validate_header((HllHeader *)base, (uint64_t)st.st_size)) {
        HLL_ERR("invalid HyperLogLog table"); munmap(base, ms); return NULL;
    }
    int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
    if (myfd < 0) { HLL_ERR("fcntl: %s", strerror(errno)); munmap(base, ms); return NULL; }
    return hll_setup(base, ms, NULL, myfd);
}

static void hll_destroy(HllHandle *h) {
    if (!h) return;
    if (h->backing_fd >= 0) close(h->backing_fd);
    if (h->base) munmap(h->base, h->mmap_size);
    free(h->path);
    free(h);
}

static inline int hll_msync(HllHandle *h) {
    if (!h || !h->base) return 0;
    return msync(h->base, h->mmap_size, MS_SYNC);
}

/* ================================================================
 * HyperLogLog operations (callers hold the lock)
 * ================================================================ */

/* add one item; returns 1 if a register increased, else 0 */
static int hll_add_locked(HllHandle *h, const void *item, size_t len) {
    uint64_t x = XXH3_64bits(item, len);
    uint32_t p = h->hdr->precision;
    uint32_t idx = (uint32_t)(x >> (64 - p));           /* top p bits = register index */
    uint64_t rest = (x << p) | (1ULL << (p - 1));       /* guard bit so clz terminates */
    uint8_t  rho  = (uint8_t)(__builtin_clzll(rest) + 1);
    uint8_t *regs = hll_regs(h);
    if (regs[idx] < rho) { regs[idx] = rho; return 1; }
    return 0;
}

/* estimate; returns a double */
static double hll_count_locked(HllHandle *h) {
    uint32_t m = h->hdr->m;
    uint8_t *regs = hll_regs(h);
    double sum = 0.0;
    uint32_t V = 0;
    for (uint32_t j = 0; j < m; j++) {
        sum += ldexp(1.0, -(int)regs[j]);
        V += (regs[j] == 0);
    }
    double alpha;
    if      (m == 16) alpha = 0.673;
    else if (m == 32) alpha = 0.697;
    else if (m == 64) alpha = 0.709;
    else              alpha = 0.7213 / (1.0 + 1.079 / (double)m);
    double E = alpha * (double)m * (double)m / sum;
    if (E <= 2.5 * (double)m && V > 0)
        E = (double)m * log((double)m / (double)V);  /* linear counting (small range) */
    return E;
}

/* merge src registers into dst (caller guarantees equal m); register-wise max */
static void hll_merge_regs(HllHandle *dst, const uint8_t *src_regs) {
    uint32_t m = dst->hdr->m;
    uint8_t *regs = hll_regs(dst);
    for (uint32_t j = 0; j < m; j++)
        if (src_regs[j] > regs[j]) regs[j] = src_regs[j];
}

/* reset all registers to 0 (caller holds the write lock) */
static inline void hll_clear_locked(HllHandle *h) {
    memset(hll_regs(h), 0, (size_t)h->hdr->m);
}

#endif /* HLL_H */



( run in 0.437 second using v1.01-cache-2.11-cpan-bbe5e583499 )