Data-ReqRep-Shared

 view release on metacpan or  search on metacpan

reqrep.h  view on Meta::CPAN

#include <errno.h>
#include <time.h>
#include <limits.h>
#include <signal.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/eventfd.h>

/* ================================================================
 * Constants
 * ================================================================ */

#define REQREP_MAGIC           0x52525331U  /* "RRS1" */
#define REQREP_VERSION         1
#define REQREP_ERR_BUFLEN      256
#define REQREP_SPIN_LIMIT      32
#define REQREP_LOCK_TIMEOUT_SEC 2

#define REQREP_UTF8_FLAG       0x80000000U
#define REQREP_STR_LEN_MASK    0x7FFFFFFFU

#define RESP_FREE              0
#define RESP_ACQUIRED          1
#define RESP_READY             2

#define REQREP_MODE_STR        0
#define REQREP_MODE_INT        1

/* Pack/unpack slot index + generation into a 64-bit request ID.
 * Prevents ABA: cancelled slot re-acquired by another client
 * won't match the generation stored in the original request. */
#define REQREP_MAKE_ID(slot, gen) (((uint64_t)(gen) << 32) | (uint64_t)(slot))
#define REQREP_ID_SLOT(id)  ((uint32_t)((id) & 0xFFFFFFFFULL))
#define REQREP_ID_GEN(id)   ((uint32_t)((id) >> 32))

/* ================================================================
 * Header (256 bytes = 4 cache lines, lives at start of mmap)
 * ================================================================ */

typedef struct {
    /* ---- Cache line 0 (0-63): immutable after create ---- */
    uint32_t magic;           /* 0 */
    uint32_t version;         /* 4 */
    uint32_t mode;            /* 8: REQREP_MODE_STR or REQREP_MODE_INT */
    uint32_t req_cap;         /* 12: request queue capacity (power of 2) */
    uint64_t total_size;      /* 16: mmap size */
    uint32_t req_slots_off;   /* 24: offset to request slot array */
    uint32_t req_arena_off;   /* 28: offset to request arena */
    uint32_t req_arena_cap;   /* 32: arena byte capacity */
    uint32_t resp_slots;      /* 36: number of response slots */
    uint32_t resp_data_max;   /* 40: max response data bytes per slot */
    uint32_t resp_off;        /* 44: offset to response slot area */
    uint32_t resp_stride;     /* 48: bytes per response slot (cache-aligned) */
    uint8_t  _pad0[12];       /* 52-63 */

    /* ---- Cache line 1 (64-127): recv hot (server) ---- */
    uint64_t req_head;        /* 64: consumer position */
    uint32_t recv_waiters;    /* 72: blocked servers */
    uint32_t recv_futex;      /* 76: futex for recv wakeup */
    uint8_t  _pad1[48];       /* 80-127 */

    /* ---- Cache line 2 (128-191): send hot (client) ---- */
    uint64_t req_tail;        /* 128: producer position */
    uint32_t send_waiters;    /* 136: blocked clients */
    uint32_t send_futex;      /* 140: futex for send wakeup */
    uint8_t  _pad2[48];       /* 144-191 */

    /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
    uint32_t mutex;           /* 192: futex-based mutex (0 or PID|0x80000000) */
    uint32_t mutex_waiters;   /* 196 */
    uint32_t arena_wpos;      /* 200: arena write position */
    uint32_t arena_used;      /* 204: arena bytes consumed */
    uint32_t resp_hint;       /* 208: hint for slot scan */
    uint32_t stat_recoveries; /* 212 */
    uint32_t slot_futex;      /* 216: futex for slot availability */
    uint32_t slot_waiters;    /* 220: threads waiting for free slot */
    uint64_t stat_requests;   /* 224 */
    uint64_t stat_replies;    /* 232 */
    uint64_t stat_send_full;  /* 240 */
    uint32_t stat_recv_empty; /* 248 */
    uint32_t _pad3;           /* 252-255 */
} ReqRepHeader;

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(ReqRepHeader) == 256, "ReqRepHeader must be 256 bytes");
#endif

/* ================================================================
 * Slot types
 * ================================================================ */

