Data-DisjointSet-Shared

 view release on metacpan or  search on metacpan

dsu.h  view on Meta::CPAN

    uint32_t magic, version;          /* 0,4 */
    uint32_t _pad0;                   /* 8 */
    uint32_t _pad1;                   /* 12 */

    /* ---- configuration / partition state ---- */
    uint32_t n;                       /* 16  number of elements = capacity     */
    uint32_t num_sets;                /* 20  current count of disjoint sets     */
    uint32_t _pad2;                   /* 24 */
    uint32_t _pad3;                   /* 28 */

    /* ---- offsets / size ---- */
    uint64_t total_size;              /* 32 */
    uint64_t reader_slots_off;        /* 40 */
    uint64_t parent_off;              /* 48 */
    uint64_t size_off;                /* 56 */

    /* ---- lock + stats ---- */
    uint32_t rwlock;                  /* 64 */
    uint32_t rwlock_waiters;          /* 68 */
    uint32_t rwlock_writers_waiting;  /* 72 */
    uint32_t _pad4;                   /* 76 */
    uint64_t stat_ops;                /* 80 */
    uint8_t  _pad[168];               /* 88..255 */
};
typedef struct DsuHeader DsuHeader;

_Static_assert(sizeof(DsuHeader) == 256, "DsuHeader must be 256 bytes");

/* ---- Process-local handle ---- */

typedef struct DsuHandle {
    DsuHeader     *hdr;
    DsuReaderSlot *reader_slots;  /* DSU_READER_SLOTS entries */
    void          *base;          /* mmap base */
    size_t         mmap_size;
    char          *path;          /* backing file path (strdup'd) */
    int            backing_fd;    /* memfd or reopened-fd to close on destroy, -1 for file/anon */
    uint32_t       my_slot_idx;   /* UINT32_MAX if all slots taken (no recovery for this handle) */
    uint32_t       cached_pid;    /* getpid() cached at last slot claim */
    uint32_t       cached_fork_gen; /* dsu_fork_gen value at last slot claim */
} DsuHandle;

/* ================================================================
 * Futex-based write-preferring read-write lock
 * with reader-slot dead-process recovery
 * ================================================================ */

#define DSU_RWLOCK_SPIN_LIMIT 32
#define DSU_LOCK_TIMEOUT_SEC  2  /* FUTEX_WAIT timeout for stale lock detection */

static inline void dsu_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define DSU_RWLOCK_WRITER_BIT 0x80000000U
#define DSU_RWLOCK_PID_MASK   0x7FFFFFFFU
#define DSU_RWLOCK_WR(pid)    (DSU_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & DSU_RWLOCK_PID_MASK))

/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
 * lock-holder's PID is recycled to an unrelated live process before recovery
 * runs, this reports "alive" and that slot's orphaned contribution is not
 * reclaimed until the recycled process exits. Robust detection would require
 * a per-slot process-start-time epoch (a header-layout/version change).
 * Documented under "Crash Safety" in the POD. */
