Data-BloomFilter-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

add(self, item)
    SV *self
    SV *item
  PREINIT:
    EXTRACT(self);
    STRLEN n;
    const char *s;
  CODE:
    s = SvPVbyte(item, n);                 /* may croak (wide char) -- BEFORE the lock */
    bf_rwlock_wrlock(h);
    RETVAL = bf_add_locked(h, s, n);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    bf_rwlock_wrunlock(h);
  OUTPUT:
    RETVAL

UV
add_many(self, items)
    SV *self
    SV *items
  PREINIT:

Shared.xs  view on Meta::CPAN

        const char **ps = NULL; STRLEN *ls = NULL;
        if (cnt) {                                       /* resolve all bytes BEFORE locking */
            Newx(ps, cnt, const char *); SAVEFREEPV(ps); /* freed on return OR unwind */
            Newx(ls, cnt, STRLEN);       SAVEFREEPV(ls);
            for (i = 0; i < cnt; i++) {                  /* a croak here holds NO lock; SAVEFREEPV cleans up */
                SV **el = av_fetch(av, (SSize_t)i, 0);
                if (el && *el) ps[i] = SvPVbyte(*el, ls[i]);
                else { ps[i] = ""; ls[i] = 0; }
            }
        }
        bf_rwlock_wrlock(h);                             /* locked region: NO croak-capable calls */
        for (i = 0; i < cnt; i++) added += (UV)bf_add_locked(h, ps[i], ls[i]);
        __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);  /* a call always counts, even an empty batch */
        bf_rwlock_wrunlock(h);
    }
    RETVAL = added;
  OUTPUT:
    RETVAL

int
contains(self, item)
    SV *self
    SV *item
  PREINIT:
    EXTRACT(self);
    STRLEN n;
    const char *s;
  CODE:
    s = SvPVbyte(item, n);                 /* may croak (wide char) -- BEFORE the lock */
    bf_rwlock_rdlock(h);
    RETVAL = bf_contains_locked(h, s, n);
    bf_rwlock_rdunlock(h);
  OUTPUT:
    RETVAL

void
merge(self, other)
    SV *self
    SV *other
  PREINIT:
    EXTRACT(self);

Shared.xs  view on Meta::CPAN

    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    bf_rwlock_wrunlock(h);