typedef struct {
    uint32_t arena_off;
    uint32_t packed_len;   /* bit 31 = UTF-8, bits 0-30 = byte length */
    uint32_t arena_skip;   /* bytes to release from arena on recv */
    uint32_t resp_slot;    /* response slot index */
    uint32_t resp_gen;     /* generation at time of slot acquire (ABA guard) */
    uint32_t _rpad;
} ReqSlot;  /* 24 bytes (Str mode) */

/* Int request slot: Vyukov sequence + value + response routing */
typedef struct {
    uint64_t sequence;
    int64_t  value;
    uint32_t resp_slot;
    uint32_t resp_gen;
} ReqIntSlot;  /* 24 bytes (Int mode, lock-free) */

typedef struct {
    uint32_t state;        /* futex: RESP_FREE=0, RESP_ACQUIRED=1, RESP_READY=2 */
    uint32_t waiters;      /* futex waiters on this slot */
    uint32_t owner_pid;    /* PID of client that acquired (for stale recovery) */
    uint32_t resp_len;     /* response data length */
    uint32_t resp_flags;   /* bit 0 = UTF-8 */
    uint32_t generation;   /* incremented on each acquire (ABA guard) */
    uint32_t _rpad[2];     /* pad to 32 bytes */
} RespSlotHeader;  /* 32 bytes + data */

#if defined(__STDC_VERSION__) && __STDC_VERSION__ >= 201112L
_Static_assert(sizeof(RespSlotHeader) == 32, "RespSlotHeader must be 32 bytes");
#endif

/* ================================================================
 * Process-local handle

reqrep.h  view on Meta::CPAN

    ReqRepHandle *h = reqrep_setup_handle(base, (size_t)total_size, NULL, fd);
    if (!h) { munmap(base, (size_t)total_size); close(fd); return NULL; }
    return h;
}

static ReqRepHandle *reqrep_open_fd(int fd, uint32_t mode, char *errbuf) {
    if (errbuf) errbuf[0] = '\0';

    struct stat st;
    if (fstat(fd, &st) < 0) {
        REQREP_ERR("fstat(fd=%d): %s", fd, strerror(errno));
        return NULL;
    }

    if ((uint64_t)st.st_size < sizeof(ReqRepHeader)) {
        REQREP_ERR("fd %d: too small (%lld)", fd, (long long)st.st_size);
        return NULL;
    }

    size_t map_size = (size_t)st.st_size;
    void *base = mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    if (base == MAP_FAILED) {
        REQREP_ERR("mmap(fd=%d): %s", fd, strerror(errno));
        return NULL;
    }

    if (!reqrep_validate_header((ReqRepHeader *)base, map_size, mode)) {
        REQREP_ERR("fd %d: invalid or incompatible reqrep", fd);
        munmap(base, map_size);
        return NULL;
    }

    int myfd = fcntl(fd, F_DUPFD_CLOEXEC, 0);
    if (myfd < 0) {
        REQREP_ERR("fcntl(F_DUPFD_CLOEXEC): %s", strerror(errno));
        munmap(base, map_size);
        return NULL;
    }

    ReqRepHandle *h = reqrep_setup_handle(base, map_size, NULL, myfd);
    if (!h) { munmap(base, map_size); close(myfd); return NULL; }
    return h;
}

static void reqrep_destroy(ReqRepHandle *h) {
    if (!h) return;
    if (h->notify_fd >= 0) close(h->notify_fd);
    if (h->reply_fd >= 0) close(h->reply_fd);
    if (h->backing_fd >= 0) close(h->backing_fd);
    if (h->hdr) munmap(h->hdr, h->mmap_size);
    free(h->copy_buf);
    free(h->path);
    free(h);
}

/* ================================================================
 * Request queue operations (client -> server)
 * ================================================================ */

/* Push request while mutex is held. Returns 1=ok, 0=full, -2=too long. */
static inline int reqrep_send_locked(ReqRepHandle *h, const char *str,
                                      uint32_t len, bool utf8,
                                      uint32_t resp_slot_idx, uint32_t resp_gen) {
    ReqRepHeader *hdr = h->hdr;

    if (len > REQREP_STR_LEN_MASK) return -2;

    if (hdr->req_tail - hdr->req_head >= h->req_cap) {
        __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
        return 0;
    }

    uint32_t alloc = (len + 7) & ~7u;
    if (alloc == 0) alloc = 8;
    /* Single message must fit arena; else overflow into response slots. */
    if (alloc > h->req_arena_cap) return -2;
    uint32_t pos = hdr->arena_wpos;
    uint64_t skip = alloc;

    if ((uint64_t)pos + alloc > h->req_arena_cap) {
        skip += h->req_arena_cap - pos;
        pos = 0;
    }

    if ((uint64_t)hdr->arena_used + skip > h->req_arena_cap) {
        if (hdr->req_tail == hdr->req_head) {
            hdr->arena_wpos = 0;
            hdr->arena_used = 0;
            pos = 0;
            skip = alloc;
        } else {
            __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
            return 0;
        }
    }

    memcpy(h->req_arena + pos, str, len);

    uint32_t idx = (uint32_t)(hdr->req_tail & h->req_cap_mask);
    ReqSlot *slot = &h->req_slots[idx];
    slot->arena_off = pos;
    slot->packed_len = len | (utf8 ? REQREP_UTF8_FLAG : 0);
    slot->arena_skip = (uint32_t)skip;
    slot->resp_slot = resp_slot_idx;
    slot->resp_gen = resp_gen;

    hdr->arena_wpos = pos + alloc;
    hdr->arena_used += (uint32_t)skip;
    hdr->req_tail++;
    __atomic_add_fetch(&hdr->stat_requests, 1, __ATOMIC_RELAXED);
    return 1;
}

/* Non-blocking send: acquire slot + push request.
 * Returns 1=ok, 0=full, -2=too long, -3=no slots.
 * On success, *out_id is the packed slot+generation ID. */
static int reqrep_try_send(ReqRepHandle *h, const char *str, uint32_t len,
                            bool utf8, uint64_t *out_id) {
    int32_t slot = reqrep_slot_acquire(h);
    if (slot < 0) return -3;

    RespSlotHeader *rslot = reqrep_resp_slot(h, (uint32_t)slot);
    uint32_t gen = __atomic_load_n(&rslot->generation, __ATOMIC_ACQUIRE);

    reqrep_mutex_lock(h->hdr);
    int r = reqrep_send_locked(h, str, len, utf8, (uint32_t)slot, gen);
    reqrep_mutex_unlock(h->hdr);

    if (r == 1) {
        reqrep_wake_consumers(h->hdr);
        *out_id = REQREP_MAKE_ID((uint32_t)slot, gen);
        return 1;
    }

    reqrep_slot_release(h, (uint32_t)slot);
    return r;
}

/* Blocking send with timeout. Returns 1=ok, 0=timeout, -2=too long, -3=no slots (timeout). */
static int reqrep_send_wait(ReqRepHandle *h, const char *str, uint32_t len,
                             bool utf8, uint64_t *out_id, double timeout) {
    int r = reqrep_try_send(h, str, len, utf8, out_id);
    if (r == 1 || r == -2) return r;
    if (timeout == 0) return r;

    ReqRepHeader *hdr = h->hdr;
    struct timespec deadline, remaining;
    int has_deadline = (timeout > 0);
    if (has_deadline) reqrep_make_deadline(timeout, &deadline);

    for (;;) {
        uint32_t *futex_word  = (r == -3) ? &hdr->slot_futex : &hdr->send_futex;
        uint32_t *waiter_cnt  = (r == -3) ? &hdr->slot_waiters : &hdr->send_waiters;

        uint32_t fseq = __atomic_load_n(futex_word, __ATOMIC_ACQUIRE);
        r = reqrep_try_send(h, str, len, utf8, out_id);
        if (r == 1 || r == -2) return r;

        __atomic_add_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
        struct timespec *pts = NULL;
        if (has_deadline) {
            if (!reqrep_remaining_time(&deadline, &remaining)) {
                __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
                return r;
            }
            pts = &remaining;
        }
        long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
        __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);

        r = reqrep_try_send(h, str, len, utf8, out_id);
        if (r == 1 || r == -2) return r;
        if (rc == -1 && errno == ETIMEDOUT) return r;
    }
}

/* Pop request while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
static inline int reqrep_recv_locked(ReqRepHandle *h, const char **out_str,
                                      uint32_t *out_len, bool *out_utf8,
                                      uint64_t *out_id) {
    ReqRepHeader *hdr = h->hdr;

    if (hdr->req_tail == hdr->req_head) {
        __atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
        return 0;
    }

    uint32_t idx = (uint32_t)(hdr->req_head & h->req_cap_mask);
    ReqSlot *slot = &h->req_slots[idx];

    uint32_t len = slot->packed_len & REQREP_STR_LEN_MASK;
    *out_utf8 = (slot->packed_len & REQREP_UTF8_FLAG) != 0;
    *out_id = REQREP_MAKE_ID(slot->resp_slot, slot->resp_gen);

    if (!reqrep_ensure_copy_buf(h, len + 1))
        return -1;
    if (len > 0)
        memcpy(h->copy_buf, h->req_arena + slot->arena_off, len);
    h->copy_buf[len] = '\0';
    *out_str = h->copy_buf;
    *out_len = len;

    if (hdr->arena_used >= slot->arena_skip)
        hdr->arena_used -= slot->arena_skip;
    else
        hdr->arena_used = 0;
    if (hdr->arena_used == 0)
        hdr->arena_wpos = 0;

    hdr->req_head++;
    return 1;
}

/* Pop request (server recv). Returns 1=ok, 0=empty, -1=OOM. */
static inline int reqrep_try_recv(ReqRepHandle *h, const char **out_str,
                                   uint32_t *out_len, bool *out_utf8,
                                   uint64_t *out_id) {
    reqrep_mutex_lock(h->hdr);
    int r = reqrep_recv_locked(h, out_str, out_len, out_utf8, out_id);
    reqrep_mutex_unlock(h->hdr);
    if (r == 1) reqrep_wake_producers(h->hdr);
    return r;
}

/* Blocking recv with timeout. Returns 1=ok, 0=timeout, -1=OOM. */
static int reqrep_recv_wait(ReqRepHandle *h, const char **out_str,
                             uint32_t *out_len, bool *out_utf8,
                             uint64_t *out_id, double timeout) {
    int r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
    if (r != 0) return r;
    if (timeout == 0) return 0;

    ReqRepHeader *hdr = h->hdr;
    struct timespec deadline, remaining;
    int has_deadline = (timeout > 0);
    if (has_deadline) reqrep_make_deadline(timeout, &deadline);

    for (;;) {
        uint32_t fseq = __atomic_load_n(&hdr->recv_futex, __ATOMIC_ACQUIRE);
        r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
        if (r != 0) return r;

        __atomic_add_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
        struct timespec *pts = NULL;
        if (has_deadline) {
            if (!reqrep_remaining_time(&deadline, &remaining)) {
                __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);
                return 0;
            }
            pts = &remaining;
        }
        long rc = syscall(SYS_futex, &hdr->recv_futex, FUTEX_WAIT, fseq, pts, NULL, 0);
        __atomic_sub_fetch(&hdr->recv_waiters, 1, __ATOMIC_RELEASE);

        r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
        if (r != 0) return r;
        if (rc == -1 && errno == ETIMEDOUT) return 0;
    }
}

/* ================================================================
 * Response operations (server -> client)
 * ================================================================ */

/* Write response to a response slot.
 * Returns 1=ok, -1=bad slot, -2=stale (cancelled/recycled), -3=too long. */