static inline int dsu_pid_alive(uint32_t pid) {
    if (pid == 0) return 1; /* no owner recorded, assume alive */
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* Force-recover a stale write lock left by a dead process.
 * CAS to OUR pid to hold the lock while fixing shared state, then release.
 * Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
 * recovering process can detect and re-recover if we crash mid-recovery. */
static inline void dsu_recover_stale_lock(DsuHandle *h, uint32_t observed_rwlock) {
    DsuHeader *hdr = h->hdr;
    uint32_t mypid = DSU_RWLOCK_WR((uint32_t)getpid());
    if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
            mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
        return;
    /* We now hold the write lock as mypid.  No additional shared state needs
     * repair here (this module has no seqlock); just release the lock. */
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static const struct timespec dsu_lock_timeout = { DSU_LOCK_TIMEOUT_SEC, 0 };

/* Process-global fork-generation counter.  Incremented in the pthread_atfork
 * child callback so every open handle detects a fork transition on the next
 * lock call without paying a getpid() syscall on the hot path. */
static uint32_t dsu_fork_gen = 1;
static pthread_once_t dsu_atfork_once = PTHREAD_ONCE_INIT;
static void dsu_on_fork_child(void) {
    __atomic_add_fetch(&dsu_fork_gen, 1, __ATOMIC_RELAXED);
}
static void dsu_atfork_init(void) {
    pthread_atfork(NULL, NULL, dsu_on_fork_child);
}

/* Ensure this process owns a reader slot.  Called from the lock helpers so
 * that fork()'d children pick up their own slot lazily instead of sharing
 * the parent's.  Hot-path is a single relaxed load + compare; only on a
 * fork-generation mismatch do we touch getpid() and scan slots. */
static inline void dsu_claim_reader_slot(DsuHandle *h) {
    uint32_t cur_gen = __atomic_load_n(&dsu_fork_gen, __ATOMIC_RELAXED);
    if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
        return;
    /* Cold path -- register the atfork hook once per process, then claim. */
    pthread_once(&dsu_atfork_once, dsu_atfork_init);
    /* Re-read after pthread_once: dsu_on_fork_child may have bumped it. */
    cur_gen = __atomic_load_n(&dsu_fork_gen, __ATOMIC_RELAXED);
    uint32_t now_pid = (uint32_t)getpid();

dsu.h  view on Meta::CPAN

    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void dsu_unpark_reader(DsuHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void dsu_park_writer(DsuHandle *h) {
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
    __atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void dsu_unpark_writer(DsuHandle *h) {
    __atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
    __atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
    if (h->my_slot_idx != UINT32_MAX) {
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
    }
}

static inline void dsu_rwlock_rdlock(DsuHandle *h) {
    dsu_claim_reader_slot(h);
    DsuHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
    /* Claim subcount BEFORE bumping the shared rwlock counter.  This way
     * a concurrent writer-side recovery scan that sees our PID alive with
     * subcount > 0 will (correctly) defer force-reset, even while we are
     * still spinning trying to win the rwlock CAS.  Without this, a reader
     * killed between rwlock CAS-success and subcount++ would let recovery
     * force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
     * our eventual rdunlock dec. */
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    for (int spin = 0; ; spin++) {
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Write-preferring: when lock is free (cur==0) and writers are
         * waiting, yield to let the writer acquire. When readers are
         * already active (cur>=1), new readers may join freely. */
        if (cur > 0 && cur < DSU_RWLOCK_WRITER_BIT) {
            if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        } else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < DSU_RWLOCK_SPIN_LIMIT, 1)) {
            dsu_rwlock_spin_pause();
            continue;
        }
        dsu_park_reader(h);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR when yielding to waiting writers */
        if (cur >= DSU_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &dsu_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                dsu_unpark_reader(h);
                dsu_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        dsu_unpark_reader(h);
        spin = 0;
    }
}

static inline void dsu_rwlock_rdunlock(DsuHandle *h) {
    DsuHeader *hdr = h->hdr;
    /* Release the shared counter BEFORE dropping our subcount so that
     * "any live PID with subcount > 0" is a reliable in-flight indicator
     * for the writer-side recovery scan.  Inverting these would create a
     * window where we still own a unit of rwlock but our slot subcount is
     * 0, letting recovery force-reset rwlock underneath us. */
    uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
    if (h->my_slot_idx != UINT32_MAX)
        __atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
    if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

static inline void dsu_rwlock_wrlock(DsuHandle *h) {
    dsu_claim_reader_slot(h);  /* refresh cached_pid across fork */
    DsuHeader *hdr = h->hdr;
    uint32_t *lock = &hdr->rwlock;
    /* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
     * any crash window between acquiring the lock and storing the owner. */
    uint32_t mypid = DSU_RWLOCK_WR(h->cached_pid);
    for (int spin = 0; ; spin++) {
        uint32_t expected = 0;
        if (__atomic_compare_exchange_n(lock, &expected, mypid,
                1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
            return;
        if (__builtin_expect(spin < DSU_RWLOCK_SPIN_LIMIT, 1)) {
            dsu_rwlock_spin_pause();
            continue;
        }
        dsu_park_writer(h);
        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &dsu_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                dsu_unpark_writer(h);
                dsu_recover_after_timeout(h);
                spin = 0;
                continue;
            }
        }
        dsu_unpark_writer(h);
        spin = 0;
    }

dsu.h  view on Meta::CPAN


static inline void dsu_rwlock_wrunlock(DsuHandle *h) {
    DsuHeader *hdr = h->hdr;
    __atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}

/* ================================================================
 * Layout math + create / open / destroy
 *
 * Layout: Header -> reader_slots[1024] -> parent[n] (uint32) -> size[n] (uint32)
 * Both arrays are 4-byte words; the reader-slot region is a multiple of 4
 * bytes, so parent_off and size_off are naturally 4-byte aligned.
 * ================================================================ */

/* Single source of truth for the mmap region layout offsets. */
typedef struct { uint64_t reader_slots, parent, size; } DsuLayout;

static inline DsuLayout dsu_layout(uint32_t n) {
    DsuLayout L;
    L.reader_slots = sizeof(DsuHeader);
    L.parent       = L.reader_slots + (uint64_t)DSU_READER_SLOTS * sizeof(DsuReaderSlot);
    L.size         = L.parent + (uint64_t)n * sizeof(uint32_t);
    return L;
}

static inline uint64_t dsu_total_size(uint32_t n) {
    DsuLayout L = dsu_layout(n);
    return L.size + (uint64_t)n * sizeof(uint32_t);   /* parent[n] + size[n] */
}

static inline uint32_t *dsu_parent(DsuHandle *h) {
    return (uint32_t *)((char *)h->base + h->hdr->parent_off);
}

static inline uint32_t *dsu_size(DsuHandle *h) {
    return (uint32_t *)((char *)h->base + h->hdr->size_off);
}

/* ================================================================
 * Union-find core (callers hold the WRITE lock -- find compresses)
 * ================================================================ */

/* Find the root of x with path halving (every other node on the path is
 * relinked to its grandparent).  MUTATING -- the caller must hold the write
 * lock.  x must already be range-checked (< n) by the XS layer. */
static inline uint32_t dsu_find(DsuHandle *h, uint32_t x) {
    uint32_t *p = dsu_parent(h);
    while (p[x] != x) {
        p[x] = p[p[x]];   /* path halving */
        x = p[x];
    }
    return x;
}

/* Union the sets containing a and b by size (the larger-sized root wins, so
 * the tree stays shallow).  Returns 1 if the two were in different sets and
 * are now merged, 0 if they were already in the same set.  Caller holds the
 * write lock; a and b are range-checked. */
static inline int dsu_union_locked(DsuHandle *h, uint32_t a, uint32_t b) {
    uint32_t ra = dsu_find(h, a), rb = dsu_find(h, b);
    if (ra == rb) return 0;
    uint32_t *p  = dsu_parent(h);
    uint32_t *sz = dsu_size(h);
    if (sz[ra] < sz[rb]) { uint32_t t = ra; ra = rb; rb = t; }
    p[rb] = ra;
    sz[ra] += sz[rb];
    h->hdr->num_sets--;
    return 1;
}

/* Whether a and b are in the same set (mutates via path compression). */
static inline int dsu_connected_locked(DsuHandle *h, uint32_t a, uint32_t b) {
    return dsu_find(h, a) == dsu_find(h, b);
}

/* Size of the set containing x (mutates via path compression). */
static inline uint32_t dsu_set_size_locked(DsuHandle *h, uint32_t x) {
    return dsu_size(h)[dsu_find(h, x)];
}

/* Reset to all singletons: parent[i]=i, size[i]=1, num_sets=n.
 * Caller holds the write lock. */
static inline void dsu_reset_locked(DsuHandle *h) {
    uint32_t *p = dsu_parent(h);
    uint32_t *sz = dsu_size(h);
    uint32_t n = h->hdr->n;
    for (uint32_t i = 0; i < n; i++) { p[i] = i; sz[i] = 1; }
    h->hdr->num_sets = n;
}

/* ================================================================
 * Validate args + header init / setup / open / destroy
 * ================================================================ */

/* Validate create args.  Single source of truth: the XS layer does NOT
 * duplicate this range check. */
static int dsu_validate_create_args(uint64_t n, char *errbuf) {
    if (errbuf) errbuf[0] = '\0';
    if (n < 1) { DSU_ERR("n must be >= 1"); return 0; }
    if (n > DSU_MAX_N) { DSU_ERR("n must be <= %u", (unsigned)DSU_MAX_N); return 0; }
    return 1;
}

static inline void dsu_init_header(void *base, uint32_t n, uint64_t total_size) {
    DsuLayout L = dsu_layout(n);
    DsuHeader *hdr = (DsuHeader *)base;
    /* Explicitly zero the header + reader-slot region (lock-recovery state);
       the parent/size arrays are initialized explicitly below. */
    memset(base, 0, (size_t)L.parent);
    hdr->magic            = DSU_MAGIC;
    hdr->version          = DSU_VERSION;
    hdr->n                = n;
    hdr->num_sets         = n;
    hdr->total_size       = total_size;
    hdr->reader_slots_off = L.reader_slots;
    hdr->parent_off       = L.parent;
    hdr->size_off         = L.size;
    {
        uint32_t *p  = (uint32_t *)((char *)base + L.parent);
        uint32_t *sz = (uint32_t *)((char *)base + L.size);
        for (uint32_t i = 0; i < n; i++) { p[i] = i; sz[i] = 1; }
    }
    __atomic_thread_fence(__ATOMIC_SEQ_CST);
}

static inline DsuHandle *dsu_setup(void *base, size_t map_size,
                                   const char *path, int backing_fd) {
    DsuHeader *hdr = (DsuHeader *)base;
    DsuHandle *h = (DsuHandle *)calloc(1, sizeof(DsuHandle));
    if (!h) {
        munmap(base, map_size);
        if (backing_fd >= 0) close(backing_fd);
        return NULL;
    }
    h->hdr          = hdr;
    h->base         = base;
    h->reader_slots = (DsuReaderSlot *)((uint8_t *)base + hdr->reader_slots_off);
    h->mmap_size    = map_size;
    h->path         = path ? strdup(path) : NULL;
    h->backing_fd   = backing_fd;
    h->my_slot_idx  = UINT32_MAX;
    return h;
}



( run in 0.462 second using v1.01-cache-2.11-cpan-bbe5e583499 )