Data-RoaringBitmap-Shared
view release on metacpan or search on metacpan
croak("Data::RoaringBitmap::Shared->add: value %" UVuf " exceeds uint32 (max 4294967295)", x);
hi = (uint32_t)(x >> 16);
rb_rwlock_wrlock(h);
/* Need a free container slot only if this value touches an empty bucket. */
if (rb_buckets(h)[hi].type == RB_TYPE_NONE && rb_avail_slots(h) == 0) {
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED); /* a call that took the write lock counts (matches add_many) */
rb_rwlock_wrunlock(h); /* release BEFORE croak */
croak("Data::RoaringBitmap::Shared->add: container pool exhausted "
"(grow container_capacity)");
}
added = rb_add_locked(h, (uint32_t)x);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
rb_rwlock_wrunlock(h);
RETVAL = (IV)added;
OUTPUT:
RETVAL
IV
add_many(self, ints)
SV *self
SV *ints
total_added = 0;
rb_rwlock_wrlock(h);
for (i = 0; i < n; i++) {
uint32_t hi = (uint32_t)(vals[i] >> 16);
if (rb_buckets(h)[hi].type == RB_TYPE_NONE && rb_avail_slots(h) == 0) {
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
rb_rwlock_wrunlock(h); /* release BEFORE croak; partial adds remain (documented non-atomic) */
croak("Data::RoaringBitmap::Shared->add_many: container pool exhausted "
"after adding %" IVdf " element(s) (grow container_capacity)", total_added);
}
total_added += rb_add_locked(h, (uint32_t)vals[i]);
}
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
rb_rwlock_wrunlock(h);
RETVAL = total_added;
OUTPUT:
RETVAL
bool
contains(self, x)
SV *self
UV x
PREINIT:
EXTRACT(self);
CODE:
if (x > RB_UINT32_MAX_UV) { RETVAL = 0; } /* a value out of range is simply absent */
else {
rb_rwlock_rdlock(h);
RETVAL = rb_contains_locked(h, (uint32_t)x) ? 1 : 0;
rb_rwlock_rdunlock(h);
}
OUTPUT:
RETVAL
IV
remove(self, x)
SV *self
UV x
PREINIT:
EXTRACT(self);
int removed;
CODE:
if (x > RB_UINT32_MAX_UV) { RETVAL = 0; } /* out of range -> nothing to remove */
else {
rb_rwlock_wrlock(h);
removed = rb_remove_locked(h, (uint32_t)x);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
rb_rwlock_wrunlock(h);
RETVAL = (IV)removed;
}
OUTPUT:
RETVAL
UV
cardinality(self)
SV *self
SV *
min(self)
SV *self
PREINIT:
EXTRACT(self);
uint32_t v;
int found;
CODE:
rb_rwlock_rdlock(h);
found = rb_min_locked(h, &v);
rb_rwlock_rdunlock(h);
RETVAL = found ? newSVuv((UV)v) : &PL_sv_undef;
OUTPUT:
RETVAL
SV *
max(self)
SV *self
PREINIT:
EXTRACT(self);
uint32_t v;
int found;
CODE:
rb_rwlock_rdlock(h);
found = rb_max_locked(h, &v);
rb_rwlock_rdunlock(h);
RETVAL = found ? newSVuv((UV)v) : &PL_sv_undef;
OUTPUT:
RETVAL
SV *
to_array(self)
SV *self
PREINIT:
EXTRACT(self);
} else {
uint32_t need;
rb_lock_pair(h, o);
need = rb_union_new_slots_needed(h, o);
if (rb_avail_slots(h) < need) {
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
rb_unlock_pair(h, o); /* release BEFORE croak */
croak("Data::RoaringBitmap::Shared->union: container pool exhausted "
"(needs %u more container(s); grow container_capacity)", need);
}
rb_union_locked(h, o);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
rb_unlock_pair(h, o);
SvREFCNT_inc(self); /* return the receiver for chaining */
RETVAL = self;
}
}
OUTPUT:
RETVAL
SV *
if (!o) croak("Attempted to use a destroyed Data::RoaringBitmap::Shared object");
/* Same underlying bitmap (same handle, or two handles to one mapping
* sharing a bitmap_id) -> a &= a is a no-op. Short-circuit before
* locking to avoid self-deadlocking on the one shared rwlock. */
if (o == h || o->hdr->bitmap_id == h->hdr->bitmap_id) {
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
SvREFCNT_inc(self);
RETVAL = self;
} else {
rb_lock_pair(h, o); /* intersect never needs new slots */
rb_intersect_locked(h, o);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
rb_unlock_pair(h, o);
SvREFCNT_inc(self);
RETVAL = self;
}
}
OUTPUT:
RETVAL
void
clear(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
rb_rwlock_wrlock(h);
rb_clear_locked(h);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
rb_rwlock_wrunlock(h);
SV *
stats(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
{
static inline void rb_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define RB_RWLOCK_WRITER_BIT 0x80000000U
#define RB_RWLOCK_PID_MASK 0x7FFFFFFFU
#define RB_RWLOCK_WR(pid) (RB_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & RB_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Documented under "Crash Safety". */
static inline int rb_pid_alive(uint32_t pid) {
uint32_t mid = lo_i + ((hi_i - lo_i) >> 1);
if (vals[mid] < lo) lo_i = mid + 1;
else hi_i = mid;
}
*pos = lo_i;
return (lo_i < card && vals[lo_i] == lo);
}
/* ================================================================
* Single-bitmap operations. Callers hold the WRITE lock (mutators) or the
* READ lock (rb_contains_locked).
* ================================================================ */
/* Convert bucket `hi`'s array container to a bitmap, in place (same slot).
* The slot is reinterpreted, so the array values are copied to a C-stack temp
* first. The array is at most RB_ARRAY_MAX entries (a full array container). */
static inline void rb_array_to_bitmap(RbHandle *h, uint32_t hi) {
RbBucket *bt = &rb_buckets(h)[hi];
uint32_t card = bt->cardinality;
uint16_t tmp[RB_ARRAY_MAX];
uint16_t *vals = rb_array(h, bt->container_off);
for (uint32_t i = 0; i < card; i++) {
uint16_t lo = tmp[i];
bits[lo >> 6] |= (uint64_t)1 << (lo & 63);
}
bt->type = RB_TYPE_BITMAP;
}
/* Add x to the set. Returns 1 if newly added, 0 if already present. The
* caller has verified (under the lock) that a free slot exists when the target
* bucket is currently empty. */
static inline int rb_add_locked(RbHandle *h, uint32_t x) {
uint32_t hi = x >> 16;
uint16_t lo = (uint16_t)(x & 0xffff);
RbBucket *bt = &rb_buckets(h)[hi];
if (bt->type == RB_TYPE_NONE) {
uint32_t s = rb_alloc_slot(h); /* guaranteed available by caller */
bt->container_off = s;
bt->type = RB_TYPE_ARRAY;
bt->cardinality = 1;
rb_array(h, s)[0] = lo;
uint64_t b = (uint64_t)1 << (lo & 63);
if (bits[w] & b) return 0;
bits[w] |= b;
bt->cardinality++;
h->hdr->cardinality++;
return 1;
}
}
/* Membership test. Read-only. */
static inline int rb_contains_locked(RbHandle *h, uint32_t x) {
uint32_t hi = x >> 16;
uint16_t lo = (uint16_t)(x & 0xffff);
RbBucket *bt = &rb_buckets(h)[hi];
if (bt->type == RB_TYPE_NONE) return 0;
if (bt->type == RB_TYPE_ARRAY) {
uint32_t pos;
return rb_array_search(rb_array(h, bt->container_off), bt->cardinality, lo, &pos);
}
{
uint64_t *bits = rb_bitmap(h, bt->container_off);
return (bits[lo >> 6] >> (lo & 63)) & 1;
}
}
/* Remove x from the set. Returns 1 if removed, 0 if absent. Frees the slot
* (and clears the bucket) when the last element of a bucket is removed. v1
* does NOT down-convert a bitmap to an array. */
static inline int rb_remove_locked(RbHandle *h, uint32_t x) {
uint32_t hi = x >> 16;
uint16_t lo = (uint16_t)(x & 0xffff);
RbBucket *bt = &rb_buckets(h)[hi];
if (bt->type == RB_TYPE_NONE) return 0;
if (bt->type == RB_TYPE_ARRAY) {
uint16_t *vals = rb_array(h, bt->container_off);
uint32_t pos;
if (!rb_array_search(vals, bt->cardinality, lo, &pos)) return 0;
memmove(&vals[pos], &vals[pos + 1], (size_t)(bt->cardinality - pos - 1) * sizeof(uint16_t));
bt->cardinality--;
rb_free_slot(h, bt->container_off);
bt->container_off = 0;
bt->type = RB_TYPE_NONE;
}
return 1;
}
}
/* Reset to empty: free every container, zero the bucket table, reset the pool.
* Caller holds the write lock. */
static inline void rb_clear_locked(RbHandle *h) {
RbHeader *hdr = h->hdr;
RbBucket *bt = rb_buckets(h);
memset(bt, 0, (size_t)RB_NUM_BUCKETS * sizeof(RbBucket));
hdr->container_used = 1; /* slot 0 reserved */
hdr->free_head = 0;
hdr->free_count = 0;
hdr->cardinality = 0;
}
/* Smallest set element. Returns 1 and sets *out, else 0 (empty set). */
static inline int rb_min_locked(RbHandle *h, uint32_t *out) {
RbBucket *bt = rb_buckets(h);
for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++) {
if (bt[hi].type == RB_TYPE_NONE) continue;
if (bt[hi].type == RB_TYPE_ARRAY) {
*out = (hi << 16) | rb_array(h, bt[hi].container_off)[0];
return 1;
}
{
uint64_t *bits = rb_bitmap(h, bt[hi].container_off);
for (uint32_t w = 0; w < 1024; w++) {
*out = (hi << 16) | lo;
return 1;
}
}
}
}
return 0;
}
/* Largest set element. Returns 1 and sets *out, else 0 (empty set). */
static inline int rb_max_locked(RbHandle *h, uint32_t *out) {
RbBucket *bt = rb_buckets(h);
for (uint32_t hi = RB_NUM_BUCKETS; hi-- > 0; ) {
if (bt[hi].type == RB_TYPE_NONE) continue;
if (bt[hi].type == RB_TYPE_ARRAY) {
*out = (hi << 16) | rb_array(h, bt[hi].container_off)[bt[hi].cardinality - 1];
return 1;
}
{
uint64_t *bits = rb_bitmap(h, bt[hi].container_off);
for (uint32_t w = 1024; w-- > 0; ) {
/* Recompute the total cardinality from the per-bucket cards (after a set op). */
static inline void rb_recompute_cardinality(RbHandle *a, RbBucket *abt) {
uint64_t total = 0;
for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++)
if (abt[hi].type != RB_TYPE_NONE) total += abt[hi].cardinality;
a->hdr->cardinality = total;
}
/* a |= b. Caller has verified rb_avail_slots(a) >= rb_union_new_slots_needed.
* Every bucket combination is handled in place. */
static inline void rb_union_locked(RbHandle *a, RbHandle *b) {
RbBucket *abt = rb_buckets(a);
RbBucket *bbt = rb_buckets(b);
for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++) {
if (bbt[hi].type == RB_TYPE_NONE) continue;
if (abt[hi].type == RB_TYPE_NONE) {
/* a lacks this bucket -> copy b's container wholesale. */
uint32_t s = rb_alloc_slot(a); /* guaranteed available */
memcpy(rb_slot(a, s), rb_slot(b, bbt[hi].container_off), RB_CONTAINER_BYTES);
abt[hi].container_off = s;
uint64_t *abits = rb_bitmap(a, abt[hi].container_off);
abt[hi].cardinality = rb_or_into_bitmap(abits, b, &bbt[hi]);
}
}
rb_recompute_cardinality(a, abt);
}
/* a &= b. Never needs new slots (intersection only shrinks or frees). Caller
* holds a's write lock and b's read lock. */
static inline void rb_intersect_locked(RbHandle *a, RbHandle *b) {
RbBucket *abt = rb_buckets(a);
RbBucket *bbt = rb_buckets(b);
for (uint32_t hi = 0; hi < RB_NUM_BUCKETS; hi++) {
if (abt[hi].type == RB_TYPE_NONE) continue;
if (bbt[hi].type == RB_TYPE_NONE) {
rb_free_slot(a, abt[hi].container_off);
abt[hi].container_off = 0;
abt[hi].type = RB_TYPE_NONE;
abt[hi].cardinality = 0;
( run in 0.333 second using v1.01-cache-2.11-cpan-bbe5e583499 )