Data-PubSub-Shared

 view release on metacpan or  search on metacpan

Shared.xs  view on Meta::CPAN

        Newx(args, count, struct psm_arg);
        SAVEFREEPV(args);
        for (uint32_t i = 0; i < count; i++) {
            SV *val = ST(i + 1);
            args[i].utf8 = SvUTF8(val) ? true : false;
            args[i].str = args[i].utf8 ? SvPVutf8(val, args[i].len)
                                       : SvPV(val, args[i].len);
        }
        pubsub_mutex_lock(h->hdr);
        for (uint32_t i = 0; i < count; i++) {
            int r = pubsub_str_publish_locked(h, args[i].str, (uint32_t)args[i].len, args[i].utf8);
            if (r == -1) {
                pubsub_mutex_unlock(h->hdr);
                croak("publish_multi: message too long (%u > %u)", (unsigned)args[i].len, h->msg_size);
            }
            RETVAL++;
        }
        pubsub_mutex_unlock(h->hdr);
        pubsub_wake_subscribers(h->hdr);
    }
  OUTPUT:

pubsub.h  view on Meta::CPAN


/* ================================================================
 * Str: mutex-protected publish, lock-free subscribe
 *
 * Variable-length messages stored in a circular arena. Each slot
 * records the arena offset; the seqlock (sequence double-check)
 * guarantees readers see consistent data.
 * ================================================================ */

/* Publish one Str message while mutex is already held (no lock/wake). */
static inline int pubsub_str_publish_locked(PubSubHandle *h, const char *str,
                                             uint32_t len, bool utf8) {
    if (len > PUBSUB_STR_LEN_MASK) return -1;
    if (len > h->msg_size) return -1;

    PubSubHeader *hdr = h->hdr;
    PubSubStrSlot *slots = (PubSubStrSlot *)h->slots;

    uint64_t pos = __atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED);
    uint32_t idx = pos & h->cap_mask;
    PubSubStrSlot *slot = &slots[idx];

pubsub.h  view on Meta::CPAN

    __atomic_store_n(&hdr->write_pos, pos + 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED);

    return 1;
}

static inline int pubsub_str_publish(PubSubHandle *h, const char *str,
                                      uint32_t len, bool utf8) {
    if (len > h->msg_size) return -1;
    pubsub_mutex_lock(h->hdr);
    int r = pubsub_str_publish_locked(h, str, len, utf8);
    pubsub_mutex_unlock(h->hdr);
    if (r == 1) pubsub_wake_subscribers(h->hdr);
    return r;
}

/* Returns: 1 = success, 0 = empty/not-ready */
static inline int pubsub_str_poll(PubSubSub *sub, const char **out_str,
                                   uint32_t *out_len, bool *out_utf8) {
    PubSubHeader *hdr = sub->hdr;
    PubSubStrSlot *slots = (PubSubStrSlot *)sub->slots;



( run in 2.225 seconds using v1.01-cache-2.11-cpan-e1769b4cff6 )