Data-RingBuffer-Shared

 view release on metacpan or  search on metacpan

ring.h  view on Meta::CPAN

/*
 * ring.h -- Shared-memory fixed-size ring buffer for Linux
 *
 * Lock-free circular buffer: writes overwrite oldest when full.
 * Readers access by relative position (0=latest) or absolute sequence.
 * No consumer tracking — data persists until overwritten.
 *
 * v2 layout adds per-slot publication sequence (seqlock-per-slot), so
 * readers never observe a partially-written or cross-epoch torn slot:
 * read_seq / read_latest return 0 if the slot is mid-write or has been
 * overwritten to a different epoch. Safe under MPMC writers.
 *
 * Unlike Queue (consumed on read) or PubSub (subscription tracking),
 * RingBuffer is a simple overwriting circular window.
 */

#ifndef RING_H
#define RING_H

#include <stdint.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>

#define RING_MAGIC       0x524E4732U  /* "RNG2" — v2 layout: per-slot publication seq */
#define RING_VERSION     2
#define RING_ERR_BUFLEN  256

#define RING_VAR_INT  0
#define RING_VAR_F64  1

/* ================================================================
 * Header (128 bytes)
 * ================================================================ */

typedef struct {
    uint32_t magic;
    uint32_t version;
    uint32_t elem_size;
    uint32_t variant_id;
    uint64_t capacity;
    uint64_t total_size;
    uint64_t data_off;         /* 32: offset to data array */
    uint64_t seq_off;          /* 40: offset to per-slot publication seq array */
    uint8_t  _pad0[16];        /* 48-63 */

    uint64_t head;             /* 64: monotonic write cursor (next write position) */
    uint64_t count;            /* 72: total writes (for overwrite detection) */
    uint32_t waiters;          /* 80: blocked on new data */
    uint32_t wake_seq;         /* 84: FUTEX_WAIT target (avoids 64-bit count wraparound) */
    uint64_t stat_writes;      /* 88 */
    uint64_t stat_overwrites;  /* 96 */
    uint8_t  _pad2[24];        /* 104-127 */
} RingHeader;

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(RingHeader) == 128, "RingHeader must be 128 bytes");
#endif

typedef struct {
    RingHeader *hdr;
    uint8_t    *data;
    uint64_t   *seq;           /* per-slot publication sequence (cap entries) */
    size_t      mmap_size;
    uint32_t    elem_size;
    char       *path;
    int         notify_fd;
    int         backing_fd;
} RingHandle;

/* ================================================================
 * Slot access
 * ================================================================ */

static inline uint8_t *ring_slot(RingHandle *h, uint64_t seq) {
    return h->data + (seq % h->hdr->capacity) * h->elem_size;
}

/* ================================================================
 * Write — overwrites oldest when full, always succeeds
 *
 * Per-slot seq encoding (uint64_t, initial 0):
 *   bit 0 = 1 (odd): writer in progress for pos = (seq >> 1) - 1
 *   bit 0 = 0 (even): data for pos = (seq >> 1) - 1 is published and stable
 * Writers serialize on the slot via CAS (two writers at pos N and pos N+cap
 * racing on the same slot index). Readers use a seqlock-style double-load
 * to detect mid-write tearing.
 * ================================================================ */

static inline uint64_t ring_write(RingHandle *h, const void *val, uint32_t vlen) {
    RingHeader *hdr = h->hdr;
    /* Claim a unique position via fetch_add — ring overwrites, no capacity check. */
    uint64_t pos = __atomic_fetch_add(&hdr->head, 1, __ATOMIC_ACQ_REL);
    uint32_t slot_idx = (uint32_t)(pos % hdr->capacity);
    uint64_t my_writing = ((pos + 1) << 1) | 1;   /* odd: writing for pos */
    uint64_t my_done    = (pos + 1) << 1;         /* even: pos is committed */

    /* CAS per-slot seq from a committed (even) mark to our writing-mark.
     * If another writer is in progress (odd), spin until they commit —
     * otherwise we'd race data writes to the same slot. If a newer writer
     * has already committed (seq >> 1 > pos+1), skip: their data wins. */
    uint64_t cur = __atomic_load_n(&h->seq[slot_idx], __ATOMIC_ACQUIRE);
    int wrote = 0;
    for (;;) {
        if (cur & 1) {
            /* Another writer owns the slot; wait for them to publish. */
            cur = __atomic_load_n(&h->seq[slot_idx], __ATOMIC_ACQUIRE);
            continue;
        }



( run in 1.171 second using v1.01-cache-2.11-cpan-39bf76dae61 )