Data-HyperLogLog-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

add(self, item)
    SV *self
    SV *item
  PREINIT:
    EXTRACT(self);
    STRLEN n;
    const char *s;
  CODE:
    s = SvPVbyte(item, n);
    hll_rwlock_wrlock(h);
    RETVAL = hll_add_locked(h, s, n);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    hll_rwlock_wrunlock(h);
  OUTPUT:
    RETVAL

UV
add_many(self, items)
    SV *self
    SV *items
  PREINIT:

Shared.xs  view on Meta::CPAN

        const char **ps = NULL; STRLEN *ls = NULL;
        if (cnt) {                                       /* resolve all bytes BEFORE locking */
            Newx(ps, cnt, const char *); SAVEFREEPV(ps); /* freed on return OR unwind */
            Newx(ls, cnt, STRLEN);       SAVEFREEPV(ls);
            for (i = 0; i < cnt; i++) {                  /* a croak here holds NO lock; SAVEFREEPV cleans up */
                SV **el = av_fetch(av, (SSize_t)i, 0);
                if (el && *el) ps[i] = SvPVbyte(*el, ls[i]);
                else { ps[i] = ""; ls[i] = 0; }
            }
        }
        hll_rwlock_wrlock(h);                            /* locked region: NO croak-capable calls */
        for (i = 0; i < cnt; i++) added += (UV)hll_add_locked(h, ps[i], ls[i]);
        __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);  /* a call always counts, even an empty batch */
        hll_rwlock_wrunlock(h);
    }
    RETVAL = added;
  OUTPUT:
    RETVAL

UV
count(self)
    SV *self
  PREINIT:
    EXTRACT(self);
    double E;
  CODE:
    hll_rwlock_rdlock(h);
    E = hll_count_locked(h);
    hll_rwlock_rdunlock(h);
    RETVAL = (UV)(E + 0.5);
  OUTPUT:
    RETVAL

void
merge(self, other)
    SV *self
    SV *other
  PREINIT:

Shared.xs  view on Meta::CPAN

    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    hll_rwlock_wrunlock(h);

void
clear(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    hll_rwlock_wrlock(h);
    hll_clear_locked(h);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    hll_rwlock_wrunlock(h);

UV
precision(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = h->hdr->precision;

Shared.xs  view on Meta::CPAN

  PREINIT:
    EXTRACT(self);
  CODE:
    {
        double   E;
        uint64_t ops;
        uint32_t precision, m;
        /* Snapshot under the lock; do all (croak-capable) Perl allocation after
           releasing it -- so an OOM in newHV/newSVuv can never strand the lock. */
        hll_rwlock_rdlock(h);
        E         = hll_count_locked(h);
        ops       = h->hdr->stat_ops;
        precision = h->hdr->precision;
        m         = h->hdr->m;
        hll_rwlock_rdunlock(h);

        HV *hv = newHV();
        hv_stores(hv, "precision",  newSVuv(precision));
        hv_stores(hv, "registers",  newSVuv(m));
        hv_stores(hv, "count",      newSVuv((UV)(E + 0.5)));
        hv_stores(hv, "ops",        newSVuv(ops));

hll.h  view on Meta::CPAN

static inline void hll_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define HLL_RWLOCK_WRITER_BIT 0x80000000U
#define HLL_RWLOCK_PID_MASK   0x7FFFFFFFU
#define HLL_RWLOCK_WR(pid)    (HLL_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & HLL_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/version change).

hll.h  view on Meta::CPAN

            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < HLL_RWLOCK_SPIN_LIMIT, 1)) {
            hll_rwlock_spin_pause();
            continue;
        }
        hll_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= HLL_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &hll_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                hll_unpark_reader(h);
                hll_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }

hll.h  view on Meta::CPAN

static inline int hll_msync(HllHandle *h) {
    if (!h || !h->base) return 0;
    return msync(h->base, h->mmap_size, MS_SYNC);
}

/* ================================================================
 * HyperLogLog operations (callers hold the lock)
 * ================================================================ */

/* add one item; returns 1 if a register increased, else 0 */
static int hll_add_locked(HllHandle *h, const void *item, size_t len) {
    uint64_t x = XXH3_64bits(item, len);
    uint32_t p = h->hdr->precision;
    uint32_t idx = (uint32_t)(x >> (64 - p));           /* top p bits = register index */
    uint64_t rest = (x << p) | (1ULL << (p - 1));       /* guard bit so clz terminates */
    uint8_t  rho  = (uint8_t)(__builtin_clzll(rest) + 1);
    uint8_t *regs = hll_regs(h);
    if (regs[idx] < rho) { regs[idx] = rho; return 1; }
    return 0;
}

/* estimate; returns a double */
static double hll_count_locked(HllHandle *h) {
    uint32_t m = h->hdr->m;
    uint8_t *regs = hll_regs(h);
    double sum = 0.0;
    uint32_t V = 0;
    for (uint32_t j = 0; j < m; j++) {
        sum += ldexp(1.0, -(int)regs[j]);
        V += (regs[j] == 0);
    }
    double alpha;
    if      (m == 16) alpha = 0.673;

hll.h  view on Meta::CPAN


/* merge src registers into dst (caller guarantees equal m); register-wise max */
static void hll_merge_regs(HllHandle *dst, const uint8_t *src_regs) {
    uint32_t m = dst->hdr->m;
    uint8_t *regs = hll_regs(dst);
    for (uint32_t j = 0; j < m; j++)
        if (src_regs[j] > regs[j]) regs[j] = src_regs[j];
}

/* reset all registers to 0 (caller holds the write lock) */
static inline void hll_clear_locked(HllHandle *h) {
    memset(hll_regs(h), 0, (size_t)h->hdr->m);
}

#endif /* HLL_H */



( run in 0.736 second using v1.01-cache-2.11-cpan-bbe5e583499 )