Data-Sync-Shared

 view release on metacpan or  search on metacpan

sync.h  view on Meta::CPAN

static inline void sync_spin_pause(void) {
#if defined(__x86_64__) || defined(__i386__)
    __asm__ volatile("pause" ::: "memory");
#elif defined(__aarch64__)
    __asm__ volatile("yield" ::: "memory");
#else
    __asm__ volatile("" ::: "memory");
#endif
}

static inline int sync_pid_alive(uint32_t pid) {
    if (pid == 0) return 1;
    return !(kill((pid_t)pid, 0) == -1 && errno == ESRCH);
}

/* Convert timeout in seconds (double) to absolute deadline */
static inline void sync_make_deadline(double timeout, struct timespec *deadline) {
    clock_gettime(CLOCK_MONOTONIC, deadline);
    deadline->tv_sec += (time_t)timeout;
    deadline->tv_nsec += (long)((timeout - (double)(time_t)timeout) * 1e9);
    if (deadline->tv_nsec >= 1000000000L) {

sync.h  view on Meta::CPAN

        __atomic_add_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        uint32_t cur = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, &hdr->mutex, FUTEX_WAIT, cur,
                              &sync_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(&hdr->mutex, __ATOMIC_RELAXED);
                if (val >= SYNC_MUTEX_WRITER_BIT) {
                    uint32_t pid = val & SYNC_MUTEX_PID_MASK;
                    if (!sync_pid_alive(pid))
                        sync_recover_stale_mutex(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(&hdr->mutex_waiters, 1, __ATOMIC_RELAXED);
        spin = 0;
    }
}

sync.h  view on Meta::CPAN

        /* Sleep when write-locked OR yielding to parked writers (cur==0) */
        if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &sync_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
                if (cur >= SYNC_RWLOCK_WRITER_BIT) {
                    uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
                    if (val >= SYNC_RWLOCK_WRITER_BIT) {
                        uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
                        if (!sync_pid_alive(pid))
                            sync_recover_stale_rwlock(hdr, val);
                    }
                } else {
                    /* Yielding to writers timed out — optimistically drop one
                     * writers_waiting to recover from potentially-crashed
                     * parked writer. A live writer just re-increments. */
                    uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
                    while (wc > 0 && !__atomic_compare_exchange_n(
                            writers_waiting, &wc, wc - 1,
                            1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}

sync.h  view on Meta::CPAN

            } else {
                pts = (struct timespec *)&sync_lock_timeout;
            }
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
                if (cur >= SYNC_RWLOCK_WRITER_BIT) {
                    uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
                    if (val >= SYNC_RWLOCK_WRITER_BIT) {
                        uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
                        if (!sync_pid_alive(pid))
                            sync_recover_stale_rwlock(hdr, val);
                    }
                } else {
                    /* Yielding to writer timed out — drop one writers_waiting
                     * to recover from a potentially-crashed parked writer. */
                    uint32_t wc = __atomic_load_n(writers_waiting, __ATOMIC_RELAXED);
                    while (wc > 0 && !__atomic_compare_exchange_n(
                            writers_waiting, &wc, wc - 1,
                            1, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {}
                }

sync.h  view on Meta::CPAN

        uint32_t cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        if (cur != 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &sync_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
                __atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
                if (val >= SYNC_RWLOCK_WRITER_BIT) {
                    uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
                    if (!sync_pid_alive(pid))
                        sync_recover_stale_rwlock(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
        __atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
        spin = 0;
    }

sync.h  view on Meta::CPAN

            } else {
                pts = (struct timespec *)&sync_lock_timeout;
            }
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur, pts, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
                __atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
                uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
                if (val >= SYNC_RWLOCK_WRITER_BIT) {
                    uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
                    if (!sync_pid_alive(pid))
                        sync_recover_stale_rwlock(hdr, val);
                }
                spin = 0;
                continue;
            }
        }
        __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
        __atomic_sub_fetch(writers_waiting, 1, __ATOMIC_RELAXED);
        spin = 0;
    }

sync.h  view on Meta::CPAN

        if (r == 0) return 0;   /* already done */

        /* r == -1: someone else is running. Wait or detect stale. */
        uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_ACQUIRE);
        if (val == SYNC_ONCE_DONE) return 0;
        if (val == SYNC_ONCE_INIT) continue;  /* race: was reset, retry */

        /* Check stale initializer */
        if (val >= SYNC_MUTEX_WRITER_BIT) {
            uint32_t pid = val & SYNC_MUTEX_PID_MASK;
            if (!sync_pid_alive(pid)) {
                if (__atomic_compare_exchange_n(&hdr->value, &val, SYNC_ONCE_INIT,
                        0, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
                    __atomic_add_fetch(&hdr->stat_recoveries, 1, __ATOMIC_RELAXED);
                    if (__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED) > 0)
                        syscall(SYS_futex, &hdr->value, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
                }
                continue;
            }
        }



( run in 2.367 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )