Parallel-DataPipe

 view release on metacpan or  search on metacpan

lib/Parallel/DataPipe.pm  view on Meta::CPAN

package Parallel::DataPipe;

our $VERSION='0.12';
use 5.8.0;
use strict;
use warnings;
use IO::Select;
use List::Util qw(first min);
use constant PIPE_MAX_CHUNK_SIZE => $^O =~ m{linux|cygwin}? 16*1024 : 1024;
use constant _EOF_ => (-(1<<31));

sub run {
    my $param = {};
    my ($input,$map,$output) = @_;
    if (ref($input) eq 'HASH') {
        $param = $input;
    } else {
        $param = {input=>$input, process=>$map, output=>$output };
    }
    pipeline($param);
}

sub pipeline {
    my $class=shift;
    if (ref($class) eq 'HASH') {
        unshift @_, $class;
        $class = __PACKAGE__;
    }
    my @pipes;
    # init pipes
    my $default_input;
    for my $param (@_) {
        unless (exists $param->{input}) {
            $param->{input} = $default_input or die "You have to specify input for the first pipe";
        }
        my $pipe = $class->new($param);
        if (ref($pipe->{output}) eq 'ARRAY') {
            $default_input = $pipe->{output};
        }
        push @pipes, $pipe;
    }
    run_pipes(0,@pipes);
    my $result = $pipes[$#pipes]->{output};
    # @pipes=() kills parent
    # as well as its implicit destroying
    # destroy pipes one by one if you want to survive!!!
    undef $_ for @pipes;
    return unless defined(wantarray);
    return unless $result;
    return wantarray?@$result:$result;
}

sub run_pipes {
    my ($prev_busy,$me,@next) = @_;
    my $me_busy = $me->load_data || $me->busy_processors;
    while ($me_busy) {
        $me->receive_and_merge_data;
        $me_busy = $me->load_data || $me->busy_processors;
        my $next_busy = @next && run_pipes($prev_busy || $me_busy, @next);
        $me_busy ||= $next_busy;
        # get data from pipe if we have free_processors
        return $me_busy if $prev_busy && $me->free_processors;
    }
    return 0;
}

# input_iterator is either array or subroutine reference which get's data from queue or other way and returns it
# if there is no data it returns undef
sub input_iterator {
    my $self = shift;
    $self->{input_iterator}->(@_);
}

sub output_iterator {
    my $self = shift;
    $self->{output_iterator}->(@_);
}

# this is to set/create input iterator
sub set_input_iterator {
    my ($self,$param) = @_;
    my $old_behaviour = $param->{input_iterator};
    my ($input_iterator) = extract_param($param, qw(input_iterator input queue data));
    unless (ref($input_iterator) eq 'CODE') {
        die "array or code reference expected for input_iterator" unless ref($input_iterator) eq 'ARRAY';
        my $queue = $input_iterator;
        $self->{input} = $queue;
        if ($old_behaviour) {
            my $l = @$queue;
            my $i = 0;
            $input_iterator = sub {$i<$l?$queue->[$i++]:undef};
        } else {
            # this behaviour is introduced with 0.06
            $input_iterator = sub {$queue?shift(@$queue):undef};
        }
    }
    $self->{input_iterator} = $input_iterator;
}

sub set_output_iterator {
    my ($self,$param) = @_;
    my ($output_iterator) = extract_param($param, qw(merge_data output_iterator output output_queue output_data merge reduce));
    unless (ref($output_iterator) eq 'CODE') {
        my $queue = $output_iterator || [];
        $self->{output} = $queue;
        $output_iterator = sub {push @$queue,$_};
    }
    $self->{output_iterator} = $output_iterator;
}

# loads all free processor with data from input
# return the number of loaded processors
sub load_data {
    my $self = shift;
    my @free_processors = $self->free_processors;
    my $result = 0;
    for my $processor (@free_processors) {
        my $data = $self->input_iterator;
        # return number of processors loaded
        return $result unless defined($data);
        $result++;
        $self->load_data_processor($data,$processor);

lib/Parallel/DataPipe.pm  view on Meta::CPAN

        #print "say goodbye - can't fork!\n"; <>;
        die "can't fork!";
    }
    if ($pid == 0) {
        local $SIG{TERM} = sub {
            exit;
        }; # exit silently from data processors
        # data processor is eternal loop which wait for raw data on pipe from main
        # data processor is killed when it's not needed anymore by _kill_data_processors
        $init_data_processor->() if ref($init_data_processor) && ref($init_data_processor) eq 'CODE';
        $data_processor_callback->() while (1);
        exit;
    }
    return $pid;
}

sub _create_data_processor {
    my ($self,$process_data_callback, $init_data_processor) = @_;

    # parent <=> child pipes
    my ($parent_read, $parent_write) = pipely();
    my ($child_read, $child_write) = pipely();

    my $data_processor = sub {
        local $_ = $self->_get_data($child_read);
        unless (defined($_)) {
            exit 0;
        }
        $_ = $process_data_callback->($_);
        $self->_put_data($parent_write,$_);
    };

    # return data processor record
    return {
        pid => _fork_data_processor($data_processor,$init_data_processor),  # needed to kill processor when there is no more data to process
        child_write => $child_write,                 # pipe to write raw data from main to data processor
        parent_read => $parent_read,                 # pipe to write raw data from main to data processor
    };
}

sub extract_param {
    my ($param, @alias) = @_;
    return first {defined($_)} map delete($param->{$_}), @alias;
}

sub create_data_processors {
    my ($self,$param) = @_;
    my $process_data_callback = extract_param($param,qw(process_data process processor map));
    my $init_data_processor = extract_param($param,qw(init_data_processor));
    my $number_of_data_processors = extract_param($param,qw(number_of_data_processors number_of_processors));
    $number_of_data_processors = $self->number_of_cpu_cores unless $number_of_data_processors;
    die "process_data parameter should be code ref" unless ref($process_data_callback) eq 'CODE';
	die "\$number_of_data_processors:undefined" unless defined($number_of_data_processors);
    return [map $self->_create_data_processor($process_data_callback, $init_data_processor, $_), 0..$number_of_data_processors-1];
}

sub load_data_processor {
	my ($self,$data,$processor) = @_;
    $processor->{item_number} = $self->{item_number}++;
    die "no support of data processing for undef items!" unless defined($data);
    $processor->{busy} = 1;
    $self->_put_data($processor->{child_write},$data);
}

sub busy_processors {
    my $self = shift;
    return grep $_->{busy}, @{$self->{processors}};
}

sub free_processors {
    my $self = shift;
    return grep !$_->{busy}, @{$self->{processors}};
}

sub receive_and_merge_data {
	my $self = shift;
    my ($processors,$ready) = @{$self}{qw(processors ready)};
    $self->{ready} = $ready = [] unless $ready;
    @$ready = IO::Select->new(map $_->{busy} && $_->{parent_read},@$processors)->can_read() unless @$ready;
    my $fh = shift(@$ready);
    my $processor = first {$_->{parent_read} == $fh} @$processors;
    local $_ = $self->_get_data($fh);
    $processor->{busy} = undef; # make processor free
    $self->output_iterator($_,$processor->{item_number});
}

sub _kill_data_processors {
    my ($self) = @_;
    my $processors = $self->{processors};
    my @pid_to_kill = map $_->{pid}, @$processors;
    my %pid_to_wait = map {$_=>undef} @pid_to_kill;
    # put undef to input of data_processor - they know it's time to exit
    $self->_put_data($_->{child_write}) for @$processors;
    while (@pid_to_kill) {
        my $pid = wait;
        delete $pid_to_wait{$pid};
        @pid_to_kill = keys %pid_to_wait;
    }
}

sub new {
    my ($class, $param) = @_;
	my $self = {mypid=>$$};
    bless $self,$class;
    $self->set_input_iterator($param);
    # item_number for merge implementation
    $self->{item_number} = 0;
    # check if user want to use alternative serialisation routines
    $self->init_serializer($param);
    # @$processors is array with data processor info
    $self->{processors} = $self->create_data_processors($param);
    # data_merge is sub which merge all processed data inside parent thread
    # it is called each time after process_data returns some new portion of data
    $self->set_output_iterator($param);
    my $not_supported = join ", ", keys %$param;
    die "Parameters are redundant or not supported:". $not_supported if $not_supported;
	return $self;
}

sub DESTROY {
	my $self = shift;
    return unless $self->{mypid} == $$;
    $self->_kill_data_processors;
    #semctl($self->{sem_id},0,IPC_RMID,0);
}

=begin comment

Why I copied IO::Pipely::pipely instead of use IO::Pipely qw(pipely)?
1. Do not depend on installation of additional module
2. I don't know (yet) how to win race condition:
A) In Makefile.PL I would to check if fork & pipe works on the platform before creating Makefile.
But I am not sure if it's ok that at that moment I can use pipely to create pipes.
so
B) to use pipely I have to create makefile
For now I decided just copy code for pipely into this module.
Then if I know how do win that race condition I will get rid of this code and
will use IO::Pipely qw(pipely) instead and
will add dependency on it.

