Future-Workflow
view release on metacpan or search on metacpan
lib/Future/Workflow/Pipeline.pm view on Meta::CPAN
# You may distribute under the terms of either the GNU General Public License
# or the Artistic License (the same terms as Perl itself)
#
# (C) Paul Evans, 2021-2024 -- leonerd@leonerd.org.uk
use v5.26;
use warnings;
use Object::Pad 0.800;
package Future::Workflow::Pipeline 0.02;
class Future::Workflow::Pipeline;
use Carp;
use Future::AsyncAwait;
=head1 NAME
C<Future::Workflow::Pipeline> - a pipeline of processing stages
=head1 SYNOPSIS
=for highlighter language=perl
# 1: Make a pipeline
my $pipeline = Future::Workflow::Pipeline->new;
# 2: Add some stages to it
# An async stage; e.g. perform an HTTP fetch
my $ua = Net::Future::HTTP->new;
$pipeline->append_stage_async( async sub ($url) {
return await $ua->GET( $url );
});
# A synchronous (in-process) stage; e.g. some HTML parsing
$pipeline->append_stage_sync( sub ($response) {
my $dom = Mojo::DOM->new( $response->decoded_content );
return $dom->at('div[id="main"]')->text;
});
# A detached (out-of-process/thread) stage; e.g. some silly CPU-intensive task
$pipeline->append_stage_detached( sub ($text) {
my $iter = Algorithm::Permute->new([ split m/\s+/, $text ]);
my $best; my $bestscore;
while(my @words = $iter->next) {
my $str = join "\0", @words;
my $score = md5sum( $str );
next if defined $bestscore and $score ge $bestscore;
$best = $str;
$bestscore = $score;
}
return $best;
});
# 3: Give it an output
# These are alternatives:
# An asynchronous output
my $dbh = Database::Async->new( ... );
$pipeline->set_output_async( async sub ($best) {
await $dbh->do('INSERT INTO Results VALUES (?)', $best);
});
# A synchronous output
$pipeline->set_output_sync( sub ($best) {
print "MD5 minimized sort order is:\n";
print " $_\n" for split m/\0/, $best;
});
# 4: Now start it running on some input values
foreach my $url (slurp_lines("urls.txt")) {
await $pipeline->push_input($url);
}
# 5: Wait for it all to finish
await $pipeline->drain;
=head1 DESCRIPTION
Instances of this class implement a "pipeline", a sequence of data-processing
stages. Each stage is represented by a function that is passed a single
argument and should return a result. The pipeline itself stores a function
that will be passed each eventual result.
=head2 Queueing
In front of every stage there exists a queue of pending items. If the first
stage is currently busy when C</push_input> is called, the item is accepted
into its queue instead. Items will be taken from the queue in the order they
( run in 0.436 second using v1.01-cache-2.11-cpan-e1769b4cff6 )