Data-Buffer-Shared
view release on metacpan or search on metacpan
buf_generic.h view on Meta::CPAN
#define BUF_STATIC_ASSERT(cond, msg) _Static_assert(cond, msg)
#else
#define BUF_STATIC_ASSERT(cond, msg)
#endif
typedef struct {
/* ---- Cache line 0 (0-63): immutable after create ---- */
uint32_t magic; /* 0 */
uint32_t version; /* 4 */
uint32_t variant_id; /* 8 */
uint32_t elem_size; /* 12 */
uint64_t capacity; /* 16: number of elements */
uint64_t total_size; /* 24: total mmap size */
uint64_t data_off; /* 32: offset to data array */
uint8_t _reserved0[24]; /* 40-63 */
/* ---- Cache line 1 (64-127): seqlock + rwlock + mutable state ---- */
uint32_t seq; /* 64: seqlock counter, odd = writer active */
uint32_t rwlock; /* 68: 0=unlocked, readers=1..0x7FFFFFFF, writer=0x80000000|pid */
uint32_t rwlock_waiters; /* 72: wake-target counter (readers+writers) */
uint32_t stat_recoveries; /* 76 */
uint32_t rwlock_writers_waiting; /* 80: reader yield signal (writers only) */
uint32_t _pad2; /* 84 */
uint64_t _reserved1[5]; /* 88-127 */
} BufHeader;
BUF_STATIC_ASSERT(sizeof(BufHeader) == 128, "BufHeader must be exactly 128 bytes (2 cache lines)");
/* ---- Process-local handle ---- */
typedef struct {
BufHeader *hdr;
void *data; /* pointer to element array in mmap */
size_t mmap_size;
char *path; /* backing file path (strdup'd, NULL for anon) */
int fd; /* kept open for memfd, -1 otherwise */
int efd; /* eventfd for notifications, -1 if none */
uint8_t wr_locked; /* process-local: 1 if lock_wr is held */
uint8_t efd_owned; /* 1 if we created the eventfd (close on destroy) */
} BufHandle;
/* ---- Futex-based read-write lock ---- */
#define BUF_RWLOCK_SPIN_LIMIT 32
#define BUF_LOCK_TIMEOUT_SEC 2
static inline void buf_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
#define BUF_RWLOCK_WRITER_BIT 0x80000000U
#define BUF_RWLOCK_PID_MASK 0x7FFFFFFFU
#define BUF_RWLOCK_WR(pid) (BUF_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & BUF_RWLOCK_PID_MASK))
static inline int buf_pid_alive(uint32_t pid) {
if (pid == 0) return 1;
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
static inline void buf_recover_stale_lock(BufHeader *hdr, uint32_t observed_rwlock) {
uint32_t mypid = BUF_RWLOCK_WR((uint32_t)getpid());
if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
uint32_t seq = __atomic_load_n(&hdr->seq, __ATOMIC_ACQUIRE);
if (seq & 1)
__atomic_store_n(&hdr->seq, seq + 1, __ATOMIC_RELEASE);
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static const struct timespec buf_lock_timeout = { BUF_LOCK_TIMEOUT_SEC, 0 };
static inline void buf_rwlock_rdlock(BufHeader *hdr) {
uint32_t *lock = &hdr->rwlock;
uint32_t *waiters = &hdr->rwlock_waiters;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur > 0 && cur < BUF_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < BUF_RWLOCK_SPIN_LIMIT, 1)) {
buf_spin_pause();
continue;
}
__atomic_add_fetch(waiters, 1, __ATOMIC_RELAXED);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur >= BUF_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&buf_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(waiters, 1, __ATOMIC_RELAXED);
if (cur >= BUF_RWLOCK_WRITER_BIT) {
uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (val >= BUF_RWLOCK_WRITER_BIT) {
uint32_t pid = val & BUF_RWLOCK_PID_MASK;
if (!buf_pid_alive(pid))
buf_recover_stale_lock(hdr, val);
}
} else {
/* Yielding to writer timed out â drop one writers_waiting
* to recover from a potentially-crashed parked writer. */
uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
while (wc > 0 && !__atomic_compare_exchange_n(
writers_waiting, &wc, wc - 1,
1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(waiters, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
static inline void buf_rwlock_rdunlock(BufHeader *hdr) {
uint32_t prev = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
if (prev == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void buf_rwlock_wrlock(BufHeader *hdr) {
uint32_t *lock = &hdr->rwlock;
uint32_t *waiters = &hdr->rwlock_waiters;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
uint32_t mypid = BUF_RWLOCK_WR((uint32_t)getpid());
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < BUF_RWLOCK_SPIN_LIMIT, 1)) {
buf_spin_pause();
continue;
}
__atomic_add_fetch(waiters, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&buf_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (val >= BUF_RWLOCK_WRITER_BIT) {
uint32_t pid = val & BUF_RWLOCK_PID_MASK;
if (!buf_pid_alive(pid))
buf_recover_stale_lock(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
static inline void buf_rwlock_wrunlock(BufHeader *hdr) {
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
/* ---- Seqlock ---- */
static inline uint32_t buf_seqlock_read_begin(BufHeader *hdr) {
int spin = 0;
for (;;) {
uint32_t s = __atomic_load_n(&hdr->seq, __ATOMIC_ACQUIRE);
if (__builtin_expect((s & 1) == 0, 1)) return s;
if (__builtin_expect(spin < 100000, 1)) {
buf_spin_pause();
spin++;
continue;
}
uint32_t val = __atomic_load_n(&hdr->rwlock, __ATOMIC_RELAXED);
if (val >= BUF_RWLOCK_WRITER_BIT) {
uint32_t pid = val & BUF_RWLOCK_PID_MASK;
if (!buf_pid_alive(pid)) {
buf_recover_stale_lock(hdr, val);
spin = 0;
continue;
}
}
struct timespec ts = {0, 1000000};
nanosleep(&ts, NULL);
spin = 0;
}
}
static inline int buf_seqlock_read_retry(uint32_t *seq, uint32_t start) {
return __atomic_load_n(seq, __ATOMIC_ACQUIRE) != start;
}
static inline void buf_seqlock_write_begin(uint32_t *seq) {
__atomic_add_fetch(seq, 1, __ATOMIC_RELEASE);
}
static inline void buf_seqlock_write_end(uint32_t *seq) {
__atomic_add_fetch(seq, 1, __ATOMIC_RELEASE);
}
/* ---- mmap create/open ---- */
static BufHandle *buf_create_map(const char *path, uint64_t capacity,
uint32_t elem_size, uint32_t variant_id,
char *errbuf) {
errbuf[0] = '\0';
int created = 0;
int fd = open(path, O_RDWR | O_CREAT | O_EXCL, 0666);
if (fd >= 0) {
created = 1;
} else if (errno == EEXIST) {
fd = open(path, O_RDWR);
}
if (fd < 0) {
snprintf(errbuf, BUF_ERR_BUFLEN, "open(%s): %s", path, strerror(errno));
return NULL;
}
/* Lock file for init race prevention */
if (flock(fd, LOCK_EX) < 0) {
snprintf(errbuf, BUF_ERR_BUFLEN, "flock(%s): %s", path, strerror(errno));
close(fd);
return NULL;
}
uint64_t data_off = sizeof(BufHeader); /* 128, already cache-line aligned */
if (elem_size > 0 && capacity > (UINT64_MAX - data_off) / elem_size) {
snprintf(errbuf, BUF_ERR_BUFLEN, "buffer size overflow");
flock(fd, LOCK_UN);
close(fd);
return NULL;
}
uint64_t total_size = data_off + capacity * elem_size;
struct stat st;
if (fstat(fd, &st) < 0) {
snprintf(errbuf, BUF_ERR_BUFLEN, "fstat(%s): %s", path, strerror(errno));
( run in 1.846 second using v1.01-cache-2.11-cpan-39bf76dae61 )