Coro-DataPipe

 view release on metacpan or  search on metacpan

lib/Coro/DataPipe.pm  view on Meta::CPAN

    my $result = $pipes[$#pipes]->{output};
    # @pipes=() kills parent
    # as well as its implicit destroying
    # destroy pipes one by one if you want to survive!!! 
    undef $_ for @pipes;
    return unless defined(wantarray);
    return unless $result;
    return wantarray?@$result:$result;
}

sub run_pipes {
    my ($prev_busy,$me,@next) = @_;
    while (1) {
        my $data_loaded = $me->load_data;        
        my $me_busy = $data_loaded || $me->busy_processors;
        # get processed data 
        schedule if $me_busy;
        # push it to next pipe
        $me_busy = $data_loaded || $me->busy_processors;
        my $next_busy = @next && run_pipes($prev_busy || $me_busy, @next);
        # I am busy either when I am already busy or my child are busy
        $me_busy ||= $next_busy;
        # pipeline is free if every pipe is free and no more data to process
        return 0 unless $me_busy || $data_loaded;
        # get data from pipe if we have free_processors
        return $me_busy if $prev_busy && $me->free_processors;
    }
    return 0;
}

sub set_input_iterator {
    my ($self,$param) = @_;
    my ($input_iterator) = extract_param($param, qw(input));
    if (ref($input_iterator) ne 'CODE') {
        die "array or code reference expected for input_iterator" unless ref($input_iterator) eq 'ARRAY';
        my $queue = $input_iterator;
        $input_iterator = sub {$queue?shift(@$queue):undef};
    }
    $self->{input_iterator} = $input_iterator;
}

sub set_output_iterator {
    my ($self,$param) = @_;
    my ($output_iterator) = extract_param($param, qw(output));
    if (ref($output_iterator) ne 'CODE') {
        my $queue = $output_iterator || [];
        $self->{output} = $queue;
        $output_iterator = sub {push @$queue,$_};
    }
    $self->{output_iterator} = $output_iterator;    
}

sub set_process_iterator {
    my ($self,$param) = @_;
    my $process_data_callback = extract_param($param,qw(process));
    my $main =  $Coro::current;
    $self->{process_iterator} = sub {
        my $data = shift;
        my $item_number = $self->{item_number}++;
        $self->{busy}++;
        my $coro = async {
            local $_ = $data;
            $_ = $process_data_callback->($data);
            $self->{output_iterator}->($_,$item_number);
            $self->{busy}--;
            $main->ready;
        };
    };
}

# loads all free processor with data from input
# return the number of loaded processors
sub load_data {
    my $self = shift;
    my $result = 0;
    while ($self->free_processors) {
        my $data = $self->{input_iterator}->();
        return $result unless defined($data);
        $self->{process_iterator}->($data);
        $result++;
    }
    return $result;
}

sub extract_param {
    my ($param, @alias) = @_;
    return first {defined($_)} map delete($param->{$_}), @alias;
}

sub busy_processors {
    my $self = shift;
    return $self->{busy};
}

sub free_processors {
    my $self = shift;
    return $self->{busy} < $self->{number_of_data_processors};    
}

sub new {
    my ($class, $param) = @_;	
	my $self = {};
    bless $self,$class;
    # this is cooperative, so it's better to set explicit number of processor - your better know when it wins
    my $number_of_data_processors = extract_param($param,'number_of_data_processors');
    unless ($number_of_data_processors) {
        $number_of_data_processors = 2;
        warn "number_of_data_processors set to $number_of_data_processors";
    }
    $self->{number_of_data_processors} = $number_of_data_processors;
    # item_number & busy
    $self->{$_} = 0 for qw(item_number busy);
    $self->set_input_iterator($param);
    $self->set_output_iterator($param);
    $self->set_process_iterator($param);
    my $not_supported = join ", ", keys %$param;
    die "Parameters are redundant or not supported:". $not_supported if $not_supported;	
	return $self;
}

1;



( run in 1.106 second using v1.01-cache-2.11-cpan-e1769b4cff6 )