Atomic-Pipe

 view release on metacpan or  search on metacpan

t/stress_fifo.t  view on Meta::CPAN

use Test2::V0;
use Test2::IPC;
use Atomic::Pipe;

BEGIN { *PIPE_BUF = Atomic::Pipe->can('PIPE_BUF') }

BEGIN {
    my $path = __FILE__;
    $path =~ s{[^/]+\.t$}{worker.pm};
    require "./$path";
}

use POSIX qw/mkfifo/;
use File::Temp qw/tempdir/;
use File::Spec;

my $tempdir = tempdir(CLEANUP => 1);
my $fifo = File::Spec->catfile($tempdir, 'fifo');
unless (eval { mkfifo($fifo, 0700) or die "Failed to make fifo: $!" }) {
    die $@ unless $@ =~ m/not implemented on this architecture/;
    skip_all $@;
};

my $r = Atomic::Pipe->read_fifo($fifo);

my $COUNT = 10_000;

worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("aaa" x PIPE_BUF) for 1 .. $COUNT };
worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("bbb" x PIPE_BUF) for 1 .. $COUNT };
worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("ccc" x PIPE_BUF) for 1 .. $COUNT };
worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("ddd" x PIPE_BUF) for 1 .. $COUNT };
worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("eee" x PIPE_BUF) for 1 .. $COUNT };

# Without this windows blocks in the main thread and the other threads never do their work.
sleep 4 if $^O eq 'MSWin32';

my %seen;
while (my $msg = $r->read_message) {
    is(
        $msg,
        in_set(
            ("aaa" x PIPE_BUF),
            ("bbb" x PIPE_BUF),
            ("ccc" x PIPE_BUF),
            ("ddd" x PIPE_BUF),
            ("eee" x PIPE_BUF),
        ),
        "Message is valid, not mangled"
    );

    $seen{substr($msg, 0, 1)}++;

    last if ++$seen{TOTAL} >= (5 * $COUNT);
}

delete $seen{TOTAL};

is(
    \%seen,
    {a => $COUNT, b => $COUNT, c => $COUNT, d => $COUNT, e => $COUNT},
    "Got all $COUNT messages from each process"
);

cleanup();
done_testing;



( run in 1.971 second using v1.01-cache-2.11-cpan-39bf76dae61 )