=end comment

=cut

lib/Parallel/DataPipe.pm  view on Meta::CPAN

to produce an input data for next pipe.
This is recursively applied for all chain of pipes.

Here is parallel grep implemented in 40 lines of perl code:

  use List::More qw(part);
  my @dirs = '.';
  my @files;
  pipeline(
    # this pipe looks (recursively) for all files in specified @dirs
    {
        input => \@dirs,
        process => sub {
            my ($files,$dirs) = part -d?1:0,glob("$_/*");
            return [$files,$dirs];
        },
        output => sub {
            my ($files,$dirs) = @$_;
            push @dirs,@$dirs;# recursion is here
            push @files,@$files;
        },
    },
    # this pipe grep files for word hello
    {
        input => \@files,
        process => sub {
            my ($file) = $_;
            open my $fh, $file;
            my @lines;
            while (<$fh>) {
                # line_number : line
                push @lines,"$.:$_" if m{hello};
            }
            return [$file,\@lines];
        },
        output => sub {
            my ($file,$lines) = @$_;
            # print filename, line_number , line
            print "$file:$_" for @$lines;
        }
    }
  );

=head1 HOW parallel pipe (run) WORKS

1) Main thread (parent) forks C<number_of_data_processors> of children for processing data.

2) As soon as data comes from C<input_iterator> it sends it to next child using
pipe mechanizm.

