Data-ReqRep-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

    struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
    UV n = 0;
    int last_r = 0;
    int oom = 0;
    if (count > 0) {
        items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
        if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
    }
    reqrep_mutex_lock(h->hdr);
    for (UV i = 0; i < count; i++) {
        last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r <= 0) break;
        char *c = (char *)malloc(len ? len : 1);
        if (!c) { oom = 1; break; }
        if (len) memcpy(c, str, len);
        items_buf[n].buf = c;
        items_buf[n].len = len;
        items_buf[n].id = id;
        items_buf[n].utf8 = utf8;
        n++;
    }

Shared.xs  view on Meta::CPAN

    struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
    UV n = 0;
    int last_r2 = 0;
    int oom = 0;
    if (count > 1) {
        items_buf = (void *)malloc((size_t)(count - 1) * sizeof(*items_buf));
        if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
    }
    reqrep_mutex_lock(h->hdr);
    for (UV i = 1; i < count; i++) {
        last_r2 = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r2 <= 0) break;
        char *c = (char *)malloc(len ? len : 1);
        if (!c) { oom = 1; break; }
        if (len) memcpy(c, str, len);
        items_buf[n].buf = c;
        items_buf[n].len = len;
        items_buf[n].id = id;
        items_buf[n].utf8 = utf8;
        n++;
    }

Shared.xs  view on Meta::CPAN

    uint32_t max_count;
  PPCODE:
    max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
    /* Hoist SV construction out of the mutex (see recv_multi). */
    struct drain_item { char *buf; uint32_t len; uint64_t id; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
    UV drained_n = 0;
    int last_r = 0;
    int oom = 0;
    reqrep_mutex_lock(h->hdr);
    while (max_count-- > 0) {
        last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
        if (last_r <= 0) break;
        struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
        char *c = (char *)malloc(len ? len : 1);
        if (!it || !c) { free(it); free(c); oom = 1; break; }
        if (len) memcpy(c, str, len);
        it->buf = c; it->len = len; it->id = id; it->utf8 = utf8; it->next = NULL;
        if (drained_tail) drained_tail->next = it; else drained_head = it;
        drained_tail = it;
        drained_n++;
    }

reqrep.h  view on Meta::CPAN

    uint32_t req_arena_off;   /* 28: offset to request arena */
    uint32_t req_arena_cap;   /* 32: arena byte capacity */
    uint32_t resp_slots;      /* 36: number of response slots */
    uint32_t resp_data_max;   /* 40: max response data bytes per slot */
    uint32_t resp_off;        /* 44: offset to response slot area */
    uint32_t resp_stride;     /* 48: bytes per response slot (cache-aligned) */
    uint8_t  _pad0[12];       /* 52-63 */

    /* ---- Cache line 1 (64-127): recv hot (server) ---- */
    uint64_t req_head;        /* 64: consumer position */
    uint32_t recv_waiters;    /* 72: blocked servers */
    uint32_t recv_futex;      /* 76: futex for recv wakeup */
    uint8_t  _pad1[48];       /* 80-127 */

    /* ---- Cache line 2 (128-191): send hot (client) ---- */
    uint64_t req_tail;        /* 128: producer position */
    uint32_t send_waiters;    /* 136: blocked clients */
    uint32_t send_futex;      /* 140: futex for send wakeup */
    uint8_t  _pad2[48];       /* 144-191 */

    /* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
    uint32_t mutex;           /* 192: futex-based mutex (0 or PID|0x80000000) */
    uint32_t mutex_waiters;   /* 196 */
    uint32_t arena_wpos;      /* 200: arena write position */
    uint32_t arena_used;      /* 204: arena bytes consumed */
    uint32_t resp_hint;       /* 208: hint for slot scan */
    uint32_t stat_recoveries; /* 212 */

reqrep.h  view on Meta::CPAN

    free(h->copy_buf);
    free(h->path);
    free(h);
}

/* ================================================================
 * Request queue operations (client -> server)
 * ================================================================ */

/* Push request while mutex is held. Returns 1=ok, 0=full, -2=too long. */
static inline int reqrep_send_locked(ReqRepHandle *h, const char *str,
                                      uint32_t len, bool utf8,
                                      uint32_t resp_slot_idx, uint32_t resp_gen) {
    ReqRepHeader *hdr = h->hdr;

    if (len > REQREP_STR_LEN_MASK) return -2;

    if (hdr->req_tail - hdr->req_head >= h->req_cap) {
        __atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
        return 0;
    }

reqrep.h  view on Meta::CPAN

 * On success, *out_id is the packed slot+generation ID. */
static int reqrep_try_send(ReqRepHandle *h, const char *str, uint32_t len,
                            bool utf8, uint64_t *out_id) {
    int32_t slot = reqrep_slot_acquire(h);
    if (slot < 0) return -3;

    RespSlotHeader *rslot = reqrep_resp_slot(h, (uint32_t)slot);
    uint32_t gen = __atomic_load_n(&rslot->generation, __ATOMIC_ACQUIRE);

    reqrep_mutex_lock(h->hdr);
    int r = reqrep_send_locked(h, str, len, utf8, (uint32_t)slot, gen);
    reqrep_mutex_unlock(h->hdr);

    if (r == 1) {
        reqrep_wake_consumers(h->hdr);
        *out_id = REQREP_MAKE_ID((uint32_t)slot, gen);
        return 1;
    }

    reqrep_slot_release(h, (uint32_t)slot);
    return r;

reqrep.h  view on Meta::CPAN

        long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
        __atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);

        r = reqrep_try_send(h, str, len, utf8, out_id);
        if (r == 1 || r == -2) return r;
        if (rc == -1 && errno == ETIMEDOUT) return r;
    }
}

