Data-PubSub-Shared

 view release on metacpan or  search on metacpan

pubsub.h  view on Meta::CPAN

}                                                                              \
                                                                                \
static int pubsub_##PFX##_poll_wait(PubSubSub *sub, VTYPE *value,              \
                                     double timeout) {                         \
    int r = pubsub_##PFX##_poll(sub, value);                                   \
    if (r != 0) return r;                                                      \
    if (timeout == 0.0) return 0;                                              \
    PubSubHeader *hdr = sub->hdr;                                              \
    struct timespec deadline, remaining;                                        \
    int has_deadline = (timeout > 0);                                          \
    if (has_deadline) pubsub_make_deadline(timeout, &deadline);                 \
    for (;;) {                                                                 \
        /* Increment waiters BEFORE loading fseq/polling. SEQ_CST pairs       \
         * with publisher's SEQ_CST fence in pubsub_wake_subscribers so a     \
         * publisher that races our poll() either sees waiters > 0 and       \
         * wakes us, or publishes data we observe in the post-increment      \
         * poll. Without this ordering, we could sleep forever on an         \
         * unchanged fseq while data sits in the ring. */                     \
        __atomic_add_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST);           \
        uint32_t fseq = __atomic_load_n(&hdr->sub_futex, __ATOMIC_ACQUIRE);   \
        r = pubsub_##PFX##_poll(sub, value);                                   \
        if (r != 0) {                                                          \
            __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST);       \
            return r;                                                          \
        }                                                                      \
        struct timespec *pts = NULL;                                           \
        if (has_deadline) {                                                     \
            if (!pubsub_remaining_time(&deadline, &remaining)) {               \
                __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST);   \
                return 0;                                                      \
            }                                                                  \
            pts = &remaining;                                                  \
        }                                                                      \
        long rc = syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAIT,             \
                          fseq, pts, NULL, 0);                                 \
        __atomic_sub_fetch(&hdr->sub_waiters, 1, __ATOMIC_SEQ_CST);           \
        r = pubsub_##PFX##_poll(sub, value);                                   \
        if (r != 0) return r;                                                  \
        if (rc == -1 && errno == ETIMEDOUT) return 0;                          \
    }                                                                          \
}

/* Instantiate for Int (64-bit seq + 64-bit value = 16 bytes/slot) */
DEFINE_INT_PUBSUB(int, PubSubIntSlot, int64_t, uint64_t, int64_t)

/* Instantiate for Int32 (32-bit seq + 32-bit value = 8 bytes/slot) */
DEFINE_INT_PUBSUB(int32, PubSubInt32Slot, int32_t, uint32_t, int32_t)

/* Instantiate for Int16 (32-bit seq + 16-bit value = 8 bytes/slot) */
DEFINE_INT_PUBSUB(int16, PubSubInt16Slot, int16_t, uint32_t, int32_t)

/* ================================================================
 * Str: mutex-protected publish, lock-free subscribe
 *
 * Variable-length messages stored in a circular arena. Each slot
 * records the arena offset; the seqlock (sequence double-check)
 * guarantees readers see consistent data.
 * ================================================================ */

/* Publish one Str message while mutex is already held (no lock/wake). */
static inline int pubsub_str_publish_locked(PubSubHandle *h, const char *str,
                                             uint32_t len, bool utf8) {
    if (len > PUBSUB_STR_LEN_MASK) return -1;
    if (len > h->msg_size) return -1;

    PubSubHeader *hdr = h->hdr;
    PubSubStrSlot *slots = (PubSubStrSlot *)h->slots;

    uint64_t pos = __atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED);
    uint32_t idx = pos & h->cap_mask;
    PubSubStrSlot *slot = &slots[idx];

    __atomic_store_n(&slot->sequence, 0, __ATOMIC_RELAXED);
    __atomic_thread_fence(__ATOMIC_RELEASE);

    uint32_t alloc = (len + 7) & ~7u;
    if (alloc == 0) alloc = 8;
    if (alloc > h->arena_cap) return -1;
    uint32_t apos = __atomic_load_n(&hdr->arena_wpos, __ATOMIC_RELAXED);
    if ((uint64_t)apos + alloc > h->arena_cap)
        apos = 0;

    memcpy(h->data + apos, str, len);

    slot->arena_off = apos;
    slot->packed_len = len | (utf8 ? PUBSUB_STR_UTF8_FLAG : 0);

    __atomic_store_n(&hdr->arena_wpos, apos + alloc, __ATOMIC_RELAXED);

    __atomic_store_n(&slot->sequence, pos + 1, __ATOMIC_RELEASE);
    __atomic_store_n(&hdr->write_pos, pos + 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED);

    return 1;
}

static inline int pubsub_str_publish(PubSubHandle *h, const char *str,
                                      uint32_t len, bool utf8) {
    if (len > h->msg_size) return -1;
    pubsub_mutex_lock(h->hdr);
    int r = pubsub_str_publish_locked(h, str, len, utf8);
    pubsub_mutex_unlock(h->hdr);
    if (r == 1) pubsub_wake_subscribers(h->hdr);
    return r;
}

/* Returns: 1 = success, 0 = empty/not-ready */
static inline int pubsub_str_poll(PubSubSub *sub, const char **out_str,
                                   uint32_t *out_len, bool *out_utf8) {
    PubSubHeader *hdr = sub->hdr;
    PubSubStrSlot *slots = (PubSubStrSlot *)sub->slots;

    for (int attempt = 0; attempt < PUBSUB_POLL_RETRIES; attempt++) {
        uint64_t cursor = sub->cursor;
        uint64_t wp = __atomic_load_n(&hdr->write_pos, __ATOMIC_ACQUIRE);

        if (cursor >= wp) return 0;

        if (wp - cursor > sub->capacity) {
            sub->overflow_count += wp - cursor - sub->capacity;
            sub->cursor = wp - sub->capacity;
            continue;
        }

        uint32_t idx = cursor & sub->cap_mask;
        PubSubStrSlot *slot = &slots[idx];

        uint64_t seq1 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
        if (seq1 != cursor + 1) {
            if (seq1 > cursor + 1) {
                uint64_t new_cursor = wp > sub->capacity ? wp - sub->capacity : 0;
                if (new_cursor > cursor)
                    sub->overflow_count += new_cursor - cursor;
                sub->cursor = new_cursor;
                continue;
            }
            return 0;
        }

        uint32_t plen = slot->packed_len;
        uint32_t aoff = slot->arena_off;
        uint32_t len = plen & PUBSUB_STR_LEN_MASK;
        bool utf8 = (plen & PUBSUB_STR_UTF8_FLAG) != 0;

        /* Safety: if metadata looks corrupted, retry */
        if (len > sub->msg_size) continue;
        if ((uint64_t)aoff + len > sub->hdr->arena_cap) continue;

        if (!pubsub_ensure_copy_buf(sub, len + 1)) return 0;

        if (len > 0)
            memcpy(sub->copy_buf, sub->data + aoff, len);
        sub->copy_buf[len] = '\0';

        uint64_t seq2 = __atomic_load_n(&slot->sequence, __ATOMIC_ACQUIRE);
        if (seq2 != seq1) continue;

        *out_str = sub->copy_buf;
        *out_len = len;
        *out_utf8 = utf8;
        sub->cursor = cursor + 1;



( run in 1.957 second using v1.01-cache-2.11-cpan-e1769b4cff6 )