Data-PubSub-Shared
view release on metacpan or search on metacpan
}
/* ================================================================
* Futex helpers
* ================================================================ */
#define PUBSUB_MUTEX_WRITER_BIT 0x80000000U
#define PUBSUB_MUTEX_PID_MASK 0x7FFFFFFFU
#define PUBSUB_MUTEX_VAL(pid) (PUBSUB_MUTEX_WRITER_BIT | ((uint32_t)(pid) & PUBSUB_MUTEX_PID_MASK))
static inline int pubsub_pid_alive(uint32_t pid) {
if (pid == 0) return 1;
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
static const struct timespec pubsub_lock_timeout = { PUBSUB_LOCK_TIMEOUT_SEC, 0 };
static inline void pubsub_recover_stale_mutex(PubSubHeader *hdr, uint32_t observed) {
if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
return;
__atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
&pubsub_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
if (val >= PUBSUB_MUTEX_WRITER_BIT) {
uint32_t pid = val & PUBSUB_MUTEX_PID_MASK;
if (!pubsub_pid_alive(pid))
pubsub_recover_stale_mutex(hdr, val);
}
spin = 0;
continue;
}
}
__atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
spin = 0;
}
}
xt/detached_sub.t view on Meta::CPAN
use strict;
use warnings;
use Test::More;
# Detached subscriber: publisher handle destroyed, but an existing
# subscriber keeps the memfd alive and can drain pending messages.
use Data::PubSub::Shared::Int;
my $sub;
{
my $p = Data::PubSub::Shared::Int->new_memfd("detached", 64);
$p->publish($_) for 1..10;
$sub = $p->subscribe_all;
# $p goes out of scope â its handle is destroyed
}
xt/lifecycle.t view on Meta::CPAN
{
my $ps = Data::PubSub::Shared::Int->new(undef, 1024);
$ps->publish($_) for 1..100;
for my $round (1..5000) {
my $sub = $ps->subscribe_all;
my $v = $sub->poll;
# $sub goes out of scope â DESTROY fires, owner_rv refcount decremented
}
# handle should still be alive and functional
$ps->publish(999);
my $sub = $ps->subscribe_all;
my @got = $sub->drain;
ok scalar @got > 0, 'int: handle alive after 5000 subscribe/destroy cycles';
is $got[-1], 999, 'int: last published value correct';
}
# ============================================================
# 2. Rapid subscribe/destroy cycles â Str
# ============================================================
{
my $ps = Data::PubSub::Shared::Str->new(undef, 1024);
$ps->publish("msg$_") for 1..100;
for my $round (1..5000) {
my $sub = $ps->subscribe_all;
my $v = $sub->poll;
}
$ps->publish("final");
my $sub = $ps->subscribe_all;
my @got = $sub->drain;
ok scalar @got > 0, 'str: handle alive after 5000 subscribe/destroy cycles';
is $got[-1], 'final', 'str: last published value correct';
}
# ============================================================
# 3. Multiple subscribers alive simultaneously
# ============================================================
{
my $ps = Data::PubSub::Shared::Int->new(undef, 256);
$ps->publish($_) for 1..50;
my @subs;
for (1..1000) {
push @subs, $ps->subscribe_all;
}
xt/lifecycle.t view on Meta::CPAN
# 4. Subscriber outlives explicit handle undef
# ============================================================
{
my $sub;
{
my $ps = Data::PubSub::Shared::Int->new(undef, 64);
$ps->publish(77);
$sub = $ps->subscribe_all;
# $ps goes out of scope here, but $sub holds a reference via owner_rv
}
# $sub should still work â owner_rv keeps the handle alive
is $sub->poll, 77, 'subscriber works after handle goes out of scope';
is $sub->lag, 0, 'subscriber lag correct after handle scope exit';
}
# ============================================================
# 5. Subscriber outlives handle â Str
# ============================================================
{
my $sub;
{
( run in 2.086 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )