Data-RingBuffer-Shared
view release on metacpan or search on metacpan
/*
* ring.h -- Shared-memory fixed-size ring buffer for Linux
*
* Lock-free circular buffer: writes overwrite oldest when full.
* Readers access by relative position (0=latest) or absolute sequence.
* No consumer tracking â data persists until overwritten.
*
* v2 layout adds per-slot publication sequence (seqlock-per-slot), so
* readers never observe a partially-written or cross-epoch torn slot:
* read_seq / read_latest return 0 if the slot is mid-write or has been
* overwritten to a different epoch. Safe under MPMC writers.
*
* Unlike Queue (consumed on read) or PubSub (subscription tracking),
* RingBuffer is a simple overwriting circular window.
*/
#ifndef RING_H
#define RING_H
#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>
#define RING_MAGIC 0x524E4732U /* "RNG2" â v2 layout: per-slot publication seq */
#define RING_VERSION 2
#define RING_ERR_BUFLEN 256
#define RING_VAR_INT 0
#define RING_VAR_F64 1
/* ================================================================
* Header (128 bytes)
* ================================================================ */
typedef struct {
uint32_t magic;
uint32_t version;
uint32_t elem_size;
uint32_t variant_id;
uint64_t capacity;
uint64_t total_size;
uint64_t data_off; /* 32: offset to data array */
uint64_t seq_off; /* 40: offset to per-slot publication seq array */
uint8_t _pad0[16]; /* 48-63 */
uint64_t head; /* 64: monotonic write cursor (next write position) */
uint64_t count; /* 72: total writes (for overwrite detection) */
uint32_t waiters; /* 80: blocked on new data */
uint32_t wake_seq; /* 84: FUTEX_WAIT target (avoids 64-bit count wraparound) */
uint64_t stat_writes; /* 88 */
uint64_t stat_overwrites; /* 96 */
uint8_t _pad2[24]; /* 104-127 */
} RingHeader;
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(RingHeader) == 128, "RingHeader must be 128 bytes");
#endif
typedef struct {
RingHeader *hdr;
uint8_t *data;
uint64_t *seq; /* per-slot publication sequence (cap entries) */
size_t mmap_size;
uint32_t elem_size;
char *path;
int notify_fd;
int backing_fd;
} RingHandle;
/* ================================================================
* Slot access
* ================================================================ */
static inline uint8_t *ring_slot(RingHandle *h, uint64_t seq) {
return h->data + (seq % h->hdr->capacity) * h->elem_size;
}
/* ================================================================
* Write â overwrites oldest when full, always succeeds
*
* Per-slot seq encoding (uint64_t, initial 0):
* bit 0 = 1 (odd): writer in progress for pos = (seq >> 1) - 1
* bit 0 = 0 (even): data for pos = (seq >> 1) - 1 is published and stable
* Writers serialize on the slot via CAS (two writers at pos N and pos N+cap
* racing on the same slot index). Readers use a seqlock-style double-load
* to detect mid-write tearing.
* ================================================================ */
static inline uint64_t ring_write(RingHandle *h, const void *val, uint32_t vlen) {
RingHeader *hdr = h->hdr;
/* Claim a unique position via fetch_add â ring overwrites, no capacity check. */
uint64_t pos = __atomic_fetch_add(&hdr->head, 1, __ATOMIC_ACQ_REL);
uint32_t slot_idx = (uint32_t)(pos % hdr->capacity);
uint64_t my_writing = ((pos + 1) << 1) | 1; /* odd: writing for pos */
uint64_t my_done = (pos + 1) << 1; /* even: pos is committed */
/* CAS per-slot seq from a committed (even) mark to our writing-mark.
* If another writer is in progress (odd), spin until they commit â
* otherwise we'd race data writes to the same slot. If a newer writer
* has already committed (seq >> 1 > pos+1), skip: their data wins. */
uint64_t cur = __atomic_load_n(&h->seq[slot_idx], __ATOMIC_ACQUIRE);
int wrote = 0;
for (;;) {
if (cur & 1) {
/* Another writer owns the slot; wait for them to publish. */
cur = __atomic_load_n(&h->seq[slot_idx], __ATOMIC_ACQUIRE);
continue;
}
( run in 1.171 second using v1.01-cache-2.11-cpan-39bf76dae61 )