Data-PubSub-Shared

 view release on metacpan or  search on metacpan

pubsub.h  view on Meta::CPAN

typedef struct {
    PubSubHeader *hdr;
    void         *slots;
    char         *data;
    uint64_t      cursor;
    uint32_t      capacity;
    uint32_t      cap_mask;
    uint32_t      msg_size;
    char         *copy_buf;
    uint32_t      copy_buf_cap;
    uint64_t      overflow_count;
    int           notify_fd;
    void         *userdata;
} PubSubSub;

/* ================================================================
 * Utility
 * ================================================================ */

static inline uint32_t pubsub_next_pow2(uint32_t v) {
    if (v < 2) return 2;
    if (v > 0x80000000U) return 0;
    v--;
    v |= v >> 1; v |= v >> 2; v |= v >> 4; v |= v >> 8; v |= v >> 16;
    return v + 1;
}

static inline void pubsub_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

static inline int pubsub_ensure_copy_buf(PubSubSub *sub, uint32_t needed) {
    if (needed <= sub->copy_buf_cap) return 1;
    uint32_t ns = sub->copy_buf_cap ? sub->copy_buf_cap : 64;
    while (ns < needed) {
        uint32_t n2 = ns * 2;
        if (n2 <= ns) { ns = needed; break; }
        ns = n2;
    }
    char *nb = (char *)realloc(sub->copy_buf, ns);
    if (!nb) return 0;
    sub->copy_buf = nb;
    sub->copy_buf_cap = ns;
    return 1;
}

/* ================================================================
 * Futex helpers
 * ================================================================ */

#define PUBSUB_MUTEX_WRITER_BIT 0x80000000U
#define PUBSUB_MUTEX_PID_MASK   0x7FFFFFFFU
#define PUBSUB_MUTEX_VAL(pid)   (PUBSUB_MUTEX_WRITER_BIT | ((uint32_t)(pid) & PUBSUB_MUTEX_PID_MASK))

static inline int pubsub_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

static const struct timespec pubsub_lock_timeout = { PUBSUB_LOCK_TIMEOUT_SEC, 0 };

static inline void pubsub_recover_stale_mutex(PubSubHeader *hdr, uint32_t observed) {
    if (!__atomic_compare_exchange_n(&hdr->mutex, &observed, 0,
            0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED))
        return;
    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

static inline void pubsub_mutex_lock(PubSubHeader *hdr) {
    uint32_t mypid = PUBSUB_MUTEX_VAL((uint32_t)getpid());
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(&hdr->mutex, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < PUBSUB_SPIN_LIMIT, 1)) {
            pubsub_spin_pause();
            continue;
        }
        __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
                              &pubsub_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
                if (val >= PUBSUB_MUTEX_WRITER_BIT) {
                    uint32_t pid = val & PUBSUB_MUTEX_PID_MASK;
                    if (!pubsub_pid_alive(pid))
                        pubsub_recover_stale_mutex(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

static inline void pubsub_mutex_unlock(PubSubHeader *hdr) {
    __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

static inline void pubsub_wake_subscribers(PubSubHeader *hdr) {
    if (__atomic_load_n(&hdr->sub_waiters, __ATOMIC_RELAXED) > 0) {
        __atomic_add_fetch(&hdr->sub_futex, 1, __ATOMIC_RELEASE);
        syscall(SYS_futex, &hdr->sub_futex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
    }
}

static inline int pubsub_remaining_time(const struct timespec *deadline,
                                         struct timespec *remaining) {
    struct timespec now;
    clock_gettime(CLOCK_MONOTONIC, &now);
    remaining->tv_sec = deadline->tv_sec - now.tv_sec;
    remaining->tv_nsec = deadline->tv_nsec - now.tv_nsec;
    if (remaining->tv_nsec < 0) {
        remaining->tv_sec--;
        remaining->tv_nsec += 1000000000L;
    }
    return remaining->tv_sec >= 0;
}

static inline void pubsub_make_deadline(double timeout, struct timespec *deadline) {
    clock_gettime(CLOCK_MONOTONIC, deadline);
    deadline->tv_sec += (time_t)timeout;
    deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
    if (deadline->tv_nsec >= 1000000000L) {
        deadline->tv_sec++;
        deadline->tv_nsec -= 1000000000L;
    }
}

/* ================================================================
 * Header validation
 * ================================================================ */

static inline int pubsub_validate_header(PubSubHeader *hdr, uint32_t mode,
                                          uint64_t file_size) {
    if (hdr->magic != PUBSUB_MAGIC ||
        hdr->version != PUBSUB_VERSION ||
        hdr->mode != mode ||
        hdr->capacity == 0 ||
        (hdr->capacity & (hdr->capacity - 1)) != 0 ||
        hdr->total_size != file_size ||
        hdr->slots_off != sizeof(PubSubHeader))



( run in 1.344 second using v1.01-cache-2.11-cpan-39bf76dae61 )