Coro-DataPipe
view release on metacpan or search on metacpan
lib/Coro/DataPipe.pm view on Meta::CPAN
my $result = $pipes[$#pipes]->{output};
# @pipes=() kills parent
# as well as its implicit destroying
# destroy pipes one by one if you want to survive!!!
undef $_ for @pipes;
return unless defined(wantarray);
return unless $result;
return wantarray?@$result:$result;
}
sub run_pipes {
my ($prev_busy,$me,@next) = @_;
while (1) {
my $data_loaded = $me->load_data;
my $me_busy = $data_loaded || $me->busy_processors;
# get processed data
schedule if $me_busy;
# push it to next pipe
$me_busy = $data_loaded || $me->busy_processors;
my $next_busy = @next && run_pipes($prev_busy || $me_busy, @next);
# I am busy either when I am already busy or my child are busy
$me_busy ||= $next_busy;
# pipeline is free if every pipe is free and no more data to process
return 0 unless $me_busy || $data_loaded;
# get data from pipe if we have free_processors
return $me_busy if $prev_busy && $me->free_processors;
}
return 0;
}
sub set_input_iterator {
my ($self,$param) = @_;
my ($input_iterator) = extract_param($param, qw(input));
if (ref($input_iterator) ne 'CODE') {
die "array or code reference expected for input_iterator" unless ref($input_iterator) eq 'ARRAY';
my $queue = $input_iterator;
$input_iterator = sub {$queue?shift(@$queue):undef};
}
$self->{input_iterator} = $input_iterator;
}
sub set_output_iterator {
my ($self,$param) = @_;
my ($output_iterator) = extract_param($param, qw(output));
if (ref($output_iterator) ne 'CODE') {
my $queue = $output_iterator || [];
$self->{output} = $queue;
$output_iterator = sub {push @$queue,$_};
}
$self->{output_iterator} = $output_iterator;
}
sub set_process_iterator {
my ($self,$param) = @_;
my $process_data_callback = extract_param($param,qw(process));
my $main = $Coro::current;
$self->{process_iterator} = sub {
my $data = shift;
my $item_number = $self->{item_number}++;
$self->{busy}++;
my $coro = async {
local $_ = $data;
$_ = $process_data_callback->($data);
$self->{output_iterator}->($_,$item_number);
$self->{busy}--;
$main->ready;
};
};
}
# loads all free processor with data from input
# return the number of loaded processors
sub load_data {
my $self = shift;
my $result = 0;
while ($self->free_processors) {
my $data = $self->{input_iterator}->();
return $result unless defined($data);
$self->{process_iterator}->($data);
$result++;
}
return $result;
}
sub extract_param {
my ($param, @alias) = @_;
return first {defined($_)} map delete($param->{$_}), @alias;
}
sub busy_processors {
my $self = shift;
return $self->{busy};
}
sub free_processors {
my $self = shift;
return $self->{busy} < $self->{number_of_data_processors};
}
sub new {
my ($class, $param) = @_;
my $self = {};
bless $self,$class;
# this is cooperative, so it's better to set explicit number of processor - your better know when it wins
my $number_of_data_processors = extract_param($param,'number_of_data_processors');
unless ($number_of_data_processors) {
$number_of_data_processors = 2;
warn "number_of_data_processors set to $number_of_data_processors";
}
$self->{number_of_data_processors} = $number_of_data_processors;
# item_number & busy
$self->{$_} = 0 for qw(item_number busy);
$self->set_input_iterator($param);
$self->set_output_iterator($param);
$self->set_process_iterator($param);
my $not_supported = join ", ", keys %$param;
die "Parameters are redundant or not supported:". $not_supported if $not_supported;
return $self;
}
1;
( run in 1.106 second using v1.01-cache-2.11-cpan-e1769b4cff6 )