Data-Histogram-Shared

 view release on metacpan or  search on metacpan

hist.h  view on Meta::CPAN

    int32_t  sub_bucket_half_count;            /* 52 */
    int64_t  sub_bucket_mask;                  /* 56 */
    int32_t  bucket_count;                     /* 64 */
    int32_t  _pad2;                            /* 68 */
    int64_t  counts_len;                       /* 72  number of int64 counts    */

    /* ---- recorded data ---- */
    int64_t  total_count;             /* 80  sum of all recorded counts         */
    int64_t  min_value;               /* 88  min recorded value (INT64_MAX init)*/
    int64_t  max_value;               /* 96  max recorded value (0 init)        */

    /* ---- offsets / size ---- */
    uint64_t total_size;              /* 104 */
    uint64_t reader_slots_off;        /* 112 */
    uint64_t counts_off;              /* 120 */

    /* ---- lock + stats ---- */
    uint32_t rwlock;                  /* 128 */
    uint32_t rwlock_waiters;          /* 132 */
    uint32_t rwlock_writers_waiting;  /* 136 */
    uint32_t _pad3;                   /* 140 */
    uint64_t stat_ops;                /* 144 */
    uint8_t  _pad[104];               /* 152..255 */
};
typedef struct HistHeader HistHeader;

_Static_assert(sizeof(HistHeader) == 256, "HistHeader must be 256 bytes");

/* ---- Process-local handle ---- */

typedef struct HistHandle {
    HistHeader     *hdr;
    HistReaderSlot *reader_slots;  /* HIST_READER_SLOTS entries */
    void           *base;          /* mmap base */
    size_t          mmap_size;
    char           *path;          /* backing file path (strdup'd) */
    int             backing_fd;    /* memfd or reopened-fd to close on destroy, -1 for file/anon */
    uint32_t        my_slot_idx;   /* UINT32_MAX if all slots taken (no recovery for this handle) */
    uint32_t        cached_pid;    /* getpid() cached at last slot claim */
    uint32_t        cached_fork_gen; /* hist_fork_gen value at last slot claim */
} HistHandle;

/* ================================================================
 * Futex-based write-preferring read-write lock
 * with reader-slot dead-process recovery
 * ================================================================ */

#define HIST_RWLOCK_SPIN_LIMIT 32
#define HIST_LOCK_TIMEOUT_SEC  2  /* FUTEX_WAIT timeout for stale lock detection */

static inline void hist_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define HIST_RWLOCK_WRITER_BIT 0x80000000U
#define HIST_RWLOCK_PID_MASK   0x7FFFFFFFU
#define HIST_RWLOCK_WR(pid)    (HIST_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & HIST_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/version change).
 * Documented under "Crash Safety" in the POD. */
