Data-PubSub-Shared

 view release on metacpan or  search on metacpan

pubsub.h  view on Meta::CPAN

}

/* ================================================================
 * Futex helpers
 * ================================================================ */

#define PUBSUB_MUTEX_WRITER_BIT 0x80000000U
#define PUBSUB_MUTEX_PID_MASK   0x7FFFFFFFU
#define PUBSUB_MUTEX_VAL(pid)   (PUBSUB_MUTEX_WRITER_BIT | ((uint32_t)(pid) & PUBSUB_MUTEX_PID_MASK))

static inline int pubsub_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

static const struct timespec pubsub_lock_timeout = { PUBSUB_LOCK_TIMEOUT_SEC, 0 };

static inline void pubsub_recover_stale_mutex(PubSubHeader *hdr, uint32_t observed) {
    if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return;

pubsub.h  view on Meta::CPAN

        __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
                              &pubsub_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
                if (val >= PUBSUB_MUTEX_WRITER_BIT) {
                    uint32_t pid = val & PUBSUB_MUTEX_PID_MASK;
                    if (!pubsub_pid_alive(pid))
                        pubsub_recover_stale_mutex(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

xt/detached_sub.t  view on Meta::CPAN

use strict;
use warnings;
use Test::More;

# Detached subscriber: publisher handle destroyed, but an existing
# subscriber keeps the memfd alive and can drain pending messages.

use Data::PubSub::Shared::Int;

my $sub;
{
    my $p = Data::PubSub::Shared::Int->new_memfd("detached", 64);
    $p->publish($_) for 1..10;
    $sub = $p->subscribe_all;
    # $p goes out of scope — its handle is destroyed
}

xt/lifecycle.t  view on Meta::CPAN

{
    my $ps = Data::PubSub::Shared::Int->new(undef, 1024);
    $ps->publish($_) for 1..100;

    for my $round (1..5000) {
        my $sub = $ps->subscribe_all;
        my $v = $sub->poll;
        # $sub goes out of scope — DESTROY fires, owner_rv refcount decremented
    }

    # handle should still be alive and functional
    $ps->publish(999);
    my $sub = $ps->subscribe_all;
    my @got = $sub->drain;
    ok scalar @got > 0, 'int: handle alive after 5000 subscribe/destroy cycles';
    is $got[-1], 999, 'int: last published value correct';
}

# ============================================================
# 2. Rapid subscribe/destroy cycles — Str
# ============================================================
{
    my $ps = Data::PubSub::Shared::Str->new(undef, 1024);
    $ps->publish("msg$_") for 1..100;

    for my $round (1..5000) {
        my $sub = $ps->subscribe_all;
        my $v = $sub->poll;
    }

    $ps->publish("final");
    my $sub = $ps->subscribe_all;
    my @got = $sub->drain;
    ok scalar @got > 0, 'str: handle alive after 5000 subscribe/destroy cycles';
    is $got[-1], 'final', 'str: last published value correct';
}

# ============================================================
# 3. Multiple subscribers alive simultaneously
# ============================================================
{
    my $ps = Data::PubSub::Shared::Int->new(undef, 256);
    $ps->publish($_) for 1..50;

    my @subs;
    for (1..1000) {
        push @subs, $ps->subscribe_all;
    }

xt/lifecycle.t  view on Meta::CPAN

# 4. Subscriber outlives explicit handle undef
# ============================================================
{
    my $sub;
    {
        my $ps = Data::PubSub::Shared::Int->new(undef, 64);
        $ps->publish(77);
        $sub = $ps->subscribe_all;
        # $ps goes out of scope here, but $sub holds a reference via owner_rv
    }
    # $sub should still work — owner_rv keeps the handle alive
    is $sub->poll, 77, 'subscriber works after handle goes out of scope';
    is $sub->lag, 0, 'subscriber lag correct after handle scope exit';
}

# ============================================================
# 5. Subscriber outlives handle — Str
# ============================================================
{
    my $sub;
    {



( run in 2.086 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )