Data-Buffer-Shared
view release on metacpan or search on metacpan
$buf->unlink; # remove backing file
my $h = $buf->stats; # diagnostic hashref
API
Replace "xx" with variant prefix: "i8", "u8", "i16", "u16", "i32",
"u32", "i64", "u64", "f32", "f64", "str".
buf_xx_set $buf, $idx, $value; # set element (lock-free atomic for numeric)
my $v = buf_xx_get $buf, $idx; # get element (lock-free atomic for numeric)
my @v = buf_xx_slice $buf, $from, $count; # bulk read (seqlock)
buf_xx_fill $buf, $value; # fill all elements (write-locked)
Integer variants also have:
my $n = buf_xx_incr $buf, $idx; # atomic increment, returns new value
my $n = buf_xx_decr $buf, $idx; # atomic decrement
my $n = buf_xx_add $buf, $idx, $delta; # atomic add
my $ok = buf_xx_cas $buf, $idx, $old, $new; # compare-and-swap
my $p = buf_xx_cmpxchg $buf, $idx, $old, $new; # CAS, returns prior value
my $n = buf_xx_atomic_and $buf, $idx, $mask; # atomic AND (integer variants)
my $n = buf_xx_atomic_or $buf, $idx, $mask; # atomic OR
my $n = buf_xx_atomic_xor $buf, $idx, $mask; # atomic XOR
Raw / bulk:
my $raw = buf_xx_get_raw $buf, $from, $count; # bulk bytes, seqlock-guarded
buf_xx_set_raw $buf, $from, $raw; # bulk bytes, write-locked
$buf->add_slice($from, \@deltas); # batch atomic add (integer variants)
my $ptr = buf_xx_ptr $buf; # raw pointer to data, for FFI use
my $ptr = buf_xx_ptr_at $buf, $idx; # pointer to element at index
Zero-copy (numeric variants):
my $sv = $buf->as_scalar; # mmap-aliased read-only scalar ref
Diagnostics:
bench/throughput.pl view on Meta::CPAN
my $v;
for my $i (1..$n) { $v = $buf->get($i % $cap) }
};
}
print "\n=== Str/16B ($n ops, $cap elements) ===\n";
{
my $buf = Data::Buffer::Shared::Str->new_anon($cap, 16);
my $val = "hello world!!!!"; # 15 bytes
bench "str set (locked)" => sub {
for my $i (1..$n) { $buf->set($i % $cap, $val) }
};
bench "str get (seqlock)" => sub {
my $v;
for my $i (1..$n) { $v = $buf->get($i % $cap) }
};
}
print "\n=== Keyword vs Method ($n ops) ===\n";
{
buf_generic.h view on Meta::CPAN
uint32_t variant_id; /* 8 */
uint32_t elem_size; /* 12 */
uint64_t capacity; /* 16: number of elements */
uint64_t total_size; /* 24: total mmap size */
uint64_t data_off; /* 32: offset to data array */
uint64_t reader_slots_off;/* 40: offset to BufReaderSlot[BUF_READER_SLOTS] */
uint8_t _reserved0[16]; /* 48-63 */
/* ---- Cache line 1 (64-127): seqlock + rwlock + mutable state ---- */
uint32_t seq; /* 64: seqlock counter, odd = writer active */
uint32_t rwlock; /* 68: 0=unlocked, readers=1..0x7FFFFFFF, writer=0x80000000|pid */
uint32_t rwlock_waiters; /* 72: wake-target counter (readers+writers) */
uint32_t stat_recoveries; /* 76 */
uint32_t rwlock_writers_waiting; /* 80: reader yield signal (writers only) */
uint32_t _pad2; /* 84 */
uint64_t _reserved1[5]; /* 88-127 */
} BufHeader;
BUF_STATIC_ASSERT(sizeof(BufHeader) == 128, "BufHeader must be exactly 128 bytes (2 cache lines)");
/* ---- Process-local handle ---- */
buf_generic.h view on Meta::CPAN
BufHeader *hdr;
void *data; /* pointer to element array in mmap */
BufReaderSlot *reader_slots; /* in mmap, BUF_READER_SLOTS entries */
size_t mmap_size;
char *path; /* backing file path (strdup'd, NULL for anon) */
int fd; /* kept open for memfd, -1 otherwise */
int efd; /* eventfd for notifications, -1 if none */
uint32_t my_slot_idx; /* UINT32_MAX = unclaimed; per-process slot index */
uint32_t cached_pid; /* getpid() at claim time */
uint32_t cached_fork_gen; /* fork-generation at claim time */
uint8_t wr_locked; /* process-local: 1 if lock_wr is held */
uint8_t efd_owned; /* 1 if we created the eventfd (close on destroy) */
} BufHandle;
/* ---- Futex-based read-write lock ---- */
#define BUF_RWLOCK_SPIN_LIMIT 32
#define BUF_LOCK_TIMEOUT_SEC 2
static inline void buf_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
buf_generic.h view on Meta::CPAN
return buf_open_fd(fd, BUF_ELEM_SIZE, BUF_VARIANT_ID, errbuf);
}
#endif
/* ---- Raw byte access (for packed binary interop) ---- */
static int BUF_FN(get_raw)(BufHandle *h, uint64_t byte_off, uint64_t nbytes, void *out) {
uint64_t data_size = h->hdr->capacity * (uint64_t)h->hdr->elem_size;
if (nbytes > data_size || byte_off > data_size - nbytes) return 0;
char *data = (char *)h->data;
if (h->wr_locked) {
memcpy(out, data + byte_off, (size_t)nbytes);
} else {
uint32_t seq_start;
do {
seq_start = buf_seqlock_read_begin(h->hdr);
memcpy(out, data + byte_off, (size_t)nbytes);
} while (buf_seqlock_read_retry(&h->hdr->seq, seq_start));
}
return 1;
}
static int BUF_FN(set_raw)(BufHandle *h, uint64_t byte_off, uint64_t nbytes, const void *in) {
uint64_t data_size = h->hdr->capacity * (uint64_t)h->hdr->elem_size;
if (nbytes > data_size || byte_off > data_size - nbytes) return 0;
char *data = (char *)h->data;
int nested = h->wr_locked;
if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&h->hdr->seq); }
memcpy(data + byte_off, in, (size_t)nbytes);
if (!nested) { buf_seqlock_write_end(&h->hdr->seq); buf_rwlock_wrunlock(h); }
return 1;
}
/* ---- Clear (zero entire buffer) ---- */
static void BUF_FN(clear)(BufHandle *h) {
BufHeader *hdr = h->hdr;
int nested = h->wr_locked;
if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
memset(h->data, 0, (size_t)(hdr->capacity * hdr->elem_size));
if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
}
/* ---- Single-element atomic get (lock-free for numeric types) ---- */
#ifdef BUF_IS_FIXEDSTR
static int BUF_FN(get)(BufHandle *h, uint64_t idx, char *out, uint32_t *out_len) {
BufHeader *hdr = h->hdr;
uint32_t esz = hdr->elem_size;
if (idx >= hdr->capacity) return 0;
char *data = (char *)h->data;
if (h->wr_locked) {
memcpy(out, data + idx * esz, esz);
} else {
uint32_t seq_start;
do {
seq_start = buf_seqlock_read_begin(hdr);
memcpy(out, data + idx * esz, esz);
} while (buf_seqlock_read_retry(&hdr->seq, seq_start));
}
uint32_t len = esz;
while (len > 0 && out[len - 1] == '\0') len--;
*out_len = len;
return 1;
}
static int BUF_FN(set)(BufHandle *h, uint64_t idx, const char *val, uint32_t len) {
BufHeader *hdr = h->hdr;
uint32_t esz = hdr->elem_size;
if (idx >= hdr->capacity) return 0;
char *data = (char *)h->data;
int nested = h->wr_locked;
if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
memset(data + idx * esz, 0, esz);
uint32_t copy_len = len < esz ? len : esz;
memcpy(data + idx * esz, val, copy_len);
if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
return 1;
}
#elif defined(BUF_IS_FLOAT)
buf_generic.h view on Meta::CPAN
/* ---- Bulk operations (seqlock-guarded) ---- */
#ifdef BUF_IS_FIXEDSTR
static int BUF_FN(get_slice)(BufHandle *h, uint64_t from, uint64_t count,
void *out) {
BufHeader *hdr = h->hdr;
uint32_t esz = hdr->elem_size;
if (count > hdr->capacity || from > hdr->capacity - count) return 0;
char *data = (char *)h->data;
if (h->wr_locked) {
memcpy(out, data + from * esz, count * esz);
} else {
uint32_t seq_start;
do {
seq_start = buf_seqlock_read_begin(hdr);
memcpy(out, data + from * esz, count * esz);
} while (buf_seqlock_read_retry(&hdr->seq, seq_start));
}
return 1;
}
static int BUF_FN(set_slice)(BufHandle *h, uint64_t from, uint64_t count,
const void *in) {
BufHeader *hdr = h->hdr;
uint32_t esz = hdr->elem_size;
if (count > hdr->capacity || from > hdr->capacity - count) return 0;
char *data = (char *)h->data;
int nested = h->wr_locked;
if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
memcpy(data + from * esz, in, count * esz);
if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
return 1;
}
#else /* numeric */
static int BUF_FN(get_slice)(BufHandle *h, uint64_t from, uint64_t count,
BUF_ELEM_TYPE *out) {
BufHeader *hdr = h->hdr;
if (count > hdr->capacity || from > hdr->capacity - count) return 0;
BUF_ELEM_TYPE *data = (BUF_ELEM_TYPE *)h->data;
if (h->wr_locked) {
memcpy(out, &data[from], count * sizeof(BUF_ELEM_TYPE));
} else {
uint32_t seq_start;
do {
seq_start = buf_seqlock_read_begin(hdr);
memcpy(out, &data[from], count * sizeof(BUF_ELEM_TYPE));
} while (buf_seqlock_read_retry(&hdr->seq, seq_start));
}
return 1;
}
static int BUF_FN(set_slice)(BufHandle *h, uint64_t from, uint64_t count,
const BUF_ELEM_TYPE *in) {
BufHeader *hdr = h->hdr;
if (count > hdr->capacity || from > hdr->capacity - count) return 0;
BUF_ELEM_TYPE *data = (BUF_ELEM_TYPE *)h->data;
int nested = h->wr_locked;
if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
memcpy(&data[from], in, count * sizeof(BUF_ELEM_TYPE));
if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
return 1;
}
#endif /* BUF_IS_FIXEDSTR */
/* ---- Fill ---- */
#ifdef BUF_IS_FIXEDSTR
static void BUF_FN(fill)(BufHandle *h, const char *val, uint32_t len) {
BufHeader *hdr = h->hdr;
uint32_t esz = hdr->elem_size;
char *data = (char *)h->data;
int nested = h->wr_locked;
if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
uint32_t copy_len = len < esz ? len : esz;
memset(data, 0, (size_t)hdr->capacity * esz);
for (uint64_t i = 0; i < hdr->capacity; i++)
memcpy(data + i * esz, val, copy_len);
if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
}
#else
static void BUF_FN(fill)(BufHandle *h, BUF_ELEM_TYPE val) {
BufHeader *hdr = h->hdr;
BUF_ELEM_TYPE *data = (BUF_ELEM_TYPE *)h->data;
int nested = h->wr_locked;
if (!nested) { buf_rwlock_wrlock(h); buf_seqlock_write_begin(&hdr->seq); }
for (uint64_t i = 0; i < hdr->capacity; i++)
data[i] = val;
if (!nested) { buf_seqlock_write_end(&hdr->seq); buf_rwlock_wrunlock(h); }
}
#endif
/* ---- Atomic operations (integer types only) ---- */
buf_generic.h view on Meta::CPAN
static inline void *BUF_FN(ptr_at)(BufHandle *h, uint64_t idx) {
if (idx >= h->hdr->capacity) return NULL;
return (char *)h->data + idx * h->hdr->elem_size;
}
/* ---- Explicit locking for batch operations ---- */
static inline void BUF_FN(lock_wr)(BufHandle *h) {
buf_rwlock_wrlock(h);
buf_seqlock_write_begin(&h->hdr->seq);
h->wr_locked = 1;
}
static inline void BUF_FN(unlock_wr)(BufHandle *h) {
h->wr_locked = 0;
buf_seqlock_write_end(&h->hdr->seq);
buf_rwlock_wrunlock(h);
}
static inline void BUF_FN(lock_rd)(BufHandle *h) {
buf_rwlock_rdlock(h);
}
static inline void BUF_FN(unlock_rd)(BufHandle *h) {
buf_rwlock_rdunlock(h);
lib/Data/Buffer/Shared.pm view on Meta::CPAN
my $h = $buf->stats; # diagnostic hashref
=head2 API
Replace C<xx> with variant prefix: C<i8>, C<u8>, C<i16>, C<u16>,
C<i32>, C<u32>, C<i64>, C<u64>, C<f32>, C<f64>, C<str>.
buf_xx_set $buf, $idx, $value; # set element (lock-free atomic for numeric)
my $v = buf_xx_get $buf, $idx; # get element (lock-free atomic for numeric)
my @v = buf_xx_slice $buf, $from, $count; # bulk read (seqlock)
buf_xx_fill $buf, $value; # fill all elements (write-locked)
Integer variants also have:
my $n = buf_xx_incr $buf, $idx; # atomic increment, returns new value
my $n = buf_xx_decr $buf, $idx; # atomic decrement
my $n = buf_xx_add $buf, $idx, $delta; # atomic add
my $ok = buf_xx_cas $buf, $idx, $old, $new; # compare-and-swap
my $p = buf_xx_cmpxchg $buf, $idx, $old, $new; # CAS, returns prior value
my $n = buf_xx_atomic_and $buf, $idx, $mask; # atomic AND (integer variants)
my $n = buf_xx_atomic_or $buf, $idx, $mask; # atomic OR
my $n = buf_xx_atomic_xor $buf, $idx, $mask; # atomic XOR
Raw / bulk:
my $raw = buf_xx_get_raw $buf, $from, $count; # bulk bytes, seqlock-guarded
buf_xx_set_raw $buf, $from, $raw; # bulk bytes, write-locked
$buf->add_slice($from, \@deltas); # batch atomic add (integer variants)
my $ptr = buf_xx_ptr $buf; # raw pointer to data, for FFI use
my $ptr = buf_xx_ptr_at $buf, $idx; # pointer to element at index
Zero-copy (numeric variants):
my $sv = $buf->as_scalar; # mmap-aliased read-only scalar ref
Diagnostics:
t/12-dead-reader-recovery.t view on Meta::CPAN
"children held rdlock (rwlock=$rwlock_word)");
kill 'KILL', @pids;
waitpid($_, 0) for @pids;
# Parent's wrlock-path op must complete within ~3 s.
my $start = time;
my $ok = eval {
local $SIG{ALRM} = sub { die "alarm\n" };
alarm 10;
$b->fill(42); # write-locked
alarm 0;
1;
};
my $elapsed = time - $start;
ok($ok, sprintf('parent fill returned (elapsed %.2fs)', $elapsed))
or diag "stuck after ${elapsed}s: $@";
cmp_ok($elapsed, '<', 5, "recovery completed in <5s");
is($b->get(0), 42, "post-recovery value correct");
my $s = $b->stats;
( run in 1.468 second using v1.01-cache-2.11-cpan-e1769b4cff6 )