Future-Workflow

 view release on metacpan or  search on metacpan

lib/Future/Workflow/Pipeline.pm  view on Meta::CPAN

   $pipeline->append_stage_sync( sub ($response) {
      my $dom = Mojo::DOM->new( $response->decoded_content );
      return $dom->at('div[id="main"]')->text;
   });

   # A detached (out-of-process/thread) stage; e.g. some silly CPU-intensive task
   $pipeline->append_stage_detached( sub ($text) {
      my $iter = Algorithm::Permute->new([ split m/\s+/, $text ]);

      my $best; my $bestscore;
      while(my @words = $iter->next) {
         my $str = join "\0", @words;
         my $score = md5sum( $str );
         next if defined $bestscore and $score ge $bestscore;

         $best      = $str;
         $bestscore = $score;
      }

      return $best;
   });


   # 3: Give it an output

   # These are alternatives:

   # An asynchronous output
   my $dbh = Database::Async->new( ... );
   $pipeline->set_output_async( async sub ($best) {
      await $dbh->do('INSERT INTO Results VALUES (?)', $best);
   });

   # A synchronous output
   $pipeline->set_output_sync( sub ($best) {
      print "MD5 minimized sort order is:\n";
      print "  $_\n" for split m/\0/, $best;
   });


   # 4: Now start it running on some input values

   foreach my $url (slurp_lines("urls.txt")) {
      await $pipeline->push_input($url);
   }


   # 5: Wait for it all to finish
   await $pipeline->drain;

=head1 DESCRIPTION

Instances of this class implement a "pipeline", a sequence of data-processing
stages. Each stage is represented by a function that is passed a single
argument and should return a result. The pipeline itself stores a function
that will be passed each eventual result.

=head2 Queueing

In front of every stage there exists a queue of pending items. If the first
stage is currently busy when C</push_input> is called, the item is accepted
into its queue instead. Items will be taken from the queue in the order they
were pushed when the stage's work function finishes with prior items.

If the queue between stages is full, then items will remain pending in prior
stages. Ultimately this back-pressure will make its way back to the
C</push_input> method at the beginning of the pipeline.

=cut

=head1 CONSTRUCTOR

   $pipeline = Future::Workflow::Pipeline->new;

The constructor takes no additional parameters.

=cut

field $_output;
field @_stages;

=head1 METHODS

=cut

=head2 set_output

   $pipeline->set_output( $code );

      await $code->( $result );

Sets the destination output for the pipeline. Each completed work item will be
passed to the invoked function, which is expected to return a C<Future>.

=cut

method set_output ( $code )
{
   $_output = $code;
   $_stages[-1]->set_output( $_output ) if @_stages;
}

=head2 set_output_sync

   $pipeline->set_output_sync( $code );

      $code->( $result );

Similar to L</set_output>, where the output function is called synchronously,
returning when it has finished.

=cut

method set_output_sync ( $code )
{
   $self->set_output( async sub ( $result ) { $code->( $result ) } );
}

=head2 append_stage

   $pipeline->append_stage( $code, %args );



( run in 1.909 second using v1.01-cache-2.11-cpan-39bf76dae61 )