Data-NDArray-Shared

 view release on metacpan or  search on metacpan

ndarray.h  view on Meta::CPAN

    uint32_t dtype;                   /* 8   enum NdaDtype */
    uint32_t ndim;                    /* 12  1..NDA_MAX_DIMS */

    /* ---- shape / strides (in ELEMENTS, row-major) ---- */
    uint64_t shape[NDA_MAX_DIMS];     /* 16  .. 79 */
    uint64_t strides[NDA_MAX_DIMS];   /* 80  .. 143 */
    uint64_t size;                    /* 144 product of shape = total elements */
    uint32_t itemsize;                /* 152 bytes per element */
    uint32_t _pad0;                   /* 156 */

    /* ---- offsets / sizes ---- */
    uint64_t total_size;              /* 160 */
    uint64_t reader_slots_off;        /* 168 */
    uint64_t data_off;                /* 176 */
    uint64_t array_id;                /* 184 stable identity for set-op lock ordering */

    /* ---- lock + stats ---- */
    uint32_t rwlock;                  /* 192 */
    uint32_t rwlock_waiters;          /* 196 */
    uint32_t rwlock_writers_waiting;  /* 200 */
    uint32_t _pad1;                   /* 204 */
    uint64_t stat_ops;                /* 208 */
    uint8_t  _pad[40];                /* 216..255 */
};
typedef struct NdaHeader NdaHeader;

_Static_assert(sizeof(struct NdaHeader) == 256, "NdaHeader must be 256 bytes");

/* ---- Process-local handle ---- */

typedef struct NdaHandle {
    NdaHeader     *hdr;
    NdaReaderSlot *reader_slots;  /* NDA_READER_SLOTS entries */
    void          *base;          /* mmap base */
    size_t         mmap_size;
    char          *path;          /* backing file path (strdup'd) */
    int            backing_fd;    /* memfd or reopened-fd to close on destroy, -1 for file/anon */
    uint32_t       my_slot_idx;   /* UINT32_MAX if all slots taken (no recovery for this handle) */
    uint32_t       cached_pid;    /* getpid() cached at last slot claim */
    uint32_t       cached_fork_gen; /* nda_fork_gen value at last slot claim */
} NdaHandle;

/* ================================================================
 * Futex-based write-preferring read-write lock
 * with reader-slot dead-process recovery
 * ================================================================ */

#define NDA_RWLOCK_SPIN_LIMIT 32
#define NDA_LOCK_TIMEOUT_SEC  2  /* FUTEX_WAIT timeout for stale lock detection */

static inline void nda_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define NDA_RWLOCK_WRITER_BIT 0x80000000U
#define NDA_RWLOCK_PID_MASK   0x7FFFFFFFU
#define NDA_RWLOCK_WR(pid)    (NDA_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & NDA_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Documented under "Crash Safety"
 * in the POD. */
