Data-Queue-Shared
view release on metacpan or search on metacpan
t/03-multiprocess.t view on Meta::CPAN
my @received;
while (@received < 50) {
my $v = $q->pop_wait(30);
last unless defined $v;
push @received, $v;
}
waitpid($pid, 0);
is scalar @received, 50, 'received all 50 strings';
my @sorted = sort @received;
my @sorted_exp = sort map { "message_$_" } 1..50;
is_deeply \@sorted, \@sorted_exp, 'all string values correct';
$q->unlink;
};
# Str: blocking pop across fork
my $path4 = tmpnam() . '.shm';
END { unlink $path4 if $$ == $main_pid && $path4 && -f $path4 }
subtest 'str blocking pop' => sub {
my $q = Data::Queue::Shared::Str->new($path4, 64, 8192);
my $pid = fork();
die "fork: $!" unless defined $pid;
if ($pid == 0) {
my $cq = Data::Queue::Shared::Str->new($path4, 64, 8192);
select(undef, undef, undef, 0.1);
$cq->push("delayed_message");
POSIX::_exit(0);
}
my $val = $q->pop_wait(30);
waitpid($pid, 0);
is $val, "delayed_message", 'str blocking pop received value';
$q->unlink;
};
# Str: clear() unblocks push_wait
my $path5 = tmpnam() . '.shm';
END { unlink $path5 if $$ == $main_pid && $path5 && -f $path5 }
subtest 'str clear unblocks push_wait' => sub {
my $q = Data::Queue::Shared::Str->new($path5, 4, 4096);
$q->push("x") for 1..4; # fill
my $pid = fork();
die "fork: $!" unless defined $pid;
if ($pid == 0) {
my $cq = Data::Queue::Shared::Str->new($path5, 4, 4096);
select(undef, undef, undef, 0.1);
$cq->clear;
POSIX::_exit(0);
}
my $t0 = time;
ok $q->push_wait("after_clear", 30), 'push_wait succeeded after clear()';
cmp_ok time - $t0, '<', 20, 'push_wait unblocked (not full-timeout hang)';
waitpid($pid, 0);
$q->unlink;
};
# Str: clear() wakes blocked pop_wait consumers
my $path6 = tmpnam() . '.shm';
END { unlink $path6 if $$ == $main_pid && $path6 && -f $path6 }
subtest 'str clear wakes pop_wait' => sub {
my $q = Data::Queue::Shared::Str->new($path6, 4, 4096);
my $pid = fork();
die "fork: $!" unless defined $pid;
if ($pid == 0) {
my $cq = Data::Queue::Shared::Str->new($path6, 4, 4096);
select(undef, undef, undef, 0.1);
$cq->clear; # should wake the blocked consumer
select(undef, undef, undef, 0.1);
$cq->push("after_clear"); # then push something
POSIX::_exit(0);
}
# Parent blocks in pop_wait â clear wakes it, then it re-blocks,
# then the push wakes it again with actual data
my $t0 = time;
my $val = $q->pop_wait(30);
is $val, "after_clear", 'got value pushed after clear';
cmp_ok time - $t0, '<', 20, 'pop_wait returned (not full-timeout hang)';
waitpid($pid, 0);
$q->unlink;
};
done_testing;
( run in 0.646 second using v1.01-cache-2.11-cpan-e1769b4cff6 )