Data-Queue-Shared

 view release on metacpan or  search on metacpan

t/03-multiprocess.t  view on Meta::CPAN

    my @received;
    while (@received < 50) {
        my $v = $q->pop_wait(30);
        last unless defined $v;
        push @received, $v;
    }

    waitpid($pid, 0);
    is scalar @received, 50, 'received all 50 strings';
    my @sorted = sort @received;
    my @sorted_exp = sort map { "message_$_" } 1..50;
    is_deeply \@sorted, \@sorted_exp, 'all string values correct';

    $q->unlink;
};

# Str: blocking pop across fork
my $path4 = tmpnam() . '.shm';
END { unlink $path4 if $$ == $main_pid && $path4 && -f $path4 }

subtest 'str blocking pop' => sub {
    my $q = Data::Queue::Shared::Str->new($path4, 64, 8192);

    my $pid = fork();
    die "fork: $!" unless defined $pid;

    if ($pid == 0) {
        my $cq = Data::Queue::Shared::Str->new($path4, 64, 8192);
        select(undef, undef, undef, 0.1);
        $cq->push("delayed_message");
        POSIX::_exit(0);
    }

    my $val = $q->pop_wait(30);
    waitpid($pid, 0);
    is $val, "delayed_message", 'str blocking pop received value';

    $q->unlink;
};

# Str: clear() unblocks push_wait
my $path5 = tmpnam() . '.shm';
END { unlink $path5 if $$ == $main_pid && $path5 && -f $path5 }

subtest 'str clear unblocks push_wait' => sub {
    my $q = Data::Queue::Shared::Str->new($path5, 4, 4096);
    $q->push("x") for 1..4;  # fill

    my $pid = fork();
    die "fork: $!" unless defined $pid;

    if ($pid == 0) {
        my $cq = Data::Queue::Shared::Str->new($path5, 4, 4096);
        select(undef, undef, undef, 0.1);
        $cq->clear;
        POSIX::_exit(0);
    }

    my $t0 = time;
    ok $q->push_wait("after_clear", 30), 'push_wait succeeded after clear()';
    cmp_ok time - $t0, '<', 20, 'push_wait unblocked (not full-timeout hang)';
    waitpid($pid, 0);

    $q->unlink;
};

# Str: clear() wakes blocked pop_wait consumers
my $path6 = tmpnam() . '.shm';
END { unlink $path6 if $$ == $main_pid && $path6 && -f $path6 }

subtest 'str clear wakes pop_wait' => sub {
    my $q = Data::Queue::Shared::Str->new($path6, 4, 4096);

    my $pid = fork();
    die "fork: $!" unless defined $pid;

    if ($pid == 0) {
        my $cq = Data::Queue::Shared::Str->new($path6, 4, 4096);
        select(undef, undef, undef, 0.1);
        $cq->clear;  # should wake the blocked consumer
        select(undef, undef, undef, 0.1);
        $cq->push("after_clear");  # then push something
        POSIX::_exit(0);
    }

    # Parent blocks in pop_wait — clear wakes it, then it re-blocks,
    # then the push wakes it again with actual data
    my $t0 = time;
    my $val = $q->pop_wait(30);
    is $val, "after_clear", 'got value pushed after clear';
    cmp_ok time - $t0, '<', 20, 'pop_wait returned (not full-timeout hang)';

    waitpid($pid, 0);
    $q->unlink;
};

done_testing;



( run in 0.646 second using v1.01-cache-2.11-cpan-e1769b4cff6 )