Data-SortedSet-Shared

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

        for my $k (1 .. 4) {
            unless (fork) {                                  # child
                $z->add($k * 1_000_000 + $_, rand) for 1 .. 1000;
                exit;
            }
        }
        1 while wait != -1;                                  # reap children
        print $z->count, "\n";                               # 4000

    Every operation is serialized by the rwlock, so concurrent writers do
    not corrupt the tree. A writer can wake readers blocked in other
    processes through the eventfd interface: it calls "notify" after a
    batch, and a reader selects on "fileno" then drains the count with
    "eventfd_consume".

COMPLEXITY
    "score"/"exists"/"peek_*" are O(1); "add"/"remove"/"incr"/"rank"/
    "at_rank"/"pop_*" and locating a range bound are O(log n); a range or
    iteration of "k" members is O(log n + k), scanning sequentially through
    the linked leaves.

Shared.xs  view on Meta::CPAN

  OUTPUT:
    RETVAL

void
clear(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  CODE:
    ss_rwlock_wrlock(h);
    ss_clear_locked(h);
    ss_rwlock_wrunlock(h);

SV *
add(self, member, score)
    SV *self
    IV member
    NV score
  PREINIT:
    EXTRACT(self);
    int rc;
  CODE:
    if (score != score) croak("add: score must not be NaN");
    ss_rwlock_wrlock(h);
    rc = ss_add_locked(h, (int64_t)member, (double)score);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    ss_rwlock_wrunlock(h);
    RETVAL = (rc < 0) ? &PL_sv_undef : newSViv(rc);
  OUTPUT:
    RETVAL

SV *
score(self, member)
    SV *self
    IV member

Shared.xs  view on Meta::CPAN

    RETVAL

bool
remove(self, member)
    SV *self
    IV member
  PREINIT:
    EXTRACT(self);
  CODE:
    ss_rwlock_wrlock(h);
    RETVAL = ss_remove_locked(h, (int64_t)member);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    ss_rwlock_wrunlock(h);
  OUTPUT:
    RETVAL

NV
incr(self, member, delta)
    SV *self
    IV member
    NV delta
  PREINIT:
    EXTRACT(self);
    double out;
    int rc;
  CODE:
    if (delta != delta) croak("incr: delta must not be NaN");
    ss_rwlock_wrlock(h);
    rc = ss_incr_locked(h, (int64_t)member, (double)delta, &out);
    __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
    ss_rwlock_wrunlock(h);
    if (rc == -1) croak("incr: max_entries exhausted");
    if (rc == -2) croak("incr: result is NaN");
    RETVAL = out;
  OUTPUT:
    RETVAL

void
pop_min(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  PPCODE:
    {
        int64_t m; double s; int ok;
        ss_rwlock_wrlock(h);
        ok = ss_pop_locked(h, 0, &m, &s);
        if (ok) __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
        ss_rwlock_wrunlock(h);
        if (ok) { EXTEND(SP, 2); PUSHs(sv_2mortal(newSViv((IV)m))); PUSHs(sv_2mortal(newSVnv(s))); }
    }

void
pop_max(self)
    SV *self
  PREINIT:
    EXTRACT(self);
  PPCODE:
    {
        int64_t m; double s; int ok;
        ss_rwlock_wrlock(h);
        ok = ss_pop_locked(h, 1, &m, &s);
        if (ok) __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
        ss_rwlock_wrunlock(h);
        if (ok) { EXTEND(SP, 2); PUSHs(sv_2mortal(newSViv((IV)m))); PUSHs(sv_2mortal(newSVnv(s))); }
    }

bool
_validate(self)
    SV *self
  PREINIT:
    EXTRACT(self);

Shared.xs  view on Meta::CPAN

        ss_rwlock_wrlock(h);
        for (SSize_t i = 0; i < nr; i++) {
            SV **rv = av_fetch(av, i, 0);
            if (!rv || !SvROK(*rv) || SvTYPE(SvRV(*rv)) != SVt_PVAV) continue;   /* skip malformed */
            AV *row = (AV *)SvRV(*rv);
            if (av_len(row) + 1 < 2) continue;
            SV **ms = av_fetch(row, 0, 0), **sv = av_fetch(row, 1, 0);
            if (!ms || !sv) continue;
            double score = SvNV(*sv);
            if (score != score) continue;                                       /* skip NaN */
            int rc = ss_add_locked(h, (int64_t)SvIV(*ms), score);
            if (rc == 1) added++;
            else if (rc == -1) break;                                           /* pool full */
        }
        __atomic_fetch_add(&h->hdr->stat_ops, 1, __ATOMIC_RELAXED);
        ss_rwlock_wrunlock(h);
    }
    RETVAL = added;
  OUTPUT:
    RETVAL

lib/Data/SortedSet/Shared.pm  view on Meta::CPAN

    for my $k (1 .. 4) {
        unless (fork) {                                  # child
            $z->add($k * 1_000_000 + $_, rand) for 1 .. 1000;
            exit;
        }
    }
    1 while wait != -1;                                  # reap children
    print $z->count, "\n";                               # 4000

Every operation is serialized by the rwlock, so concurrent writers do not corrupt
the tree.  A writer can wake readers blocked in other processes through the
eventfd interface: it calls C<notify> after a batch, and a reader selects on
C<fileno> then drains the count with C<eventfd_consume>.

=head1 COMPLEXITY

C<score>/C<exists>/C<peek_*> are O(1); C<add>/C<remove>/C<incr>/C<rank>/
C<at_rank>/C<pop_*> and locating a range bound are O(log n); a range or iteration
of C<k> members is O(log n + k), scanning sequentially through the linked leaves.

=head1 STATS

sortedset.h  view on Meta::CPAN

static inline void ss_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define SS_RWLOCK_WRITER_BIT 0x80000000U
#define SS_RWLOCK_PID_MASK   0x7FFFFFFFU
#define SS_RWLOCK_WR(pid)    (SS_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SS_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/version change).

sortedset.h  view on Meta::CPAN

            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < SS_RWLOCK_SPIN_LIMIT, 1)) {
            ss_rwlock_spin_pause();
            continue;
        }
        ss_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= SS_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &ss_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                ss_unpark_reader(h);
                ss_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }

