Data-CountMinSketch-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

    SV *item
    UV n
  PREINIT:
    EXTRACT(self);
    STRLEN len;
    const char *s;
    UV total;
  CODE:
    s = SvPVbyte(item, len);               /* may croak (wide char) -- BEFORE the lock */
    cms_rwlock_wrlock(h);
    cms_add_locked(h, s, len, (uint64_t)n);
    total = (UV)h->hdr->total;
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    cms_rwlock_wrunlock(h);
    RETVAL = total;
  OUTPUT:
    RETVAL

UV
add_many(self, items)
    SV *self

Shared.xs  view on Meta::CPAN

        const char **ps = NULL; STRLEN *ls = NULL;
        if (cnt) {                                       /* resolve all bytes BEFORE locking */
            Newx(ps, cnt, const char *); SAVEFREEPV(ps); /* freed on return OR unwind */
            Newx(ls, cnt, STRLEN);       SAVEFREEPV(ls);
            for (i = 0; i < cnt; i++) {                  /* a croak here holds NO lock; SAVEFREEPV cleans up */
                SV **el = av_fetch(av, (SSize_t)i, 0);
                if (el && *el) ps[i] = SvPVbyte(*el, ls[i]);
                else { ps[i] = ""; ls[i] = 0; }
            }
        }
        cms_rwlock_wrlock(h);                            /* locked region: NO croak-capable calls */
        for (i = 0; i < cnt; i++) cms_add_locked(h, ps[i], ls[i], 1);
        __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);  /* a call always counts, even an empty batch */
        cms_rwlock_wrunlock(h);
        RETVAL = (UV)cnt;                                /* every element is added (CMS add never fails) */
    }
  OUTPUT:
    RETVAL

UV
estimate(self, item)
    SV *self
    SV *item
  PREINIT:
    EXTRACT(self);
    STRLEN len;
    const char *s;
    UV est;
  CODE:
    s = SvPVbyte(item, len);               /* may croak (wide char) -- BEFORE the lock */
    cms_rwlock_rdlock(h);
    est = (UV)cms_estimate_locked(h, s, len);
    cms_rwlock_rdunlock(h);
    RETVAL = est;
  OUTPUT:
    RETVAL

void
merge(self, other)
    SV *self
    SV *other
  PREINIT:

Shared.xs  view on Meta::CPAN

    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    cms_rwlock_wrunlock(h);

void
clear(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    cms_rwlock_wrlock(h);
    cms_clear_locked(h);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    cms_rwlock_wrunlock(h);

UV
total(self)
    SV *self
  PREINIT:
    EXTRACT(self);
    UV t;
  CODE:

cms.h  view on Meta::CPAN

static inline void cms_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define CMS_RWLOCK_WRITER_BIT 0x80000000U
#define CMS_RWLOCK_PID_MASK   0x7FFFFFFFU
#define CMS_RWLOCK_WR(pid)    (CMS_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & CMS_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/version change).

cms.h  view on Meta::CPAN

            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < CMS_RWLOCK_SPIN_LIMIT, 1)) {
            cms_rwlock_spin_pause();
            continue;
        }
        cms_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= CMS_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &cms_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                cms_unpark_reader(h);
                cms_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }

cms.h  view on Meta::CPAN

 * ================================================================ */

static inline void cms_indices(const void *item, size_t len,
                               uint64_t *h1, uint64_t *h2) {
    XXH128_hash_t hh = XXH3_128bits(item, len);
    *h1 = hh.low64;
    *h2 = hh.high64 | 1ULL;   /* force odd so the d row columns spread over the pow2 matrix */
}

/* add n to each of the d cells; bump total by n (caller holds the write lock) */
static void cms_add_locked(CmsHandle *h, const void *item, size_t len, uint64_t n) {
    uint64_t h1, h2;
    cms_indices(item, len, &h1, &h2);
    uint64_t w = h->hdr->w;
    uint64_t mask = h->hdr->mask;
    uint32_t d = h->hdr->d;
    uint64_t *counters = cms_counters(h);
    for (uint32_t r = 0; r < d; r++) {
        uint64_t c = (h1 + (uint64_t)r * h2) & mask;
        counters[(uint64_t)r * w + c] += n;
    }
    h->hdr->total += n;
}

/* return the minimum of the d cells -- the Count-Min estimate, which never
 * underestimates the true count (caller holds a lock) */
static uint64_t cms_estimate_locked(CmsHandle *h, const void *item, size_t len) {
    uint64_t h1, h2;
    cms_indices(item, len, &h1, &h2);
    uint64_t w = h->hdr->w;
    uint64_t mask = h->hdr->mask;
    uint32_t d = h->hdr->d;
    uint64_t *counters = cms_counters(h);
    uint64_t m = UINT64_MAX;
    for (uint32_t r = 0; r < d; r++) {
        uint64_t c = (h1 + (uint64_t)r * h2) & mask;
        uint64_t v = counters[(uint64_t)r * w + c];

cms.h  view on Meta::CPAN

/* merge src cells into dst (caller guarantees equal w and d); cellwise add,
 * saturating at UINT64_MAX on overflow (caller holds dst's write lock) */
static void cms_merge_counters(uint64_t *dst, const uint64_t *src, uint64_t cells) {
    for (uint64_t i = 0; i < cells; i++) {
        if (dst[i] > UINT64_MAX - src[i]) dst[i] = UINT64_MAX;
        else dst[i] += src[i];
    }
}

/* reset all counters to 0 and total to 0 (caller holds the write lock) */
static inline void cms_clear_locked(CmsHandle *h) {
    uint64_t cells = (uint64_t)h->hdr->d * h->hdr->w;
    memset(cms_counters(h), 0, (size_t)(cells * sizeof(uint64_t)));
    h->hdr->total = 0;
}

#endif /* CMS_H */



( run in 1.355 second using v1.01-cache-2.11-cpan-bbe5e583499 )