Data-Buffer-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

#include "buf_u32.h"
#include "buf_i64.h"
#include "buf_u64.h"
#include "buf_f32.h"
#include "buf_f64.h"
#include "buf_str.h"

#include "XSParseKeyword.h"

/* ---- as_scalar magic: prevent use-after-free by preventing buffer DESTROY
 * while the returned scalar ref is alive. We attach magic to the inner SV
 * that holds a reference to the buffer object. When the inner SV is freed,
 * the magic destructor releases the reference. ---- */

static int buf_scalar_magic_free(pTHX_ SV *sv, MAGIC *mg) {
    PERL_UNUSED_ARG(sv);
    if (mg->mg_obj) SvREFCNT_dec(mg->mg_obj);
    return 0;
}

static const MGVTBL buf_scalar_magic_vtbl = {

buf_generic.h  view on Meta::CPAN

    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

#define BUF_RWLOCK_WRITER_BIT 0x80000000U
#define BUF_RWLOCK_PID_MASK   0x7FFFFFFFU
#define BUF_RWLOCK_WR(pid)    (BUF_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & BUF_RWLOCK_PID_MASK))

static inline int buf_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* ---- Per-process slot lifecycle (dead-reader recovery) ----
 * Each process claims one BufReaderSlot lazily on first lock op so that
 * its contribution to the shared rwlock counter can be reclaimed by other
 * processes if it dies (SIGKILL'd worker no longer pins the counter). */
static uint32_t buf_fork_gen = 0;
static pthread_once_t buf_atfork_once = PTHREAD_ONCE_INIT;

buf_generic.h  view on Meta::CPAN

    int any_recovery = 0;

    /* Pass 1: scan; classify; immediate-wipe dead slots with sc==0 (no
     * rwlock contribution to lose). Defer wiping dead-with-sc>0 slots
     * until force-reset can fire — otherwise we'd lose the only record
     * of the orphan rwlock contribution while a live reader is present. */
    for (uint32_t i = 0; i < BUF_READER_SLOTS; i++) {
        uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
        if (pid == 0) continue;
        uint32_t sc = __atomic_load_n(&h->reader_slots[i].subcount, __ATOMIC_RELAXED);
        if (buf_pid_alive(pid)) {
            if (sc > 0) any_live_reader = 1;
            continue;
        }
        if (sc > 0) { found_dead_reader = 1; continue; }
        if (buf_drain_dead_slot(h, i, pid)) any_recovery = 1;
    }

    /* Pass 2: only if force-reset will fire.  Issue the rwlock CAS first
     * to keep the race window with new readers narrow, then wipe the
     * deferred dead slots. */

buf_generic.h  view on Meta::CPAN

            if (__atomic_compare_exchange_n(&hdr->rwlock, &cur, 0,
                    0, __ATOMIC_RELEASE, __ATOMIC_RELAXED)) {
                any_recovery = 1;
                if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
                    syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
            }
        }
        for (uint32_t i = 0; i < BUF_READER_SLOTS; i++) {
            uint32_t pid = __atomic_load_n(&h->reader_slots[i].pid, __ATOMIC_ACQUIRE);
            if (pid == 0) continue;
            if (buf_pid_alive(pid)) continue;
            if (buf_drain_dead_slot(h, i, pid)) any_recovery = 1;
        }
    }
    if (any_recovery)
        __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
}

/* Park/unpark helpers — keep global rwlock_waiters/writers_waiting and
 * per-slot mirror counters in sync so recovery can drain them. */
static inline void buf_park_reader(BufHandle *h) {

buf_generic.h  view on Meta::CPAN

 * value here (rather than trusting a stale snapshot from the futex
 * caller) so that (a) a writer that died after our futex_wait started
 * is detected on the same timeout, and (b) phantom waiter/writers_waiting
 * contributions left by a dead parked writer are drained even when the
 * lock word itself is now 0. */
static inline void buf_recover_after_timeout(BufHandle *h) {
    BufHeader *hdr = h->hdr;
    uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
    if (val >= BUF_RWLOCK_WRITER_BIT) {
        uint32_t pid = val & BUF_RWLOCK_PID_MASK;
        if (!buf_pid_alive(pid))
            buf_recover_stale_lock(hdr, val);
    } else {
        buf_recover_dead_readers(h);
    }
}

static inline void buf_rwlock_rdlock(BufHandle *h) {
    BufHeader *hdr = h->hdr;
    buf_claim_reader_slot(h);
    uint32_t *lock = &hdr->rwlock;

buf_generic.h  view on Meta::CPAN

        uint32_t s = __atomic_load_n(&hdr->seq, __ATOMIC_ACQUIRE);
        if (__builtin_expect((s & 1) == 0, 1)) return s;
        if (__builtin_expect(spin < 100000, 1)) {
            buf_spin_pause();
            spin++;
            continue;
        }
        uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
        if (val >= BUF_RWLOCK_WRITER_BIT) {
            uint32_t pid = val & BUF_RWLOCK_PID_MASK;
            if (!buf_pid_alive(pid)) {
                buf_recover_stale_lock(hdr, val);
                spin = 0;
                continue;
            }
        }
        struct timespec ts = {0, 1000000};
        nanosleep(&ts, NULL);
        spin = 0;
    }
}

t/09-review-fixes.t  view on Meta::CPAN

    my $buf = Data::Buffer::Shared::Str->new_anon(5, 16);
    $buf->set(0, "hello");

    $buf->lock_wr;
    is($buf->get(0), "hello", 'str get under lock_wr');
    my @vals = $buf->slice(0, 1);
    is($vals[0], "hello", 'str slice under lock_wr');
    $buf->unlock_wr;
}

# === as_scalar keeps buffer alive (prevents use-after-free) ===
{
    my $ref;
    {
        my $buf = Data::Buffer::Shared::I64->new_anon(10);
        $buf->set(0, 12345);
        $ref = $buf->as_scalar;
        # $buf goes out of scope here — but magic ref prevents DESTROY
    }
    # buffer should still be alive because $ref holds a backref
    my @vals = unpack("q<", $$ref);
    is($vals[0], 12345, 'as_scalar keeps buffer alive after scope exit');
}

done_testing;



( run in 1.668 second using v1.01-cache-2.11-cpan-2398b32b56e )