Data-Buffer-Shared

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

        $buf->unlink;           # remove backing file
        my $h = $buf->stats;    # diagnostic hashref

  API
    Replace "xx" with variant prefix: "i8", "u8", "i16", "u16", "i32",
    "u32", "i64", "u64", "f32", "f64", "str".

        buf_xx_set $buf, $idx, $value;    # set element (lock-free atomic for numeric)
        my $v = buf_xx_get $buf, $idx;    # get element (lock-free atomic for numeric)
        my @v = buf_xx_slice $buf, $from, $count;  # bulk read (seqlock)
        buf_xx_fill $buf, $value;         # fill all elements (write-locked)

    Integer variants also have:

        my $n = buf_xx_incr $buf, $idx;          # atomic increment, returns new value
        my $n = buf_xx_decr $buf, $idx;          # atomic decrement
        my $n = buf_xx_add $buf, $idx, $delta;   # atomic add
        my $ok = buf_xx_cas $buf, $idx, $old, $new;          # compare-and-swap
        my $p = buf_xx_cmpxchg $buf, $idx, $old, $new;       # CAS, returns prior value
        my $n = buf_xx_atomic_and $buf, $idx, $mask;         # atomic AND (integer variants)
        my $n = buf_xx_atomic_or  $buf, $idx, $mask;         # atomic OR
        my $n = buf_xx_atomic_xor $buf, $idx, $mask;         # atomic XOR

    Raw / bulk:

        my $raw = buf_xx_get_raw $buf, $from, $count;        # bulk bytes, seqlock-guarded
        buf_xx_set_raw $buf, $from, $raw;                    # bulk bytes, write-locked
        $buf->add_slice($from, \@deltas);                    # batch atomic add (integer variants)
        my $ptr = buf_xx_ptr $buf;           # raw pointer to data, for FFI use
        my $ptr = buf_xx_ptr_at $buf, $idx;  # pointer to element at index

    Zero-copy (numeric variants):

        my $sv = $buf->as_scalar;   # mmap-aliased read-only scalar ref

    Diagnostics:

bench/throughput.pl  view on Meta::CPAN

        my $v;
        for my $i (1..$n) { $v = $buf->get($i % $cap) }
    };
}

print "\n=== Str/16B ($n ops, $cap elements) ===\n";
{
    my $buf = Data::Buffer::Shared::Str->new_anon($cap, 16);

    my $val = "hello world!!!!";  # 15 bytes
    bench "str set (locked)" => sub {
        for my $i (1..$n) { $buf->set($i % $cap, $val) }
    };
    bench "str get (seqlock)" => sub {
        my $v;
        for my $i (1..$n) { $v = $buf->get($i % $cap) }
    };
}

print "\n=== Keyword vs Method ($n ops) ===\n";
{

buf_generic.h  view on Meta::CPAN

    uint32_t variant_id;      /* 8 */
    uint32_t elem_size;       /* 12 */
    uint64_t capacity;        /* 16: number of elements */
    uint64_t total_size;      /* 24: total mmap size */
    uint64_t data_off;        /* 32: offset to data array */
    uint64_t reader_slots_off;/* 40: offset to BufReaderSlot[BUF_READER_SLOTS] */
    uint8_t  _reserved0[16];  /* 48-63 */

    /* ---- Cache line 1 (64-127): seqlock + rwlock + mutable state ---- */
    uint32_t seq;             /* 64: seqlock counter, odd = writer active */
    uint32_t rwlock;          /* 68: 0=unlocked, readers=1..0x7FFFFFFF, writer=0x80000000|pid */
    uint32_t rwlock_waiters;  /* 72: wake-target counter (readers+writers) */
    uint32_t stat_recoveries; /* 76 */
    uint32_t rwlock_writers_waiting; /* 80: reader yield signal (writers only) */
    uint32_t _pad2;           /* 84 */
    uint64_t _reserved1[5];   /* 88-127 */
} BufHeader;

BUF_STATIC_ASSERT(sizeof(BufHeader) == 128, "BufHeader must be exactly 128 bytes (2 cache lines)");

