Atomic-Pipe
view release on metacpan or search on metacpan
- Add non-blocking write (*nix, and windows too!)
0.012 2020-12-13 18:30:33-08:00 America/Los_Angeles
- Fix some tests that were breaking Test2's IPC
- Fix missing PIPE->autoflush() on some platforms
0.011 2020-12-12 22:01:30-08:00 America/Los_Angeles
- Fix non-blocking mode in windows
- Single implementation of IPC tests where fork or threads are picked as
needed.
0.010 2020-12-12 15:18:53-08:00 America/Los_Angeles
- Fix win32
0.009 2020-12-10 22:46:18-08:00 America/Los_Angeles
- Add fifo support
NAME
Atomic::Pipe - Send atomic messages from multiple writers across a
POSIX pipe.
DESCRIPTION
Normally if you write to a pipe from multiple processes/threads, the
messages will come mixed together unpredictably. Some messages may be
interrupted by parts of messages from other writers. This module takes
advantage of some POSIX specifications to allow multiple writers to
send arbitrary data down a pipe in atomic chunks to avoid the issue.
NOTE: This only works for POSIX compliant pipes on POSIX compliant
systems. Also some features may not be available on older systems, or
some platforms.
Also: https://man7.org/linux/man-pages/man7/pipe.7.html
POSIX.1 says that write(2)s of less than PIPE_BUF bytes must be
atomic: the output data is written to the pipe as a contiguous
sequence. Writes of more than PIPE_BUF bytes may be nonatomic: the
kernel may interleave the data with data written by other processes.
POSIX.1 requires PIPE_BUF to be at least 512 bytes. (On Linux,
PIPE_BUF is 4096 bytes.) [...]
Under the hood this module will split your message into small sections
of slightly smaller than the PIPE_BUF limit. Each message will be sent
as 1 atomic chunk with a 4 byte prefix indicating what process id it
came from, what thread id it came from, a chunk ID (in descending
order, so if there are 3 chunks the first will have id 2, the second 1,
and the final chunk is always 0 allowing a flush as it knows it is
done) and then 1 byte with the length of the data section to follow.
On the receiving end this module will read chunks and re-assemble them
based on the header data. So the reader will always get complete
messages. Note that message order is not guarenteed when messages are
sent from multiple processes or threads. Though all messages from any
given thread/process should be in order.
SYNOPSIS
use Atomic::Pipe;
my ($r, $w) = Atomic::Pipe->pair;
# Chunks will be set to the number of atomic chunks the message was split
# into. It is fine to ignore the value returned, it will always be an
# integer 1 or larger.
# NAME
Atomic::Pipe - Send atomic messages from multiple writers across a POSIX pipe.
# DESCRIPTION
Normally if you write to a pipe from multiple processes/threads, the messages
will come mixed together unpredictably. Some messages may be interrupted by
parts of messages from other writers. This module takes advantage of some POSIX
specifications to allow multiple writers to send arbitrary data down a pipe in
atomic chunks to avoid the issue.
**NOTE:** This only works for POSIX compliant pipes on POSIX compliant systems.
Also some features may not be available on older systems, or some platforms.
Also: [https://man7.org/linux/man-pages/man7/pipe.7.html](https://man7.org/linux/man-pages/man7/pipe.7.html)
POSIX.1 says that write(2)s of less than PIPE_BUF bytes must be
atomic: the output data is written to the pipe as a contiguous
sequence. Writes of more than PIPE_BUF bytes may be nonatomic: the
kernel may interleave the data with data written by other processes.
POSIX.1 requires PIPE_BUF to be at least 512 bytes. (On Linux,
PIPE_BUF is 4096 bytes.) [...]
Under the hood this module will split your message into small sections of
slightly smaller than the PIPE\_BUF limit. Each message will be sent as 1 atomic
chunk with a 4 byte prefix indicating what process id it came from, what thread
id it came from, a chunk ID (in descending order, so if there are 3 chunks the
first will have id 2, the second 1, and the final chunk is always 0 allowing a
flush as it knows it is done) and then 1 byte with the length of the data
section to follow.
On the receiving end this module will read chunks and re-assemble them based on
the header data. So the reader will always get complete messages. Note that
message order is not guarenteed when messages are sent from multiple processes
or threads. Though all messages from any given thread/process should be in
order.
# SYNOPSIS
use Atomic::Pipe;
my ($r, $w) = Atomic::Pipe->pair;
# Chunks will be set to the number of atomic chunks the message was split
# into. It is fine to ignore the value returned, it will always be an
lib/Atomic/Pipe.pm view on Meta::CPAN
*SSIZE_MAX = sub() { 512 };
}
{
# Using the default pipe size as a read size is significantly faster
# than a larger value on my test machine.
my $read_size = min(SSIZE_MAX(), 65_536);
*DEFAULT_READ_SIZE = sub() { $read_size };
}
my $can_thread = 1;
$can_thread &&= $] >= 5.008001;
$can_thread &&= $Config{'useithreads'};
# Threads are broken on perl 5.10.0 built with gcc 4.8+
if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
my @parts = split /\./, $Config{'gccversion'};
$can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
}
$can_thread &&= !$INC{'Devel/Cover.pm'};
if (!$can_thread) {
*_get_tid = sub() { 0 };
}
elsif ($INC{'threads.pm'}) {
*_get_tid = sub() { threads->tid() };
}
else {
*_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
}
if ($^O eq 'MSWin32') {
local $@;
eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
*IS_WIN32 = sub() { 1 };
}
else {
*IS_WIN32 = sub() { 0 };
lib/Atomic/Pipe.pm view on Meta::CPAN
=pod
=encoding UTF-8
=head1 NAME
Atomic::Pipe - Send atomic messages from multiple writers across a POSIX pipe.
=head1 DESCRIPTION
Normally if you write to a pipe from multiple processes/threads, the messages
will come mixed together unpredictably. Some messages may be interrupted by
parts of messages from other writers. This module takes advantage of some POSIX
specifications to allow multiple writers to send arbitrary data down a pipe in
atomic chunks to avoid the issue.
B<NOTE:> This only works for POSIX compliant pipes on POSIX compliant systems.
Also some features may not be available on older systems, or some platforms.
Also: L<https://man7.org/linux/man-pages/man7/pipe.7.html>
POSIX.1 says that write(2)s of less than PIPE_BUF bytes must be
atomic: the output data is written to the pipe as a contiguous
sequence. Writes of more than PIPE_BUF bytes may be nonatomic: the
kernel may interleave the data with data written by other processes.
POSIX.1 requires PIPE_BUF to be at least 512 bytes. (On Linux,
PIPE_BUF is 4096 bytes.) [...]
Under the hood this module will split your message into small sections of
slightly smaller than the PIPE_BUF limit. Each message will be sent as 1 atomic
chunk with a 4 byte prefix indicating what process id it came from, what thread
id it came from, a chunk ID (in descending order, so if there are 3 chunks the
first will have id 2, the second 1, and the final chunk is always 0 allowing a
flush as it knows it is done) and then 1 byte with the length of the data
section to follow.
On the receiving end this module will read chunks and re-assemble them based on
the header data. So the reader will always get complete messages. Note that
message order is not guarenteed when messages are sent from multiple processes
or threads. Though all messages from any given thread/process should be in
order.
=head1 SYNOPSIS
use Atomic::Pipe;
my ($r, $w) = Atomic::Pipe->pair;
# Chunks will be set to the number of atomic chunks the message was split
# into. It is fine to ignore the value returned, it will always be an
my $COUNT = 10_000;
diag("Using count: $COUNT");
worker { $w->write_message("aaa" x PIPE_BUF) for 1 .. $COUNT };
worker { $w->write_message("bbb" x PIPE_BUF) for 1 .. $COUNT };
worker { $w->write_message("ccc" x PIPE_BUF) for 1 .. $COUNT };
worker { $w->write_message("ddd" x PIPE_BUF) for 1 .. $COUNT };
worker { $w->write_message("eee" x PIPE_BUF) for 1 .. $COUNT };
# Without this windows blocks in the main thread and the other threads never do their work.
sleep 2 if $^O eq 'MSWin32';
my %seen;
while (my $msg = $r->read_message) {
is(
$msg,
in_set(
("aaa" x PIPE_BUF),
("bbb" x PIPE_BUF),
("ccc" x PIPE_BUF),
$seen{substr($msg, 0, 1)}++;
last if ++$seen{TOTAL} >= (5 * $COUNT);
}
delete $seen{TOTAL};
is(
\%seen,
{a => $COUNT, b => $COUNT, c => $COUNT, d => $COUNT, e => $COUNT},
"Got all $COUNT messages from each thread"
);
cleanup();
done_testing;
t/stress_fifo.t view on Meta::CPAN
my $r = Atomic::Pipe->read_fifo($fifo);
my $COUNT = 10_000;
worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("aaa" x PIPE_BUF) for 1 .. $COUNT };
worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("bbb" x PIPE_BUF) for 1 .. $COUNT };
worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("ccc" x PIPE_BUF) for 1 .. $COUNT };
worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("ddd" x PIPE_BUF) for 1 .. $COUNT };
worker { my $w = Atomic::Pipe->write_fifo($fifo); $w->write_message("eee" x PIPE_BUF) for 1 .. $COUNT };
# Without this windows blocks in the main thread and the other threads never do their work.
sleep 4 if $^O eq 'MSWin32';
my %seen;
while (my $msg = $r->read_message) {
is(
$msg,
in_set(
("aaa" x PIPE_BUF),
("bbb" x PIPE_BUF),
("ccc" x PIPE_BUF),
t/worker.pm view on Meta::CPAN
use Test2::V0;
use Test2::IPC;
use Test2::Util qw/CAN_REALLY_FORK CAN_THREAD/;
skip_all "This test requires either forking or threads"
unless CAN_REALLY_FORK || CAN_THREAD;
if (CAN_REALLY_FORK) {
diag "Using fork()...\n";
my @pids;
*cleanup = sub() { waitpid($_, 0) for @pids; @pids = () };
*worker = sub(&) {
my ($code) = @_;
t/worker.pm view on Meta::CPAN
return push @pids => $pid if $pid;
my $ok = eval { $code->(); 1 };
my $err = $@;
exit(0) if $ok;
warn $err;
exit 255;
}
}
else {
diag "Using threads...\n";
require threads;
my @threads;
*cleanup = sub() { $_->join for @threads; @threads = () };
*worker = sub(&) {
my ($code) = @_;
# Prevent weird deadlock on win32
if ($^O eq 'MSWin32') {
my $inner = $code;
$code = sub { sleep 2; $inner->() };
}
push @threads => threads->create($code);
};
}
sub note_sleep {
my ($end) = @_;
for (1 .. 10) {
print "# " . ($end - $_ + 1) . "\n";
sleep 1;
}
}
( run in 1.101 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )