Data-PubSub-Shared
view release on metacpan or search on metacpan
} \
\
static int pubsub_##PFX##_poll_wait(PubSubSub *sub, VTYPE *value, \
double timeout) { \
int r = pubsub_##PFX##_poll(sub, value); \
if (r != 0) return r; \
if (timeout == 0.0) return 0; \
PubSubHeader *hdr = sub->hdr; \
struct timespec deadline, remaining; \
int has_deadline = (timeout > 0); \
if (has_deadline) pubsub_make_deadline(timeout, &deadline); \
for (;;) { \
/* Increment waiters BEFORE loading fseq/polling. SEQ_CST pairs \
* with publisher's SEQ_CST fence in pubsub_wake_subscribers so a \
* publisher that races our poll() either sees waiters > 0 and \
* wakes us, or publishes data we observe in the post-increment \
* poll. Without this ordering, we could sleep forever on an \
* unchanged fseq while data sits in the ring. */ \
__atomic_add_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST); \
uint32_t fseq = __atomic_load_n(&hdr->sub_futex, __ATOMIC_ACQUIRE); \
r = pubsub_##PFX##_poll(sub, value); \
if (r != 0) { \
__atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST); \
return r; \
} \
struct timespec *pts = NULL; \
if (has_deadline) { \
if (!pubsub_remaining_time(&deadline, &remaining)) { \
__atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST); \
return 0; \
} \
pts = &remaining; \
} \
long rc = syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAIT, \
fseq, pts, NULL, 0); \
__atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST); \
r = pubsub_##PFX##_poll(sub, value); \
if (r != 0) return r; \
if (rc == -1 && errno == ETIMEDOUT) return 0; \
} \
}
/* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
DEFINE_INT_PUBSUB(int, PubSubIntSlot, int64_t, uint64_t, int64_t)
/* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
DEFINE_INT_PUBSUB(int32, PubSubInt32Slot, int32_t, uint32_t, int32_t)
/* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
DEFINE_INT_PUBSUB(int16, PubSubInt16Slot, int16_t, uint32_t, int32_t)
/* ================================================================
* Str: mutex-protected publish, lock-free subscribe
*
* Variable-length messages stored in a circular arena. Each slot
* records the arena offset; the seqlock (sequence double-check)
* guarantees readers see consistent data.
* ================================================================ */
/* Publish one Str message while mutex is already held (no lock/wake). */
static inline int pubsub_str_publish_locked(PubSubHandle *h, const char *str,
uint32_t len, bool utf8) {
if (len > PUBSUB_STR_LEN_MASK) return -1;
if (len > h->msg_size) return -1;
PubSubHeader *hdr = h->hdr;
PubSubStrSlot *slots = (PubSubStrSlot *)h->slots;
uint64_t pos = __atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED);
uint32_t idx = pos & h->cap_mask;
PubSubStrSlot *slot = &slots[idx];
__atomic_store_n(&slot->sequence, 0, __ATOMIC_RELAXED);
__atomic_thread_fence(__ATOMIC_RELEASE);
uint32_t alloc = (len + 7) & ~7u;
if (alloc == 0) alloc = 8;
if (alloc > h->arena_cap) return -1;
uint32_t apos = __atomic_load_n(&hdr->arena_wpos, __ATOMIC_RELAXED);
if ((uint64_t)apos + alloc > h->arena_cap)
apos = 0;
memcpy(h->data + apos, str, len);
slot->arena_off = apos;
slot->packed_len = len | (utf8 ? PUBSUB_STR_UTF8_FLAG : 0);
__atomic_store_n(&hdr->arena_wpos, apos + alloc, __ATOMIC_RELAXED);
__atomic_store_n(&slot->sequence, pos + 1, __ATOMIC_RELEASE);
__atomic_store_n(&hdr->write_pos, pos + 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED);
return 1;
}
static inline int pubsub_str_publish(PubSubHandle *h, const char *str,
uint32_t len, bool utf8) {
if (len > h->msg_size) return -1;
pubsub_mutex_lock(h->hdr);
int r = pubsub_str_publish_locked(h, str, len, utf8);
pubsub_mutex_unlock(h->hdr);
if (r == 1) pubsub_wake_subscribers(h->hdr);
return r;
}
/* Returns: 1 = success, 0 = empty/not-ready */
static inline int pubsub_str_poll(PubSubSub *sub, const char **out_str,
uint32_t *out_len, bool *out_utf8) {
PubSubHeader *hdr = sub->hdr;
PubSubStrSlot *slots = (PubSubStrSlot *)sub->slots;
for (int attempt = 0; attempt < PUBSUB_POLL_RETRIES; attempt++) {
uint64_t cursor = sub->cursor;
uint64_t wp = __atomic_load_n(&hdr->write_pos, __ATOMIC_ACQUIRE);
if (cursor >= wp) return 0;
if (wp - cursor > sub->capacity) {
sub->overflow_count += wp - cursor - sub->capacity;
sub->cursor = wp - sub->capacity;
continue;
}
uint32_t idx = cursor & sub->cap_mask;
PubSubStrSlot *slot = &slots[idx];
uint64_t seq1 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
if (seq1 != cursor + 1) {
if (seq1 > cursor + 1) {
uint64_t new_cursor = wp > sub->capacity ? wp - sub->capacity : 0;
if (new_cursor > cursor)
sub->overflow_count += new_cursor - cursor;
sub->cursor = new_cursor;
continue;
}
return 0;
}
uint32_t plen = slot->packed_len;
uint32_t aoff = slot->arena_off;
uint32_t len = plen & PUBSUB_STR_LEN_MASK;
bool utf8 = (plen & PUBSUB_STR_UTF8_FLAG) != 0;
/* Safety: if metadata looks corrupted, retry */
if (len > sub->msg_size) continue;
if ((uint64_t)aoff + len > sub->hdr->arena_cap) continue;
if (!pubsub_ensure_copy_buf(sub, len + 1)) return 0;
if (len > 0)
memcpy(sub->copy_buf, sub->data + aoff, len);
sub->copy_buf[len] = '\0';
uint64_t seq2 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
if (seq2 != seq1) continue;
*out_str = sub->copy_buf;
*out_len = len;
*out_utf8 = utf8;
sub->cursor = cursor + 1;
( run in 1.957 second using v1.01-cache-2.11-cpan-e1769b4cff6 )