static int reqrep_reply(ReqRepHandle *h, uint64_t id,
                         const char *str, uint32_t len, bool utf8) {
    uint32_t slot_idx = REQREP_ID_SLOT(id);
    uint32_t expected_gen = REQREP_ID_GEN(id);
    if (slot_idx >= h->resp_slots) return -1;
    if (len > h->resp_data_max) return -3;

    RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
    uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
    if (state != RESP_ACQUIRED) return -2;
    if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return -2;

    uint8_t *data = (uint8_t *)slot + sizeof(RespSlotHeader);

reqrep.h  view on Meta::CPAN

                            double timeout) {
    int r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
    if (r != 0) return r;
    if (timeout == 0) return 0;

    uint32_t slot_idx = REQREP_ID_SLOT(id);
    RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
    struct timespec deadline, remaining;
    int has_deadline = (timeout > 0);
    if (has_deadline) reqrep_make_deadline(timeout, &deadline);

    for (;;) {
        uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
        if (state == RESP_READY)
            return reqrep_try_get(h, id, out_str, out_len, out_utf8);

        __atomic_add_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);

        /* Re-check: cancel may have fired between try_get and waiter registration */
        if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != REQREP_ID_GEN(id)) {
            __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
            return -4;
        }

        state = __atomic_load_n(&slot->state, __ATOMIC_ACQUIRE);
        if (state == RESP_READY) {
            __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
            return reqrep_try_get(h, id, out_str, out_len, out_utf8);
        }

        struct timespec *pts = NULL;
        if (has_deadline) {
            if (!reqrep_remaining_time(&deadline, &remaining)) {
                __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);
                return 0;
            }
            pts = &remaining;
        }

        syscall(SYS_futex, &slot->state, FUTEX_WAIT, state, pts, NULL, 0);
        __atomic_sub_fetch(&slot->waiters, 1, __ATOMIC_RELEASE);

        r = reqrep_try_get(h, id, out_str, out_len, out_utf8);
        if (r != 0) return r;
    }
}

/* Cancel a pending request — CAS ACQUIRED→FREE only if generation matches.
 * If the reply already arrived (READY), cancel is a no-op — call get() to drain. */
static void reqrep_cancel(ReqRepHandle *h, uint64_t id) {
    uint32_t slot_idx = REQREP_ID_SLOT(id);
    uint32_t expected_gen = REQREP_ID_GEN(id);
    if (slot_idx >= h->resp_slots) return;
    RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
    if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return;
    uint32_t expected_state = RESP_ACQUIRED;
    if (__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_FREE,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
        __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
        __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
        /* Wake get_wait blocked on this slot's state futex */
        if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
            syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
        reqrep_wake_slot_waiters(h->hdr);
    }
}

/* Combined send + wait-for-reply with single deadline.
 * Returns 1=ok, 0=timeout, -2=too long, -3=no slots, -4=stale. */
static int reqrep_request(ReqRepHandle *h, const char *req_str, uint32_t req_len,
                           bool req_utf8, const char **out_str, uint32_t *out_len,
                           bool *out_utf8, double timeout) {
    uint64_t id;
    struct timespec deadline;
    int has_deadline = (timeout > 0);
    if (has_deadline) reqrep_make_deadline(timeout, &deadline);

    int r = reqrep_send_wait(h, req_str, req_len, req_utf8, &id, timeout);
    if (r != 1) return r;

    double get_timeout = timeout;
    if (has_deadline) {
        struct timespec now;
        clock_gettime(CLOCK_MONOTONIC, &now);
        get_timeout = (double)(deadline.tv_sec - now.tv_sec) +
                      (double)(deadline.tv_nsec - now.tv_nsec) / 1e9;
        if (get_timeout <= 0) {
            reqrep_cancel(h, id);
            return 0;
        }
    }

    r = reqrep_get_wait(h, id, out_str, out_len, out_utf8, get_timeout);
    if (r != 1) {
        reqrep_cancel(h, id);
        /* If reply arrived between timeout and cancel, drain to free the slot */
        const char *discard; uint32_t dlen; bool dutf8;
        reqrep_try_get(h, id, &discard, &dlen, &dutf8);
    }
    return r;
}

/* Count response slots owned by this process */
static uint32_t reqrep_pending(ReqRepHandle *h) {
    uint32_t mypid = (uint32_t)getpid();
    uint32_t count = 0;
    for (uint32_t i = 0; i < h->resp_slots; i++) {
        RespSlotHeader *slot = reqrep_resp_slot(h, i);
        uint32_t state = __atomic_load_n(&slot->state, __ATOMIC_RELAXED);
        if ((state == RESP_ACQUIRED || state == RESP_READY) &&
            __atomic_load_n(&slot->owner_pid, __ATOMIC_RELAXED) == mypid)
            count++;
    }
    return count;
}

/* ================================================================
 * Queue state
 * ================================================================ */

static inline uint64_t reqrep_size(ReqRepHandle *h) {



( run in 0.700 second using v1.01-cache-2.11-cpan-39bf76dae61 )