static inline int hist_pid_alive(uint32_t pid) {
    if (pid == 0) return 1; /* no owner recorded, assume alive */
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* Force-recover a stale write lock left by a dead process.
 * CAS to OUR pid to hold the lock while fixing shared state, then release.
 * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
 * recovering process can detect and re-recover if we crash mid-recovery. */
static inline void hist_recover_stale_lock(HistHandle *h, uint32_t observed_rwlock) {
    HistHeader *hdr = h->hdr;
    uint32_t mypid = HIST_RWLOCK_WR((uint32_t)getpid());
    if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
            mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
        return;
    /* We now hold the write lock as mypid.  No additional shared state needs
     * repair here (this module has no seqlock); just release the lock. */
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static const struct timespec hist_lock_timeout = { HIST_LOCK_TIMEOUT_SEC, 0 };

/* Process-global fork-generation counter.  Incremented in the pthread_atfork
 * child callback so every open handle detects a fork transition on the next
 * lock call without paying a getpid() syscall on the hot path. */
static uint32_t hist_fork_gen = 1;
static pthread_once_t hist_atfork_once = PTHREAD_ONCE_INIT;
static void hist_on_fork_child(void) {
    __atomic_add_fetch(&hist_fork_gen, 1, __ATOMIC_RELAXED);
}
static void hist_atfork_init(void) {
    pthread_atfork(NULL, NULL, hist_on_fork_child);
}

/* Ensure this process owns a reader slot.  Called from the lock helpers so
 * that fork()'d children pick up their own slot lazily instead of sharing
 * the parent's.  Hot-path is a single relaxed load + compare; only on a
 * fork-generation mismatch do we touch getpid() and scan slots. */
static inline void hist_claim_reader_slot(HistHandle *h) {
    uint32_t cur_gen = __atomic_load_n(&hist_fork_gen, __ATOMIC_RELAXED);
    if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
        return;
    /* Cold path -- register the atfork hook once per process, then claim. */
    pthread_once(&hist_atfork_once, hist_atfork_init);
    /* Re-read after pthread_once: hist_on_fork_child may have bumped it. */
    cur_gen = __atomic_load_n(&hist_fork_gen, __ATOMIC_RELAXED);
    uint32_t now_pid = (uint32_t)getpid();

hist.h  view on Meta::CPAN

    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void hist_unpark_reader(HistHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void hist_park_writer(HistHandle *h) {
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void hist_unpark_writer(HistHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
}

static inline void hist_rwlock_rdlock(HistHandle *h) {
    hist_claim_reader_slot(h);
    HistHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
    /* Claim subcount BEFORE bumping the shared rwlock counter.  This way
     * a concurrent writer-side recovery scan that sees our PID alive with
     * subcount > 0 will (correctly) defer force-reset, even while we are
     * still spinning trying to win the rwlock CAS.  Without this, a reader
     * killed between rwlock CAS-success and subcount++ would let recovery
     * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
     * our eventual rdunlock dec. */
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    for (int spin = 0; ; spin++) {
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Write-preferring: when lock is free (cur==0) and writers are
         * waiting, yield to let the writer acquire. When readers are
         * already active (cur>=1), new readers may join freely. */
        if (cur > 0 && cur < HIST_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < HIST_RWLOCK_SPIN_LIMIT, 1)) {
            hist_rwlock_spin_pause();
            continue;
        }
        hist_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= HIST_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &hist_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                hist_unpark_reader(h);
                hist_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        hist_unpark_reader(h);
        spin = 0;
    }
}

static inline void hist_rwlock_rdunlock(HistHandle *h) {
    HistHeader *hdr = h->hdr;
    /* Release the shared counter BEFORE dropping our subcount so that
     * "any live PID with subcount > 0" is a reliable in-flight indicator
     * for the writer-side recovery scan.  Inverting these would create a
     * window where we still own a unit of rwlock but our slot subcount is
     * 0, letting recovery force-reset rwlock underneath us. */
    uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static inline void hist_rwlock_wrlock(HistHandle *h) {
    hist_claim_reader_slot(h);  /* refresh cached_pid across fork */
    HistHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
     * any crash window between acquiring the lock and storing the owner. */
    uint32_t mypid = HIST_RWLOCK_WR(h->cached_pid);
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(lock, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < HIST_RWLOCK_SPIN_LIMIT, 1)) {
            hist_rwlock_spin_pause();
            continue;
        }
        hist_park_writer(h);
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &hist_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                hist_unpark_writer(h);
                hist_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        hist_unpark_writer(h);
        spin = 0;
    }

hist.h  view on Meta::CPAN

static inline int32_t hist_sub_bucket_index(HistHandle *h, int64_t v, int32_t bi) {
    return (int32_t)((uint64_t)v >> (bi + h->hdr->unit_magnitude));
}

static inline int64_t hist_counts_index(HistHandle *h, int32_t bi, int32_t sbi) {
    return ((int64_t)(bi + 1) << h->hdr->sub_bucket_half_count_magnitude)
           + (sbi - h->hdr->sub_bucket_half_count);
}

static inline int64_t hist_counts_index_for(HistHandle *h, int64_t v) {
    int32_t bi  = hist_bucket_index(h, v);
    int32_t sbi = hist_sub_bucket_index(h, v, bi);
    return hist_counts_index(h, bi, sbi);
}

/* reverse: lowest value stored at counts[index] */
static inline int64_t hist_value_at_index(HistHandle *h, int64_t index) {
    int32_t bi  = (int32_t)(index >> h->hdr->sub_bucket_half_count_magnitude) - 1;
    int32_t sbi = (int32_t)(index & (h->hdr->sub_bucket_half_count - 1)) + h->hdr->sub_bucket_half_count;
    if (bi < 0) { sbi -= h->hdr->sub_bucket_half_count; bi = 0; }
    return (int64_t)sbi << (bi + h->hdr->unit_magnitude);
}

static inline int64_t hist_size_of_equiv_range(HistHandle *h, int64_t v) {
    int32_t bi  = hist_bucket_index(h, v);
    int32_t sbi = hist_sub_bucket_index(h, v, bi);
    int32_t adj = (sbi >= h->hdr->sub_bucket_count) ? bi + 1 : bi;
    return (int64_t)1 << (h->hdr->unit_magnitude + adj);
}

static inline int64_t hist_lowest_equiv(HistHandle *h, int64_t v) {
    return hist_value_at_index(h, hist_counts_index_for(h, v));
}

static inline int64_t hist_highest_equiv(HistHandle *h, int64_t v) {
    return hist_lowest_equiv(h, v) + hist_size_of_equiv_range(h, v) - 1;
}

static inline int64_t hist_median_equiv(HistHandle *h, int64_t v) {
    return hist_lowest_equiv(h, v) + (hist_size_of_equiv_range(h, v) >> 1);
}

/* Non-locking index resolver for the XS range-check before taking the lock.
 * Returns the counts index for v, or -1 if v falls outside the trackable
 * range (idx < 0 or idx >= counts_len).  v must be >= 0. */
static inline int64_t hist_index_for(HistHandle *h, int64_t v) {
    int32_t bi = hist_bucket_index(h, v);
    if (bi < 0 || bi >= h->hdr->bucket_count) return -1;
    int32_t sbi = hist_sub_bucket_index(h, v, bi);
    int64_t idx = hist_counts_index(h, bi, sbi);
    if (idx < 0 || idx >= h->hdr->counts_len) return -1;
    return idx;
}

/* ================================================================
 * HdrHistogram operations (callers hold the lock)
 * ================================================================ */

/* Record `count` occurrences of `value`.  The XS caller has ALREADY range-
 * checked 0 <= value <= highest and idx < counts_len before locking. */
static void hist_record_locked(HistHandle *h, int64_t value, int64_t count) {
    int64_t idx = hist_counts_index_for(h, value);
    int64_t *counts = hist_counts(h);
    counts[idx] += count;
    h->hdr->total_count += count;
    if (value < h->hdr->min_value) h->hdr->min_value = value;
    if (value > h->hdr->max_value) h->hdr->max_value = value;
}

/* Highest equivalent value at or below which `p` percent of recorded values
 * lie.  Returns 0 for an empty histogram. */
static int64_t hist_value_at_percentile_locked(HistHandle *h, double p) {
    int64_t total = h->hdr->total_count;
    if (total == 0) return 0;
    int64_t want = (int64_t)ceil((p / 100.0) * (double)total);
    if (want < 1) want = 1;
    if (want > total) want = total;
    int64_t *counts = hist_counts(h);
    int64_t running = 0;
    int64_t len = h->hdr->counts_len;
    for (int64_t idx = 0; idx < len; idx++) {
        if (!counts[idx]) continue;            /* skip empty cells (sparse); a 0 cell can never be the first to reach want */
        running += counts[idx];
        if (running >= want)
            return hist_highest_equiv(h, hist_value_at_index(h, idx));
    }
    return 0;
}

/* Arithmetic mean of all recorded values (using each bucket's median-equivalent
 * value as the representative).  Returns 0.0 for an empty histogram. */
static double hist_mean_locked(HistHandle *h) {
    int64_t total = h->hdr->total_count;
    if (total == 0) return 0.0;
    int64_t *counts = hist_counts(h);
    int64_t len = h->hdr->counts_len;
    double sum = 0.0;
    for (int64_t idx = 0; idx < len; idx++) {
        int64_t c = counts[idx];
        if (c)
            sum += (double)c * (double)hist_median_equiv(h, hist_value_at_index(h, idx));
    }
    return sum / (double)total;
}

/* merge src counts into dst (caller guarantees equal geometry); cellwise add,
 * saturating at INT64_MAX on overflow (caller holds dst's write lock) */
static void hist_merge_counts(int64_t *dst, const int64_t *src, int64_t counts_len) {
    for (int64_t i = 0; i < counts_len; i++) {
        if (src[i] <= 0) continue;                                    /* counts are non-negative; skip empty cells */
        if (dst[i] > INT64_MAX - src[i]) dst[i] = INT64_MAX;          /* saturate */
        else dst[i] += src[i];
    }
}

/* reset all counts to 0; reset total/min/max (caller holds the write lock) */
static inline void hist_reset_locked(HistHandle *h) {
    memset(hist_counts(h), 0, (size_t)((uint64_t)h->hdr->counts_len * sizeof(int64_t)));
    h->hdr->total_count = 0;
    h->hdr->min_value   = INT64_MAX;
    h->hdr->max_value   = 0;
}

#endif /* HIST_H */



( run in 0.457 second using v1.01-cache-2.11-cpan-bbe5e583499 )