Data-Sync-Shared

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

        my $s  = $obj->stats;          # diagnostic hashref

    Stats keys vary by type. All counters are approximate under concurrency.

    Semaphore: "value", "max", "waiters", "mmap_size", "acquires",
    "releases", "waits", "timeouts", "recoveries".

    Barrier: "parties", "arrived", "generation", "waiters", "mmap_size",
    "waits", "releases", "timeouts".

    RWLock: "state" ("unlocked", "read_locked", "write_locked"), "readers",
    "waiters", "mmap_size", "acquires", "releases", "recoveries".

    Condvar: "waiters", "signals", "mmap_size", "acquires", "releases",
    "waits", "timeouts", "recoveries".

    Once: "state" ("init", "running", "done"), "is_done", "waiters",
    "mmap_size", "acquires", "releases", "waits", "timeouts", "recoveries".

   eventfd Integration
        my $fd = $obj->eventfd;        # create eventfd, returns fd

Shared.xs  view on Meta::CPAN

SV *
stats(self)
    SV *self
  PREINIT:
    EXTRACT_HANDLE("Data::Sync::Shared::RWLock", self);
  CODE:
    HV *hv = newHV();
    SyncHeader *hdr = h->hdr;
    uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
    const char *state;
    if (val == 0) state = "unlocked";
    else if (val < SYNC_RWLOCK_WRITER_BIT) state = "read_locked";
    else state = "write_locked";
    hv_store(hv, "state", 5, newSVpv(state, 0), 0);
    hv_store(hv, "readers", 7,
        newSVuv(val < SYNC_RWLOCK_WRITER_BIT ? val : 0), 0);
    hv_store(hv, "waiters", 7, newSVuv((UV)__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED)), 0);
    hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
    hv_store(hv, "acquires", 8, newSVuv((UV)__atomic_load_n(&hdr->stat_acquires, __ATOMIC_RELAXED)), 0);
    hv_store(hv, "releases", 8, newSVuv((UV)__atomic_load_n(&hdr->stat_releases, __ATOMIC_RELAXED)), 0);
    hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
    RETVAL = newRV_noinc((SV *)hv);
  OUTPUT:

eg/guard.pl  view on Meta::CPAN

}

# ---- Condvar guard ----
{
    my $cv = Data::Sync::Shared::Condvar->new(undef);

    {
        my $g = $cv->lock_guard;
        print "cv: lock_guard held\n";
    }
    # Prove it unlocked by locking again
    $cv->lock;
    $cv->unlock;
    print "cv: lock_guard released correctly\n\n";
}

# ---- Exception safety ----
{
    my $sem = Data::Sync::Shared::Semaphore->new(undef, 5);

    for my $i (1..5) {

lib/Data/Sync/Shared.pm  view on Meta::CPAN

    my $s  = $obj->stats;          # diagnostic hashref

Stats keys vary by type. All counters are approximate under concurrency.

B<Semaphore:> C<value>, C<max>, C<waiters>, C<mmap_size>, C<acquires>,
C<releases>, C<waits>, C<timeouts>, C<recoveries>.

B<Barrier:> C<parties>, C<arrived>, C<generation>, C<waiters>,
C<mmap_size>, C<waits>, C<releases>, C<timeouts>.

B<RWLock:> C<state> (C<"unlocked">, C<"read_locked">, C<"write_locked">),
C<readers>, C<waiters>, C<mmap_size>, C<acquires>, C<releases>,
C<recoveries>.

B<Condvar:> C<waiters>, C<signals>, C<mmap_size>, C<acquires>,
C<releases>, C<waits>, C<timeouts>, C<recoveries>.

B<Once:> C<state> (C<"init">, C<"running">, C<"done">), C<is_done>,
C<waiters>, C<mmap_size>, C<acquires>, C<releases>, C<waits>,
C<timeouts>, C<recoveries>.

sync.h  view on Meta::CPAN

    /* ---- Cache line 0 (0-63): immutable after create ---- */
    uint32_t magic;          /* 0 */
    uint32_t version;        /* 4 */
    uint32_t type;           /* 8: SYNC_TYPE_* */
    uint32_t param;          /* 12: type-specific (sem max, barrier count, etc.) */
    uint64_t total_size;     /* 16: mmap size */
    uint8_t  _pad0[40];      /* 24-63 */

    /* ---- Cache line 1 (64-127): mutable state ---- */

    /* Semaphore: value = current count, waiters = blocked acquirers */
    /* Barrier: value = arrived count, waiters = blocked at barrier,
                generation = increments each time barrier trips */
    /* RWLock: value = rwlock word (0=free, N=N readers, 0x80000000|pid=writer),
               waiters = blocked lockers */
    /* Condvar: value = signal counter (futex word), waiters = blocked waiters,
                mutex = associated mutex for predicate protection */
    /* Once: value = state (0=INIT, 1=RUNNING|pid, 2=DONE),
             waiters = blocked on completion */

    uint32_t value;          /* 64: primary state word (futex target) */
    uint32_t waiters;        /* 68: waiter count */
    uint32_t generation;     /* 72: barrier generation / condvar epoch */
    uint32_t mutex;          /* 76: condvar mutex (0 or PID|0x80000000) */
    uint32_t mutex_waiters;  /* 80: condvar mutex waiter count */
    uint32_t stat_recoveries;/* 84 */
    uint64_t stat_acquires;  /* 88 */
    uint64_t stat_releases;  /* 96 */
    uint64_t stat_waits;     /* 104 */

sync.h  view on Meta::CPAN


static inline void sync_mutex_unlock(SyncHeader *hdr) {
    __atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
    if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
        syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}

/* ================================================================
 * RWLock helpers (for SYNC_TYPE_RWLOCK)
 *
 * value == 0:                  unlocked
 * value  1..0x7FFFFFFF:        N active readers
 * value  0x80000000 | pid:     write-locked by pid
 * ================================================================ */

#define SYNC_RWLOCK_WRITER_BIT 0x80000000U
#define SYNC_RWLOCK_PID_MASK   0x7FFFFFFFU
#define SYNC_RWLOCK_WR(pid)    (SYNC_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SYNC_RWLOCK_PID_MASK))

static inline int sync_rwlock_try_rdlock(SyncHeader *hdr);
static inline int sync_rwlock_try_wrlock(SyncHeader *hdr);

static inline void sync_recover_stale_rwlock(SyncHeader *hdr, uint32_t observed) {

sync.h  view on Meta::CPAN

            if (__atomic_compare_exchange_n(lock, &cur, 1,
                    1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
                return;
        }
        if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
            sync_spin_pause();
            continue;
        }
        __atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
        cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
        /* Sleep when write-locked OR yielding to parked writers (cur==0) */
        if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
            long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
                              &sync_lock_timeout, NULL, 0);
            if (rc == -1 && errno == ETIMEDOUT) {
                __atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
                if (cur >= SYNC_RWLOCK_WRITER_BIT) {
                    uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
                    if (val >= SYNC_RWLOCK_WRITER_BIT) {
                        uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
                        if (!sync_pid_alive(pid))

t/03-rwlock.t  view on Meta::CPAN

my $path = tmpnam() . '.shm';
END { unlink $path if $path && -f $path }

# Basic create
my $rw = Data::Sync::Shared::RWLock->new($path);
ok $rw, 'created rwlock';

# Read lock
$rw->rdlock;
my $s = $rw->stats;
is $s->{state}, 'read_locked', 'state is read_locked';
is $s->{readers}, 1, '1 reader';

# Multiple readers
$rw->rdlock;
$s = $rw->stats;
is $s->{readers}, 2, '2 readers';
$rw->rdunlock;
$rw->rdunlock;

$s = $rw->stats;
is $s->{state}, 'unlocked', 'unlocked after rdunlock';

# Write lock
$rw->wrlock;
$s = $rw->stats;
is $s->{state}, 'write_locked', 'state is write_locked';
is $s->{readers}, 0, '0 readers during write';
$rw->wrunlock;

$s = $rw->stats;
is $s->{state}, 'unlocked', 'unlocked after wrunlock';

# try_rdlock / try_wrlock
ok $rw->try_rdlock, 'try_rdlock succeeds when free';
ok !$rw->try_wrlock, 'try_wrlock fails when readers hold';
$rw->rdunlock;

ok $rw->try_wrlock, 'try_wrlock succeeds when free';
ok !$rw->try_rdlock, 'try_rdlock fails when writer holds';
ok !$rw->try_wrlock, 'try_wrlock fails when writer holds';
$rw->wrunlock;

# Downgrade: wrlock -> rdlock
$rw->wrlock;
$s = $rw->stats;
is $s->{state}, 'write_locked', 'write_locked before downgrade';
$rw->downgrade;
$s = $rw->stats;
is $s->{state}, 'read_locked', 'read_locked after downgrade';
is $s->{readers}, 1, '1 reader after downgrade';
ok $rw->try_rdlock, 'can rdlock after downgrade';
$rw->rdunlock;
$rw->rdunlock;

# Path
is $rw->path, $path, 'path correct';

# Reopen existing
my $rw2 = Data::Sync::Shared::RWLock->new($path);

t/07-new-features.t  view on Meta::CPAN


# ============================================================
# RWLock: rdlock_guard / wrlock_guard
# ============================================================

{
    my $rw = Data::Sync::Shared::RWLock->new(undef);
    {
        my $g = $rw->rdlock_guard;
        my $s = $rw->stats;
        is $s->{state}, 'read_locked', 'rdlock_guard locks';
    }
    my $s = $rw->stats;
    is $s->{state}, 'unlocked', 'rdlock_guard unlocks on scope exit';

    {
        my $g = $rw->wrlock_guard;
        $s = $rw->stats;
        is $s->{state}, 'write_locked', 'wrlock_guard locks';
    }
    $s = $rw->stats;
    is $s->{state}, 'unlocked', 'wrlock_guard unlocks on scope exit';
}

# ============================================================
# Condvar: lock_guard
# ============================================================

{
    my $cv = Data::Sync::Shared::Condvar->new(undef);
    {
        my $g = $cv->lock_guard;

xt/crash_recovery.t  view on Meta::CPAN

# Recovery should happen within ~2s (LOCK_TIMEOUT_SEC).
# ============================================================
{
    my $rw = Data::Sync::Shared::RWLock->new(undef);

    my $pipe = IO::Pipe->new;
    my $pid = fork // die "fork: $!";
    if ($pid == 0) {
        $pipe->writer;
        $rw->wrlock;
        print $pipe "locked\n";
        $pipe->close;
        sleep(60);  # hold forever — parent will kill us
        _exit(0);
    }
    $pipe->reader;
    <$pipe>;  # wait until child holds the lock
    $pipe->close;

    kill 9, $pid;
    waitpid($pid, 0);

xt/crash_recovery.t  view on Meta::CPAN

# ============================================================
{
    my $rw = Data::Sync::Shared::RWLock->new(undef);

    for my $round (1..3) {
        my $pipe = IO::Pipe->new;
        my $pid = fork // die "fork: $!";
        if ($pid == 0) {
            $pipe->writer;
            $rw->wrlock;
            print $pipe "locked\n";
            $pipe->close;
            sleep(60);
            _exit(0);
        }
        $pipe->reader;
        <$pipe>;
        $pipe->close;

        kill 9, $pid;
        waitpid($pid, 0);

xt/crash_recovery.t  view on Meta::CPAN

# and be able to lock.
# ============================================================
{
    my $cv = Data::Sync::Shared::Condvar->new(undef);

    my $pipe = IO::Pipe->new;
    my $pid = fork // die "fork: $!";
    if ($pid == 0) {
        $pipe->writer;
        $cv->lock;
        print $pipe "locked\n";
        $pipe->close;
        sleep(60);  # hold forever
        _exit(0);
    }
    $pipe->reader;
    <$pipe>;
    $pipe->close;

    kill 9, $pid;
    waitpid($pid, 0);

xt/guard_stress.t  view on Meta::CPAN


# ============================================================
# 4. RWLock rdlock_guard: rapid create/destroy
# ============================================================
{
    my $rw = Data::Sync::Shared::RWLock->new(undef);
    for (1..$CYCLES) {
        my $g = $rw->rdlock_guard;
    }
    my $s = $rw->stats;
    is $s->{state}, 'unlocked', "rdlock_guard: unlocked after $CYCLES cycles";
}

# ============================================================
# 5. RWLock wrlock_guard: rapid create/destroy
# ============================================================
{
    my $rw = Data::Sync::Shared::RWLock->new(undef);
    for (1..$CYCLES) {
        my $g = $rw->wrlock_guard;
    }
    my $s = $rw->stats;
    is $s->{state}, 'unlocked', "wrlock_guard: unlocked after $CYCLES cycles";
}

# ============================================================
# 6. RWLock guard under fork contention (mixed rd/wr)
# ============================================================
{
    my $rw = Data::Sync::Shared::RWLock->new(undef);
    my $per = int($CYCLES / $WORKERS);

    my @pids;

xt/guard_stress.t  view on Meta::CPAN

            _exit(0);
        }
        push @pids, $pid;
    }

    my $ok = 1;
    for (@pids) { waitpid($_, 0); $ok = 0 if $? }

    ok $ok, "rwlock guard fork: $WORKERS workers x $per mixed cycles";
    my $s = $rw->stats;
    is $s->{state}, 'unlocked', "rwlock guard fork: unlocked after contention";
}

# ============================================================
# 7. Condvar lock_guard: rapid create/destroy
# ============================================================
{
    my $cv = Data::Sync::Shared::Condvar->new(undef);
    for (1..$CYCLES) {
        my $g = $cv->lock_guard;
    }
    ok 1, "condvar lock_guard: survived $CYCLES cycles";
    # verify not deadlocked
    $cv->lock;
    $cv->unlock;
    ok 1, "condvar lock_guard: lock still acquirable";
}

# ============================================================
# 8. Guard exception safety: die inside guarded scope
# ============================================================
{
    my $sem = Data::Sync::Shared::Semaphore->new(undef, 5);

xt/guard_stress.t  view on Meta::CPAN

    is $sem->value, 5, "sem guard exception: value intact after 1000 die cycles";

    my $rw = Data::Sync::Shared::RWLock->new(undef);
    for (1..1000) {
        eval {
            my $g = $rw->wrlock_guard;
            die "boom" if $_ % 3 == 0;
        };
    }
    my $s = $rw->stats;
    is $s->{state}, 'unlocked', "rwlock guard exception: unlocked after 1000 die cycles";
}

# ============================================================
# 9. Nested guards
# ============================================================
{
    my $sem = Data::Sync::Shared::Semaphore->new(undef, 10);
    {
        my $g1 = $sem->acquire_guard(3);
        is $sem->value, 7, 'nested guard: outer took 3';

xt/persistence.t  view on Meta::CPAN

        print \$s->value, "\\n";
        print \$s->max, "\\n";
    ' 2>&1`;
    chomp $out;
    my @lines = split /\n/, $out;
    is $lines[0], '7', 'child process: sem value persisted (7)';
    is $lines[1], '10', 'child process: sem max persisted (10)';
    unlink $path;
}

# 2. RWLock: unlocked state persists (no lock held across exec)
{
    my $path = tmpnam() . '.shm';
    my $rw = Data::Sync::Shared::RWLock->new($path);
    $rw->wrlock;
    $rw->wrunlock;
    $rw->sync;

    my $out = `$perl -Mblib -MData::Sync::Shared -e '
        my \$rw = Data::Sync::Shared::RWLock->new("\Q$path\E");
        my \$s = \$rw->stats;
        print \$s->{state}, "\\n";
        print \$s->{acquires}, "\\n";
    ' 2>&1`;
    chomp $out;
    my @lines = split /\n/, $out;
    is $lines[0], 'unlocked', 'child: rwlock state persisted (unlocked)';
    ok $lines[1] > 0, 'child: rwlock acquires counter persisted';
    unlink $path;
}

# 3. Once: done state persists
{
    my $path = tmpnam() . '.shm';
    my $once = Data::Sync::Shared::Once->new($path);
    $once->enter;
    $once->done;

xt/signal_safety.t  view on Meta::CPAN


    # fire SIGALRM every 10ms
    ualarm(10_000, 10_000);
    $rw->wrlock(5.0);
    ualarm(0);
    $rw->wrunlock;

    waitpid($pid, 0);
    ok $alarms > 0, "rwlock: received $alarms SIGALRM during wait";
    my $s = $rw->stats;
    is $s->{state}, 'unlocked', 'rwlock state clean after SIGALRM';
}

# 3. Condvar: SIGALRM during wait
{
    my $cv = Data::Sync::Shared::Condvar->new(undef);
    my $alarms = 0;

    local $SIG{ALRM} = sub { $alarms++ };

    my $pid = fork // die "fork: $!";

xt/sigterm_cleanup.t  view on Meta::CPAN

    $c->acquire;
    # Suspend here; parent will SIGTERM us
    sleep 60;
    _exit(0);
}

select undef, undef, undef, 0.2;
kill TERM => $pid;
waitpid $pid, 0;

# Parent should still operate. If DESTROY deadlocked holding a lock, we'd hang.
my $t0 = time;
my $got = $sem->acquire(2.0);
my $dt = time - $t0;
ok $got, 'acquire succeeded after child SIGTERM';
ok $dt < 3, sprintf('no deadlock (%.2fs)', $dt);

unlink $path;
done_testing;



( run in 1.517 second using v1.01-cache-2.11-cpan-39bf76dae61 )