Future-Workflow

 view release on metacpan or  search on metacpan

lib/Future/Workflow/Pipeline.pm  view on Meta::CPAN

#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2021-2024 -- leonerd@leonerd.org.uk

use v5.26;
use warnings;
use Object::Pad 0.800;

package Future::Workflow::Pipeline 0.02;
class Future::Workflow::Pipeline;

use Carp;

use Future::AsyncAwait;

=head1 NAME

C<Future::Workflow::Pipeline> - a pipeline of processing stages

=head1 SYNOPSIS

=for highlighter language=perl

   # 1: Make a pipeline
   my $pipeline = Future::Workflow::Pipeline->new;


   # 2: Add some stages to it
 
   # An async stage; e.g. perform an HTTP fetch
   my $ua = Net::Future::HTTP->new;
   $pipeline->append_stage_async( async sub ($url) {
      return await $ua->GET( $url );
   });

   # A synchronous (in-process) stage; e.g. some HTML parsing
   $pipeline->append_stage_sync( sub ($response) {
      my $dom = Mojo::DOM->new( $response->decoded_content );
      return $dom->at('div[id="main"]')->text;
   });

   # A detached (out-of-process/thread) stage; e.g. some silly CPU-intensive task
   $pipeline->append_stage_detached( sub ($text) {
      my $iter = Algorithm::Permute->new([ split m/\s+/, $text ]);

      my $best; my $bestscore;
      while(my @words = $iter->next) {
         my $str = join "\0", @words;
         my $score = md5sum( $str );
         next if defined $bestscore and $score ge $bestscore;

         $best      = $str;
         $bestscore = $score;
      }

      return $best;
   });


   # 3: Give it an output

   # These are alternatives:

   # An asynchronous output
   my $dbh = Database::Async->new( ... );
   $pipeline->set_output_async( async sub ($best) {
      await $dbh->do('INSERT INTO Results VALUES (?)', $best);
   });

   # A synchronous output
   $pipeline->set_output_sync( sub ($best) {
      print "MD5 minimized sort order is:\n";
      print "  $_\n" for split m/\0/, $best;
   });


   # 4: Now start it running on some input values

   foreach my $url (slurp_lines("urls.txt")) {
      await $pipeline->push_input($url);
   }


   # 5: Wait for it all to finish
   await $pipeline->drain;

=head1 DESCRIPTION

Instances of this class implement a "pipeline", a sequence of data-processing
stages. Each stage is represented by a function that is passed a single
argument and should return a result. The pipeline itself stores a function
that will be passed each eventual result.

=head2 Queueing

In front of every stage there exists a queue of pending items. If the first
stage is currently busy when C</push_input> is called, the item is accepted
into its queue instead. Items will be taken from the queue in the order they



( run in 0.436 second using v1.01-cache-2.11-cpan-e1769b4cff6 )