Data-Histogram-Shared
view release on metacpan or search on metacpan
int32_t sub_bucket_half_count; /* 52 */
int64_t sub_bucket_mask; /* 56 */
int32_t bucket_count; /* 64 */
int32_t _pad2; /* 68 */
int64_t counts_len; /* 72 number of int64 counts */
/* ---- recorded data ---- */
int64_t total_count; /* 80 sum of all recorded counts */
int64_t min_value; /* 88 min recorded value (INT64_MAX init)*/
int64_t max_value; /* 96 max recorded value (0 init) */
/* ---- offsets / size ---- */
uint64_t total_size; /* 104 */
uint64_t reader_slots_off; /* 112 */
uint64_t counts_off; /* 120 */
/* ---- lock + stats ---- */
uint32_t rwlock; /* 128 */
uint32_t rwlock_waiters; /* 132 */
uint32_t rwlock_writers_waiting; /* 136 */
uint32_t _pad3; /* 140 */
uint64_t stat_ops; /* 144 */
uint8_t _pad[104]; /* 152..255 */
};
typedef struct HistHeader HistHeader;
_Static_assert(sizeof(HistHeader) == 256, "HistHeader must be 256 bytes");
/* ---- Process-local handle ---- */
typedef struct HistHandle {
HistHeader *hdr;
HistReaderSlot *reader_slots; /* HIST_READER_SLOTS entries */
void *base; /* mmap base */
size_t mmap_size;
char *path; /* backing file path (strdup'd) */
int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
uint32_t cached_pid; /* getpid() cached at last slot claim */
uint32_t cached_fork_gen; /* hist_fork_gen value at last slot claim */
} HistHandle;
/* ================================================================
* Futex-based write-preferring read-write lock
* with reader-slot dead-process recovery
* ================================================================ */
#define HIST_RWLOCK_SPIN_LIMIT 32
#define HIST_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
static inline void hist_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define HIST_RWLOCK_WRITER_BIT 0x80000000U
#define HIST_RWLOCK_PID_MASK 0x7FFFFFFFU
#define HIST_RWLOCK_WR(pid) (HIST_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & HIST_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Robust detection would require
* a per-slot process-start-time epoch (a header-layout/version change).
* Documented under "Crash Safety" in the POD. */
static inline int hist_pid_alive(uint32_t pid) {
if (pid == 0) return 1; /* no owner recorded, assume alive */
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Force-recover a stale write lock left by a dead process.
* CAS to OUR pid to hold the lock while fixing shared state, then release.
* Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
* recovering process can detect and re-recover if we crash mid-recovery. */
static inline void hist_recover_stale_lock(HistHandle *h, uint32_t observed_rwlock) {
HistHeader *hdr = h->hdr;
uint32_t mypid = HIST_RWLOCK_WR((uint32_t)getpid());
if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
/* We now hold the write lock as mypid. No additional shared state needs
* repair here (this module has no seqlock); just release the lock. */
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static const struct timespec hist_lock_timeout = { HIST_LOCK_TIMEOUT_SEC, 0 };
/* Process-global fork-generation counter. Incremented in the pthread_atfork
* child callback so every open handle detects a fork transition on the next
* lock call without paying a getpid() syscall on the hot path. */
static uint32_t hist_fork_gen = 1;
static pthread_once_t hist_atfork_once = PTHREAD_ONCE_INIT;
static void hist_on_fork_child(void) {
__atomic_add_fetch(&hist_fork_gen, 1, __ATOMIC_RELAXED);
}
static void hist_atfork_init(void) {
pthread_atfork(NULL, NULL, hist_on_fork_child);
}
/* Ensure this process owns a reader slot. Called from the lock helpers so
* that fork()'d children pick up their own slot lazily instead of sharing
* the parent's. Hot-path is a single relaxed load + compare; only on a
* fork-generation mismatch do we touch getpid() and scan slots. */
static inline void hist_claim_reader_slot(HistHandle *h) {
uint32_t cur_gen = __atomic_load_n(&hist_fork_gen, __ATOMIC_RELAXED);
if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
return;
/* Cold path -- register the atfork hook once per process, then claim. */
pthread_once(&hist_atfork_once, hist_atfork_init);
/* Re-read after pthread_once: hist_on_fork_child may have bumped it. */
cur_gen = __atomic_load_n(&hist_fork_gen, __ATOMIC_RELAXED);
uint32_t now_pid = (uint32_t)getpid();
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void hist_unpark_reader(HistHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void hist_park_writer(HistHandle *h) {
if (h->my_slot_idx != UINT32_MAX) {
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void hist_unpark_writer(HistHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX) {
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
}
static inline void hist_rwlock_rdlock(HistHandle *h) {
hist_claim_reader_slot(h);
HistHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
/* Claim subcount BEFORE bumping the shared rwlock counter. This way
* a concurrent writer-side recovery scan that sees our PID alive with
* subcount > 0 will (correctly) defer force-reset, even while we are
* still spinning trying to win the rwlock CAS. Without this, a reader
* killed between rwlock CAS-success and subcount++ would let recovery
* force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
* our eventual rdunlock dec. */
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Write-preferring: when lock is free (cur==0) and writers are
* waiting, yield to let the writer acquire. When readers are
* already active (cur>=1), new readers may join freely. */
if (cur > 0 && cur < HIST_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < HIST_RWLOCK_SPIN_LIMIT, 1)) {
hist_rwlock_spin_pause();
continue;
}
hist_park_reader(h);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR when yielding to waiting writers */
if (cur >= HIST_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&hist_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
hist_unpark_reader(h);
hist_recover_after_timeout(h);
spin = 0;
continue;
}
}
hist_unpark_reader(h);
spin = 0;
}
}
static inline void hist_rwlock_rdunlock(HistHandle *h) {
HistHeader *hdr = h->hdr;
/* Release the shared counter BEFORE dropping our subcount so that
* "any live PID with subcount > 0" is a reliable in-flight indicator
* for the writer-side recovery scan. Inverting these would create a
* window where we still own a unit of rwlock but our slot subcount is
* 0, letting recovery force-reset rwlock underneath us. */
uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void hist_rwlock_wrlock(HistHandle *h) {
hist_claim_reader_slot(h); /* refresh cached_pid across fork */
HistHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
/* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
* any crash window between acquiring the lock and storing the owner. */
uint32_t mypid = HIST_RWLOCK_WR(h->cached_pid);
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < HIST_RWLOCK_SPIN_LIMIT, 1)) {
hist_rwlock_spin_pause();
continue;
}
hist_park_writer(h);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&hist_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
hist_unpark_writer(h);
hist_recover_after_timeout(h);
spin = 0;
continue;
}
}
hist_unpark_writer(h);
spin = 0;
}
static inline int32_t hist_sub_bucket_index(HistHandle *h, int64_t v, int32_t bi) {
return (int32_t)((uint64_t)v >> (bi + h->hdr->unit_magnitude));
}
static inline int64_t hist_counts_index(HistHandle *h, int32_t bi, int32_t sbi) {
return ((int64_t)(bi + 1) << h->hdr->sub_bucket_half_count_magnitude)
+ (sbi - h->hdr->sub_bucket_half_count);
}
static inline int64_t hist_counts_index_for(HistHandle *h, int64_t v) {
int32_t bi = hist_bucket_index(h, v);
int32_t sbi = hist_sub_bucket_index(h, v, bi);
return hist_counts_index(h, bi, sbi);
}
/* reverse: lowest value stored at counts[index] */
static inline int64_t hist_value_at_index(HistHandle *h, int64_t index) {
int32_t bi = (int32_t)(index >> h->hdr->sub_bucket_half_count_magnitude) - 1;
int32_t sbi = (int32_t)(index & (h->hdr->sub_bucket_half_count - 1)) + h->hdr->sub_bucket_half_count;
if (bi < 0) { sbi -= h->hdr->sub_bucket_half_count; bi = 0; }
return (int64_t)sbi << (bi + h->hdr->unit_magnitude);
}
static inline int64_t hist_size_of_equiv_range(HistHandle *h, int64_t v) {
int32_t bi = hist_bucket_index(h, v);
int32_t sbi = hist_sub_bucket_index(h, v, bi);
int32_t adj = (sbi >= h->hdr->sub_bucket_count) ? bi + 1 : bi;
return (int64_t)1 << (h->hdr->unit_magnitude + adj);
}
static inline int64_t hist_lowest_equiv(HistHandle *h, int64_t v) {
return hist_value_at_index(h, hist_counts_index_for(h, v));
}
static inline int64_t hist_highest_equiv(HistHandle *h, int64_t v) {
return hist_lowest_equiv(h, v) + hist_size_of_equiv_range(h, v) - 1;
}
static inline int64_t hist_median_equiv(HistHandle *h, int64_t v) {
return hist_lowest_equiv(h, v) + (hist_size_of_equiv_range(h, v) >> 1);
}
/* Non-locking index resolver for the XS range-check before taking the lock.
* Returns the counts index for v, or -1 if v falls outside the trackable
* range (idx < 0 or idx >= counts_len). v must be >= 0. */
static inline int64_t hist_index_for(HistHandle *h, int64_t v) {
int32_t bi = hist_bucket_index(h, v);
if (bi < 0 || bi >= h->hdr->bucket_count) return -1;
int32_t sbi = hist_sub_bucket_index(h, v, bi);
int64_t idx = hist_counts_index(h, bi, sbi);
if (idx < 0 || idx >= h->hdr->counts_len) return -1;
return idx;
}
/* ================================================================
* HdrHistogram operations (callers hold the lock)
* ================================================================ */
/* Record `count` occurrences of `value`. The XS caller has ALREADY range-
* checked 0 <= value <= highest and idx < counts_len before locking. */
static void hist_record_locked(HistHandle *h, int64_t value, int64_t count) {
int64_t idx = hist_counts_index_for(h, value);
int64_t *counts = hist_counts(h);
counts[idx] += count;
h->hdr->total_count += count;
if (value < h->hdr->min_value) h->hdr->min_value = value;
if (value > h->hdr->max_value) h->hdr->max_value = value;
}
/* Highest equivalent value at or below which `p` percent of recorded values
* lie. Returns 0 for an empty histogram. */
static int64_t hist_value_at_percentile_locked(HistHandle *h, double p) {
int64_t total = h->hdr->total_count;
if (total == 0) return 0;
int64_t want = (int64_t)ceil((p / 100.0) * (double)total);
if (want < 1) want = 1;
if (want > total) want = total;
int64_t *counts = hist_counts(h);
int64_t running = 0;
int64_t len = h->hdr->counts_len;
for (int64_t idx = 0; idx < len; idx++) {
if (!counts[idx]) continue; /* skip empty cells (sparse); a 0 cell can never be the first to reach want */
running += counts[idx];
if (running >= want)
return hist_highest_equiv(h, hist_value_at_index(h, idx));
}
return 0;
}
/* Arithmetic mean of all recorded values (using each bucket's median-equivalent
* value as the representative). Returns 0.0 for an empty histogram. */
static double hist_mean_locked(HistHandle *h) {
int64_t total = h->hdr->total_count;
if (total == 0) return 0.0;
int64_t *counts = hist_counts(h);
int64_t len = h->hdr->counts_len;
double sum = 0.0;
for (int64_t idx = 0; idx < len; idx++) {
int64_t c = counts[idx];
if (c)
sum += (double)c * (double)hist_median_equiv(h, hist_value_at_index(h, idx));
}
return sum / (double)total;
}
/* merge src counts into dst (caller guarantees equal geometry); cellwise add,
* saturating at INT64_MAX on overflow (caller holds dst's write lock) */
static void hist_merge_counts(int64_t *dst, const int64_t *src, int64_t counts_len) {
for (int64_t i = 0; i < counts_len; i++) {
if (src[i] <= 0) continue; /* counts are non-negative; skip empty cells */
if (dst[i] > INT64_MAX - src[i]) dst[i] = INT64_MAX; /* saturate */
else dst[i] += src[i];
}
}
/* reset all counts to 0; reset total/min/max (caller holds the write lock) */
static inline void hist_reset_locked(HistHandle *h) {
memset(hist_counts(h), 0, (size_t)((uint64_t)h->hdr->counts_len * sizeof(int64_t)));
h->hdr->total_count = 0;
h->hdr->min_value = INT64_MAX;
h->hdr->max_value = 0;
}
#endif /* HIST_H */
( run in 0.457 second using v1.01-cache-2.11-cpan-bbe5e583499 )