Data-Pool-Shared
view release on metacpan or search on metacpan
t/02-multiprocess.t view on Meta::CPAN
my $s = $c->alloc;
$c->set($s, $$);
_exit(defined $s ? 0 : 1);
}
}
for (1..$N) { wait }
is $pool->used, $N, "$N children each allocated a slot";
# Verify all slots have valid PIDs
my @pids;
$pool->each_allocated(sub {
push @pids, $pool->get($_[0]);
});
is scalar @pids, $N, 'found N allocated slots';
$pool->reset;
# --- Stale recovery ---
my $idx2 = $pool->alloc;
$pool->set($idx2, 999);
$pid = fork;
die "fork: $!" unless defined $pid;
if ($pid == 0) {
my $c = Data::Pool::Shared::I64->new($path, 100);
# Alloc a slot and die without freeing
my $s = $c->alloc;
$c->set($s, 12345);
_exit(0);
}
waitpid($pid, 0);
is $pool->used, 2, '2 slots allocated (parent + dead child)';
my $recovered = $pool->recover_stale;
is $recovered, 1, 'recovered 1 stale slot';
is $pool->used, 1, '1 slot remains (parent)';
is $pool->get($idx2), 999, 'parent slot untouched';
$pool->free($idx2);
# --- Blocking alloc with futex wakeup ---
# Fill pool to capacity, fork child that frees after delay
$pool->reset;
my @fill;
for (1..100) {
push @fill, $pool->alloc;
}
is $pool->used, 100, 'pool filled to 100';
$pid = fork;
die "fork: $!" unless defined $pid;
if ($pid == 0) {
my $c = Data::Pool::Shared::I64->new($path, 100);
select(undef, undef, undef, 0.1); # sleep 100ms
$c->free($fill[0]); # free one slot
_exit(0);
}
# Parent blocks on alloc, should wake when child frees
my $blocked = $pool->alloc(2.0);
ok defined $blocked, 'blocking alloc succeeded after child freed';
waitpid($pid, 0);
$pool->reset;
# --- Timeout ---
for (1..100) { $pool->alloc }
my $t0 = time;
my $to = $pool->alloc(0.2);
ok !defined $to, 'alloc timed out';
ok time - $t0 < 10, 'timeout was reasonably fast';
$pool->reset;
# --- Anonymous pool across fork ---
my $anon = Data::Pool::Shared::I64->new(undef, 10);
my $ai = $anon->alloc;
$anon->set($ai, 77);
$pid = fork;
die "fork: $!" unless defined $pid;
if ($pid == 0) {
# Child inherits mmap from fork
_exit($anon->get($ai) == 77 ? 0 : 1);
}
waitpid($pid, 0);
is $? >> 8, 0, 'anonymous pool shared across fork';
$anon->free($ai);
# --- Atomic operations across processes ---
$pool->reset;
my $counter = $pool->alloc;
$pool->set($counter, 0);
my $WORKERS = 10;
my $ITERS = 1000;
for (1..$WORKERS) {
my $p = fork;
die "fork: $!" unless defined $p;
if ($p == 0) {
my $c = Data::Pool::Shared::I64->new($path, 100);
for (1..$ITERS) {
$c->add($counter, 1);
}
_exit(0);
}
}
for (1..$WORKERS) { wait }
is $pool->get($counter), $WORKERS * $ITERS, 'atomic add correct across processes';
$pool->free($counter);
done_testing;
( run in 2.401 seconds using v1.01-cache-2.11-cpan-e1769b4cff6 )