Data-RingBuffer-Shared
view release on metacpan or search on metacpan
* v2 layout adds per-slot publication sequence (seqlock-per-slot), so
* readers never observe a partially-written or cross-epoch torn slot:
* read_seq / read_latest return 0 if the slot is mid-write or has been
* overwritten to a different epoch. Safe under MPMC writers.
*
* Unlike Queue (consumed on read) or PubSub (subscription tracking),
* RingBuffer is a simple overwriting circular window.
*/
#ifndef RING_H
#define RING_H
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>
#include <time.h>
#include <limits.h>
#define RING_MAGIC 0x524E4732U /* "RNG2" â v2 layout: per-slot publication seq */
#define RING_VERSION 2
#define RING_ERR_BUFLEN 256
#define RING_VAR_INT 0
#define RING_VAR_F64 1
/* Bounded wait for an abandoned WRITING mark (~2s, matches sister-module
* SHM_LOCK_TIMEOUT_SEC). Without per-slot PID tracking we can't detect a
* dead writer directly; instead, a writer that lands on a slot still odd
* after this many monotonic-coarse ticks (seconds) assumes the prior
* writer is dead/abandoned and CAS-takes the slot, overwriting any
* partial data. Readers detect the epoch change via the seqlock retry. */
#define RING_WRITER_RECOVERY_SEC 2
/* ================================================================
* Header (128 bytes)
* ================================================================ */
typedef struct {
uint32_t magic;
uint32_t version;
uint32_t elem_size;
uint32_t variant_id;
uint64_t capacity;
uint64_t total_size;
uint64_t data_off; /* 32: offset to data array */
uint64_t seq_off; /* 40: offset to per-slot publication seq array */
uint8_t _pad0[16]; /* 48-63 */
uint64_t head; /* 64: monotonic write cursor (next write position) */
uint64_t count; /* 72: total writes (for overwrite detection) */
uint32_t waiters; /* 80: blocked on new data */
uint32_t wake_seq; /* 84: FUTEX_WAIT target (avoids 64-bit count wraparound) */
uint64_t stat_writes; /* 88 */
uint64_t stat_overwrites; /* 96 */
uint8_t _pad2[24]; /* 104-127 */
} RingHeader;
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(RingHeader) == 128, "RingHeader must be 128 bytes");
#endif
typedef struct {
RingHeader *hdr;
uint8_t *data;
uint64_t *seq; /* per-slot publication sequence (cap entries) */
size_t mmap_size;
uint32_t elem_size;
char *path;
int notify_fd;
int backing_fd;
} RingHandle;
/* ================================================================
* Slot access
* ================================================================ */
static inline uint8_t *ring_slot(RingHandle *h, uint64_t seq) {
return h->data + (seq % h->hdr->capacity) * h->elem_size;
}
/* ================================================================
* Write â overwrites oldest when full, always succeeds
*
* Per-slot seq encoding (uint64_t, initial 0):
* bit 0 = 1 (odd): writer in progress for pos = (seq >> 1) - 1
* bit 0 = 0 (even): data for pos = (seq >> 1) - 1 is published and stable
* Writers serialize on the slot via CAS (two writers at pos N and pos N+cap
* racing on the same slot index). Readers use a seqlock-style double-load
* to detect mid-write tearing.
* ================================================================ */
static inline uint64_t ring_write(RingHandle *h, const void *val, uint32_t vlen) {
RingHeader *hdr = h->hdr;
/* Claim a unique position via fetch_add â ring overwrites, no capacity check. */
uint64_t pos = __atomic_fetch_add(&hdr->head, 1, __ATOMIC_ACQ_REL);
uint64_t slot_idx = pos % hdr->capacity;
uint64_t my_writing = ((pos + 1) << 1) | 1; /* odd: writing for pos */
uint64_t my_done = (pos + 1) << 1; /* even: pos is committed */
/* CAS per-slot seq from a committed (even) mark to our writing-mark.
* If another writer is in progress (odd), spin until they commit â
* otherwise we'd race data writes to the same slot. If a newer writer
* has already committed (seq >> 1 > pos+1), skip: their data wins.
*
* Bounded WRITING-wait for abandoned slots: a writer that CAS'd seq
* evenâodd but died before publishing would otherwise strand this
* slot forever (every capacity-th write thereafter would spin). We
* track wall-clock seconds via CLOCK_MONOTONIC_COARSE; if the slot
* stays odd past RING_WRITER_RECOVERY_SEC the prior writer is treated
* as abandoned and we CAS over its odd mark, overwriting any partial
* data. Readers detect the epoch change via seqlock retry. */
( run in 1.094 second using v1.01-cache-2.11-cpan-40ba7b3775d )