Parallel-DataPipe
view release on metacpan or search on metacpan
lib/Parallel/DataPipe.pm view on Meta::CPAN
# @pipes=() kills parent
# as well as its implicit destroying
# destroy pipes one by one if you want to survive!!!
undef $_ for @pipes;
return unless defined(wantarray);
return unless $result;
return wantarray?@$result:$result;
}
sub run_pipes {
my ($prev_busy,$me,@next) = @_;
my $me_busy = $me->load_data || $me->busy_processors;
while ($me_busy) {
$me->receive_and_merge_data;
$me_busy = $me->load_data || $me->busy_processors;
my $next_busy = @next && run_pipes($prev_busy || $me_busy, @next);
$me_busy ||= $next_busy;
# get data from pipe if we have free_processors
return $me_busy if $prev_busy && $me->free_processors;
}
return 0;
}
# input_iterator is either array or subroutine reference which get's data from queue or other way and returns it
# if there is no data it returns undef
sub input_iterator {
my $self = shift;
$self->{input_iterator}->(@_);
}
lib/Parallel/DataPipe.pm view on Meta::CPAN
$number_of_data_processors = $self->number_of_cpu_cores unless $number_of_data_processors;
die "process_data parameter should be code ref" unless ref($process_data_callback) eq 'CODE';
die "\$number_of_data_processors:undefined" unless defined($number_of_data_processors);
return [map $self->_create_data_processor($process_data_callback, $init_data_processor, $_), 0..$number_of_data_processors-1];
}
sub load_data_processor {
my ($self,$data,$processor) = @_;
$processor->{item_number} = $self->{item_number}++;
die "no support of data processing for undef items!" unless defined($data);
$processor->{busy} = 1;
$self->_put_data($processor->{child_write},$data);
}
sub busy_processors {
my $self = shift;
return grep $_->{busy}, @{$self->{processors}};
}
sub free_processors {
my $self = shift;
return grep !$_->{busy}, @{$self->{processors}};
}
sub receive_and_merge_data {
my $self = shift;
my ($processors,$ready) = @{$self}{qw(processors ready)};
$self->{ready} = $ready = [] unless $ready;
@$ready = IO::Select->new(map $_->{busy} && $_->{parent_read},@$processors)->can_read() unless @$ready;
my $fh = shift(@$ready);
my $processor = first {$_->{parent_read} == $fh} @$processors;
local $_ = $self->_get_data($fh);
$processor->{busy} = undef; # make processor free
$self->output_iterator($_,$processor->{item_number});
}
sub _kill_data_processors {
my ($self) = @_;
my $processors = $self->{processors};
my @pid_to_kill = map $_->{pid}, @$processors;
my %pid_to_wait = map {$_=>undef} @pid_to_kill;
# put undef to input of data_processor - they know it's time to exit
$self->_put_data($_->{child_write}) for @$processors;
lib/Parallel/DataPipe.pm view on Meta::CPAN
3) Child processes data and returns result back to parent using pipe.
4) Parent firstly fills up all the pipes to children with data and then
starts to expect processed data on pipes from children.
5) If it receives result from chidlren it sends processed data to C<data_merge> subroutine,
and starts loop 2) again.
6) loop 2) continues until input data is ended (end of C<input_iterator> array or C<input_iterator> sub returned undef).
7) In the end parent expects processed data from all busy chidlren and puts processed data to C<data_merge>
8) After having all the children sent processed data they are killed and run returns to the caller.
Note:
If C<input_iterator> or <process_data> returns reference, it serialize/deserialize data before/after pipe.
That way you have full control whether data will be serialized on IPC.
=head1 SEE ALSO
L<fork|http://perldoc.perl.org/functions/fork.html>
( run in 0.237 second using v1.01-cache-2.11-cpan-3cd7ad12f66 )