Data-HyperLogLog-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

    if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION)
        croak("Data::HyperLogLog::Shared->new: precision must be between %d and %d",
              HLL_MIN_PRECISION, HLL_MAX_PRECISION);
    HllHandle *h = hll_create(p, (uint32_t)precision, errbuf);
    if (!h) croak("Data::HyperLogLog::Shared->new: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_memfd(class, name = &PL_sv_undef, precision = 14)
    const char *class
    SV *name
    UV precision
  PREINIT:
    char errbuf[HLL_ERR_BUFLEN];
  CODE:
    const char *nm = SvOK(name) ? SvPV_nolen(name) : NULL;   /* undef -> default label */
    if (precision < HLL_MIN_PRECISION || precision > HLL_MAX_PRECISION)
        croak("Data::HyperLogLog::Shared->new_memfd: precision must be between %d and %d",
              HLL_MIN_PRECISION, HLL_MAX_PRECISION);
    HllHandle *h = hll_create_memfd(nm, (uint32_t)precision, errbuf);
    if (!h) croak("Data::HyperLogLog::Shared->new_memfd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

SV *
new_from_fd(class, fd)
    const char *class
    int fd
  PREINIT:
    char errbuf[HLL_ERR_BUFLEN];
  CODE:
    HllHandle *h = hll_open_fd(fd, errbuf);
    if (!h) croak("Data::HyperLogLog::Shared->new_from_fd: %s", errbuf);
    MAKE_OBJ(class, h);
  OUTPUT:
    RETVAL

void
DESTROY(self)
    SV *self
  CODE:
    if (sv_isobject(self) && sv_derived_from(self, "Data::HyperLogLog::Shared")) {
        HllHandle *h = INT2PTR(HllHandle*, SvIV(SvRV(self)));
        if (h) { sv_setiv(SvRV(self), 0); hll_destroy(h); }   /* null first: activates EXTRACT's use-after-destroy croak + makes a double DESTROY a no-op */
    }

int
add(self, item)
    SV *self
    SV *item
  PREINIT:
    EXTRACT(self);
    STRLEN n;
    const char *s;
  CODE:
    s = SvPVbyte(item, n);
    hll_rwlock_wrlock(h);
    RETVAL = hll_add_locked(h, s, n);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    hll_rwlock_wrunlock(h);
  OUTPUT:
    RETVAL

UV
add_many(self, items)
    SV *self
    SV *items
  PREINIT:
    EXTRACT(self);
    AV *av;
    IV  top;
    UV  added = 0;
  CODE:
    if (!SvROK(items) || SvTYPE(SvRV(items)) != SVt_PVAV)
        croak("Data::HyperLogLog::Shared->add_many: expected an array reference");
    av = (AV *)SvRV(items);
    top = av_len(av);                     /* last index, -1 if empty */
    {
        STRLEN cnt = (top >= 0) ? (STRLEN)(top + 1) : 0, i;
        const char **ps = NULL; STRLEN *ls = NULL;
        if (cnt) {                                       /* resolve all bytes BEFORE locking */
            Newx(ps, cnt, const char *); SAVEFREEPV(ps); /* freed on return OR unwind */
            Newx(ls, cnt, STRLEN);       SAVEFREEPV(ls);
            for (i = 0; i < cnt; i++) {                  /* a croak here holds NO lock; SAVEFREEPV cleans up */
                SV **el = av_fetch(av, (SSize_t)i, 0);
                if (el && *el) ps[i] = SvPVbyte(*el, ls[i]);
                else { ps[i] = ""; ls[i] = 0; }
            }
        }
        hll_rwlock_wrlock(h);                            /* locked region: NO croak-capable calls */
        for (i = 0; i < cnt; i++) added += (UV)hll_add_locked(h, ps[i], ls[i]);
        __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);  /* a call always counts, even an empty batch */
        hll_rwlock_wrunlock(h);
    }
    RETVAL = added;
  OUTPUT:
    RETVAL

UV
count(self)
    SV *self
  PREINIT:
    EXTRACT(self);
    double E;
  CODE:
    hll_rwlock_rdlock(h);
    E = hll_count_locked(h);
    hll_rwlock_rdunlock(h);
    RETVAL = (UV)(E + 0.5);
  OUTPUT:
    RETVAL

void
merge(self, other)
    SV *self
    SV *other
  PREINIT:
    EXTRACT(self);
  CODE:
    if (!sv_isobject(other) || !sv_derived_from(other, "Data::HyperLogLog::Shared"))
        croak("Data::HyperLogLog::Shared->merge: expected a Data::HyperLogLog::Shared object");
    HllHandle *o = INT2PTR(HllHandle*, SvIV(SvRV(other)));
    if (!o) croak("Attempted to use a destroyed Data::HyperLogLog::Shared object");

    /* Snapshot the other's registers under its read lock into a temp
     * buffer, then release before taking self's write lock.  Copying to
     * a temp avoids holding two locks at once (deadlock-free regardless
     * of acquisition order between two processes merging each other). */
    uint32_t om = o->hdr->m;          /* m is immutable after creation -- compare lock-free */
    if (om != h->hdr->m)
        croak("Data::HyperLogLog::Shared->merge: precision mismatch (%u vs %u registers)",
              h->hdr->m, om);

    uint8_t *tmp;
    Newx(tmp, (size_t)om, uint8_t);
    SAVEFREEPV(tmp);                 /* freed on normal return OR croak unwind */
    hll_rwlock_rdlock(o);
    memcpy(tmp, hll_regs(o), (size_t)om);
    hll_rwlock_rdunlock(o);

    hll_rwlock_wrlock(h);
    hll_merge_regs(h, tmp);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    hll_rwlock_wrunlock(h);

void
clear(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    hll_rwlock_wrlock(h);
    hll_clear_locked(h);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    hll_rwlock_wrunlock(h);

UV
precision(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = h->hdr->precision;
  OUTPUT:
    RETVAL

UV
registers(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = h->hdr->m;
  OUTPUT:
    RETVAL

SV *
stats(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    {
        double   E;
        uint64_t ops;
        uint32_t precision, m;
        /* Snapshot under the lock; do all (croak-capable) Perl allocation after
           releasing it -- so an OOM in newHV/newSVuv can never strand the lock. */
        hll_rwlock_rdlock(h);
        E         = hll_count_locked(h);
        ops       = h->hdr->stat_ops;
        precision = h->hdr->precision;
        m         = h->hdr->m;
        hll_rwlock_rdunlock(h);

        HV *hv = newHV();
        hv_stores(hv, "precision",  newSVuv(precision));
        hv_stores(hv, "registers",  newSVuv(m));
        hv_stores(hv, "count",      newSVuv((UV)(E + 0.5)));
        hv_stores(hv, "ops",        newSVuv(ops));
        hv_stores(hv, "mmap_size",  newSVuv((UV)h->mmap_size));
        RETVAL = newRV_noinc((SV *)hv);
    }
  OUTPUT:
    RETVAL

SV *
path(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = h->path ? newSVpv(h->path, 0) : &PL_sv_undef;
  OUTPUT:
    RETVAL

int
memfd(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = h->backing_fd;
  OUTPUT:
    RETVAL

void
sync(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    if (hll_msync(h) != 0) croak("sync: %s", strerror(errno));

void
unlink(self, ...)
    SV *self
  CODE:
    if (sv_isobject(self) && sv_derived_from(self, "Data::HyperLogLog::Shared")) {
        HllHandle *h = INT2PTR(HllHandle*, SvIV(SvRV(self)));
        if (h && h->path) unlink(h->path);
    } else if (items >= 2 && SvOK(ST(1))) {
        unlink(SvPV_nolen(ST(1)));
    }



( run in 0.434 second using v1.01-cache-2.11-cpan-bbe5e583499 )