Consumer-NonBlock

 view release on metacpan or  search on metacpan

README  view on Meta::CPAN

    
        $writer = undef; # Close the output
        $reader = undef; # Close the input, and delete the temp dir

SYNOPSYS WITH FORK

    Normally when the reader is closed it will delete the data. Normally
    when the writer is closed it will mark the data stream as complete. If
    you fork then either of these actions can happen in either process.

    The weaken() method can be used to prevent a reader or writer from
    taking these actions. So in the producer process you want to weaken the
    reader object, and in the consumer process you want to weaken the
    writer object.

        use Consumer::NonBlock;
    
        my ($reader, $writer) = Consumer::NonBlock->pair(batch_size => 100);
    
        my $pid = fork // die "Could not fork: $!";
    
        if ($pid) { # Parent
            # Make sure this process does not delete the temp data
            $reader->weaken;
            $reader->close;
    
            $writer->write_line("Line from the parent");
        }
        else { # Child
            # Make sure this process does not mark the IO as complete
            $writer->weaken;
            $writer->close;
    
            my $line = $reader->read_line;
            print "Got line: $line\n";
        }

IMPLEMENTATION DETAILS

    This module works by having the producer write to temporary files. It
    will rotate files after a specified batch limit. The consumer will

README  view on Meta::CPAN

    $bool = $handle->is_reader()

      Check if handle is a reader.

    $bool = $handle->is_writer()

      Check if handle is a writer.

    $bool = $handle->is_weak()

      Check if the handle has been weakened.

    $handle->weaken()

      Weaken the handle so it will not delete the data dir or close the IO.

    $handle->set_env_var()

      Sets the $ENV{CONSUMER_NONBLOCK_DIR} env var to the temporary data
      dir. This can then be used in a child process with
      Consumer::NonBlock->reader_from_env() to create a reader instance in
      another process.

README.md  view on Meta::CPAN


    $writer = undef; # Close the output
    $reader = undef; # Close the input, and delete the temp dir

# SYNOPSYS WITH FORK

Normally when the reader is closed it will delete the data. Normally when the
writer is closed it will mark the data stream as complete. If you fork then
either of these actions can happen in either process.

The weaken() method can be used to prevent a reader or writer from taking these
actions. So in the producer process you want to weaken the reader object, and
in the consumer process you want to weaken the writer object.

    use Consumer::NonBlock;

    my ($reader, $writer) = Consumer::NonBlock->pair(batch_size => 100);

    my $pid = fork // die "Could not fork: $!";

    if ($pid) { # Parent
        # Make sure this process does not delete the temp data
        $reader->weaken;
        $reader->close;

        $writer->write_line("Line from the parent");
    }
    else { # Child
        # Make sure this process does not mark the IO as complete
        $writer->weaken;
        $writer->close;

        my $line = $reader->read_line;
        print "Got line: $line\n";
    }

# IMPLEMENTATION DETAILS

This module works by having the producer write to temporary files. It will
rotate files after a specified batch limit. The consumer will delete the files

README.md  view on Meta::CPAN

- $bool = $handle->is\_reader()

    Check if handle is a reader.

- $bool = $handle->is\_writer()

    Check if handle is a writer.

- $bool = $handle->is\_weak()

    Check if the handle has been weakened.

- $handle->weaken()

    Weaken the handle so it will not delete the data dir or close the IO.

- $handle->set\_env\_var()

    Sets the `$ENV{CONSUMER_NONBLOCK_DIR}` env var to the temporary data dir. This
    can then be used in a child process with
    `Consumer::NonBlock->reader_from_env()` to create a reader instance in
    another process.

lib/Consumer/NonBlock.pm  view on Meta::CPAN

    <is_writer

    <is_weak

    +fh
    +batch
    +batch_item
    +buffer
};

sub weaken { $_[0]->{+IS_WEAK} = 1 };

sub init {
    my $self = shift;

    croak "Must be either a reader or a writer" unless $self->{+IS_READER} || $self->{+IS_WRITER};
    croak "Must only be a reader or a writer, not both" if $self->{+IS_READER} && $self->{+IS_WRITER};

    my $dir = $self->{+DIR} or croak "'dir' is a required attribute";
    croak "Invalid directory '$dir'" unless -d $dir;

lib/Consumer/NonBlock.pm  view on Meta::CPAN


    $writer = undef; # Close the output
    $reader = undef; # Close the input, and delete the temp dir

=head1 SYNOPSYS WITH FORK

Normally when the reader is closed it will delete the data. Normally when the
writer is closed it will mark the data stream as complete. If you fork then
either of these actions can happen in either process.

The weaken() method can be used to prevent a reader or writer from taking these
actions. So in the producer process you want to weaken the reader object, and
in the consumer process you want to weaken the writer object.

    use Consumer::NonBlock;

    my ($reader, $writer) = Consumer::NonBlock->pair(batch_size => 100);

    my $pid = fork // die "Could not fork: $!";

    if ($pid) { # Parent
        # Make sure this process does not delete the temp data
        $reader->weaken;
        $reader->close;

        $writer->write_line("Line from the parent");
    }
    else { # Child
        # Make sure this process does not mark the IO as complete
        $writer->weaken;
        $writer->close;

        my $line = $reader->read_line;
        print "Got line: $line\n";
    }

=head1 IMPLEMENTATION DETAILS

This module works by having the producer write to temporary files. It will
rotate files after a specified batch limit. The consumer will delete the files

lib/Consumer/NonBlock.pm  view on Meta::CPAN

=item $bool = $handle->is_reader()

Check if handle is a reader.

=item $bool = $handle->is_writer()

Check if handle is a writer.

=item $bool = $handle->is_weak()

Check if the handle has been weakened.

=item $handle->weaken()

Weaken the handle so it will not delete the data dir or close the IO.

=item $handle->set_env_var()

Sets the C<$ENV{CONSUMER_NONBLOCK_DIR}> env var to the temporary data dir. This
can then be used in a child process with
C<< Consumer::NonBlock->reader_from_env() >> to create a reader instance in
another process.

t/simple.t  view on Meta::CPAN

use Test2::V0 -target => 'Consumer::NonBlock';
use ok $CLASS;

my ($r, $w) = Consumer::NonBlock->pair(batch_size => 5);

$w->write_line("foo $_") for 1 .. 12;

my $w2 = bless({%$w}, $CLASS);

$w->weaken;
$w = undef;
ok($w2->_update->{open}, "Still open");
$w2->close();
ok(!$w2, "Closed w2");
ok(!$r->_update->{open}, "Not open");

my $dir = $r->dir;
opendir(my $dh, $dir) or die "Could not open dir '$dir': $!";
my @files = sort grep { $_ !~ m/\./ } readdir($dh);
is(\@files, [0 .. 2, 'data'], "Got all expected files");



( run in 0.877 second using v1.01-cache-2.11-cpan-65fba6d93b7 )