Data-SortedSet-Shared
view release on metacpan or search on metacpan
for my $k (1 .. 4) {
unless (fork) { # child
$z->add($k * 1_000_000 + $_, rand) for 1 .. 1000;
exit;
}
}
1 while wait != -1; # reap children
print $z->count, "\n"; # 4000
Every operation is serialized by the rwlock, so concurrent writers do
not corrupt the tree. A writer can wake readers blocked in other
processes through the eventfd interface: it calls "notify" after a
batch, and a reader selects on "fileno" then drains the count with
"eventfd_consume".
COMPLEXITY
"score"/"exists"/"peek_*" are O(1); "add"/"remove"/"incr"/"rank"/
"at_rank"/"pop_*" and locating a range bound are O(log n); a range or
iteration of "k" members is O(log n + k), scanning sequentially through
the linked leaves.
OUTPUT:
RETVAL
void
clear(self)
SV *self
PREINIT:
EXTRACT(self);
CODE:
ss_rwlock_wrlock(h);
ss_clear_locked(h);
ss_rwlock_wrunlock(h);
SV *
add(self, member, score)
SV *self
IV member
NV score
PREINIT:
EXTRACT(self);
int rc;
CODE:
if (score != score) croak("add: score must not be NaN");
ss_rwlock_wrlock(h);
rc = ss_add_locked(h, (int64_t)member, (double)score);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
ss_rwlock_wrunlock(h);
RETVAL = (rc < 0) ? &PL_sv_undef : newSViv(rc);
OUTPUT:
RETVAL
SV *
score(self, member)
SV *self
IV member
RETVAL
bool
remove(self, member)
SV *self
IV member
PREINIT:
EXTRACT(self);
CODE:
ss_rwlock_wrlock(h);
RETVAL = ss_remove_locked(h, (int64_t)member);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
ss_rwlock_wrunlock(h);
OUTPUT:
RETVAL
NV
incr(self, member, delta)
SV *self
IV member
NV delta
PREINIT:
EXTRACT(self);
double out;
int rc;
CODE:
if (delta != delta) croak("incr: delta must not be NaN");
ss_rwlock_wrlock(h);
rc = ss_incr_locked(h, (int64_t)member, (double)delta, &out);
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
ss_rwlock_wrunlock(h);
if (rc == -1) croak("incr: max_entries exhausted");
if (rc == -2) croak("incr: result is NaN");
RETVAL = out;
OUTPUT:
RETVAL
void
pop_min(self)
SV *self
PREINIT:
EXTRACT(self);
PPCODE:
{
int64_t m; double s; int ok;
ss_rwlock_wrlock(h);
ok = ss_pop_locked(h, 0, &m, &s);
if (ok) __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
ss_rwlock_wrunlock(h);
if (ok) { EXTEND(SP, 2); PUSHs(sv_2mortal(newSViv((IV)m))); PUSHs(sv_2mortal(newSVnv(s))); }
}
void
pop_max(self)
SV *self
PREINIT:
EXTRACT(self);
PPCODE:
{
int64_t m; double s; int ok;
ss_rwlock_wrlock(h);
ok = ss_pop_locked(h, 1, &m, &s);
if (ok) __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
ss_rwlock_wrunlock(h);
if (ok) { EXTEND(SP, 2); PUSHs(sv_2mortal(newSViv((IV)m))); PUSHs(sv_2mortal(newSVnv(s))); }
}
bool
_validate(self)
SV *self
PREINIT:
EXTRACT(self);
ss_rwlock_wrlock(h);
for (SSize_t i = 0; i < nr; i++) {
SV **rv = av_fetch(av, i, 0);
if (!rv || !SvROK(*rv) || SvTYPE(SvRV(*rv)) != SVt_PVAV) continue; /* skip malformed */
AV *row = (AV *)SvRV(*rv);
if (av_len(row) + 1 < 2) continue;
SV **ms = av_fetch(row, 0, 0), **sv = av_fetch(row, 1, 0);
if (!ms || !sv) continue;
double score = SvNV(*sv);
if (score != score) continue; /* skip NaN */
int rc = ss_add_locked(h, (int64_t)SvIV(*ms), score);
if (rc == 1) added++;
else if (rc == -1) break; /* pool full */
}
__atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
ss_rwlock_wrunlock(h);
}
RETVAL = added;
OUTPUT:
RETVAL
lib/Data/SortedSet/Shared.pm view on Meta::CPAN
for my $k (1 .. 4) {
unless (fork) { # child
$z->add($k * 1_000_000 + $_, rand) for 1 .. 1000;
exit;
}
}
1 while wait != -1; # reap children
print $z->count, "\n"; # 4000
Every operation is serialized by the rwlock, so concurrent writers do not corrupt
the tree. A writer can wake readers blocked in other processes through the
eventfd interface: it calls C<notify> after a batch, and a reader selects on
C<fileno> then drains the count with C<eventfd_consume>.
=head1 COMPLEXITY
C<score>/C<exists>/C<peek_*> are O(1); C<add>/C<remove>/C<incr>/C<rank>/
C<at_rank>/C<pop_*> and locating a range bound are O(log n); a range or iteration
of C<k> members is O(log n + k), scanning sequentially through the linked leaves.
=head1 STATS
sortedset.h view on Meta::CPAN
static inline void ss_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define SS_RWLOCK_WRITER_BIT 0x80000000U
#define SS_RWLOCK_PID_MASK 0x7FFFFFFFU
#define SS_RWLOCK_WR(pid) (SS_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SS_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Robust detection would require
* a per-slot process-start-time epoch (a header-layout/version change).
sortedset.h view on Meta::CPAN
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < SS_RWLOCK_SPIN_LIMIT, 1)) {
ss_rwlock_spin_pause();
continue;
}
ss_park_reader(h);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR when yielding to waiting writers */
if (cur >= SS_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&ss_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
ss_unpark_reader(h);
ss_recover_after_timeout(h);
spin = 0;
continue;
}
}
sortedset.h view on Meta::CPAN
uint64_t v = 0;
if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
return (int64_t)v;
}
/* ================================================================
* Sorted set: node pool, member index, B+tree (callers hold the lock)
* ================================================================ */
/* reset to the empty set (caller holds the write lock) */
static inline void ss_clear_locked(SsHandle *h) {
SsHeader *hdr = h->hdr;
hdr->count = 0;
hdr->root = SS_NONE;
hdr->height = 0;
hdr->leftmost = SS_NONE;
hdr->rightmost = SS_NONE;
for (uint32_t i = 0; i < hdr->node_capacity; i++)
h->nodes[i].parent = (i + 1 < hdr->node_capacity) ? (i + 1) : SS_NONE;
hdr->node_free_head = 0;
memset(h->index, 0, (size_t)hdr->index_slots * sizeof(SsIdxSlot));
sortedset.h view on Meta::CPAN
}
} else if (root->num == 1) {
uint32_t child = root->children[0];
ss_node_free(h, hdr->root);
hdr->root = child; h->nodes[child].parent = SS_NONE; hdr->height--;
}
hdr->count--;
}
/* add: 1 (new), 0 (existing -- score updated if changed), -1 (full) */
static int ss_add_locked(SsHandle *h, int64_t member, double score) {
double old;
if (ss_idx_get(h, member, &old)) {
if (old != score) { ss_tree_del(h, old, member); ss_tree_add(h, score, member); ss_idx_set(h, member, score); }
return 0;
}
if (h->hdr->count >= h->hdr->max_entries) return -1;
ss_tree_add(h, score, member);
ss_idx_set(h, member, score);
return 1;
}
/* remove: 1 if removed, 0 if absent */
static int ss_remove_locked(SsHandle *h, int64_t member) {
double old;
if (!ss_idx_get(h, member, &old)) return 0;
ss_tree_del(h, old, member);
ss_idx_del(h, member);
return 1;
}
/* incr by delta. *out = new score. returns 1 (created), 0 (updated), -1 (full),
-2 (result is NaN) */
static int ss_incr_locked(SsHandle *h, int64_t member, double delta, double *out) {
double old;
if (ss_idx_get(h, member, &old)) {
double ns = old + delta; *out = ns;
if (ns != ns) return -2;
if (ns != old) { ss_tree_del(h, old, member); ss_tree_add(h, ns, member); ss_idx_set(h, member, ns); }
return 0;
}
if (h->hdr->count >= h->hdr->max_entries) return -1;
*out = delta;
if (delta != delta) return -2;
ss_tree_add(h, delta, member); ss_idx_set(h, member, delta);
return 1;
}
/* pop the min (max=0) or max (max=1): 0 if empty, else 1 with *m,*s */
static int ss_pop_locked(SsHandle *h, int max, int64_t *m, double *s) {
if (h->hdr->root == SS_NONE) return 0;
SsNode *nd = &h->nodes[max ? h->hdr->rightmost : h->hdr->leftmost];
int pos = max ? nd->num - 1 : 0;
*m = nd->members[pos]; *s = nd->scores[pos];
ss_tree_del(h, *s, *m);
ss_idx_del(h, *m);
return 1;
}
/* ---- structural validator (debug / tests) ---- */
( run in 0.775 second using v1.01-cache-2.11-cpan-bbe5e583499 )