static inline int nda_pid_alive(uint32_t pid) {
    if (pid == 0) return 1; /* no owner recorded, assume alive */
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* Force-recover a stale write lock left by a dead process.
 * CAS to OUR pid to hold the lock while fixing shared state, then release. */
static inline void nda_recover_stale_lock(NdaHandle *h, uint32_t observed_rwlock) {
    NdaHeader *hdr = h->hdr;
    uint32_t mypid = NDA_RWLOCK_WR((uint32_t)getpid());
    if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
            mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
        return;
    /* We now hold the write lock as mypid.  No additional shared state needs
     * repair here (this module has no seqlock); just release the lock. */
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static const struct timespec nda_lock_timeout = { NDA_LOCK_TIMEOUT_SEC, 0 };

/* Process-global fork-generation counter.  Incremented in the pthread_atfork
 * child callback so every open handle detects a fork transition on the next
 * lock call without paying a getpid() syscall on the hot path. */
static uint32_t nda_fork_gen = 1;
static pthread_once_t nda_atfork_once = PTHREAD_ONCE_INIT;
static void nda_on_fork_child(void) {
    __atomic_add_fetch(&nda_fork_gen, 1, __ATOMIC_RELAXED);
}
static void nda_atfork_init(void) {
    pthread_atfork(NULL, NULL, nda_on_fork_child);
}

/* Ensure this process owns a reader slot.  Called from the lock helpers so
 * that fork()'d children pick up their own slot lazily instead of sharing
 * the parent's. */
static inline void nda_claim_reader_slot(NdaHandle *h) {
    uint32_t cur_gen = __atomic_load_n(&nda_fork_gen, __ATOMIC_RELAXED);
    if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
        return;
    /* Cold path -- register the atfork hook once per process, then claim. */
    pthread_once(&nda_atfork_once, nda_atfork_init);
    cur_gen = __atomic_load_n(&nda_fork_gen, __ATOMIC_RELAXED);
    uint32_t now_pid = (uint32_t)getpid();
    h->cached_pid = now_pid;
    h->cached_fork_gen = cur_gen;
    h->my_slot_idx = UINT32_MAX;
    uint32_t start = now_pid % NDA_READER_SLOTS;
    for (uint32_t i = 0; i < NDA_READER_SLOTS; i++) {

ndarray.h  view on Meta::CPAN

static inline char *nda_data(NdaHandle *h) {
    return (char *)h->base + h->hdr->data_off;
}

/* Flat element index for a multi-index (caller has bounds-checked each dim). */
static inline uint64_t nda_flat_offset(NdaHandle *h, const uint64_t *idx, uint32_t ndim) {
    const uint64_t *st = h->hdr->strides;
    uint64_t off = 0;
    for (uint32_t d = 0; d < ndim; d++) off += idx[d] * st[d];
    return off;
}

/* ================================================================
 * Typed element load (callers hold a lock).  Read element e as a double.
 * ================================================================ */

static inline double nda_load_nv(NdaHandle *h, uint64_t e) {
    char *base = nda_data(h);
    switch (h->hdr->dtype) {
        case NDA_F64: { double   v; memcpy(&v, base + e*8, 8); return v; }
        case NDA_F32: { float    v; memcpy(&v, base + e*4, 4); return (double)v; }
        case NDA_I64: { int64_t  v; memcpy(&v, base + e*8, 8); return (double)v; }
        case NDA_I32: { int32_t  v; memcpy(&v, base + e*4, 4); return (double)v; }
        case NDA_I16: { int16_t  v; memcpy(&v, base + e*2, 2); return (double)v; }
        case NDA_I8:  { int8_t   v; v = (int8_t)base[e];       return (double)v; }
        case NDA_U64: { uint64_t v; memcpy(&v, base + e*8, 8); return (double)v; }
        case NDA_U32: { uint32_t v; memcpy(&v, base + e*4, 4); return (double)v; }
        case NDA_U16: { uint16_t v; memcpy(&v, base + e*2, 2); return (double)v; }
        case NDA_U8:  { uint8_t  v; v = (uint8_t)base[e];      return (double)v; }
    }
    return 0.0;
}

/* Load element e of a SIGNED-int dtype widened to int64_t (caller holds a
 * lock; dtype must be one of I64/I32/I16/I8). */
static inline int64_t nda_load_i64(NdaHandle *h, uint64_t e) {
    char *base = nda_data(h);
    switch (h->hdr->dtype) {
        case NDA_I64: { int64_t v; memcpy(&v, base + e*8, 8); return v; }
        case NDA_I32: { int32_t v; memcpy(&v, base + e*4, 4); return (int64_t)v; }
        case NDA_I16: { int16_t v; memcpy(&v, base + e*2, 2); return (int64_t)v; }
        case NDA_I8:  { int8_t  v = (int8_t)base[e];          return (int64_t)v; }
        default: return 0;
    }
}

/* Load element e of an UNSIGNED-int dtype widened to uint64_t (caller holds a
 * lock; dtype must be one of U64/U32/U16/U8). */
static inline uint64_t nda_load_u64(NdaHandle *h, uint64_t e) {
    char *base = nda_data(h);
    switch (h->hdr->dtype) {
        case NDA_U64: { uint64_t v; memcpy(&v, base + e*8, 8); return v; }
        case NDA_U32: { uint32_t v; memcpy(&v, base + e*4, 4); return (uint64_t)v; }
        case NDA_U16: { uint16_t v; memcpy(&v, base + e*2, 2); return (uint64_t)v; }
        case NDA_U8:  { uint8_t  v = (uint8_t)base[e];          return (uint64_t)v; }
        default: return 0;
    }
}

/* Sum every element as a double (caller holds the read lock). */
static inline double nda_sum_locked(NdaHandle *h) {
    uint64_t size = h->hdr->size, e;
    double acc = 0.0;
    for (e = 0; e < size; e++) acc += nda_load_nv(h, e);
    return acc;
}

/* Find the flat index of the min (want_max=0) or max (want_max=1) element,
 * comparing in the element's NATIVE type so that i64/u64 values above 2^53
 * (which collapse/mis-order as doubles) are ranked exactly.  Float dtypes
 * compare as double.  Caller holds the read lock; size >= 1 always. */
static inline uint64_t nda_argextreme_locked(NdaHandle *h, int want_max) {
    uint64_t size = h->hdr->size, e, best = 0;
    uint32_t dt = h->hdr->dtype;
    if (nda_is_float(dt)) {
        double bestv = nda_load_nv(h, 0);
        for (e = 1; e < size; e++) {
            double v = nda_load_nv(h, e);
            if (want_max ? (v > bestv) : (v < bestv)) { bestv = v; best = e; }
        }
    } else if (nda_is_signed(dt)) {
        int64_t bestv = nda_load_i64(h, 0);
        for (e = 1; e < size; e++) {
            int64_t v = nda_load_i64(h, e);
            if (want_max ? (v > bestv) : (v < bestv)) { bestv = v; best = e; }
        }
    } else {
        uint64_t bestv = nda_load_u64(h, 0);
        for (e = 1; e < size; e++) {
            uint64_t v = nda_load_u64(h, e);
            if (want_max ? (v > bestv) : (v < bestv)) { bestv = v; best = e; }
        }
    }
    return best;
}

/* ================================================================
 * Validate create args + header init / setup / open / destroy
 * ================================================================ */

/* Generate a non-zero per-array identity, used ONLY at create time to order
 * element-wise set-op lock acquisition consistently across unrelated
 * processes.  Prefers getrandom(); on any failure/short read falls back to a
 * non-zero mix.  Never returns 0. */
static inline uint64_t nda_gen_array_id(const void *hdr_addr) {
    static uint32_t nda_id_counter = 0;
    uint64_t id = 0;
    ssize_t r = getrandom(&id, sizeof id, 0);
    if (r != (ssize_t)sizeof id) {
        uint32_t c = __atomic_add_fetch(&nda_id_counter, 1, __ATOMIC_RELAXED);
        id = ((uint64_t)(uint32_t)getpid() << 32)
           ^ ((uint64_t)c * 0x9E3779B97F4A7C15ull)
           ^ (uint64_t)(uintptr_t)hdr_addr;
    }
    if (id == 0) id = 0x9E3779B97F4A7C15ull;   /* never 0 */
    return id;
}

/* Validate create args + compute derived shape/strides/size/itemsize.
 * Single source of truth: the XS layer does NOT duplicate these checks.
 * On success fills *out_* and returns 1; on failure writes errbuf, returns 0. */
static int nda_validate_create_args(int dtype, const uint64_t *shape, uint32_t ndim,
                                    uint64_t *out_size, uint64_t out_strides[NDA_MAX_DIMS],
                                    uint64_t *out_data_bytes, char *errbuf) {
    if (errbuf) errbuf[0] = '\0';
    if (dtype < 0 || dtype >= NDA_NTYPES) { NDA_ERR("unknown dtype"); return 0; }
    if (ndim < 1) { NDA_ERR("ndim must be >= 1"); return 0; }
    if (ndim > NDA_MAX_DIMS) { NDA_ERR("ndim must be <= %d", NDA_MAX_DIMS); return 0; }
    uint64_t size = 1;
    for (uint32_t d = 0; d < ndim; d++) {
        if (shape[d] < 1) { NDA_ERR("shape[%u] must be >= 1", d); return 0; }
        /* size *= shape[d] with overflow guard */



( run in 1.396 second using v1.01-cache-2.11-cpan-40ba7b3775d )