Data-CountMinSketch-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

    char errbuf[CMS_ERR_BUFLEN];
  CODE:
    const char *p = SvOK(path) ? SvPV_nolen(path) : NULL;
    CmsHandle *h = cms_create(p, epsilon, delta, errbuf);   /* validates epsilon/delta into errbuf */
    if (!h) croak("Data::CountMinSketch::Shared->new: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_memfd(class, name = &PL_sv_undef, epsilon = 0.001, delta = 0.001)
    const char *class
    SV *name
    double epsilon
    double delta
  PREINIT:
    char errbuf[CMS_ERR_BUFLEN];
  CODE:
    const char *nm = SvOK(name) ? SvPV_nolen(name) : NULL;   /* undef -> default label */
    CmsHandle *h = cms_create_memfd(nm, epsilon, delta, errbuf);   /* validates epsilon/delta into errbuf */
    if (!h) croak("Data::CountMinSketch::Shared->new_memfd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_from_fd(class, fd)
    const char *class
    int fd
  PREINIT:
    char errbuf[CMS_ERR_BUFLEN];
  CODE:
    CmsHandle *h = cms_open_fd(fd, errbuf);
    if (!h) croak("Data::CountMinSketch::Shared->new_from_fd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

void
DESTROY(self)
    SV *self
  CODE:
    if (sv_isobject(self) && sv_derived_from(self, "Data::CountMinSketch::Shared")) {
        CmsHandle *h = INT2PTR(CmsHandle*, SvIV(SvRV(self)));
        if (h) { sv_setiv(SvRV(self), 0); cms_destroy(h); }   /* null first: activates EXTRACT's use-after-destroy croak + makes a double DESTROY a no-op */
    }

UV
add(self, item, n = 1)
    SV *self
    SV *item
    UV n
  PREINIT:
    EXTRACT(self);
    STRLEN len;
    const char *s;
    UV total;
  CODE:
    s = SvPVbyte(item, len);               /* may croak (wide char) -- BEFORE the lock */
    cms_rwlock_wrlock(h);
    cms_add_locked(h, s, len, (uint64_t)n);
    total = (UV)h->hdr->total;
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    cms_rwlock_wrunlock(h);
    RETVAL = total;
  OUTPUT:
    RETVAL

UV
add_many(self, items)
    SV *self
    SV *items
  PREINIT:
    EXTRACT(self);
    AV *av;
    IV  top;
  CODE:
    if (!SvROK(items) || SvTYPE(SvRV(items)) != SVt_PVAV)
        croak("Data::CountMinSketch::Shared->add_many: expected an array reference");
    av = (AV *)SvRV(items);
    top = av_len(av);                     /* last index, -1 if empty */
    {
        STRLEN cnt = (top >= 0) ? (STRLEN)(top + 1) : 0, i;
        const char **ps = NULL; STRLEN *ls = NULL;
        if (cnt) {                                       /* resolve all bytes BEFORE locking */
            Newx(ps, cnt, const char *); SAVEFREEPV(ps); /* freed on return OR unwind */
            Newx(ls, cnt, STRLEN);       SAVEFREEPV(ls);
            for (i = 0; i < cnt; i++) {                  /* a croak here holds NO lock; SAVEFREEPV cleans up */
                SV **el = av_fetch(av, (SSize_t)i, 0);
                if (el && *el) ps[i] = SvPVbyte(*el, ls[i]);
                else { ps[i] = ""; ls[i] = 0; }
            }
        }
        cms_rwlock_wrlock(h);                            /* locked region: NO croak-capable calls */
        for (i = 0; i < cnt; i++) cms_add_locked(h, ps[i], ls[i], 1);
        __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);  /* a call always counts, even an empty batch */
        cms_rwlock_wrunlock(h);
        RETVAL = (UV)cnt;                                /* every element is added (CMS add never fails) */
    }
  OUTPUT:
    RETVAL

UV
estimate(self, item)
    SV *self
    SV *item
  PREINIT:
    EXTRACT(self);
    STRLEN len;
    const char *s;
    UV est;
  CODE:
    s = SvPVbyte(item, len);               /* may croak (wide char) -- BEFORE the lock */
    cms_rwlock_rdlock(h);
    est = (UV)cms_estimate_locked(h, s, len);
    cms_rwlock_rdunlock(h);
    RETVAL = est;
  OUTPUT:
    RETVAL

void
merge(self, other)
    SV *self
    SV *other
  PREINIT:
    EXTRACT(self);
  CODE:
    if (!sv_isobject(other) || !sv_derived_from(other, "Data::CountMinSketch::Shared"))
        croak("Data::CountMinSketch::Shared->merge: expected a Data::CountMinSketch::Shared object");
    CmsHandle *o = INT2PTR(CmsHandle*, SvIV(SvRV(other)));
    if (!o) croak("Attempted to use a destroyed Data::CountMinSketch::Shared object");

    /* w and d are immutable after creation -- compare lock-free, croak BEFORE
     * allocating, so a mismatch holds no lock and leaks no buffer. */
    uint64_t ow = o->hdr->w;
    uint32_t od = o->hdr->d;
    if (ow != h->hdr->w || od != h->hdr->d)
        croak("Data::CountMinSketch::Shared->merge: geometry mismatch (w=%llu/d=%u vs w=%llu/d=%u)",
              (unsigned long long)h->hdr->w, h->hdr->d,
              (unsigned long long)ow, od);

    /* Snapshot the other's cells (+ its total) under its read lock into a temp
     * buffer, then release before taking self's write lock.  Copying to a temp
     * avoids holding two locks at once (deadlock-free regardless of acquisition
     * order between two processes merging each other). */
    uint64_t cells = (uint64_t)od * ow;
    uint64_t other_total;
    uint64_t *tmp;
    Newx(tmp, (size_t)cells, uint64_t);
    SAVEFREEPV(tmp);                 /* freed on normal return OR croak unwind */
    cms_rwlock_rdlock(o);
    memcpy(tmp, cms_counters(o), (size_t)cells * sizeof(uint64_t));
    other_total = o->hdr->total;
    cms_rwlock_rdunlock(o);

    cms_rwlock_wrlock(h);
    cms_merge_counters(cms_counters(h), tmp, cells);
    if (h->hdr->total > UINT64_MAX - other_total) h->hdr->total = UINT64_MAX;
    else h->hdr->total += other_total;
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    cms_rwlock_wrunlock(h);

void
clear(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    cms_rwlock_wrlock(h);
    cms_clear_locked(h);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    cms_rwlock_wrunlock(h);

UV
total(self)
    SV *self
  PREINIT:
    EXTRACT(self);
    UV t;
  CODE:
    cms_rwlock_rdlock(h);
    t = (UV)h->hdr->total;
    cms_rwlock_rdunlock(h);
    RETVAL = t;
  OUTPUT:
    RETVAL

UV
width(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = (UV)h->hdr->w;
  OUTPUT:
    RETVAL

UV
depth(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = (UV)h->hdr->d;
  OUTPUT:
    RETVAL

UV
cells(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = (UV)((uint64_t)h->hdr->d * h->hdr->w);
  OUTPUT:
    RETVAL

SV *
stats(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    {
        uint64_t w, total, ops, cells;
        uint32_t d;
        /* Snapshot under the lock; do all (croak-capable) Perl allocation after
           releasing it -- so an OOM in newHV/newSVuv can never strand the lock. */
        cms_rwlock_rdlock(h);
        w     = h->hdr->w;



( run in 0.842 second using v1.01-cache-2.11-cpan-bbe5e583499 )