Data-ReqRep-Shared
view release on metacpan or search on metacpan
size_t mmap_size;
uint32_t req_cap;
uint32_t req_cap_mask; /* req_cap - 1 */
uint32_t req_arena_cap;
uint32_t resp_slots;
uint32_t resp_data_max;
uint32_t resp_stride;
char *copy_buf;
uint32_t copy_buf_cap;
char *path;
int notify_fd; /* request notification eventfd, -1 if unset */
int reply_fd; /* reply notification eventfd, -1 if unset */
int backing_fd; /* memfd fd, -1 for file-backed/anonymous */
} ReqRepHandle;
/* ================================================================
* Utility
* ================================================================ */
static inline uint32_t reqrep_next_pow2(uint32_t v) {
if (v < 2) return 2;
if (v > 0x80000000U) return 0;
v--;
v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
return v + 1;
}
static inline void reqrep_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
static inline int reqrep_ensure_copy_buf(ReqRepHandle *h, uint32_t needed) {
if (needed <= h->copy_buf_cap) return 1;
uint32_t ns = h->copy_buf_cap ? h->copy_buf_cap : 64;
while (ns < needed) { uint32_t n2 = ns * 2; if (n2 <= ns) { ns = needed; break; } ns = n2; }
char *nb = (char *)realloc(h->copy_buf, ns);
if (!nb) return 0;
h->copy_buf = nb;
h->copy_buf_cap = ns;
return 1;
}
static inline RespSlotHeader *reqrep_resp_slot(ReqRepHandle *h, uint32_t idx) {
return (RespSlotHeader *)(h->resp_area + (uint64_t)idx * h->resp_stride);
}
/* ================================================================
* Futex helpers
* ================================================================ */
#define REQREP_MUTEX_WRITER_BIT 0x80000000U
#define REQREP_MUTEX_PID_MASK 0x7FFFFFFFU
#define REQREP_MUTEX_VAL(pid) (REQREP_MUTEX_WRITER_BIT | ((uint32_t)(pid) & REQREP_MUTEX_PID_MASK))
static inline int reqrep_pid_alive(uint32_t pid) {
if (pid == 0) return 1;
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
static const struct timespec reqrep_lock_timeout = { REQREP_LOCK_TIMEOUT_SEC, 0 };
static inline void reqrep_recover_stale_mutex(ReqRepHeader *hdr, uint32_t observed) {
if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
return;
__atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
static inline void reqrep_mutex_lock(ReqRepHeader *hdr) {
uint32_t mypid = REQREP_MUTEX_VAL((uint32_t)getpid());
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < REQREP_SPIN_LIMIT, 1)) {
reqrep_spin_pause();
continue;
}
__atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
&reqrep_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (val >= REQREP_MUTEX_WRITER_BIT) {
uint32_t pid = val & REQREP_MUTEX_PID_MASK;
if (!reqrep_pid_alive(pid))
reqrep_recover_stale_mutex(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
static inline void reqrep_mutex_unlock(ReqRepHeader *hdr) {
__atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
static inline void reqrep_wake_consumers(ReqRepHeader *hdr) {
if (__atomic_load_n(&hdr->recv_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->recv_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
}
static inline void reqrep_wake_producers(ReqRepHeader *hdr) {
if (__atomic_load_n(&hdr->send_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->send_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->send_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
}
static inline void reqrep_wake_slot_waiters(ReqRepHeader *hdr) {
if (__atomic_load_n(&hdr->slot_waiters, __ATOMIC_RELAXED) > 0) {
__atomic_add_fetch(&hdr->slot_futex, 1, __ATOMIC_RELEASE);
syscall(SYS_futex, &hdr->slot_futex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
}
static inline int reqrep_remaining_time(const struct timespec *deadline,
struct timespec *remaining) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
remaining->tv_sec = deadline->tv_sec - now.tv_sec;
remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
if (remaining->tv_nsec < 0) {
remaining->tv_sec--;
remaining->tv_nsec += 1000000000L;
}
return remaining->tv_sec >= 0;
}
static inline void reqrep_make_deadline(double timeout, struct timespec *deadline) {
clock_gettime(CLOCK_MONOTONIC, deadline);
deadline->tv_sec += (time_t)timeout;
deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
if (deadline->tv_nsec >= 1000000000L) {
deadline->tv_sec++;
deadline->tv_nsec -= 1000000000L;
}
}
/* ================================================================
* Response slot operations
* ================================================================ */
static int32_t reqrep_slot_acquire(ReqRepHandle *h) {
uint32_t n = h->resp_slots;
uint32_t hint = __atomic_load_n(&h->hdr->resp_hint, __ATOMIC_RELAXED);
uint32_t mypid = (uint32_t)getpid();
for (uint32_t i = 0; i < n; i++) {
uint32_t idx = (hint + i) % n;
RespSlotHeader *slot = reqrep_resp_slot(h, idx);
uint32_t expected = RESP_FREE;
if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
__atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
__atomic_store_n(&h->hdr->resp_hint, (idx + 1) % n, __ATOMIC_RELAXED);
return (int32_t)idx;
}
}
/* Recover stale slots from dead processes (both ACQUIRED and READY) */
for (uint32_t i = 0; i < n; i++) {
RespSlotHeader *slot = reqrep_resp_slot(h, i);
uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
if (state == RESP_ACQUIRED || state == RESP_READY) {
uint32_t pid = __atomic_load_n(&slot->owner_pid, __ATOMIC_RELAXED);
if (pid && !reqrep_pid_alive(pid)) {
if (__atomic_compare_exchange_n(&slot->state, &state, RESP_FREE,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_add_fetch(&h->hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
uint32_t expected = RESP_FREE;
if (__atomic_compare_exchange_n(&slot->state, &expected, RESP_ACQUIRED,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_store_n(&slot->owner_pid, mypid, __ATOMIC_RELAXED);
__atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
return (int32_t)i;
}
reqrep_wake_slot_waiters(h->hdr);
}
}
}
}
return -1;
}
static inline void reqrep_slot_release(ReqRepHandle *h, uint32_t idx) {
RespSlotHeader *slot = reqrep_resp_slot(h, idx);
__atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
__atomic_store_n(&slot->state, RESP_FREE, __ATOMIC_RELEASE);
reqrep_wake_slot_waiters(h->hdr);
}
/* ================================================================
* Create / Open / Close
* ================================================================ */
#define REQREP_ERR(fmt, ...) do { if (errbuf) snprintf(errbuf, REQREP_ERR_BUFLEN, fmt, ##__VA_ARGS__); } while(0)
static ReqRepHandle *reqrep_setup_handle(void *base, size_t map_size,
const char *path, int backing_fd) {
ReqRepHeader *hdr = (ReqRepHeader *)base;
ReqRepHandle *h = (ReqRepHandle *)calloc(1, sizeof(ReqRepHandle));
if (!h) return NULL;
h->hdr = hdr;
h->req_slots = (ReqSlot *)((char *)base + hdr->req_slots_off);
h->req_arena = (char *)base + hdr->req_arena_off;
h->resp_area = (uint8_t *)base + hdr->resp_off;
h->mmap_size = map_size;
h->req_cap = hdr->req_cap;
h->req_cap_mask = hdr->req_cap - 1;
h->req_arena_cap = hdr->req_arena_cap;
h->resp_slots = hdr->resp_slots;
h->resp_data_max = hdr->resp_data_max;
h->resp_stride = hdr->resp_stride;
h->path = path ? strdup(path) : NULL;
h->notify_fd = -1;
h->reply_fd = -1;
h->backing_fd = backing_fd;
return h;
}
static int reqrep_validate_header(ReqRepHeader *hdr, size_t file_size, uint32_t expected_mode) {
if (hdr->magic != REQREP_MAGIC) return 0;
if (hdr->version != REQREP_VERSION) return 0;
( run in 1.345 second using v1.01-cache-2.11-cpan-39bf76dae61 )