Data-DisjointSet-Shared
view release on metacpan or search on metacpan
uint32_t magic, version; /* 0,4 */
uint32_t _pad0; /* 8 */
uint32_t _pad1; /* 12 */
/* ---- configuration / partition state ---- */
uint32_t n; /* 16 number of elements = capacity */
uint32_t num_sets; /* 20 current count of disjoint sets */
uint32_t _pad2; /* 24 */
uint32_t _pad3; /* 28 */
/* ---- offsets / size ---- */
uint64_t total_size; /* 32 */
uint64_t reader_slots_off; /* 40 */
uint64_t parent_off; /* 48 */
uint64_t size_off; /* 56 */
/* ---- lock + stats ---- */
uint32_t rwlock; /* 64 */
uint32_t rwlock_waiters; /* 68 */
uint32_t rwlock_writers_waiting; /* 72 */
uint32_t _pad4; /* 76 */
uint64_t stat_ops; /* 80 */
uint8_t _pad[168]; /* 88..255 */
};
typedef struct DsuHeader DsuHeader;
_Static_assert(sizeof(DsuHeader) == 256, "DsuHeader must be 256 bytes");
/* ---- Process-local handle ---- */
typedef struct DsuHandle {
DsuHeader *hdr;
DsuReaderSlot *reader_slots; /* DSU_READER_SLOTS entries */
void *base; /* mmap base */
size_t mmap_size;
char *path; /* backing file path (strdup'd) */
int backing_fd; /* memfd or reopened-fd to close on destroy, -1 for file/anon */
uint32_t my_slot_idx; /* UINT32_MAX if all slots taken (no recovery for this handle) */
uint32_t cached_pid; /* getpid() cached at last slot claim */
uint32_t cached_fork_gen; /* dsu_fork_gen value at last slot claim */
} DsuHandle;
/* ================================================================
* Futex-based write-preferring read-write lock
* with reader-slot dead-process recovery
* ================================================================ */
#define DSU_RWLOCK_SPIN_LIMIT 32
#define DSU_LOCK_TIMEOUT_SEC 2 /* FUTEX_WAIT timeout for stale lock detection */
static inline void dsu_rwlock_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
__asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
__asm__ volatile("yield" ::: "memory");
#else
__asm__ volatile("" ::: "memory");
#endif
}
/* Extract writer PID from rwlock value (lower 31 bits when write-locked). */
#define DSU_RWLOCK_WRITER_BIT 0x80000000U
#define DSU_RWLOCK_PID_MASK 0x7FFFFFFFU
#define DSU_RWLOCK_WR(pid) (DSU_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & DSU_RWLOCK_PID_MASK))
/* Check if a PID is alive. Returns 1 if alive or unknown, 0 if definitely dead. */
/* Liveness via kill(pid,0). NOTE: cannot detect PID reuse -- if a dead
* lock-holder's PID is recycled to an unrelated live process before recovery
* runs, this reports "alive" and that slot's orphaned contribution is not
* reclaimed until the recycled process exits. Robust detection would require
* a per-slot process-start-time epoch (a header-layout/version change).
* Documented under "Crash Safety" in the POD. */
static inline int dsu_pid_alive(uint32_t pid) {
if (pid == 0) return 1; /* no owner recorded, assume alive */
return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}
/* Force-recover a stale write lock left by a dead process.
* CAS to OUR pid to hold the lock while fixing shared state, then release.
* Using our pid (not a bare WRITER_BIT sentinel) means a subsequent
* recovering process can detect and re-recover if we crash mid-recovery. */
static inline void dsu_recover_stale_lock(DsuHandle *h, uint32_t observed_rwlock) {
DsuHeader *hdr = h->hdr;
uint32_t mypid = DSU_RWLOCK_WR((uint32_t)getpid());
if (!__atomic_compare_exchange_n(&hdr->rwlock, &observed_rwlock,
mypid, 0, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
/* We now hold the write lock as mypid. No additional shared state needs
* repair here (this module has no seqlock); just release the lock. */
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static const struct timespec dsu_lock_timeout = { DSU_LOCK_TIMEOUT_SEC, 0 };
/* Process-global fork-generation counter. Incremented in the pthread_atfork
* child callback so every open handle detects a fork transition on the next
* lock call without paying a getpid() syscall on the hot path. */
static uint32_t dsu_fork_gen = 1;
static pthread_once_t dsu_atfork_once = PTHREAD_ONCE_INIT;
static void dsu_on_fork_child(void) {
__atomic_add_fetch(&dsu_fork_gen, 1, __ATOMIC_RELAXED);
}
static void dsu_atfork_init(void) {
pthread_atfork(NULL, NULL, dsu_on_fork_child);
}
/* Ensure this process owns a reader slot. Called from the lock helpers so
* that fork()'d children pick up their own slot lazily instead of sharing
* the parent's. Hot-path is a single relaxed load + compare; only on a
* fork-generation mismatch do we touch getpid() and scan slots. */
static inline void dsu_claim_reader_slot(DsuHandle *h) {
uint32_t cur_gen = __atomic_load_n(&dsu_fork_gen, __ATOMIC_RELAXED);
if (__builtin_expect(cur_gen == h->cached_fork_gen && h->my_slot_idx != UINT32_MAX, 1))
return;
/* Cold path -- register the atfork hook once per process, then claim. */
pthread_once(&dsu_atfork_once, dsu_atfork_init);
/* Re-read after pthread_once: dsu_on_fork_child may have bumped it. */
cur_gen = __atomic_load_n(&dsu_fork_gen, __ATOMIC_RELAXED);
uint32_t now_pid = (uint32_t)getpid();
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
}
static inline void dsu_unpark_reader(DsuHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
}
static inline void dsu_park_writer(DsuHandle *h) {
if (h->my_slot_idx != UINT32_MAX) {
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
__atomic_add_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_add_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
}
static inline void dsu_unpark_writer(DsuHandle *h) {
__atomic_sub_fetch(&h->hdr->rwlock_waiters, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->hdr->rwlock_writers_waiting, 1, __ATOMIC_RELAXED);
if (h->my_slot_idx != UINT32_MAX) {
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].waiters_parked, 1, __ATOMIC_RELAXED);
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].writers_parked, 1, __ATOMIC_RELAXED);
}
}
static inline void dsu_rwlock_rdlock(DsuHandle *h) {
dsu_claim_reader_slot(h);
DsuHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
uint32_t *writers_waiting = &hdr->rwlock_writers_waiting;
/* Claim subcount BEFORE bumping the shared rwlock counter. This way
* a concurrent writer-side recovery scan that sees our PID alive with
* subcount > 0 will (correctly) defer force-reset, even while we are
* still spinning trying to win the rwlock CAS. Without this, a reader
* killed between rwlock CAS-success and subcount++ would let recovery
* force-reset rwlock to 0 underneath us, causing a UINT32_MAX wrap on
* our eventual rdunlock dec. */
if (h->my_slot_idx != UINT32_MAX)
__atomic_add_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
for (int spin = 0; ; spin++) {
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Write-preferring: when lock is free (cur==0) and writers are
* waiting, yield to let the writer acquire. When readers are
* already active (cur>=1), new readers may join freely. */
if (cur > 0 && cur < DSU_RWLOCK_WRITER_BIT) {
if (__atomic_compare_exchange_n(lock, &cur, cur + 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
} else if (cur == 0 && !__atomic_load_n(writers_waiting, __ATOMIC_RELAXED)) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < DSU_RWLOCK_SPIN_LIMIT, 1)) {
dsu_rwlock_spin_pause();
continue;
}
dsu_park_reader(h);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR when yielding to waiting writers */
if (cur >= DSU_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&dsu_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
dsu_unpark_reader(h);
dsu_recover_after_timeout(h);
spin = 0;
continue;
}
}
dsu_unpark_reader(h);
spin = 0;
}
}
static inline void dsu_rwlock_rdunlock(DsuHandle *h) {
DsuHeader *hdr = h->hdr;
/* Release the shared counter BEFORE dropping our subcount so that
* "any live PID with subcount > 0" is a reliable in-flight indicator
* for the writer-side recovery scan. Inverting these would create a
* window where we still own a unit of rwlock but our slot subcount is
* 0, letting recovery force-reset rwlock underneath us. */
uint32_t after = __atomic_sub_fetch(&hdr->rwlock, 1, __ATOMIC_RELEASE);
if (h->my_slot_idx != UINT32_MAX)
__atomic_sub_fetch(&h->reader_slots[h->my_slot_idx].subcount, 1, __ATOMIC_RELAXED);
if (after == 0 && __atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
static inline void dsu_rwlock_wrlock(DsuHandle *h) {
dsu_claim_reader_slot(h); /* refresh cached_pid across fork */
DsuHeader *hdr = h->hdr;
uint32_t *lock = &hdr->rwlock;
/* Encode PID in the rwlock word itself (0x80000000 | pid) to eliminate
* any crash window between acquiring the lock and storing the owner. */
uint32_t mypid = DSU_RWLOCK_WR(h->cached_pid);
for (int spin = 0; ; spin++) {
uint32_t expected = 0;
if (__atomic_compare_exchange_n(lock, &expected, mypid,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
if (__builtin_expect(spin < DSU_RWLOCK_SPIN_LIMIT, 1)) {
dsu_rwlock_spin_pause();
continue;
}
dsu_park_writer(h);
uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (cur != 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&dsu_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
dsu_unpark_writer(h);
dsu_recover_after_timeout(h);
spin = 0;
continue;
}
}
dsu_unpark_writer(h);
spin = 0;
}
static inline void dsu_rwlock_wrunlock(DsuHandle *h) {
DsuHeader *hdr = h->hdr;
__atomic_store_n(&hdr->rwlock, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->rwlock_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->rwlock, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
}
/* ================================================================
* Layout math + create / open / destroy
*
* Layout: Header -> reader_slots[1024] -> parent[n] (uint32) -> size[n] (uint32)
* Both arrays are 4-byte words; the reader-slot region is a multiple of 4
* bytes, so parent_off and size_off are naturally 4-byte aligned.
* ================================================================ */
/* Single source of truth for the mmap region layout offsets. */
typedef struct { uint64_t reader_slots, parent, size; } DsuLayout;
static inline DsuLayout dsu_layout(uint32_t n) {
DsuLayout L;
L.reader_slots = sizeof(DsuHeader);
L.parent = L.reader_slots + (uint64_t)DSU_READER_SLOTS * sizeof(DsuReaderSlot);
L.size = L.parent + (uint64_t)n * sizeof(uint32_t);
return L;
}
static inline uint64_t dsu_total_size(uint32_t n) {
DsuLayout L = dsu_layout(n);
return L.size + (uint64_t)n * sizeof(uint32_t); /* parent[n] + size[n] */
}
static inline uint32_t *dsu_parent(DsuHandle *h) {
return (uint32_t *)((char *)h->base + h->hdr->parent_off);
}
static inline uint32_t *dsu_size(DsuHandle *h) {
return (uint32_t *)((char *)h->base + h->hdr->size_off);
}
/* ================================================================
* Union-find core (callers hold the WRITE lock -- find compresses)
* ================================================================ */
/* Find the root of x with path halving (every other node on the path is
* relinked to its grandparent). MUTATING -- the caller must hold the write
* lock. x must already be range-checked (< n) by the XS layer. */
static inline uint32_t dsu_find(DsuHandle *h, uint32_t x) {
uint32_t *p = dsu_parent(h);
while (p[x] != x) {
p[x] = p[p[x]]; /* path halving */
x = p[x];
}
return x;
}
/* Union the sets containing a and b by size (the larger-sized root wins, so
* the tree stays shallow). Returns 1 if the two were in different sets and
* are now merged, 0 if they were already in the same set. Caller holds the
* write lock; a and b are range-checked. */
static inline int dsu_union_locked(DsuHandle *h, uint32_t a, uint32_t b) {
uint32_t ra = dsu_find(h, a), rb = dsu_find(h, b);
if (ra == rb) return 0;
uint32_t *p = dsu_parent(h);
uint32_t *sz = dsu_size(h);
if (sz[ra] < sz[rb]) { uint32_t t = ra; ra = rb; rb = t; }
p[rb] = ra;
sz[ra] += sz[rb];
h->hdr->num_sets--;
return 1;
}
/* Whether a and b are in the same set (mutates via path compression). */
static inline int dsu_connected_locked(DsuHandle *h, uint32_t a, uint32_t b) {
return dsu_find(h, a) == dsu_find(h, b);
}
/* Size of the set containing x (mutates via path compression). */
static inline uint32_t dsu_set_size_locked(DsuHandle *h, uint32_t x) {
return dsu_size(h)[dsu_find(h, x)];
}
/* Reset to all singletons: parent[i]=i, size[i]=1, num_sets=n.
* Caller holds the write lock. */
static inline void dsu_reset_locked(DsuHandle *h) {
uint32_t *p = dsu_parent(h);
uint32_t *sz = dsu_size(h);
uint32_t n = h->hdr->n;
for (uint32_t i = 0; i < n; i++) { p[i] = i; sz[i] = 1; }
h->hdr->num_sets = n;
}
/* ================================================================
* Validate args + header init / setup / open / destroy
* ================================================================ */
/* Validate create args. Single source of truth: the XS layer does NOT
* duplicate this range check. */
static int dsu_validate_create_args(uint64_t n, char *errbuf) {
if (errbuf) errbuf[0] = '\0';
if (n < 1) { DSU_ERR("n must be >= 1"); return 0; }
if (n > DSU_MAX_N) { DSU_ERR("n must be <= %u", (unsigned)DSU_MAX_N); return 0; }
return 1;
}
static inline void dsu_init_header(void *base, uint32_t n, uint64_t total_size) {
DsuLayout L = dsu_layout(n);
DsuHeader *hdr = (DsuHeader *)base;
/* Explicitly zero the header + reader-slot region (lock-recovery state);
the parent/size arrays are initialized explicitly below. */
memset(base, 0, (size_t)L.parent);
hdr->magic = DSU_MAGIC;
hdr->version = DSU_VERSION;
hdr->n = n;
hdr->num_sets = n;
hdr->total_size = total_size;
hdr->reader_slots_off = L.reader_slots;
hdr->parent_off = L.parent;
hdr->size_off = L.size;
{
uint32_t *p = (uint32_t *)((char *)base + L.parent);
uint32_t *sz = (uint32_t *)((char *)base + L.size);
for (uint32_t i = 0; i < n; i++) { p[i] = i; sz[i] = 1; }
}
__atomic_thread_fence(__ATOMIC_SEQ_CST);
}
static inline DsuHandle *dsu_setup(void *base, size_t map_size,
const char *path, int backing_fd) {
DsuHeader *hdr = (DsuHeader *)base;
DsuHandle *h = (DsuHandle *)calloc(1, sizeof(DsuHandle));
if (!h) {
munmap(base, map_size);
if (backing_fd >= 0) close(backing_fd);
return NULL;
}
h->hdr = hdr;
h->base = base;
h->reader_slots = (DsuReaderSlot *)((uint8_t *)base + hdr->reader_slots_off);
h->mmap_size = map_size;
h->path = path ? strdup(path) : NULL;
h->backing_fd = backing_fd;
h->my_slot_idx = UINT32_MAX;
return h;
}
( run in 0.462 second using v1.01-cache-2.11-cpan-bbe5e583499 )