ETLp

 view release on metacpan or  search on metacpan

lib/ETLp/Execute/Iteration.pm  view on Meta::CPAN

package ETLp::Execute::Iteration;

use MooseX::Declare;

=head1 NAME

ETLp::Execute::Iteration - Execute an Iterative Pipeline

=head1 DESCRIPTION

Takes the array ref of Pipeline items and executes each item in turn

=head1 METHODS

=head2 new

=head3 parameters

All of the following parameters are mandatory

    * pipeline. an arrayref pipeline items
    * config. The application configuration
    
=head3 retuns

    * void

=cut

class ETLp::Execute::Iteration with ETLp::Role::Config {
    use Try::Tiny;
    use Data::Dumper;
    use File::LocalizeNewlines;
    use File::Basename;
    use File::Copy;
    use Carp;
    use ETLp::Exception;
    
    has 'pipeline' => (is => 'ro', isa => 'ArrayRef', required => 1);
    has 'config'   => (is => 'ro', isa => 'HashRef',  required => 1);
    
    $Data::Dumper::Deparse = 1;
    
    method run {
        my $warning = 0;
        
        try {
            FILE_LOOP: foreach my $filename ($self->_get_source_files) {
                ITEM_LOOP:
                foreach my $item (@{$self->pipeline}) {
                    $self->logger->debug("Item: " . Dumper($item));
    
                    # Create an audit entry for the item
                    my $audit_item = $self->audit->create_item(
                        name  => $item->{name},
                        type  => $item->{type},
                        phase => $item->{phase},
                    );
    
                    # The action to be taken when an error is encountered
                    my $on_error = $item->{on_error}
                        || $self->config->{config}->{on_error}
                        || 'die';
                        
                    $self->logger->debug("On error: $on_error");
                        
                    if ($item->{localize}) {
                        my $f = File::LocalizeNewlines->new();
                        my $count = $f->localize($filename);
                    }
    
                    my $next_file_loop = 0;
                    my $next_item_loop = 0;
                    
                    # Execute the item
                    try {
                        $self->logger->debug("Processing file: $filename");
                        $filename = $item->{runner}($filename);
                        $self->logger->debug("New file name: $filename");
                    } catch {
                        my $error = "Error processing $filename: $_";
                        my $fail_dir = $self->config->{config}->{fail_dir};
                        
                        move $filename, $fail_dir ||
                            ETLpException->throw(error =>
                                "Cannot move $filename to $fail_dir: $!");
                            
                        $filename = $fail_dir .'/' . basename($filename);
                        
                        $audit_item->update_message(''.$error);
                        $audit_item->update_status('failed');
                        
                        if ($on_error eq 'die') {
                            ETLpException->throw(error => $error);
                        }
                        $warning = 1;
                        
                        $self->logger->error($error);
                        
                        # Can't go to next loop item from within the catch



( run in 0.853 second using v1.01-cache-2.11-cpan-5b529ec07f3 )