Data-ReqRep-Shared
view release on metacpan or search on metacpan
#include <errno.h>
#include <time.h>
#include <limits.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>
/* ================================================================
* Constants
* ================================================================ */
#define REQREP_MAGIC 0x52525331U /* "RRS1" */
#define REQREP_VERSION 1
#define REQREP_ERR_BUFLEN 256
#define REQREP_SPIN_LIMIT 32
#define REQREP_LOCK_TIMEOUT_SEC 2
#define REQREP_UTF8_FLAG 0x80000000U
#define REQREP_STR_LEN_MASK 0x7FFFFFFFU
#define RESP_FREE 0
#define RESP_ACQUIRED 1
#define RESP_READY 2
#define REQREP_MODE_STR 0
#define REQREP_MODE_INT 1
/* Pack/unpack slot index + generation into a 64-bit request ID.
* Prevents ABA: cancelled slot re-acquired by another client
* won't match the generation stored in the original request. */
#define REQREP_MAKE_ID(slot, gen) (((uint64_t)(gen) << 32) | (uint64_t)(slot))
#define REQREP_ID_SLOT(id) ((uint32_t)((id) & 0xFFFFFFFFULL))
#define REQREP_ID_GEN(id) ((uint32_t)((id) >> 32))
/* ================================================================
* Header (256 bytes = 4 cache lines, lives at start of mmap)
* ================================================================ */
typedef struct {
/* ---- Cache line 0 (0-63): immutable after create ---- */
uint32_t magic; /* 0 */
uint32_t version; /* 4 */
uint32_t mode; /* 8: REQREP_MODE_STR or REQREP_MODE_INT */
uint32_t req_cap; /* 12: request queue capacity (power of 2) */
uint64_t total_size; /* 16: mmap size */
uint32_t req_slots_off; /* 24: offset to request slot array */
uint32_t req_arena_off; /* 28: offset to request arena */
uint32_t req_arena_cap; /* 32: arena byte capacity */
uint32_t resp_slots; /* 36: number of response slots */
uint32_t resp_data_max; /* 40: max response data bytes per slot */
uint32_t resp_off; /* 44: offset to response slot area */
uint32_t resp_stride; /* 48: bytes per response slot (cache-aligned) */
uint8_t _pad0[12]; /* 52-63 */
/* ---- Cache line 1 (64-127): recv hot (server) ---- */
uint64_t req_head; /* 64: consumer position */
uint32_t recv_waiters; /* 72: blocked servers */
uint32_t recv_futex; /* 76: futex for recv wakeup */
uint8_t _pad1[48]; /* 80-127 */
/* ---- Cache line 2 (128-191): send hot (client) ---- */
uint64_t req_tail; /* 128: producer position */
uint32_t send_waiters; /* 136: blocked clients */
uint32_t send_futex; /* 140: futex for send wakeup */
uint8_t _pad2[48]; /* 144-191 */
/* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
uint32_t mutex_waiters; /* 196 */
uint32_t arena_wpos; /* 200: arena write position */
uint32_t arena_used; /* 204: arena bytes consumed */
uint32_t resp_hint; /* 208: hint for slot scan */
uint32_t stat_recoveries; /* 212 */
uint32_t slot_futex; /* 216: futex for slot availability */
uint32_t slot_waiters; /* 220: threads waiting for free slot */
uint64_t stat_requests; /* 224 */
uint64_t stat_replies; /* 232 */
uint64_t stat_send_full; /* 240 */
uint32_t stat_recv_empty; /* 248 */
uint32_t _pad3; /* 252-255 */
} ReqRepHeader;
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(ReqRepHeader) == 256, "ReqRepHeader must be 256 bytes");
#endif
/* ================================================================
* Slot types
* ================================================================ */
typedef struct {
uint32_t arena_off;
uint32_t packed_len; /* bit 31 = UTF-8, bits 0-30 = byte length */
uint32_t arena_skip; /* bytes to release from arena on recv */
uint32_t resp_slot; /* response slot index */
uint32_t resp_gen; /* generation at time of slot acquire (ABA guard) */
uint32_t _rpad;
} ReqSlot; /* 24 bytes (Str mode) */
/* Int request slot: Vyukov sequence + value + response routing */
typedef struct {
uint64_t sequence;
int64_t value;
uint32_t resp_slot;
uint32_t resp_gen;
} ReqIntSlot; /* 24 bytes (Int mode, lock-free) */
typedef struct {
uint32_t state; /* futex: RESP_FREE=0, RESP_ACQUIRED=1, RESP_READY=2 */
uint32_t waiters; /* futex waiters on this slot */
uint32_t owner_pid; /* PID of client that acquired (for stale recovery) */
uint32_t resp_len; /* response data length */
uint32_t resp_flags; /* bit 0 = UTF-8 */
uint32_t generation; /* incremented on each acquire (ABA guard) */
uint32_t _rpad[2]; /* pad to 32 bytes */
} RespSlotHeader; /* 32 bytes + data */
#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(RespSlotHeader) == 32, "RespSlotHeader must be 32 bytes");
#endif
/* ================================================================
* Process-local handle
ReqRepHandle *h = reqrep_setup_handle(base, (size_t)total_size, NULL, fd);
if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
return h;
}
static ReqRepHandle *reqrep_open_fd(int fd, uint32_t mode, char *errbuf) {
if (errbuf) errbuf[0] = '\0';
struct stat st;
if (fstat(fd, &st) < 0) {
REQREP_ERR("fstat(fd=%d): %s", fd, strerror(errno));
return NULL;
}
if ((uint64_t)st.st_size < sizeof(ReqRepHeader)) {
REQREP_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
return NULL;
}
size_t map_size = (size_t)st.st_size;
void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
REQREP_ERR("mmap(fd=%d): %s", fd, strerror(errno));
return NULL;
}
if (!reqrep_validate_header((ReqRepHeader *)base, map_size, mode)) {
REQREP_ERR("fd %d: invalid or incompatible reqrep", fd);
munmap(base, map_size);
return NULL;
}
int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
if (myfd < 0) {
REQREP_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
munmap(base, map_size);
return NULL;
}
ReqRepHandle *h = reqrep_setup_handle(base, map_size, NULL, myfd);
if (!h) { munmap(base, map_size); close(myfd); return NULL; }
return h;
}
static void reqrep_destroy(ReqRepHandle *h) {
if (!h) return;
if (h->notify_fd >= 0) close(h->notify_fd);
if (h->reply_fd >= 0) close(h->reply_fd);
if (h->backing_fd >= 0) close(h->backing_fd);
if (h->hdr) munmap(h->hdr, h->mmap_size);
free(h->copy_buf);
free(h->path);
free(h);
}
/* ================================================================
* Request queue operations (client -> server)
* ================================================================ */
/* Push request while mutex is held. Returns 1=ok, 0=full, -2=too long. */
static inline int reqrep_send_locked(ReqRepHandle *h, const char *str,
uint32_t len, bool utf8,
uint32_t resp_slot_idx, uint32_t resp_gen) {
ReqRepHeader *hdr = h->hdr;
if (len > REQREP_STR_LEN_MASK) return -2;
if (hdr->req_tail - hdr->req_head >= h->req_cap) {
__atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
return 0;
}
uint32_t alloc = (len + 7) & ~7u;
if (alloc == 0) alloc = 8;
/* Single message must fit arena; else overflow into response slots. */
if (alloc > h->req_arena_cap) return -2;
uint32_t pos = hdr->arena_wpos;
uint64_t skip = alloc;
if ((uint64_t)pos + alloc > h->req_arena_cap) {
skip += h->req_arena_cap - pos;
pos = 0;
}
if ((uint64_t)hdr->arena_used + skip > h->req_arena_cap) {
if (hdr->req_tail == hdr->req_head) {
hdr->arena_wpos = 0;
hdr->arena_used = 0;
pos = 0;
skip = alloc;
} else {
__atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
return 0;
}
}
memcpy(h->req_arena + pos, str, len);
uint32_t idx = (uint32_t)(hdr->req_tail & h->req_cap_mask);
ReqSlot *slot = &h->req_slots[idx];
slot->arena_off = pos;
slot->packed_len = len | (utf8 ? REQREP_UTF8_FLAG : 0);
slot->arena_skip = (uint32_t)skip;
slot->resp_slot = resp_slot_idx;
slot->resp_gen = resp_gen;
hdr->arena_wpos = pos + alloc;
hdr->arena_used += (uint32_t)skip;
hdr->req_tail++;
__atomic_add_fetch(&hdr->stat_requests, 1, __ATOMIC_RELAXED);
return 1;
}
/* Non-blocking send: acquire slot + push request.
* Returns 1=ok, 0=full, -2=too long, -3=no slots.
* On success, *out_id is the packed slot+generation ID. */
static int reqrep_try_send(ReqRepHandle *h, const char *str, uint32_t len,
bool utf8, uint64_t *out_id) {
int32_t slot = reqrep_slot_acquire(h);
if (slot < 0) return -3;
RespSlotHeader *rslot = reqrep_resp_slot(h, (uint32_t)slot);
uint32_t gen = __atomic_load_n(&rslot->generation, __ATOMIC_ACQUIRE);
reqrep_mutex_lock(h->hdr);
int r = reqrep_send_locked(h, str, len, utf8, (uint32_t)slot, gen);
reqrep_mutex_unlock(h->hdr);
if (r == 1) {
reqrep_wake_consumers(h->hdr);
*out_id = REQREP_MAKE_ID((uint32_t)slot, gen);
return 1;
}
reqrep_slot_release(h, (uint32_t)slot);
return r;
}
/* Blocking send with timeout. Returns 1=ok, 0=timeout, -2=too long, -3=no slots (timeout). */
static int reqrep_send_wait(ReqRepHandle *h, const char *str, uint32_t len,
bool utf8, uint64_t *out_id, double timeout) {
int r = reqrep_try_send(h, str, len, utf8, out_id);
if (r == 1 || r == -2) return r;
if (timeout == 0) return r;
ReqRepHeader *hdr = h->hdr;
struct timespec deadline, remaining;
int has_deadline = (timeout > 0);
if (has_deadline) reqrep_make_deadline(timeout, &deadline);
for (;;) {
uint32_t *futex_word = (r == -3) ? &hdr->slot_futex : &hdr->send_futex;
uint32_t *waiter_cnt = (r == -3) ? &hdr->slot_waiters : &hdr->send_waiters;
uint32_t fseq = __atomic_load_n(futex_word, __ATOMIC_ACQUIRE);
r = reqrep_try_send(h, str, len, utf8, out_id);
if (r == 1 || r == -2) return r;
__atomic_add_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
struct timespec *pts = NULL;
if (has_deadline) {
if (!reqrep_remaining_time(&deadline, &remaining)) {
__atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
return r;
}
pts = &remaining;
}
long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
__atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
r = reqrep_try_send(h, str, len, utf8, out_id);
if (r == 1 || r == -2) return r;
if (rc == -1 && errno == ETIMEDOUT) return r;
}
}
/* Pop request while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
static inline int reqrep_recv_locked(ReqRepHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8,
uint64_t *out_id) {
ReqRepHeader *hdr = h->hdr;
if (hdr->req_tail == hdr->req_head) {
__atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
return 0;
}
uint32_t idx = (uint32_t)(hdr->req_head & h->req_cap_mask);
ReqSlot *slot = &h->req_slots[idx];
uint32_t len = slot->packed_len & REQREP_STR_LEN_MASK;
*out_utf8 = (slot->packed_len & REQREP_UTF8_FLAG) != 0;
*out_id = REQREP_MAKE_ID(slot->resp_slot, slot->resp_gen);
if (!reqrep_ensure_copy_buf(h, len + 1))
return -1;
if (len > 0)
memcpy(h->copy_buf, h->req_arena + slot->arena_off, len);
h->copy_buf[len] = '\0';
*out_str = h->copy_buf;
*out_len = len;
if (hdr->arena_used >= slot->arena_skip)
hdr->arena_used -= slot->arena_skip;
else
hdr->arena_used = 0;
if (hdr->arena_used == 0)
hdr->arena_wpos = 0;
hdr->req_head++;
return 1;
}
/* Pop request (server recv). Returns 1=ok, 0=empty, -1=OOM. */
static inline int reqrep_try_recv(ReqRepHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8,
uint64_t *out_id) {
reqrep_mutex_lock(h->hdr);
int r = reqrep_recv_locked(h, out_str, out_len, out_utf8, out_id);
reqrep_mutex_unlock(h->hdr);
if (r == 1) reqrep_wake_producers(h->hdr);
return r;
}
/* Blocking recv with timeout. Returns 1=ok, 0=timeout, -1=OOM. */
static int reqrep_recv_wait(ReqRepHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8,
uint64_t *out_id, double timeout) {
int r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
if (r != 0) return r;
if (timeout == 0) return 0;
ReqRepHeader *hdr = h->hdr;
struct timespec deadline, remaining;
int has_deadline = (timeout > 0);
if (has_deadline) reqrep_make_deadline(timeout, &deadline);
for (;;) {
uint32_t fseq = __atomic_load_n(&hdr->recv_futex, __ATOMIC_ACQUIRE);
r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
if (r != 0) return r;
__atomic_add_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
struct timespec *pts = NULL;
if (has_deadline) {
if (!reqrep_remaining_time(&deadline, &remaining)) {
__atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
return 0;
}
pts = &remaining;
}
long rc = syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAIT, fseq, pts, NULL, 0);
__atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
if (r != 0) return r;
if (rc == -1 && errno == ETIMEDOUT) return 0;
}
}
/* ================================================================
* Response operations (server -> client)
* ================================================================ */
/* Write response to a response slot.
* Returns 1=ok, -1=bad slot, -2=stale (cancelled/recycled), -3=too long. */
static int reqrep_reply(ReqRepHandle *h, uint64_t id,
const char *str, uint32_t len, bool utf8) {
uint32_t slot_idx = REQREP_ID_SLOT(id);
uint32_t expected_gen = REQREP_ID_GEN(id);
if (slot_idx >= h->resp_slots) return -1;
if (len > h->resp_data_max) return -3;
RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
if (state != RESP_ACQUIRED) return -2;
if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -2;
uint8_t *data = (uint8_t *)slot + sizeof(RespSlotHeader);
double timeout) {
int r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
if (r != 0) return r;
if (timeout == 0) return 0;
uint32_t slot_idx = REQREP_ID_SLOT(id);
RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
struct timespec deadline, remaining;
int has_deadline = (timeout > 0);
if (has_deadline) reqrep_make_deadline(timeout, &deadline);
for (;;) {
uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
if (state == RESP_READY)
return reqrep_try_get(h, id, out_str, out_len, out_utf8);
__atomic_add_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
/* Re-check: cancel may have fired between try_get and waiter registration */
if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != REQREP_ID_GEN(id)) {
__atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
return -4;
}
state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
if (state == RESP_READY) {
__atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
return reqrep_try_get(h, id, out_str, out_len, out_utf8);
}
struct timespec *pts = NULL;
if (has_deadline) {
if (!reqrep_remaining_time(&deadline, &remaining)) {
__atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
return 0;
}
pts = &remaining;
}
syscall(SYS_futex, &slot->state, FUTEX_WAIT, state, pts, NULL, 0);
__atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
if (r != 0) return r;
}
}
/* Cancel a pending request â CAS ACQUIREDâFREE only if generation matches.
* If the reply already arrived (READY), cancel is a no-op â call get() to drain. */
static void reqrep_cancel(ReqRepHandle *h, uint64_t id) {
uint32_t slot_idx = REQREP_ID_SLOT(id);
uint32_t expected_gen = REQREP_ID_GEN(id);
if (slot_idx >= h->resp_slots) return;
RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return;
uint32_t expected_state = RESP_ACQUIRED;
if (__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_FREE,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
__atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
/* Wake get_wait blocked on this slot's state futex */
if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
reqrep_wake_slot_waiters(h->hdr);
}
}
/* Combined send + wait-for-reply with single deadline.
* Returns 1=ok, 0=timeout, -2=too long, -3=no slots, -4=stale. */
static int reqrep_request(ReqRepHandle *h, const char *req_str, uint32_t req_len,
bool req_utf8, const char **out_str, uint32_t *out_len,
bool *out_utf8, double timeout) {
uint64_t id;
struct timespec deadline;
int has_deadline = (timeout > 0);
if (has_deadline) reqrep_make_deadline(timeout, &deadline);
int r = reqrep_send_wait(h, req_str, req_len, req_utf8, &id, timeout);
if (r != 1) return r;
double get_timeout = timeout;
if (has_deadline) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
get_timeout = (double)(deadline.tv_sec - now.tv_sec) +
(double)(deadline.tv_nsec - now.tv_nsec) / 1e9;
if (get_timeout <= 0) {
reqrep_cancel(h, id);
return 0;
}
}
r = reqrep_get_wait(h, id, out_str, out_len, out_utf8, get_timeout);
if (r != 1) {
reqrep_cancel(h, id);
/* If reply arrived between timeout and cancel, drain to free the slot */
const char *discard; uint32_t dlen; bool dutf8;
reqrep_try_get(h, id, &discard, &dlen, &dutf8);
}
return r;
}
/* Count response slots owned by this process */
static uint32_t reqrep_pending(ReqRepHandle *h) {
uint32_t mypid = (uint32_t)getpid();
uint32_t count = 0;
for (uint32_t i = 0; i < h->resp_slots; i++) {
RespSlotHeader *slot = reqrep_resp_slot(h, i);
uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_RELAXED);
if ((state == RESP_ACQUIRED || state == RESP_READY) &&
__atomic_load_n(&slot->owner_pid, __ATOMIC_RELAXED) == mypid)
count++;
}
return count;
}
/* ================================================================
* Queue state
* ================================================================ */
static inline uint64_t reqrep_size(ReqRepHandle *h) {
( run in 0.700 second using v1.01-cache-2.11-cpan-39bf76dae61 )