Future-Workflow
view release on metacpan or search on metacpan
lib/Future/Workflow/Pipeline.pm view on Meta::CPAN
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# (C) Paul Evans, 2021-2024 -- leonerd@leonerd.org.uk
use v5.26;
use warnings;
use Object::Pad 0.800;
package Future::Workflow::Pipeline 0.02;
class Future::Workflow::Pipeline;
use Carp;
use Future::AsyncAwait;
=head1 NAME
C<Future::Workflow::Pipeline> - a pipeline of processing stages
=head1 SYNOPSIS
=for highlighter language=perl
# 1: Make a pipeline
my $pipeline = Future::Workflow::Pipeline->new;
# 2: Add some stages to it
# An async stage; e.g. perform an HTTP fetch
my $ua = Net::Future::HTTP->new;
$pipeline->append_stage_async( async sub ($url) {
return await $ua->GET( $url );
});
# A synchronous (in-process) stage; e.g. some HTML parsing
$pipeline->append_stage_sync( sub ($response) {
my $dom = Mojo::DOM->new( $response->decoded_content );
return $dom->at('div[id="main"]')->text;
});
# A detached (out-of-process/thread) stage; e.g. some silly CPU-intensive task
$pipeline->append_stage_detached( sub ($text) {
my $iter = Algorithm::Permute->new([ split m/\s+/, $text ]);
my $best; my $bestscore;
while(my @words = $iter->next) {
my $str = join "\0", @words;
my $score = md5sum( $str );
next if defined $bestscore and $score ge $bestscore;
$best = $str;
$bestscore = $score;
}
return $best;
});
# 3: Give it an output
# These are alternatives:
# An asynchronous output
my $dbh = Database::Async->new( ... );
$pipeline->set_output_async( async sub ($best) {
await $dbh->do('INSERT INTO Results VALUES (?)', $best);
});
# A synchronous output
$pipeline->set_output_sync( sub ($best) {
print "MD5 minimized sort order is:\n";
print " $_\n" for split m/\0/, $best;
});
# 4: Now start it running on some input values
foreach my $url (slurp_lines("urls.txt")) {
await $pipeline->push_input($url);
}
# 5: Wait for it all to finish
await $pipeline->drain;
=head1 DESCRIPTION
Instances of this class implement a "pipeline", a sequence of data-processing
stages. Each stage is represented by a function that is passed a single
argument and should return a result. The pipeline itself stores a function
that will be passed each eventual result.
=head2 Queueing
In front of every stage there exists a queue of pending items. If the first
stage is currently busy when C</push_input> is called, the item is accepted
into its queue instead. Items will be taken from the queue in the order they
were pushed when the stage's work function finishes with prior items.
If the queue between stages is full, then items will remain pending in prior
stages. Ultimately this back-pressure will make its way back to the
C</push_input> method at the beginning of the pipeline.
=cut
=head1 CONSTRUCTOR
$pipeline = Future::Workflow::Pipeline->new;
The constructor takes no additional parameters.
=cut
field $_output;
field @_stages;
=head1 METHODS
=cut
=head2 set_output
$pipeline->set_output( $code );
await $code->( $result );
Sets the destination output for the pipeline. Each completed work item will be
passed to the invoked function, which is expected to return a C<Future>.
=cut
method set_output ( $code )
( run in 1.083 second using v1.01-cache-2.11-cpan-71847e10f99 )