/* ---- Process-local handle ---- */

buf_generic.h  view on Meta::CPAN

    BufHeader *hdr;
    void      *data;         /* pointer to element array in mmap */
    BufReaderSlot *reader_slots; /* in mmap, BUF_READER_SLOTS entries */
    size_t     mmap_size;
    char      *path;         /* backing file path (strdup'd, NULL for anon) */
    int        fd;           /* kept open for memfd, -1 otherwise */
    int        efd;          /* eventfd for notifications, -1 if none */
    uint32_t   my_slot_idx;  /* UINT32_MAX = unclaimed; per-process slot index */
    uint32_t   cached_pid;   /* getpid() at claim time */
    uint32_t   cached_fork_gen; /* fork-generation at claim time */
    uint8_t    wr_locked;    /* process-local: 1 if lock_wr is held */
    uint8_t    efd_owned;    /* 1 if we created the eventfd (close on destroy) */
} BufHandle;

/* ---- Futex-based read-write lock ---- */

#define BUF_RWLOCK_SPIN_LIMIT 32
#define BUF_LOCK_TIMEOUT_SEC  2

static inline void buf_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)

buf_generic.h  view on Meta::CPAN

    return buf_open_fd(fd, BUF_ELEM_SIZE, BUF_VARIANT_ID, errbuf);
}
#endif

/* ---- Raw byte access (for packed binary interop) ---- */

static int BUF_FN(get_raw)(BufHandle *h, uint64_t byte_off, uint64_t nbytes, void *out) {
    uint64_t data_size = h->hdr->capacity * (uint64_t)h->hdr->elem_size;
    if (nbytes > data_size || byte_off > data_size - nbytes) return 0;
    char *data = (char *)h->data;
    if (h->wr_locked) {
        memcpy(out, data + byte_off, (size_t)nbytes);
    } else {
        uint32_t seq_start;
        do {
            seq_start = buf_seqlock_read_begin(h->hdr);
            memcpy(out, data + byte_off, (size_t)nbytes);
        } while (buf_seqlock_read_retry(&h->hdr->seq, seq_start));
    }
    return 1;
}

static int BUF_FN(set_raw)(BufHandle *h, uint64_t byte_off, uint64_t nbytes, const void *in) {
    uint64_t data_size = h->hdr->capacity * (uint64_t)h->hdr->elem_size;
    if (nbytes > data_size || byte_off > data_size - nbytes) return 0;
    char *data = (char *)h->data;
    int nested = h->wr_locked;
    if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&h->hdr->seq); }
    memcpy(data + byte_off, in, (size_t)nbytes);
    if (!nested) { buf_seqlock_write_end(&h->hdr->seq); buf_rwlock_wrunlock(h); }
    return 1;
}

/* ---- Clear (zero entire buffer) ---- */

static void BUF_FN(clear)(BufHandle *h) {
    BufHeader *hdr = h->hdr;
    int nested = h->wr_locked;
    if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
    memset(h->data, 0, (size_t)(hdr->capacity * hdr->elem_size));
    if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
}

/* ---- Single-element atomic get (lock-free for numeric types) ---- */

#ifdef BUF_IS_FIXEDSTR

static int BUF_FN(get)(BufHandle *h, uint64_t idx, char *out, uint32_t *out_len) {
    BufHeader *hdr = h->hdr;
    uint32_t esz = hdr->elem_size;
    if (idx >= hdr->capacity) return 0;
    char *data = (char *)h->data;
    if (h->wr_locked) {
        memcpy(out, data + idx * esz, esz);
    } else {
        uint32_t seq_start;
        do {
            seq_start = buf_seqlock_read_begin(hdr);
            memcpy(out, data + idx * esz, esz);
        } while (buf_seqlock_read_retry(&hdr->seq, seq_start));
    }
    uint32_t len = esz;
    while (len > 0 && out[len - 1] == '\0') len--;
    *out_len = len;
    return 1;
}

