Atomic-Pipe

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

NAME

    Atomic::Pipe - Send atomic messages from multiple writers across a
    POSIX pipe.

DESCRIPTION

    Normally if you write to a pipe from multiple processes/threads, the
    messages will come mixed together unpredictably. Some messages may be
    interrupted by parts of messages from other writers. This module takes
    advantage of some POSIX specifications to allow multiple writers to
    send arbitrary data down a pipe in atomic chunks to avoid the issue.

    NOTE: This only works for POSIX compliant pipes on POSIX compliant
    systems. Also some features may not be available on older systems, or
    some platforms.

    Also: https://man7.org/linux/man-pages/man7/pipe.7.html

        POSIX.1 says that write(2)s of less than PIPE_BUF bytes must be
        atomic: the output data is written to the pipe as a contiguous
        sequence.  Writes of more than PIPE_BUF bytes may be nonatomic: the
        kernel may interleave the data with data written by other processes.
        POSIX.1 requires PIPE_BUF to be at least 512 bytes.  (On Linux,
        PIPE_BUF is 4096 bytes.) [...]

    Under the hood this module will split your message into small sections
    of slightly smaller than the PIPE_BUF limit. Each message will be sent
    as 1 atomic chunk with a 4 byte prefix indicating what process id it
    came from, what thread id it came from, a chunk ID (in descending
    order, so if there are 3 chunks the first will have id 2, the second 1,
    and the final chunk is always 0 allowing a flush as it knows it is
    done) and then 1 byte with the length of the data section to follow.

    On the receiving end this module will read chunks and re-assemble them
    based on the header data. So the reader will always get complete
    messages. Note that message order is not guarenteed when messages are
    sent from multiple processes or threads. Though all messages from any
    given thread/process should be in order.

SYNOPSIS

        use Atomic::Pipe;
    
        my ($r, $w) = Atomic::Pipe->pair;
    
        # Chunks will be set to the number of atomic chunks the message was split
        # into. It is fine to ignore the value returned, it will always be an
        # integer 1 or larger.
        my $chunks = $w->write_message("Hello");
    
        # $msg now contains "Hello";
        my $msg = $r->read_message;
    
        # Note, you can set the reader to be non-blocking:
        $r->blocking(0);
    
        # Writer too (but buffers unwritten items until your next write_burst(),
        # write_message(), or flush(), or will do a writing block when the pipe
        # instance is destroyed.
        $w->blocking(0);
    
        # $msg2 will be undef as no messages were sent, and blocking is turned off.
        my $msg2 = $r->read_message;

    Fork example from tests:

        use Test2::V0;
        use Test2::Require::RealFork;
        use Test2::IPC;
        use Atomic::Pipe;
    
        my ($r, $w) = Atomic::Pipe->pair;
    
        # For simplicty
        $SIG{CHLD} = 'IGNORE';
    
        # Forks and runs your coderef, then exits.
        sub worker(&) { ... }
    
        worker { is($w->write_message("aa" x $w->PIPE_BUF), 3, "$$ Wrote 3 chunks") };
        worker { is($w->write_message("bb" x $w->PIPE_BUF), 3, "$$ Wrote 3 chunks") };
        worker { is($w->write_message("cc" x $w->PIPE_BUF), 3, "$$ Wrote 3 chunks") };
    
        my @messages = ();
        push @messages => $r->read_message for 1 .. 3;
    
        is(
            [sort @messages],
            [sort(('aa' x PIPE_BUF), ('bb' x PIPE_BUF), ('cc' x PIPE_BUF))],
            "Got all 3 long messages, not mangled or mixed, order not guarenteed"
        );
    
        done_testing;

MIXED DATA MODE

    Mixed data mode is a special use-case for Atomic::Pipe. In this mode
    the assumption is that the writer end of the pipe uses the pipe as



( run in 0.617 second using v1.01-cache-2.11-cpan-39bf76dae61 )