Data-PubSub-Shared
view release on metacpan or search on metacpan
Newx(args, count, struct psm_arg);
SAVEFREEPV(args);
for (uint32_t i = 0; i < count; i++) {
SV *val = ST(i + 1);
args[i].utf8 = SvUTF8(val) ? true : false;
args[i].str = args[i].utf8 ? SvPVutf8(val, args[i].len)
: SvPV(val, args[i].len);
}
pubsub_mutex_lock(h->hdr);
for (uint32_t i = 0; i < count; i++) {
int r = pubsub_str_publish_locked(h, args[i].str, (uint32_t)args[i].len, args[i].utf8);
if (r == -1) {
pubsub_mutex_unlock(h->hdr);
croak("publish_multi: message too long (%u > %u)", (unsigned)args[i].len, h->msg_size);
}
RETVAL++;
}
pubsub_mutex_unlock(h->hdr);
pubsub_wake_subscribers(h->hdr);
}
OUTPUT:
/* ================================================================
* Str: mutex-protected publish, lock-free subscribe
*
* Variable-length messages stored in a circular arena. Each slot
* records the arena offset; the seqlock (sequence double-check)
* guarantees readers see consistent data.
* ================================================================ */
/* Publish one Str message while mutex is already held (no lock/wake). */
static inline int pubsub_str_publish_locked(PubSubHandle *h, const char *str,
uint32_t len, bool utf8) {
if (len > PUBSUB_STR_LEN_MASK) return -1;
if (len > h->msg_size) return -1;
PubSubHeader *hdr = h->hdr;
PubSubStrSlot *slots = (PubSubStrSlot *)h->slots;
uint64_t pos = __atomic_load_n(&hdr->write_pos, __ATOMIC_RELAXED);
uint32_t idx = pos & h->cap_mask;
PubSubStrSlot *slot = &slots[idx];
__atomic_store_n(&hdr->write_pos, pos + 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&hdr->stat_publish_ok, 1, __ATOMIC_RELAXED);
return 1;
}
static inline int pubsub_str_publish(PubSubHandle *h, const char *str,
uint32_t len, bool utf8) {
if (len > h->msg_size) return -1;
pubsub_mutex_lock(h->hdr);
int r = pubsub_str_publish_locked(h, str, len, utf8);
pubsub_mutex_unlock(h->hdr);
if (r == 1) pubsub_wake_subscribers(h->hdr);
return r;
}
/* Returns: 1 = success, 0 = empty/not-ready */
static inline int pubsub_str_poll(PubSubSub *sub, const char **out_str,
uint32_t *out_len, bool *out_utf8) {
PubSubHeader *hdr = sub->hdr;
PubSubStrSlot *slots = (PubSubStrSlot *)sub->slots;
( run in 1.987 second using v1.01-cache-2.11-cpan-e1769b4cff6 )