static int BUF_FN(set)(BufHandle *h, uint64_t idx, const char *val, uint32_t len) {
    BufHeader *hdr = h->hdr;
    uint32_t esz = hdr->elem_size;
    if (idx >= hdr->capacity) return 0;
    char *data = (char *)h->data;
    int nested = h->wr_locked;
    if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
    memset(data + idx * esz, 0, esz);
    uint32_t copy_len = len < esz ? len : esz;
    memcpy(data + idx * esz, val, copy_len);
    if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
    return 1;
}

#elif defined(BUF_IS_FLOAT)

buf_generic.h  view on Meta::CPAN

/* ---- Bulk operations (seqlock-guarded) ---- */

#ifdef BUF_IS_FIXEDSTR

static int BUF_FN(get_slice)(BufHandle *h, uint64_t from, uint64_t count,
                              void *out) {
    BufHeader *hdr = h->hdr;
    uint32_t esz = hdr->elem_size;
    if (count > hdr->capacity || from > hdr->capacity - count) return 0;
    char *data = (char *)h->data;
    if (h->wr_locked) {
        memcpy(out, data + from * esz, count * esz);
    } else {
        uint32_t seq_start;
        do {
            seq_start = buf_seqlock_read_begin(hdr);
            memcpy(out, data + from * esz, count * esz);
        } while (buf_seqlock_read_retry(&hdr->seq, seq_start));
    }
    return 1;
}

static int BUF_FN(set_slice)(BufHandle *h, uint64_t from, uint64_t count,
                              const void *in) {
    BufHeader *hdr = h->hdr;
    uint32_t esz = hdr->elem_size;
    if (count > hdr->capacity || from > hdr->capacity - count) return 0;
    char *data = (char *)h->data;
    int nested = h->wr_locked;
    if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
    memcpy(data + from * esz, in, count * esz);
    if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
    return 1;
}

#else /* numeric */

static int BUF_FN(get_slice)(BufHandle *h, uint64_t from, uint64_t count,
                              BUF_ELEM_TYPE *out) {
    BufHeader *hdr = h->hdr;
    if (count > hdr->capacity || from > hdr->capacity - count) return 0;
    BUF_ELEM_TYPE *data = (BUF_ELEM_TYPE *)h->data;
    if (h->wr_locked) {
        memcpy(out, &data[from], count * sizeof(BUF_ELEM_TYPE));
    } else {
        uint32_t seq_start;
        do {
            seq_start = buf_seqlock_read_begin(hdr);
            memcpy(out, &data[from], count * sizeof(BUF_ELEM_TYPE));
        } while (buf_seqlock_read_retry(&hdr->seq, seq_start));
    }
    return 1;
}

static int BUF_FN(set_slice)(BufHandle *h, uint64_t from, uint64_t count,
                              const BUF_ELEM_TYPE *in) {
    BufHeader *hdr = h->hdr;
    if (count > hdr->capacity || from > hdr->capacity - count) return 0;
    BUF_ELEM_TYPE *data = (BUF_ELEM_TYPE *)h->data;
    int nested = h->wr_locked;
    if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
    memcpy(&data[from], in, count * sizeof(BUF_ELEM_TYPE));
    if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
    return 1;
}

#endif /* BUF_IS_FIXEDSTR */

/* ---- Fill ---- */

#ifdef BUF_IS_FIXEDSTR

static void BUF_FN(fill)(BufHandle *h, const char *val, uint32_t len) {
    BufHeader *hdr = h->hdr;
    uint32_t esz = hdr->elem_size;
    char *data = (char *)h->data;
    int nested = h->wr_locked;
    if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
    uint32_t copy_len = len < esz ? len : esz;
    memset(data, 0, (size_t)hdr->capacity * esz);
    for (uint64_t i = 0; i < hdr->capacity; i++)
        memcpy(data + i * esz, val, copy_len);
    if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
}

#else

static void BUF_FN(fill)(BufHandle *h, BUF_ELEM_TYPE val) {
    BufHeader *hdr = h->hdr;
    BUF_ELEM_TYPE *data = (BUF_ELEM_TYPE *)h->data;
    int nested = h->wr_locked;
    if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
    for (uint64_t i = 0; i < hdr->capacity; i++)
        data[i] = val;
    if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
}