3) Child processes data and returns result back to parent using pipe.

4) Parent firstly fills up all the pipes to children with data and then
starts to expect processed data on pipes from children.

5) If it receives result from chidlren it sends processed data to C<data_merge> subroutine,
and starts loop 2) again.

6) loop 2) continues until input data is ended (end of C<input_iterator> array or C<input_iterator> sub returned undef).

7) In the end parent expects processed data from all busy chidlren and puts processed data to C<data_merge>

8) After having all the children sent processed data they are killed and run returns to the caller.

Note:
 If C<input_iterator> or <process_data> returns reference, it serialize/deserialize data before/after pipe.
 That way you have full control whether data will be serialized on IPC.

=head1 SEE ALSO

L<fork|http://perldoc.perl.org/functions/fork.html>

L<subs::parallel>

L<Parallel::Loops>

L<MCE>

L<IO::Pipely> - pipes that work almost everywhere

L<POE> - portable multitasking and networking framework for any event loop

L<forks>

L<threads>

=head1 DEPENDENCIES

Only core modules are used.

if found it uses Sereal module for serialization instead of Storable as the former is more efficient.

=head1 BUGS

For all bugs please send an email to okharch@gmail.com.

=head1 SOURCE REPOSITORY

See the git source on github
 L<https://github.com/okharch/Parallel-DataPipe>

=head1 COPYRIGHT

Copyright (c) 2013 Oleksandr Kharchenko <okharch@gmail.com>

All right reserved. This library is free software; you can redistribute it
and/or modify it under the same terms as Perl itself.

=head1 AUTHOR

  Oleksandr Kharchenko <okharch@gmail.com>

=cut



( run in 0.896 second using v1.01-cache-2.11-cpan-39bf76dae61 )