Data-ReqRep-Shared
view release on metacpan or search on metacpan
struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
UV n = 0;
int last_r = 0;
int oom = 0;
if (count > 0) {
items_buf = (void *)malloc((size_t)count * sizeof(*items_buf));
if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
}
reqrep_mutex_lock(h->hdr);
for (UV i = 0; i < count; i++) {
last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
if (last_r <= 0) break;
char *c = (char *)malloc(len ? len : 1);
if (!c) { oom = 1; break; }
if (len) memcpy(c, str, len);
items_buf[n].buf = c;
items_buf[n].len = len;
items_buf[n].id = id;
items_buf[n].utf8 = utf8;
n++;
}
struct { char *buf; uint32_t len; uint64_t id; bool utf8; } *items_buf = NULL;
UV n = 0;
int last_r2 = 0;
int oom = 0;
if (count > 1) {
items_buf = (void *)malloc((size_t)(count - 1) * sizeof(*items_buf));
if (!items_buf) croak("Data::ReqRep::Shared: out of memory");
}
reqrep_mutex_lock(h->hdr);
for (UV i = 1; i < count; i++) {
last_r2 = reqrep_recv_locked(h, &str, &len, &utf8, &id);
if (last_r2 <= 0) break;
char *c = (char *)malloc(len ? len : 1);
if (!c) { oom = 1; break; }
if (len) memcpy(c, str, len);
items_buf[n].buf = c;
items_buf[n].len = len;
items_buf[n].id = id;
items_buf[n].utf8 = utf8;
n++;
}
uint32_t max_count;
PPCODE:
max_count = (items > 1) ? (uint32_t)SvUV(ST(1)) : UINT32_MAX;
/* Hoist SV construction out of the mutex (see recv_multi). */
struct drain_item { char *buf; uint32_t len; uint64_t id; bool utf8; struct drain_item *next; } *drained_head = NULL, *drained_tail = NULL;
UV drained_n = 0;
int last_r = 0;
int oom = 0;
reqrep_mutex_lock(h->hdr);
while (max_count-- > 0) {
last_r = reqrep_recv_locked(h, &str, &len, &utf8, &id);
if (last_r <= 0) break;
struct drain_item *it = (struct drain_item *)malloc(sizeof(*it));
char *c = (char *)malloc(len ? len : 1);
if (!it || !c) { free(it); free(c); oom = 1; break; }
if (len) memcpy(c, str, len);
it->buf = c; it->len = len; it->id = id; it->utf8 = utf8; it->next = NULL;
if (drained_tail) drained_tail->next = it; else drained_head = it;
drained_tail = it;
drained_n++;
}
uint32_t req_arena_off; /* 28: offset to request arena */
uint32_t req_arena_cap; /* 32: arena byte capacity */
uint32_t resp_slots; /* 36: number of response slots */
uint32_t resp_data_max; /* 40: max response data bytes per slot */
uint32_t resp_off; /* 44: offset to response slot area */
uint32_t resp_stride; /* 48: bytes per response slot (cache-aligned) */
uint8_t _pad0[12]; /* 52-63 */
/* ---- Cache line 1 (64-127): recv hot (server) ---- */
uint64_t req_head; /* 64: consumer position */
uint32_t recv_waiters; /* 72: blocked servers */
uint32_t recv_futex; /* 76: futex for recv wakeup */
uint8_t _pad1[48]; /* 80-127 */
/* ---- Cache line 2 (128-191): send hot (client) ---- */
uint64_t req_tail; /* 128: producer position */
uint32_t send_waiters; /* 136: blocked clients */
uint32_t send_futex; /* 140: futex for send wakeup */
uint8_t _pad2[48]; /* 144-191 */
/* ---- Cache line 3 (192-255): mutex + arena state + stats ---- */
uint32_t mutex; /* 192: futex-based mutex (0 or PID|0x80000000) */
uint32_t mutex_waiters; /* 196 */
uint32_t arena_wpos; /* 200: arena write position */
uint32_t arena_used; /* 204: arena bytes consumed */
uint32_t resp_hint; /* 208: hint for slot scan */
uint32_t stat_recoveries; /* 212 */
free(h->copy_buf);
free(h->path);
free(h);
}
/* ================================================================
* Request queue operations (client -> server)
* ================================================================ */
/* Push request while mutex is held. Returns 1=ok, 0=full, -2=too long. */
static inline int reqrep_send_locked(ReqRepHandle *h, const char *str,
uint32_t len, bool utf8,
uint32_t resp_slot_idx, uint32_t resp_gen) {
ReqRepHeader *hdr = h->hdr;
if (len > REQREP_STR_LEN_MASK) return -2;
if (hdr->req_tail - hdr->req_head >= h->req_cap) {
__atomic_add_fetch(&hdr->stat_send_full, 1, __ATOMIC_RELAXED);
return 0;
}
* On success, *out_id is the packed slot+generation ID. */
static int reqrep_try_send(ReqRepHandle *h, const char *str, uint32_t len,
bool utf8, uint64_t *out_id) {
int32_t slot = reqrep_slot_acquire(h);
if (slot < 0) return -3;
RespSlotHeader *rslot = reqrep_resp_slot(h, (uint32_t)slot);
uint32_t gen = __atomic_load_n(&rslot->generation, __ATOMIC_ACQUIRE);
reqrep_mutex_lock(h->hdr);
int r = reqrep_send_locked(h, str, len, utf8, (uint32_t)slot, gen);
reqrep_mutex_unlock(h->hdr);
if (r == 1) {
reqrep_wake_consumers(h->hdr);
*out_id = REQREP_MAKE_ID((uint32_t)slot, gen);
return 1;
}
reqrep_slot_release(h, (uint32_t)slot);
return r;
long rc = syscall(SYS_futex, futex_word, FUTEX_WAIT, fseq, pts, NULL, 0);
__atomic_sub_fetch(waiter_cnt, 1, __ATOMIC_RELEASE);
r = reqrep_try_send(h, str, len, utf8, out_id);
if (r == 1 || r == -2) return r;
if (rc == -1 && errno == ETIMEDOUT) return r;
}
}
/* Pop request while mutex is held. Returns 1=ok, 0=empty, -1=OOM. */
static inline int reqrep_recv_locked(ReqRepHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8,
uint64_t *out_id) {
ReqRepHeader *hdr = h->hdr;
if (hdr->req_tail == hdr->req_head) {
__atomic_add_fetch(&hdr->stat_recv_empty, 1, __ATOMIC_RELAXED);
return 0;
}
uint32_t idx = (uint32_t)(hdr->req_head & h->req_cap_mask);
hdr->req_head++;
return 1;
}
/* Pop request (server recv). Returns 1=ok, 0=empty, -1=OOM. */
static inline int reqrep_try_recv(ReqRepHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8,
uint64_t *out_id) {
reqrep_mutex_lock(h->hdr);
int r = reqrep_recv_locked(h, out_str, out_len, out_utf8, out_id);
reqrep_mutex_unlock(h->hdr);
if (r == 1) reqrep_wake_producers(h->hdr);
return r;
}
/* Blocking recv with timeout. Returns 1=ok, 0=timeout, -1=OOM. */
static int reqrep_recv_wait(ReqRepHandle *h, const char **out_str,
uint32_t *out_len, bool *out_utf8,
uint64_t *out_id, double timeout) {
int r = reqrep_try_recv(h, out_str, out_len, out_utf8, out_id);
uint32_t slot_idx = REQREP_ID_SLOT(id);
uint32_t expected_gen = REQREP_ID_GEN(id);
if (slot_idx >= h->resp_slots) return;
RespSlotHeader *slot = reqrep_resp_slot(h, slot_idx);
if (__atomic_load_n(&slot->generation, __ATOMIC_ACQUIRE) != expected_gen) return;
uint32_t expected_state = RESP_ACQUIRED;
if (__atomic_compare_exchange_n(&slot->state, &expected_state, RESP_FREE,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
__atomic_store_n(&slot->owner_pid, 0, __ATOMIC_RELAXED);
__atomic_add_fetch(&slot->generation, 1, __ATOMIC_RELEASE);
/* Wake get_wait blocked on this slot's state futex */
if (__atomic_load_n(&slot->waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &slot->state, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
reqrep_wake_slot_waiters(h->hdr);
}
}
/* Combined send + wait-for-reply with single deadline.
* Returns 1=ok, 0=timeout, -2=too long, -3=no slots, -4=stale. */
static int reqrep_request(ReqRepHandle *h, const char *req_str, uint32_t req_len,
bool req_utf8, const char **out_str, uint32_t *out_len,
xt/concurrent_cancel.t view on Meta::CPAN
use warnings;
use Test::More;
use File::Temp 'tmpnam';
use Time::HiRes qw(time sleep);
use POSIX ();
use Data::ReqRep::Shared;
use Data::ReqRep::Shared::Client;
# ============================================================
# 1. cancel + get_wait race: cancel fires while get_wait is blocked
# Verify get_wait unblocks promptly (does not hang).
# ============================================================
{
my $path = tmpnam();
my $srv = Data::ReqRep::Shared->new($path, 16, 4, 256);
my $cli = Data::ReqRep::Shared::Client->new($path);
for my $trial (1..10) {
my $id = $cli->send("race$trial");
ok defined $id, "cancel+get race trial $trial: send ok";
xt/concurrent_cancel.t view on Meta::CPAN
sleep(0.001 + rand() * 0.01);
$cli->cancel($id);
POSIX::_exit(0);
}
my $t0 = time;
my $resp = $cli->get_wait($id, 2.0);
my $dt = time - $t0;
# get_wait should return within ~50ms (cancel delay + scheduling)
ok $dt < 2.0, sprintf("cancel+get race trial %d: unblocked in %.3fs", $trial, $dt);
ok !defined $resp, "cancel+get race trial $trial: returns undef";
waitpid $pid, 0;
}
# drain all queued requests
while (my ($r, $ri) = $srv->recv) { $srv->reply($ri, "ok") }
$srv->unlink;
}
xt/concurrent_cancel.t view on Meta::CPAN
# Give children time to enter get_wait
sleep(0.05);
# Clear â should unblock all get_wait callers
my $t0 = time;
$srv->clear;
for my $pid (@pids) {
waitpid $pid, 0;
is $? >> 8, 0, "clear race: child $pid unblocked and got undef";
}
my $dt = time - $t0;
ok $dt < 2.0, sprintf("clear race: all children unblocked in %.3fs", $dt);
$srv->unlink;
}
# ============================================================
# 4. Rapid cancel/send on same slot: verify generation prevents ABA
# across many iterations with minimal slot count
# ============================================================
{
my $path = tmpnam();
( run in 3.326 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )