view release on metacpan or search on metacpan
my $s = $obj->stats; # diagnostic hashref
Stats keys vary by type. All counters are approximate under concurrency.
Semaphore: "value", "max", "waiters", "mmap_size", "acquires",
"releases", "waits", "timeouts", "recoveries".
Barrier: "parties", "arrived", "generation", "waiters", "mmap_size",
"waits", "releases", "timeouts".
RWLock: "state" ("unlocked", "read_locked", "write_locked"), "readers",
"waiters", "mmap_size", "acquires", "releases", "recoveries".
Condvar: "waiters", "signals", "mmap_size", "acquires", "releases",
"waits", "timeouts", "recoveries".
Once: "state" ("init", "running", "done"), "is_done", "waiters",
"mmap_size", "acquires", "releases", "waits", "timeouts", "recoveries".
eventfd Integration
my $fd = $obj->eventfd; # create eventfd, returns fd
SV *
stats(self)
SV *self
PREINIT:
EXTRACT_HANDLE("Data::Sync::Shared::RWLock", self);
CODE:
HV *hv = newHV();
SyncHeader *hdr = h->hdr;
uint32_t val = __atomic_load_n(&hdr->value, __ATOMIC_RELAXED);
const char *state;
if (val == 0) state = "unlocked";
else if (val < SYNC_RWLOCK_WRITER_BIT) state = "read_locked";
else state = "write_locked";
hv_store(hv, "state", 5, newSVpv(state, 0), 0);
hv_store(hv, "readers", 7,
newSVuv(val < SYNC_RWLOCK_WRITER_BIT ? val : 0), 0);
hv_store(hv, "waiters", 7, newSVuv((UV)__atomic_load_n(&hdr->waiters, __ATOMIC_RELAXED)), 0);
hv_store(hv, "mmap_size", 9, newSVuv((UV)h->mmap_size), 0);
hv_store(hv, "acquires", 8, newSVuv((UV)__atomic_load_n(&hdr->stat_acquires, __ATOMIC_RELAXED)), 0);
hv_store(hv, "releases", 8, newSVuv((UV)__atomic_load_n(&hdr->stat_releases, __ATOMIC_RELAXED)), 0);
hv_store(hv, "recoveries", 10, newSVuv((UV)__atomic_load_n(&hdr->stat_recoveries, __ATOMIC_RELAXED)), 0);
RETVAL = newRV_noinc((SV *)hv);
OUTPUT:
eg/guard.pl view on Meta::CPAN
}
# ---- Condvar guard ----
{
my $cv = Data::Sync::Shared::Condvar->new(undef);
{
my $g = $cv->lock_guard;
print "cv: lock_guard held\n";
}
# Prove it unlocked by locking again
$cv->lock;
$cv->unlock;
print "cv: lock_guard released correctly\n\n";
}
# ---- Exception safety ----
{
my $sem = Data::Sync::Shared::Semaphore->new(undef, 5);
for my $i (1..5) {
lib/Data/Sync/Shared.pm view on Meta::CPAN
my $s = $obj->stats; # diagnostic hashref
Stats keys vary by type. All counters are approximate under concurrency.
B<Semaphore:> C<value>, C<max>, C<waiters>, C<mmap_size>, C<acquires>,
C<releases>, C<waits>, C<timeouts>, C<recoveries>.
B<Barrier:> C<parties>, C<arrived>, C<generation>, C<waiters>,
C<mmap_size>, C<waits>, C<releases>, C<timeouts>.
B<RWLock:> C<state> (C<"unlocked">, C<"read_locked">, C<"write_locked">),
C<readers>, C<waiters>, C<mmap_size>, C<acquires>, C<releases>,
C<recoveries>.
B<Condvar:> C<waiters>, C<signals>, C<mmap_size>, C<acquires>,
C<releases>, C<waits>, C<timeouts>, C<recoveries>.
B<Once:> C<state> (C<"init">, C<"running">, C<"done">), C<is_done>,
C<waiters>, C<mmap_size>, C<acquires>, C<releases>, C<waits>,
C<timeouts>, C<recoveries>.
/* ---- Cache line 0 (0-63): immutable after create ---- */
uint32_t magic; /* 0 */
uint32_t version; /* 4 */
uint32_t type; /* 8: SYNC_TYPE_* */
uint32_t param; /* 12: type-specific (sem max, barrier count, etc.) */
uint64_t total_size; /* 16: mmap size */
uint8_t _pad0[40]; /* 24-63 */
/* ---- Cache line 1 (64-127): mutable state ---- */
/* Semaphore: value = current count, waiters = blocked acquirers */
/* Barrier: value = arrived count, waiters = blocked at barrier,
generation = increments each time barrier trips */
/* RWLock: value = rwlock word (0=free, N=N readers, 0x80000000|pid=writer),
waiters = blocked lockers */
/* Condvar: value = signal counter (futex word), waiters = blocked waiters,
mutex = associated mutex for predicate protection */
/* Once: value = state (0=INIT, 1=RUNNING|pid, 2=DONE),
waiters = blocked on completion */
uint32_t value; /* 64: primary state word (futex target) */
uint32_t waiters; /* 68: waiter count */
uint32_t generation; /* 72: barrier generation / condvar epoch */
uint32_t mutex; /* 76: condvar mutex (0 or PID|0x80000000) */
uint32_t mutex_waiters; /* 80: condvar mutex waiter count */
uint32_t stat_recoveries;/* 84 */
uint64_t stat_acquires; /* 88 */
uint64_t stat_releases; /* 96 */
uint64_t stat_waits; /* 104 */
static inline void sync_mutex_unlock(SyncHeader *hdr) {
__atomic_store_n(&hdr->mutex, 0, __ATOMIC_RELEASE);
if (__atomic_load_n(&hdr->mutex_waiters, __ATOMIC_RELAXED) > 0)
syscall(SYS_futex, &hdr->mutex, FUTEX_WAKE, 1, NULL, NULL, 0);
}
/* ================================================================
* RWLock helpers (for SYNC_TYPE_RWLOCK)
*
* value == 0: unlocked
* value 1..0x7FFFFFFF: N active readers
* value 0x80000000 | pid: write-locked by pid
* ================================================================ */
#define SYNC_RWLOCK_WRITER_BIT 0x80000000U
#define SYNC_RWLOCK_PID_MASK 0x7FFFFFFFU
#define SYNC_RWLOCK_WR(pid) (SYNC_RWLOCK_WRITER_BIT | ((uint32_t)(pid) & SYNC_RWLOCK_PID_MASK))
static inline int sync_rwlock_try_rdlock(SyncHeader *hdr);
static inline int sync_rwlock_try_wrlock(SyncHeader *hdr);
static inline void sync_recover_stale_rwlock(SyncHeader *hdr, uint32_t observed) {
if (__atomic_compare_exchange_n(lock, &cur, 1,
1, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED))
return;
}
if (__builtin_expect(spin < SYNC_SPIN_LIMIT, 1)) {
sync_spin_pause();
continue;
}
__atomic_add_fetch(w, 1, __ATOMIC_RELAXED);
cur = __atomic_load_n(lock, __ATOMIC_RELAXED);
/* Sleep when write-locked OR yielding to parked writers (cur==0) */
if (cur >= SYNC_RWLOCK_WRITER_BIT || cur == 0) {
long rc = syscall(SYS_futex, lock, FUTEX_WAIT, cur,
&sync_lock_timeout, NULL, 0);
if (rc == -1 && errno == ETIMEDOUT) {
__atomic_sub_fetch(w, 1, __ATOMIC_RELAXED);
if (cur >= SYNC_RWLOCK_WRITER_BIT) {
uint32_t val = __atomic_load_n(lock, __ATOMIC_RELAXED);
if (val >= SYNC_RWLOCK_WRITER_BIT) {
uint32_t pid = val & SYNC_RWLOCK_PID_MASK;
if (!sync_pid_alive(pid))
t/03-rwlock.t view on Meta::CPAN
my $path = tmpnam() . '.shm';
END { unlink $path if $path && -f $path }
# Basic create
my $rw = Data::Sync::Shared::RWLock->new($path);
ok $rw, 'created rwlock';
# Read lock
$rw->rdlock;
my $s = $rw->stats;
is $s->{state}, 'read_locked', 'state is read_locked';
is $s->{readers}, 1, '1 reader';
# Multiple readers
$rw->rdlock;
$s = $rw->stats;
is $s->{readers}, 2, '2 readers';
$rw->rdunlock;
$rw->rdunlock;
$s = $rw->stats;
is $s->{state}, 'unlocked', 'unlocked after rdunlock';
# Write lock
$rw->wrlock;
$s = $rw->stats;
is $s->{state}, 'write_locked', 'state is write_locked';
is $s->{readers}, 0, '0 readers during write';
$rw->wrunlock;
$s = $rw->stats;
is $s->{state}, 'unlocked', 'unlocked after wrunlock';
# try_rdlock / try_wrlock
ok $rw->try_rdlock, 'try_rdlock succeeds when free';
ok !$rw->try_wrlock, 'try_wrlock fails when readers hold';
$rw->rdunlock;
ok $rw->try_wrlock, 'try_wrlock succeeds when free';
ok !$rw->try_rdlock, 'try_rdlock fails when writer holds';
ok !$rw->try_wrlock, 'try_wrlock fails when writer holds';
$rw->wrunlock;
# Downgrade: wrlock -> rdlock
$rw->wrlock;
$s = $rw->stats;
is $s->{state}, 'write_locked', 'write_locked before downgrade';
$rw->downgrade;
$s = $rw->stats;
is $s->{state}, 'read_locked', 'read_locked after downgrade';
is $s->{readers}, 1, '1 reader after downgrade';
ok $rw->try_rdlock, 'can rdlock after downgrade';
$rw->rdunlock;
$rw->rdunlock;
# Path
is $rw->path, $path, 'path correct';
# Reopen existing
my $rw2 = Data::Sync::Shared::RWLock->new($path);
t/07-new-features.t view on Meta::CPAN
# ============================================================
# RWLock: rdlock_guard / wrlock_guard
# ============================================================
{
my $rw = Data::Sync::Shared::RWLock->new(undef);
{
my $g = $rw->rdlock_guard;
my $s = $rw->stats;
is $s->{state}, 'read_locked', 'rdlock_guard locks';
}
my $s = $rw->stats;
is $s->{state}, 'unlocked', 'rdlock_guard unlocks on scope exit';
{
my $g = $rw->wrlock_guard;
$s = $rw->stats;
is $s->{state}, 'write_locked', 'wrlock_guard locks';
}
$s = $rw->stats;
is $s->{state}, 'unlocked', 'wrlock_guard unlocks on scope exit';
}
# ============================================================
# Condvar: lock_guard
# ============================================================
{
my $cv = Data::Sync::Shared::Condvar->new(undef);
{
my $g = $cv->lock_guard;
xt/crash_recovery.t view on Meta::CPAN
# Recovery should happen within ~2s (LOCK_TIMEOUT_SEC).
# ============================================================
{
my $rw = Data::Sync::Shared::RWLock->new(undef);
my $pipe = IO::Pipe->new;
my $pid = fork // die "fork: $!";
if ($pid == 0) {
$pipe->writer;
$rw->wrlock;
print $pipe "locked\n";
$pipe->close;
sleep(60); # hold forever â parent will kill us
_exit(0);
}
$pipe->reader;
<$pipe>; # wait until child holds the lock
$pipe->close;
kill 9, $pid;
waitpid($pid, 0);
xt/crash_recovery.t view on Meta::CPAN
# ============================================================
{
my $rw = Data::Sync::Shared::RWLock->new(undef);
for my $round (1..3) {
my $pipe = IO::Pipe->new;
my $pid = fork // die "fork: $!";
if ($pid == 0) {
$pipe->writer;
$rw->wrlock;
print $pipe "locked\n";
$pipe->close;
sleep(60);
_exit(0);
}
$pipe->reader;
<$pipe>;
$pipe->close;
kill 9, $pid;
waitpid($pid, 0);
xt/crash_recovery.t view on Meta::CPAN
# and be able to lock.
# ============================================================
{
my $cv = Data::Sync::Shared::Condvar->new(undef);
my $pipe = IO::Pipe->new;
my $pid = fork // die "fork: $!";
if ($pid == 0) {
$pipe->writer;
$cv->lock;
print $pipe "locked\n";
$pipe->close;
sleep(60); # hold forever
_exit(0);
}
$pipe->reader;
<$pipe>;
$pipe->close;
kill 9, $pid;
waitpid($pid, 0);
xt/guard_stress.t view on Meta::CPAN
# ============================================================
# 4. RWLock rdlock_guard: rapid create/destroy
# ============================================================
{
my $rw = Data::Sync::Shared::RWLock->new(undef);
for (1..$CYCLES) {
my $g = $rw->rdlock_guard;
}
my $s = $rw->stats;
is $s->{state}, 'unlocked', "rdlock_guard: unlocked after $CYCLES cycles";
}
# ============================================================
# 5. RWLock wrlock_guard: rapid create/destroy
# ============================================================
{
my $rw = Data::Sync::Shared::RWLock->new(undef);
for (1..$CYCLES) {
my $g = $rw->wrlock_guard;
}
my $s = $rw->stats;
is $s->{state}, 'unlocked', "wrlock_guard: unlocked after $CYCLES cycles";
}
# ============================================================
# 6. RWLock guard under fork contention (mixed rd/wr)
# ============================================================
{
my $rw = Data::Sync::Shared::RWLock->new(undef);
my $per = int($CYCLES / $WORKERS);
my @pids;
xt/guard_stress.t view on Meta::CPAN
_exit(0);
}
push @pids, $pid;
}
my $ok = 1;
for (@pids) { waitpid($_, 0); $ok = 0 if $? }
ok $ok, "rwlock guard fork: $WORKERS workers x $per mixed cycles";
my $s = $rw->stats;
is $s->{state}, 'unlocked', "rwlock guard fork: unlocked after contention";
}
# ============================================================
# 7. Condvar lock_guard: rapid create/destroy
# ============================================================
{
my $cv = Data::Sync::Shared::Condvar->new(undef);
for (1..$CYCLES) {
my $g = $cv->lock_guard;
}
ok 1, "condvar lock_guard: survived $CYCLES cycles";
# verify not deadlocked
$cv->lock;
$cv->unlock;
ok 1, "condvar lock_guard: lock still acquirable";
}
# ============================================================
# 8. Guard exception safety: die inside guarded scope
# ============================================================
{
my $sem = Data::Sync::Shared::Semaphore->new(undef, 5);
xt/guard_stress.t view on Meta::CPAN
is $sem->value, 5, "sem guard exception: value intact after 1000 die cycles";
my $rw = Data::Sync::Shared::RWLock->new(undef);
for (1..1000) {
eval {
my $g = $rw->wrlock_guard;
die "boom" if $_ % 3 == 0;
};
}
my $s = $rw->stats;
is $s->{state}, 'unlocked', "rwlock guard exception: unlocked after 1000 die cycles";
}
# ============================================================
# 9. Nested guards
# ============================================================
{
my $sem = Data::Sync::Shared::Semaphore->new(undef, 10);
{
my $g1 = $sem->acquire_guard(3);
is $sem->value, 7, 'nested guard: outer took 3';
xt/persistence.t view on Meta::CPAN
print \$s->value, "\\n";
print \$s->max, "\\n";
' 2>&1`;
chomp $out;
my @lines = split /\n/, $out;
is $lines[0], '7', 'child process: sem value persisted (7)';
is $lines[1], '10', 'child process: sem max persisted (10)';
unlink $path;
}
# 2. RWLock: unlocked state persists (no lock held across exec)
{
my $path = tmpnam() . '.shm';
my $rw = Data::Sync::Shared::RWLock->new($path);
$rw->wrlock;
$rw->wrunlock;
$rw->sync;
my $out = `$perl -Mblib -MData::Sync::Shared -e '
my \$rw = Data::Sync::Shared::RWLock->new("\Q$path\E");
my \$s = \$rw->stats;
print \$s->{state}, "\\n";
print \$s->{acquires}, "\\n";
' 2>&1`;
chomp $out;
my @lines = split /\n/, $out;
is $lines[0], 'unlocked', 'child: rwlock state persisted (unlocked)';
ok $lines[1] > 0, 'child: rwlock acquires counter persisted';
unlink $path;
}
# 3. Once: done state persists
{
my $path = tmpnam() . '.shm';
my $once = Data::Sync::Shared::Once->new($path);
$once->enter;
$once->done;
xt/signal_safety.t view on Meta::CPAN
# fire SIGALRM every 10ms
ualarm(10_000, 10_000);
$rw->wrlock(5.0);
ualarm(0);
$rw->wrunlock;
waitpid($pid, 0);
ok $alarms > 0, "rwlock: received $alarms SIGALRM during wait";
my $s = $rw->stats;
is $s->{state}, 'unlocked', 'rwlock state clean after SIGALRM';
}
# 3. Condvar: SIGALRM during wait
{
my $cv = Data::Sync::Shared::Condvar->new(undef);
my $alarms = 0;
local $SIG{ALRM} = sub { $alarms++ };
my $pid = fork // die "fork: $!";
xt/sigterm_cleanup.t view on Meta::CPAN
$c->acquire;
# Suspend here; parent will SIGTERM us
sleep 60;
_exit(0);
}
select undef, undef, undef, 0.2;
kill TERM => $pid;
waitpid $pid, 0;
# Parent should still operate. If DESTROY deadlocked holding a lock, we'd hang.
my $t0 = time;
my $got = $sem->acquire(2.0);
my $dt = time - $t0;
ok $got, 'acquire succeeded after child SIGTERM';
ok $dt < 3, sprintf('no deadlock (%.2fs)', $dt);
unlink $path;
done_testing;