void
clear(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    bf_rwlock_wrlock(h);
    bf_clear_locked(h);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    bf_rwlock_wrunlock(h);

UV
capacity(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    RETVAL = (UV)h->hdr->capacity;

Shared.xs  view on Meta::CPAN

    RETVAL

UV
count(self)
    SV *self
  PREINIT:
    EXTRACT(self);
    UV n;
  CODE:
    bf_rwlock_rdlock(h);
    n = (UV)bf_count_locked(h);
    bf_rwlock_rdunlock(h);
    RETVAL = n;
  OUTPUT:
    RETVAL

SV *
stats(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    {
        uint64_t X, m_bits, capacity, ops, n_est;
        uint32_t k;
        double   fp_rate;
        /* Snapshot under the lock; do all (croak-capable) Perl allocation after
           releasing it -- so an OOM in newHV/newSVuv can never strand the lock. */
        bf_rwlock_rdlock(h);
        X        = bf_popcount_locked(h);
        n_est    = bf_count_from_popcount(h, X);   /* reuse X -- no second scan */
        m_bits   = h->hdr->m_bits;
        k        = h->hdr->k;
        capacity = h->hdr->capacity;
        fp_rate  = h->hdr->fp_rate;
        ops      = h->hdr->stat_ops;
        bf_rwlock_rdunlock(h);

        HV *hv = newHV();
        hv_stores(hv, "capacity",   newSVuv((UV)capacity));

bloom.h  view on Meta::CPAN

static inline void bf_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define BF_RWLOCK_WRITER_BIT 0x80000000U
#define BF_RWLOCK_PID_MASK   0x7FFFFFFFU
#define BF_RWLOCK_WR(pid)    (BF_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & BF_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/version change).

bloom.h  view on Meta::CPAN

            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < BF_RWLOCK_SPIN_LIMIT, 1)) {
            bf_rwlock_spin_pause();
            continue;
        }
        bf_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= BF_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &bf_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                bf_unpark_reader(h);
                bf_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }

bloom.h  view on Meta::CPAN


static inline void bf_indices(BfHandle *h, const void *item, size_t len,
                              uint64_t *h1, uint64_t *h2) {
    (void)h;
    XXH128_hash_t hh = XXH3_128bits(item, len);
    *h1 = hh.low64;
    *h2 = hh.high64 | 1ULL;   /* force odd so the k probes spread over the pow2 table */
}

/* set k bits; return 1 if the item was probably NEW (at least one bit was 0), else 0 */
static int bf_add_locked(BfHandle *h, const void *item, size_t len) {
    uint64_t h1, h2;
    bf_indices(h, item, len, &h1, &h2);
    uint64_t mask = h->hdr->m_mask;
    uint32_t k = h->hdr->k;
    uint64_t *bits = bf_bits(h);
    int was_new = 0;
    for (uint32_t i = 0; i < k; i++) {
        uint64_t idx = (h1 + (uint64_t)i * h2) & mask;
        uint64_t word = idx >> 6;
        uint64_t bit  = 1ULL << (idx & 63);
        if (!(bits[word] & bit)) { bits[word] |= bit; was_new = 1; }
    }
    return was_new;
}

/* return 1 if ALL k bits are set (probably present), else 0 */
static int bf_contains_locked(BfHandle *h, const void *item, size_t len) {
    uint64_t h1, h2;
    bf_indices(h, item, len, &h1, &h2);
    uint64_t mask = h->hdr->m_mask;
    uint32_t k = h->hdr->k;
    uint64_t *bits = bf_bits(h);
    for (uint32_t i = 0; i < k; i++) {
        uint64_t idx = (h1 + (uint64_t)i * h2) & mask;
        uint64_t word = idx >> 6;
        uint64_t bit  = 1ULL << (idx & 63);
        if (!(bits[word] & bit)) return 0;
    }
    return 1;
}

/* count set bits across the whole array (caller holds a lock) */
static uint64_t bf_popcount_locked(BfHandle *h) {
    uint64_t *bits = bf_bits(h);
    uint64_t words = h->hdr->m_bits / 64;
    uint64_t n = 0;
    for (uint64_t i = 0; i < words; i++)
        n += (uint64_t)__builtin_popcountll(bits[i]);
    return n;
}

/* estimate the number of distinct items added, from a pre-computed popcount X.
   n_est = -(m/k) * ln(1 - X/m); saturated -> capacity. (caller holds a lock) */
static uint64_t bf_count_from_popcount(BfHandle *h, uint64_t X) {
    uint64_t m_bits = h->hdr->m_bits;
    uint32_t k = h->hdr->k;
    if (X >= m_bits) return h->hdr->capacity;     /* saturated */
    double n_est = -((double)m_bits / (double)k) * log(1.0 - (double)X / (double)m_bits);
    if (n_est < 0.0) n_est = 0.0;
    return (uint64_t)(n_est + 0.5);
}

/* estimate the number of distinct items added (popcounts the array). (caller holds a lock) */
static uint64_t bf_count_locked(BfHandle *h) {
    return bf_count_from_popcount(h, bf_popcount_locked(h));
}

/* merge src words into dst (caller guarantees equal m_bits); bitwise OR */
static void bf_merge_words(BfHandle *dst, const uint64_t *src_words) {
    uint64_t *bits = bf_bits(dst);
    uint64_t words = dst->hdr->m_bits / 64;
    for (uint64_t i = 0; i < words; i++)
        bits[i] |= src_words[i];
}

/* reset all bits to 0 (caller holds the write lock) */
static inline void bf_clear_locked(BfHandle *h) {
    memset(bf_bits(h), 0, (size_t)(h->hdr->m_bits / 8));
}

#endif /* BLOOM_H */

t/01-basic.t  view on Meta::CPAN

    ok !$a->contains("B-1"), 'a does not contain b items before merge';
    $a->merge($b);
    ok $a->contains("A-1"), 'merge keeps a items';
    ok $a->contains("B-1"), 'merge unions in b items';
    my $miss = 0;
    $a->contains("B-$_") or $miss++ for 1 .. 500;
    is $miss, 0, 'merge: no false negatives for any merged-in item';
}

# self-merge is a no-op and must not deadlock (the snapshot releases the read
# lock before taking the write lock, so the same handle is locked sequentially)
{
    my $s = Data::BloomFilter::Shared->new(undef, 1000, 0.01);
    $s->add_many([ map { "s-$_" } 1 .. 300 ]);
    my $before = $s->stats->{bits_set};
    my $ok = eval { local $SIG{ALRM} = sub { die "deadlock\n" }; alarm 5; $s->merge($s); alarm 0; 1 };
    ok $ok, 'self-merge does not deadlock';
    is $s->stats->{bits_set}, $before, 'self-merge is a no-op (bits unchanged)';
    my $sm = 0; $s->contains("s-$_") or $sm++ for 1 .. 300;
    is $sm, 0, 'self-merge keeps all items';
}



( run in 0.765 second using v1.01-cache-2.11-cpan-bbe5e583499 )