Atomic-Pipe
view release on metacpan or search on metacpan
lib/Atomic/Pipe.pm view on Meta::CPAN
package Atomic::Pipe;
use strict;
use warnings;
our $VERSION = '0.023';
use IO();
use Fcntl();
use bytes();
use Carp qw/croak confess/;
use Config qw/%Config/;
use List::Util qw/min/;
use Scalar::Util qw/blessed/;
use Errno qw/EINTR EAGAIN EPIPE/;
my %RETRY_ERRNO;
BEGIN {
%RETRY_ERRNO = (EINTR() => 1);
$RETRY_ERRNO{Errno->ERESTART} = 1 if Errno->can('ERESTART');
}
BEGIN {
# POSIX says writes of 512 or less are atomic, but some platforms allow for
# larger ones.
require POSIX;
if (POSIX->can('PIPE_BUF') && eval { POSIX::PIPE_BUF() }) {
*PIPE_BUF = \&POSIX::PIPE_BUF;
}
else {
*PIPE_BUF = sub() { 512 };
}
if (POSIX->can('SSIZE_MAX') && eval { POSIX::SSIZE_MAX() }) {
*SSIZE_MAX = \&POSIX::SSIZE_MAX;
}
else {
*SSIZE_MAX = sub() { 512 };
}
{
# Using the default pipe size as a read size is significantly faster
# than a larger value on my test machine.
my $read_size = min(SSIZE_MAX(), 65_536);
*DEFAULT_READ_SIZE = sub() { $read_size };
}
my $can_thread = 1;
$can_thread &&= $] >= 5.008001;
$can_thread &&= $Config{'useithreads'};
# Threads are broken on perl 5.10.0 built with gcc 4.8+
if ($can_thread && $] == 5.010000 && $Config{'ccname'} eq 'gcc' && $Config{'gccversion'}) {
my @parts = split /\./, $Config{'gccversion'};
$can_thread = 0 if $parts[0] > 4 || ($parts[0] == 4 && $parts[1] >= 8);
}
$can_thread &&= !$INC{'Devel/Cover.pm'};
if (!$can_thread) {
*_get_tid = sub() { 0 };
}
elsif ($INC{'threads.pm'}) {
*_get_tid = sub() { threads->tid() };
}
else {
*_get_tid = sub() { $INC{'threads.pm'} ? threads->tid() : 0 };
}
if ($^O eq 'MSWin32') {
local $@;
eval { require Win32::API; 1 } or die "non-blocking on windows requires Win32::API please install it.\n$@";
eval { require Win32API::File; 1 } or die "non-blocking on windows requires Win32API::File please install it.\n$@";
*IS_WIN32 = sub() { 1 };
}
else {
*IS_WIN32 = sub() { 0 };
}
}
use constant READ_SIZE => 'read_size';
use constant RH => 'rh';
use constant WH => 'wh';
use constant EOF => 'eof';
use constant STATE => 'state';
use constant OUT_BUFFER => 'out_buffer';
use constant IN_BUFFER => 'in_buffer';
use constant IN_BUFFER_SIZE => 'in_buffer_size';
use constant READ_BLOCKING => 'read_blocking';
use constant WRITE_BLOCKING => 'write_blocking';
use constant BURST_PREFIX => 'burst_prefix';
use constant BURST_POSTFIX => 'burst_postfix';
use constant ADJUSTED_DSIZE => 'adjusted_dsize';
use constant MESSAGE_KEY => 'message_key';
use constant MIXED_BUFFER => 'mixed_buffer';
use constant DELIMITER_SIZE => 'delimiter_size';
use constant INVALID_STATE => 'invalid_state';
use constant HIT_EPIPE => 'hit_epipe';
sub wh { shift->{+WH} }
sub rh { shift->{+RH} }
sub throw_invalid {
my $self = shift;
$self->{+INVALID_STATE} //= @_ ? shift : 'Unknown Error';
confess "Pipe is in an invalid state '$self->{+INVALID_STATE}'";
}
sub read_size {
my $self = shift;
($self->{+READ_SIZE}) = @_ if @_;
return $self->{+READ_SIZE} ||= DEFAULT_READ_SIZE();
}
sub fill_buffer {
my $self = shift;
$self->throw_invalid() if $self->{+INVALID_STATE};
my $rh = $self->{+RH} or die "Not a read handle";
return 0 if $self->{+EOF};
$self->{+IN_BUFFER_SIZE} //= 0;
my $to_read = $self->{+READ_SIZE} || DEFAULT_READ_SIZE();
if (IS_WIN32 && defined($self->{+READ_BLOCKING}) && !$self->{+READ_BLOCKING}) {
lib/Atomic/Pipe.pm view on Meta::CPAN
my %params = @_;
my $state = $self->{+STATE} //= {};
while (1) {
unless ($state->{key}) {
my $key_bytes = $self->_get_from_buffer($psize) or return;
my %key;
@key{qw/pid tid id size/} = unpack('l2L2', $key_bytes);
$state->{key} = \%key;
}
my $key = $state->{key};
my $data = $self->_get_from_buffer($key->{size}, eof_invalid => "EOF before end of message") // return;
my $id = $key->{id};
my $tag = join ':' => @{$key}{qw/pid tid/};
push @{$state->{parts}->{$tag} //= []} => $id;
$state->{buffers}->{$tag} = $state->{buffers}->{$tag} ? $state->{buffers}->{$tag} . $data : $data;
delete $state->{key};
unless ($id == 0) {
return ($id, undef) if $params{one_part_only};
next;
}
my $message = delete $state->{buffers}->{$tag};
my $parts = delete $state->{parts}->{$tag};
return ($id, $message) unless $params{debug};
return (
$id,
{
message => $message,
parts => $parts,
pid => $key->{pid},
tid => $key->{tid},
},
);
}
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
Atomic::Pipe - Send atomic messages from multiple writers across a POSIX pipe.
=head1 DESCRIPTION
Normally if you write to a pipe from multiple processes/threads, the messages
will come mixed together unpredictably. Some messages may be interrupted by
parts of messages from other writers. This module takes advantage of some POSIX
specifications to allow multiple writers to send arbitrary data down a pipe in
atomic chunks to avoid the issue.
B<NOTE:> This only works for POSIX compliant pipes on POSIX compliant systems.
Also some features may not be available on older systems, or some platforms.
Also: L<https://man7.org/linux/man-pages/man7/pipe.7.html>
POSIX.1 says that write(2)s of less than PIPE_BUF bytes must be
atomic: the output data is written to the pipe as a contiguous
sequence. Writes of more than PIPE_BUF bytes may be nonatomic: the
kernel may interleave the data with data written by other processes.
POSIX.1 requires PIPE_BUF to be at least 512 bytes. (On Linux,
PIPE_BUF is 4096 bytes.) [...]
Under the hood this module will split your message into small sections of
slightly smaller than the PIPE_BUF limit. Each message will be sent as 1 atomic
chunk with a 4 byte prefix indicating what process id it came from, what thread
id it came from, a chunk ID (in descending order, so if there are 3 chunks the
first will have id 2, the second 1, and the final chunk is always 0 allowing a
flush as it knows it is done) and then 1 byte with the length of the data
section to follow.
On the receiving end this module will read chunks and re-assemble them based on
the header data. So the reader will always get complete messages. Note that
message order is not guarenteed when messages are sent from multiple processes
or threads. Though all messages from any given thread/process should be in
order.
=head1 SYNOPSIS
use Atomic::Pipe;
my ($r, $w) = Atomic::Pipe->pair;
# Chunks will be set to the number of atomic chunks the message was split
# into. It is fine to ignore the value returned, it will always be an
# integer 1 or larger.
my $chunks = $w->write_message("Hello");
# $msg now contains "Hello";
my $msg = $r->read_message;
# Note, you can set the reader to be non-blocking:
$r->blocking(0);
# Writer too (but buffers unwritten items until your next write_burst(),
# write_message(), or flush(), or will do a writing block when the pipe
# instance is destroyed.
$w->blocking(0);
# $msg2 will be undef as no messages were sent, and blocking is turned off.
my $msg2 = $r->read_message;
Fork example from tests:
use Test2::V0;
use Test2::Require::RealFork;
use Test2::IPC;
use Atomic::Pipe;
my ($r, $w) = Atomic::Pipe->pair;
# For simplicty
$SIG{CHLD} = 'IGNORE';
# Forks and runs your coderef, then exits.
sub worker(&) { ... }
worker { is($w->write_message("aa" x $w->PIPE_BUF), 3, "$$ Wrote 3 chunks") };
worker { is($w->write_message("bb" x $w->PIPE_BUF), 3, "$$ Wrote 3 chunks") };
worker { is($w->write_message("cc" x $w->PIPE_BUF), 3, "$$ Wrote 3 chunks") };
my @messages = ();
push @messages => $r->read_message for 1 .. 3;
is(
[sort @messages],
[sort(('aa' x PIPE_BUF), ('bb' x PIPE_BUF), ('cc' x PIPE_BUF))],
"Got all 3 long messages, not mangled or mixed, order not guarenteed"
);
done_testing;
=head1 MIXED DATA MODE
Mixed data mode is a special use-case for Atomic::Pipe. In this mode the
( run in 1.217 second using v1.01-cache-2.11-cpan-39bf76dae61 )