sortedset.h  view on Meta::CPAN

    uint64_t v = 0;
    if (read(h->notify_fd, &v, sizeof(v)) != sizeof(v)) return -1;
    return (int64_t)v;
}

/* ================================================================
 * Sorted set: node pool, member index, B+tree (callers hold the lock)
 * ================================================================ */

/* reset to the empty set (caller holds the write lock) */
static inline void ss_clear_locked(SsHandle *h) {
    SsHeader *hdr = h->hdr;
    hdr->count     = 0;
    hdr->root      = SS_NONE;
    hdr->height    = 0;
    hdr->leftmost  = SS_NONE;
    hdr->rightmost = SS_NONE;
    for (uint32_t i = 0; i < hdr->node_capacity; i++)
        h->nodes[i].parent = (i + 1 < hdr->node_capacity) ? (i + 1) : SS_NONE;
    hdr->node_free_head = 0;
    memset(h->index, 0, (size_t)hdr->index_slots * sizeof(SsIdxSlot));

sortedset.h  view on Meta::CPAN

        }
    } else if (root->num == 1) {
        uint32_t child = root->children[0];
        ss_node_free(h, hdr->root);
        hdr->root = child; h->nodes[child].parent = SS_NONE; hdr->height--;
    }
    hdr->count--;
}

/* add: 1 (new), 0 (existing -- score updated if changed), -1 (full) */
static int ss_add_locked(SsHandle *h, int64_t member, double score) {
    double old;
    if (ss_idx_get(h, member, &old)) {
        if (old != score) { ss_tree_del(h, old, member); ss_tree_add(h, score, member); ss_idx_set(h, member, score); }
        return 0;
    }
    if (h->hdr->count >= h->hdr->max_entries) return -1;
    ss_tree_add(h, score, member);
    ss_idx_set(h, member, score);
    return 1;
}

/* remove: 1 if removed, 0 if absent */
static int ss_remove_locked(SsHandle *h, int64_t member) {
    double old;
    if (!ss_idx_get(h, member, &old)) return 0;
    ss_tree_del(h, old, member);
    ss_idx_del(h, member);
    return 1;
}

/* incr by delta. *out = new score. returns 1 (created), 0 (updated), -1 (full),
   -2 (result is NaN) */
static int ss_incr_locked(SsHandle *h, int64_t member, double delta, double *out) {
    double old;
    if (ss_idx_get(h, member, &old)) {
        double ns = old + delta; *out = ns;
        if (ns != ns) return -2;
        if (ns != old) { ss_tree_del(h, old, member); ss_tree_add(h, ns, member); ss_idx_set(h, member, ns); }
        return 0;
    }
    if (h->hdr->count >= h->hdr->max_entries) return -1;
    *out = delta;
    if (delta != delta) return -2;
    ss_tree_add(h, delta, member); ss_idx_set(h, member, delta);
    return 1;
}

/* pop the min (max=0) or max (max=1): 0 if empty, else 1 with *m,*s */
static int ss_pop_locked(SsHandle *h, int max, int64_t *m, double *s) {
    if (h->hdr->root == SS_NONE) return 0;
    SsNode *nd = &h->nodes[max ? h->hdr->rightmost : h->hdr->leftmost];
    int pos = max ? nd->num - 1 : 0;
    *m = nd->members[pos]; *s = nd->scores[pos];
    ss_tree_del(h, *s, *m);
    ss_idx_del(h, *m);
    return 1;
}

/* ---- structural validator (debug / tests) ---- */



( run in 0.411 second using v1.01-cache-2.11-cpan-bbe5e583499 )