Data-BloomFilter-Shared
view release on metacpan or search on metacpan
add(self, item)
SV *self
SV *item
PREINIT:
EXTRACT(self);
STRLEN n;
const char *s;
CODE:
s = SvPVbyte(item, n); /* may croak (wide char) -- BEFORE the lock */
bf_rwlock_wrlock(h);
RETVAL = bf_add_locked(h, s, n);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
bf_rwlock_wrunlock(h);
OUTPUT:
RETVAL
UV
add_many(self, items)
SV *self
SV *items
PREINIT:
const char **ps = NULL; STRLEN *ls = NULL;
if (cnt) { /* resolve all bytes BEFORE locking */
Newx(ps, cnt, const char *); SAVEFREEPV(ps); /* freed on return OR unwind */
Newx(ls, cnt, STRLEN); SAVEFREEPV(ls);
for (i = 0; i < cnt; i++) { /* a croak here holds NO lock; SAVEFREEPV cleans up */
SV **el = av_fetch(av, (SSize_t)i, 0);
if (el && *el) ps[i] = SvPVbyte(*el, ls[i]);
else { ps[i] = ""; ls[i] = 0; }
}
}
bf_rwlock_wrlock(h); /* locked region: NO croak-capable calls */
for (i = 0; i < cnt; i++) added += (UV)bf_add_locked(h, ps[i], ls[i]);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED); /* a call always counts, even an empty batch */
bf_rwlock_wrunlock(h);
}
RETVAL = added;
OUTPUT:
RETVAL
int
contains(self, item)
SV *self
SV *item
PREINIT:
EXTRACT(self);
STRLEN n;
const char *s;
CODE:
s = SvPVbyte(item, n); /* may croak (wide char) -- BEFORE the lock */
bf_rwlock_rdlock(h);
RETVAL = bf_contains_locked(h, s, n);
bf_rwlock_rdunlock(h);
OUTPUT:
RETVAL
void
merge(self, other)
SV *self
SV *other
PREINIT:
EXTRACT(self);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
bf_rwlock_wrunlock(h);
void
clear(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
bf_rwlock_wrlock(h);
bf_clear_locked(h);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
bf_rwlock_wrunlock(h);
UV
capacity(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
RETVAL = (UV)h->hdr->capacity;
RETVAL
UV
count(self)
SV *self
PREINIT:
EXTRACT(self);
UV n;
CODE:
bf_rwlock_rdlock(h);
n = (UV)bf_count_locked(h);
bf_rwlock_rdunlock(h);
RETVAL = n;
OUTPUT:
RETVAL
SV *
stats(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
{
uint64_t X, m_bits, capacity, ops, n_est;
uint32_t k;
double fp_rate;
/* Snapshot under the lock; do all (croak-capable) Perl allocation after
releasing it -- so an OOM in newHV/newSVuv can never strand the lock. */
bf_rwlock_rdlock(h);
X = bf_popcount_locked(h);
n_est = bf_count_from_popcount(h, X); /* reuse X -- no second scan */
m_bits = h->hdr->m_bits;
k = h->hdr->k;
capacity = h->hdr->capacity;
fp_rate = h->hdr->fp_rate;
ops = h->hdr->stat_ops;
bf_rwlock_rdunlock(h);
HV *hv = newHV();
hv_stores(hv, "capacity", newSVuv((UV)capacity));
static inline void bf_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define BF_RWLOCK_WRITER_BIT 0x80000000U
#define BF_RWLOCK_PID_MASK 0x7FFFFFFFU
#define BF_RWLOCK_WR(pid) (BF_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & BF_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Robust detection would require
* a per-slot process-start-time epoch (a header-layout/version change).
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < BF_RWLOCK_SPIN_LIMIT, 1)) {
bf_rwlock_spin_pause();
continue;
}
bf_park_reader(h);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR when yielding to waiting writers */
if (cur >= BF_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&bf_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
bf_unpark_reader(h);
bf_recover_after_timeout(h);
spin = 0;
continue;
}
}
static inline void bf_indices(BfHandle *h, const void *item, size_t len,
uint64_t *h1, uint64_t *h2) {
(void)h;
XXH128_hash_t hh = XXH3_128bits(item, len);
*h1 = hh.low64;
*h2 = hh.high64 | 1ULL; /* force odd so the k probes spread over the pow2 table */
}
/* set k bits; return 1 if the item was probably NEW (at least one bit was 0), else 0 */
static int bf_add_locked(BfHandle *h, const void *item, size_t len) {
uint64_t h1, h2;
bf_indices(h, item, len, &h1, &h2);
uint64_t mask = h->hdr->m_mask;
uint32_t k = h->hdr->k;
uint64_t *bits = bf_bits(h);
int was_new = 0;
for (uint32_t i = 0; i < k; i++) {
uint64_t idx = (h1 + (uint64_t)i * h2) & mask;
uint64_t word = idx >> 6;
uint64_t bit = 1ULL << (idx & 63);
if (!(bits[word] & bit)) { bits[word] |= bit; was_new = 1; }
}
return was_new;
}
/* return 1 if ALL k bits are set (probably present), else 0 */
static int bf_contains_locked(BfHandle *h, const void *item, size_t len) {
uint64_t h1, h2;
bf_indices(h, item, len, &h1, &h2);
uint64_t mask = h->hdr->m_mask;
uint32_t k = h->hdr->k;
uint64_t *bits = bf_bits(h);
for (uint32_t i = 0; i < k; i++) {
uint64_t idx = (h1 + (uint64_t)i * h2) & mask;
uint64_t word = idx >> 6;
uint64_t bit = 1ULL << (idx & 63);
if (!(bits[word] & bit)) return 0;
}
return 1;
}
/* count set bits across the whole array (caller holds a lock) */
static uint64_t bf_popcount_locked(BfHandle *h) {
uint64_t *bits = bf_bits(h);
uint64_t words = h->hdr->m_bits / 64;
uint64_t n = 0;
for (uint64_t i = 0; i < words; i++)
n += (uint64_t)__builtin_popcountll(bits[i]);
return n;
}
/* estimate the number of distinct items added, from a pre-computed popcount X.
n_est = -(m/k) * ln(1 - X/m); saturated -> capacity. (caller holds a lock) */
static uint64_t bf_count_from_popcount(BfHandle *h, uint64_t X) {
uint64_t m_bits = h->hdr->m_bits;
uint32_t k = h->hdr->k;
if (X >= m_bits) return h->hdr->capacity; /* saturated */
double n_est = -((double)m_bits / (double)k) * log(1.0 - (double)X / (double)m_bits);
if (n_est < 0.0) n_est = 0.0;
return (uint64_t)(n_est + 0.5);
}
/* estimate the number of distinct items added (popcounts the array). (caller holds a lock) */
static uint64_t bf_count_locked(BfHandle *h) {
return bf_count_from_popcount(h, bf_popcount_locked(h));
}
/* merge src words into dst (caller guarantees equal m_bits); bitwise OR */
static void bf_merge_words(BfHandle *dst, const uint64_t *src_words) {
uint64_t *bits = bf_bits(dst);
uint64_t words = dst->hdr->m_bits / 64;
for (uint64_t i = 0; i < words; i++)
bits[i] |= src_words[i];
}
/* reset all bits to 0 (caller holds the write lock) */
static inline void bf_clear_locked(BfHandle *h) {
memset(bf_bits(h), 0, (size_t)(h->hdr->m_bits / 8));
}
#endif /* BLOOM_H */
t/01-basic.t view on Meta::CPAN
ok !$a->contains("B-1"), 'a does not contain b items before merge';
$a->merge($b);
ok $a->contains("A-1"), 'merge keeps a items';
ok $a->contains("B-1"), 'merge unions in b items';
my $miss = 0;
$a->contains("B-$_") or $miss++ for 1 .. 500;
is $miss, 0, 'merge: no false negatives for any merged-in item';
}
# self-merge is a no-op and must not deadlock (the snapshot releases the read
# lock before taking the write lock, so the same handle is locked sequentially)
{
my $s = Data::BloomFilter::Shared->new(undef, 1000, 0.01);
$s->add_many([ map { "s-$_" } 1 .. 300 ]);
my $before = $s->stats->{bits_set};
my $ok = eval { local $SIG{ALRM} = sub { die "deadlock\n" }; alarm 5; $s->merge($s); alarm 0; 1 };
ok $ok, 'self-merge does not deadlock';
is $s->stats->{bits_set}, $before, 'self-merge is a no-op (bits unchanged)';
my $sm = 0; $s->contains("s-$_") or $sm++ for 1 .. 300;
is $sm, 0, 'self-merge keeps all items';
}
( run in 0.765 second using v1.01-cache-2.11-cpan-bbe5e583499 )