Coro-DataPipe
view release on metacpan or search on metacpan
lib/Coro/DataPipe.pm view on Meta::CPAN
package Coro::DataPipe;
our $VERSION='0.02';
use 5.006; # Perl::MinimumVersion says that
use strict;
use warnings;
use List::Util qw(first);
use Coro; #qw(schedule async);
# run is single parallel pipe which is trivial case of pipeline
sub run {
pipeline(@_);
}
sub pipeline {
my $class=shift;
if (ref($class) eq 'HASH') {
unshift @_, $class;
$class = __PACKAGE__;
}
my @pipes;
# init pipes
my $default_input;
for my $param (@_) {
unless (exists $param->{input}) {
$param->{input} = $default_input or die "You have to specify input for the first pipe";
}
my $pipe = $class->new($param);
if (ref($pipe->{output}) eq 'ARRAY') {
$default_input = $pipe->{output};
}
push @pipes, $pipe;
}
run_pipes(0,@pipes);
my $result = $pipes[$#pipes]->{output};
# @pipes=() kills parent
# as well as its implicit destroying
# destroy pipes one by one if you want to survive!!!
undef $_ for @pipes;
return unless defined(wantarray);
return unless $result;
return wantarray?@$result:$result;
}
sub run_pipes {
my ($prev_busy,$me,@next) = @_;
while (1) {
my $data_loaded = $me->load_data;
my $me_busy = $data_loaded || $me->busy_processors;
# get processed data
schedule if $me_busy;
# push it to next pipe
$me_busy = $data_loaded || $me->busy_processors;
my $next_busy = @next && run_pipes($prev_busy || $me_busy, @next);
# I am busy either when I am already busy or my child are busy
$me_busy ||= $next_busy;
# pipeline is free if every pipe is free and no more data to process
return 0 unless $me_busy || $data_loaded;
# get data from pipe if we have free_processors
return $me_busy if $prev_busy && $me->free_processors;
}
return 0;
}
sub set_input_iterator {
my ($self,$param) = @_;
my ($input_iterator) = extract_param($param, qw(input));
if (ref($input_iterator) ne 'CODE') {
die "array or code reference expected for input_iterator" unless ref($input_iterator) eq 'ARRAY';
my $queue = $input_iterator;
$input_iterator = sub {$queue?shift(@$queue):undef};
}
$self->{input_iterator} = $input_iterator;
}
sub set_output_iterator {
my ($self,$param) = @_;
my ($output_iterator) = extract_param($param, qw(output));
if (ref($output_iterator) ne 'CODE') {
my $queue = $output_iterator || [];
$self->{output} = $queue;
$output_iterator = sub {push @$queue,$_};
}
$self->{output_iterator} = $output_iterator;
}
sub set_process_iterator {
my ($self,$param) = @_;
my $process_data_callback = extract_param($param,qw(process));
my $main = $Coro::current;
$self->{process_iterator} = sub {
my $data = shift;
my $item_number = $self->{item_number}++;
$self->{busy}++;
my $coro = async {
local $_ = $data;
$_ = $process_data_callback->($data);
$self->{output_iterator}->($_,$item_number);
$self->{busy}--;
$main->ready;
};
};
}
# loads all free processor with data from input
# return the number of loaded processors
sub load_data {
my $self = shift;
my $result = 0;
while ($self->free_processors) {
my $data = $self->{input_iterator}->();
return $result unless defined($data);
$self->{process_iterator}->($data);
$result++;
}
return $result;
}
sub extract_param {
my ($param, @alias) = @_;
return first {defined($_)} map delete($param->{$_}), @alias;
}
sub busy_processors {
my $self = shift;
return $self->{busy};
}
sub free_processors {
my $self = shift;
return $self->{busy} < $self->{number_of_data_processors};
}
sub new {
my ($class, $param) = @_;
my $self = {};
bless $self,$class;
# this is cooperative, so it's better to set explicit number of processor - your better know when it wins
my $number_of_data_processors = extract_param($param,'number_of_data_processors');
unless ($number_of_data_processors) {
$number_of_data_processors = 2;
warn "number_of_data_processors set to $number_of_data_processors";
}
$self->{number_of_data_processors} = $number_of_data_processors;
# item_number & busy
$self->{$_} = 0 for qw(item_number busy);
$self->set_input_iterator($param);
$self->set_output_iterator($param);
$self->set_process_iterator($param);
my $not_supported = join ", ", keys %$param;
die "Parameters are redundant or not supported:". $not_supported if $not_supported;
return $self;
}
1;
=head1 NAME
C<Coro::DataPipe> - parallel data processing conveyor
=encoding utf-8
=head1 SYNOPSIS
use Coro::AnyEvent;
use Coro::DataPipe;
Coro::DataPipe::run {
input => [1..100],
process => sub { Coro::AnyEvent::sleep(1);$_*2 },
number_of_data_processors => 100,
output => sub { print "$_\n" },
};
time perl test.t >/dev/null
# 1 second, not 100!
=head1 DESCRIPTION
This is implementation of L<Parallel::DataPipe> algorithm using cooperative threads (Coro).
See description of alorithm and subroutines there.
This module uses cooperative threads, so all threads share the same memory and no forks are used.
Good use case is when you make some long lasting queries to database/www and then process data
and want to do it asynchronosuly.
In that case even if you have one processor you will win because processor will be always busy thanks to Coro.
=head1 SEE ALSO
L<Coro>
=head1 DEPENDENCIES
It requires Coro 6.31. Alhough may be it will work with more old version - it is just one I use for myself.
=head1 BUGS
For all bugs please send an email to okharch@gmail.com.
=head1 SOURCE REPOSITORY
See the git source on github
L<https://github.com/okharch/Coro-DataPipe>
=head1 COPYRIGHT
Copyright (c) 2013 Oleksandr Kharchenko <okharch@gmail.com>
All right reserved. This library is free software; you can redistribute it
and/or modify it under the same terms as Perl itself.
=head1 AUTHOR
Oleksandr Kharchenko <okharch@gmail.com>
=cut
( run in 1.921 second using v1.01-cache-2.11-cpan-39bf76dae61 )