Data-NDArray-Shared
view release on metacpan or search on metacpan
uint32_t dtype; /* 8 enum NdaDtype */
uint32_t ndim; /* 12 1..NDA_MAX_DIMS */
/* ---- shape / strides (in ELEMENTS, row-major) ---- */
uint64_t shape[NDA_MAX_DIMS]; /* 16 .. 79 */
uint64_t strides[NDA_MAX_DIMS]; /* 80 .. 143 */
uint64_t size; /* 144 product of shape = total elements */
uint32_t itemsize; /* 152 bytes per element */
uint32_t _pad0; /* 156 */
/* ---- offsets / sizes ---- */
uint64_t total_size; /* 160 */
uint64_t reader_slots_off; /* 168 */
uint64_t data_off; /* 176 */
uint64_t array_id; /* 184 stable identity for set-op lock ordering */
/* ---- lock + stats ---- */
uint32_t rwlock; /* 192 */
uint32_t rwlock_waiters; /* 196 */
uint32_t rwlock_writers_waiting; /* 200 */
uint32_t _pad1; /* 204 */
uint64_t stat_ops; /* 208 */
uint8_t _pad[40]; /* 216..255 */
};
typedef struct NdaHeader NdaHeader;
_Static_assert(sizeof(struct NdaHeader) == 256, "NdaHeader must be 256 bytes");
/* ---- Process-local handle ---- */
typedef struct NdaHandle {
NdaHeader *hdr;
NdaReaderSlot *reader_slots; /* NDA_READER_SLOTS entries */
void *base; /* mmap base */
size_t mmap_size;
char *path; /* backing file path (strdup'd) */
int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
uint32_t cached_pid; /* getpid() cached at last slot claim */
uint32_t cached_fork_gen; /* nda_fork_gen value at last slot claim */
} NdaHandle;
/* ================================================================
* Futex-based write-preferring read-write lock
* with reader-slot dead-process recovery
* ================================================================ */
#define NDA_RWLOCK_SPIN_LIMIT 32
#define NDA_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
static inline void nda_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define NDA_RWLOCK_WRITER_BIT 0x80000000U
#define NDA_RWLOCK_PID_MASK 0x7FFFFFFFU
#define NDA_RWLOCK_WR(pid) (NDA_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & NDA_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Documented under "Crash Safety"
* in the POD. */
static inline int nda_pid_alive(uint32_t pid) {
if (pid == 0) return 1; /* no owner recorded, assume alive */
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Force-recover a stale write lock left by a dead process.
* CAS to OUR pid to hold the lock while fixing shared state, then release. */
static inline void nda_recover_stale_lock(NdaHandle *h, uint32_t observed_rwlock) {
NdaHeader *hdr = h->hdr;
uint32_t mypid = NDA_RWLOCK_WR((uint32_t)getpid());
if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
/* We now hold the write lock as mypid. No additional shared state needs
* repair here (this module has no seqlock); just release the lock. */
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static const struct timespec nda_lock_timeout = { NDA_LOCK_TIMEOUT_SEC, 0 };
/* Process-global fork-generation counter. Incremented in the pthread_atfork
* child callback so every open handle detects a fork transition on the next
* lock call without paying a getpid() syscall on the hot path. */
static uint32_t nda_fork_gen = 1;
static pthread_once_t nda_atfork_once = PTHREAD_ONCE_INIT;
static void nda_on_fork_child(void) {
__atomic_add_fetch(&nda_fork_gen, 1, __ATOMIC_RELAXED);
}
static void nda_atfork_init(void) {
pthread_atfork(NULL, NULL, nda_on_fork_child);
}
/* Ensure this process owns a reader slot. Called from the lock helpers so
* that fork()'d children pick up their own slot lazily instead of sharing
* the parent's. */
static inline void nda_claim_reader_slot(NdaHandle *h) {
uint32_t cur_gen = __atomic_load_n(&nda_fork_gen, __ATOMIC_RELAXED);
if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
return;
/* Cold path -- register the atfork hook once per process, then claim. */
pthread_once(&nda_atfork_once, nda_atfork_init);
cur_gen = __atomic_load_n(&nda_fork_gen, __ATOMIC_RELAXED);
uint32_t now_pid = (uint32_t)getpid();
h->cached_pid = now_pid;
h->cached_fork_gen = cur_gen;
h->my_slot_idx = UINT32_MAX;
uint32_t start = now_pid % NDA_READER_SLOTS;
for (uint32_t i = 0; i < NDA_READER_SLOTS; i++) {
static inline char *nda_data(NdaHandle *h) {
return (char *)h->base + h->hdr->data_off;
}
/* Flat element index for a multi-index (caller has bounds-checked each dim). */
static inline uint64_t nda_flat_offset(NdaHandle *h, const uint64_t *idx, uint32_t ndim) {
const uint64_t *st = h->hdr->strides;
uint64_t off = 0;
for (uint32_t d = 0; d < ndim; d++) off += idx[d] * st[d];
return off;
}
/* ================================================================
* Typed element load (callers hold a lock). Read element e as a double.
* ================================================================ */
static inline double nda_load_nv(NdaHandle *h, uint64_t e) {
char *base = nda_data(h);
switch (h->hdr->dtype) {
case NDA_F64: { double v; memcpy(&v, base + e*8, 8); return v; }
case NDA_F32: { float v; memcpy(&v, base + e*4, 4); return (double)v; }
case NDA_I64: { int64_t v; memcpy(&v, base + e*8, 8); return (double)v; }
case NDA_I32: { int32_t v; memcpy(&v, base + e*4, 4); return (double)v; }
case NDA_I16: { int16_t v; memcpy(&v, base + e*2, 2); return (double)v; }
case NDA_I8: { int8_t v; v = (int8_t)base[e]; return (double)v; }
case NDA_U64: { uint64_t v; memcpy(&v, base + e*8, 8); return (double)v; }
case NDA_U32: { uint32_t v; memcpy(&v, base + e*4, 4); return (double)v; }
case NDA_U16: { uint16_t v; memcpy(&v, base + e*2, 2); return (double)v; }
case NDA_U8: { uint8_t v; v = (uint8_t)base[e]; return (double)v; }
}
return 0.0;
}
/* Load element e of a SIGNED-int dtype widened to int64_t (caller holds a
* lock; dtype must be one of I64/I32/I16/I8). */
static inline int64_t nda_load_i64(NdaHandle *h, uint64_t e) {
char *base = nda_data(h);
switch (h->hdr->dtype) {
case NDA_I64: { int64_t v; memcpy(&v, base + e*8, 8); return v; }
case NDA_I32: { int32_t v; memcpy(&v, base + e*4, 4); return (int64_t)v; }
case NDA_I16: { int16_t v; memcpy(&v, base + e*2, 2); return (int64_t)v; }
case NDA_I8: { int8_t v = (int8_t)base[e]; return (int64_t)v; }
default: return 0;
}
}
/* Load element e of an UNSIGNED-int dtype widened to uint64_t (caller holds a
* lock; dtype must be one of U64/U32/U16/U8). */
static inline uint64_t nda_load_u64(NdaHandle *h, uint64_t e) {
char *base = nda_data(h);
switch (h->hdr->dtype) {
case NDA_U64: { uint64_t v; memcpy(&v, base + e*8, 8); return v; }
case NDA_U32: { uint32_t v; memcpy(&v, base + e*4, 4); return (uint64_t)v; }
case NDA_U16: { uint16_t v; memcpy(&v, base + e*2, 2); return (uint64_t)v; }
case NDA_U8: { uint8_t v = (uint8_t)base[e]; return (uint64_t)v; }
default: return 0;
}
}
/* Sum every element as a double (caller holds the read lock). */
static inline double nda_sum_locked(NdaHandle *h) {
uint64_t size = h->hdr->size, e;
double acc = 0.0;
for (e = 0; e < size; e++) acc += nda_load_nv(h, e);
return acc;
}
/* Find the flat index of the min (want_max=0) or max (want_max=1) element,
* comparing in the element's NATIVE type so that i64/u64 values above 2^53
* (which collapse/mis-order as doubles) are ranked exactly. Float dtypes
* compare as double. Caller holds the read lock; size >= 1 always. */
static inline uint64_t nda_argextreme_locked(NdaHandle *h, int want_max) {
uint64_t size = h->hdr->size, e, best = 0;
uint32_t dt = h->hdr->dtype;
if (nda_is_float(dt)) {
double bestv = nda_load_nv(h, 0);
for (e = 1; e < size; e++) {
double v = nda_load_nv(h, e);
if (want_max ? (v > bestv) : (v < bestv)) { bestv = v; best = e; }
}
} else if (nda_is_signed(dt)) {
int64_t bestv = nda_load_i64(h, 0);
for (e = 1; e < size; e++) {
int64_t v = nda_load_i64(h, e);
if (want_max ? (v > bestv) : (v < bestv)) { bestv = v; best = e; }
}
} else {
uint64_t bestv = nda_load_u64(h, 0);
for (e = 1; e < size; e++) {
uint64_t v = nda_load_u64(h, e);
if (want_max ? (v > bestv) : (v < bestv)) { bestv = v; best = e; }
}
}
return best;
}
/* ================================================================
* Validate create args + header init / setup / open / destroy
* ================================================================ */
/* Generate a non-zero per-array identity, used ONLY at create time to order
* element-wise set-op lock acquisition consistently across unrelated
* processes. Prefers getrandom(); on any failure/short read falls back to a
* non-zero mix. Never returns 0. */
static inline uint64_t nda_gen_array_id(const void *hdr_addr) {
static uint32_t nda_id_counter = 0;
uint64_t id = 0;
ssize_t r = getrandom(&id, sizeof id, 0);
if (r != (ssize_t)sizeof id) {
uint32_t c = __atomic_add_fetch(&nda_id_counter, 1, __ATOMIC_RELAXED);
id = ((uint64_t)(uint32_t)getpid() << 32)
^ ((uint64_t)c * 0x9E3779B97F4A7C15ull)
^ (uint64_t)(uintptr_t)hdr_addr;
}
if (id == 0) id = 0x9E3779B97F4A7C15ull; /* never 0 */
return id;
}
/* Validate create args + compute derived shape/strides/size/itemsize.
* Single source of truth: the XS layer does NOT duplicate these checks.
* On success fills *out_* and returns 1; on failure writes errbuf, returns 0. */
static int nda_validate_create_args(int dtype, const uint64_t *shape, uint32_t ndim,
uint64_t *out_size, uint64_t out_strides[NDA_MAX_DIMS],
uint64_t *out_data_bytes, char *errbuf) {
if (errbuf) errbuf[0] = '\0';
if (dtype < 0 || dtype >= NDA_NTYPES) { NDA_ERR("unknown dtype"); return 0; }
if (ndim < 1) { NDA_ERR("ndim must be >= 1"); return 0; }
if (ndim > NDA_MAX_DIMS) { NDA_ERR("ndim must be <= %d", NDA_MAX_DIMS); return 0; }
uint64_t size = 1;
for (uint32_t d = 0; d < ndim; d++) {
if (shape[d] < 1) { NDA_ERR("shape[%u] must be >= 1", d); return 0; }
/* size *= shape[d] with overflow guard */
( run in 1.396 second using v1.01-cache-2.11-cpan-40ba7b3775d )