#endif

/* ---- Atomic operations (integer types only) ---- */

buf_generic.h  view on Meta::CPAN

static inline void *BUF_FN(ptr_at)(BufHandle *h, uint64_t idx) {
    if (idx >= h->hdr->capacity) return NULL;
    return (char *)h->data + idx * h->hdr->elem_size;
}

/* ---- Explicit locking for batch operations ---- */

static inline void BUF_FN(lock_wr)(BufHandle *h) {
    buf_rwlock_wrlock(h);
    buf_seqlock_write_begin(&h->hdr->seq);
    h->wr_locked = 1;
}

static inline void BUF_FN(unlock_wr)(BufHandle *h) {
    h->wr_locked = 0;
    buf_seqlock_write_end(&h->hdr->seq);
    buf_rwlock_wrunlock(h);
}

static inline void BUF_FN(lock_rd)(BufHandle *h) {
    buf_rwlock_rdlock(h);
}

static inline void BUF_FN(unlock_rd)(BufHandle *h) {
    buf_rwlock_rdunlock(h);

lib/Data/Buffer/Shared.pm  view on Meta::CPAN

    my $h = $buf->stats;    # diagnostic hashref

=head2 API

Replace C<xx> with variant prefix: C<i8>, C<u8>, C<i16>, C<u16>,
C<i32>, C<u32>, C<i64>, C<u64>, C<f32>, C<f64>, C<str>.

    buf_xx_set $buf, $idx, $value;    # set element (lock-free atomic for numeric)
    my $v = buf_xx_get $buf, $idx;    # get element (lock-free atomic for numeric)
    my @v = buf_xx_slice $buf, $from, $count;  # bulk read (seqlock)
    buf_xx_fill $buf, $value;         # fill all elements (write-locked)

Integer variants also have:

    my $n = buf_xx_incr $buf, $idx;          # atomic increment, returns new value
    my $n = buf_xx_decr $buf, $idx;          # atomic decrement
    my $n = buf_xx_add $buf, $idx, $delta;   # atomic add
    my $ok = buf_xx_cas $buf, $idx, $old, $new;          # compare-and-swap
    my $p = buf_xx_cmpxchg $buf, $idx, $old, $new;       # CAS, returns prior value
    my $n = buf_xx_atomic_and $buf, $idx, $mask;         # atomic AND (integer variants)
    my $n = buf_xx_atomic_or  $buf, $idx, $mask;         # atomic OR
    my $n = buf_xx_atomic_xor $buf, $idx, $mask;         # atomic XOR

Raw / bulk:

    my $raw = buf_xx_get_raw $buf, $from, $count;        # bulk bytes, seqlock-guarded
    buf_xx_set_raw $buf, $from, $raw;                    # bulk bytes, write-locked
    $buf->add_slice($from, \@deltas);                    # batch atomic add (integer variants)
    my $ptr = buf_xx_ptr $buf;           # raw pointer to data, for FFI use
    my $ptr = buf_xx_ptr_at $buf, $idx;  # pointer to element at index

Zero-copy (numeric variants):

    my $sv = $buf->as_scalar;   # mmap-aliased read-only scalar ref

Diagnostics:

t/12-dead-reader-recovery.t  view on Meta::CPAN

       "children held rdlock (rwlock=$rwlock_word)");

    kill 'KILL', @pids;
    waitpid($_, 0) for @pids;

    # Parent's wrlock-path op must complete within ~3 s.
    my $start = time;
    my $ok = eval {
        local $SIG{ALRM} = sub { die "alarm\n" };
        alarm 10;
        $b->fill(42);  # write-locked
        alarm 0;
        1;
    };
    my $elapsed = time - $start;
    ok($ok, sprintf('parent fill returned (elapsed %.2fs)', $elapsed))
        or diag "stuck after ${elapsed}s: $@";
    cmp_ok($elapsed, '<', 5, "recovery completed in <5s");
    is($b->get(0), 42, "post-recovery value correct");

    my $s = $b->stats;



( run in 2.168 seconds using v1.01-cache-2.11-cpan-e1769b4cff6 )