/* Pop request while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
static inline int reqrep_recv_locked(ReqRepHandle *h, const char **out_str,
                                      uint32_t *out_len, bool *out_utf8,
                                      uint64_t *out_id) {
    ReqRepHeader *hdr = h->hdr;

    if (hdr->req_tail == hdr->req_head) {
        __atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
        return 0;
    }

    uint32_t idx = (uint32_t)(hdr->req_head & h->req_cap_mask);

reqrep.h  view on Meta::CPAN


    hdr->req_head++;
    return 1;
}

/* Pop request (server recv). Returns 1=ok, 0=empty, -1=OOM. */
static inline int reqrep_try_recv(ReqRepHandle *h, const char **out_str,
                                   uint32_t *out_len, bool *out_utf8,
                                   uint64_t *out_id) {
    reqrep_mutex_lock(h->hdr);
    int r = reqrep_recv_locked(h, out_str, out_len, out_utf8, out_id);
    reqrep_mutex_unlock(h->hdr);
    if (r == 1) reqrep_wake_producers(h->hdr);
    return r;
}

/* Blocking recv with timeout. Returns 1=ok, 0=timeout, -1=OOM. */
static int reqrep_recv_wait(ReqRepHandle *h, const char **out_str,
                             uint32_t *out_len, bool *out_utf8,
                             uint64_t *out_id, double timeout) {
    int r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);

reqrep.h  view on Meta::CPAN

    uint32_t slot_idx = REQREP_ID_SLOT(id);
    uint32_t expected_gen = REQREP_ID_GEN(id);
    if (slot_idx >= h->resp_slots) return;
    RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
    if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return;
    uint32_t expected_state = RESP_ACQUIRED;
    if (__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_FREE,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
        __atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
        __atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
        /* Wake get_wait blocked on this slot's state futex */
        if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
            syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
        reqrep_wake_slot_waiters(h->hdr);
    }
}

/* Combined send + wait-for-reply with single deadline.
 * Returns 1=ok, 0=timeout, -2=too long, -3=no slots, -4=stale. */
static int reqrep_request(ReqRepHandle *h, const char *req_str, uint32_t req_len,
                           bool req_utf8, const char **out_str, uint32_t *out_len,

xt/concurrent_cancel.t  view on Meta::CPAN

use warnings;
use Test::More;
use File::Temp 'tmpnam';
use Time::HiRes qw(time sleep);
use POSIX ();

use Data::ReqRep::Shared;
use Data::ReqRep::Shared::Client;

# ============================================================
# 1. cancel + get_wait race: cancel fires while get_wait is blocked
#    Verify get_wait unblocks promptly (does not hang).
# ============================================================
{
    my $path = tmpnam();
    my $srv = Data::ReqRep::Shared->new($path, 16, 4, 256);
    my $cli = Data::ReqRep::Shared::Client->new($path);

    for my $trial (1..10) {
        my $id = $cli->send("race$trial");
        ok defined $id, "cancel+get race trial $trial: send ok";

xt/concurrent_cancel.t  view on Meta::CPAN

            sleep(0.001 + rand() * 0.01);
            $cli->cancel($id);
            POSIX::_exit(0);
        }

        my $t0 = time;
        my $resp = $cli->get_wait($id, 2.0);
        my $dt = time - $t0;

        # get_wait should return within ~50ms (cancel delay + scheduling)
        ok $dt < 2.0, sprintf("cancel+get race trial %d: unblocked in %.3fs", $trial, $dt);
        ok !defined $resp, "cancel+get race trial $trial: returns undef";

        waitpid $pid, 0;
    }

    # drain all queued requests
    while (my ($r, $ri) = $srv->recv) { $srv->reply($ri, "ok") }
    $srv->unlink;
}

xt/concurrent_cancel.t  view on Meta::CPAN


    # Give children time to enter get_wait
    sleep(0.05);

    # Clear — should unblock all get_wait callers
    my $t0 = time;
    $srv->clear;

    for my $pid (@pids) {
        waitpid $pid, 0;
        is $? >> 8, 0, "clear race: child $pid unblocked and got undef";
    }
    my $dt = time - $t0;
    ok $dt < 2.0, sprintf("clear race: all children unblocked in %.3fs", $dt);

    $srv->unlink;
}

# ============================================================
# 4. Rapid cancel/send on same slot: verify generation prevents ABA
#    across many iterations with minimal slot count
# ============================================================
{
    my $path = tmpnam();



( run in 3.326 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )