ETLp
view release on metacpan or search on metacpan
lib/ETLp/Execute/Iteration.pm view on Meta::CPAN
package ETLp::Execute::Iteration;
use MooseX::Declare;
=head1 NAME
ETLp::Execute::Iteration - Execute an Iterative Pipeline
=head1 DESCRIPTION
Takes the array ref of Pipeline items and executes each item in turn
=head1 METHODS
=head2 new
=head3 parameters
All of the following parameters are mandatory
* pipeline. an arrayref pipeline items
* config. The application configuration
=head3 retuns
* void
=cut
class ETLp::Execute::Iteration with ETLp::Role::Config {
use Try::Tiny;
use Data::Dumper;
use File::LocalizeNewlines;
use File::Basename;
use File::Copy;
use Carp;
use ETLp::Exception;
has 'pipeline' => (is => 'ro', isa => 'ArrayRef', required => 1);
has 'config' => (is => 'ro', isa => 'HashRef', required => 1);
$Data::Dumper::Deparse = 1;
method run {
my $warning = 0;
try {
FILE_LOOP: foreach my $filename ($self->_get_source_files) {
ITEM_LOOP:
foreach my $item (@{$self->pipeline}) {
$self->logger->debug("Item: " . Dumper($item));
# Create an audit entry for the item
my $audit_item = $self->audit->create_item(
name => $item->{name},
type => $item->{type},
phase => $item->{phase},
);
# The action to be taken when an error is encountered
my $on_error = $item->{on_error}
|| $self->config->{config}->{on_error}
|| 'die';
$self->logger->debug("On error: $on_error");
if ($item->{localize}) {
my $f = File::LocalizeNewlines->new();
my $count = $f->localize($filename);
}
my $next_file_loop = 0;
my $next_item_loop = 0;
# Execute the item
try {
$self->logger->debug("Processing file: $filename");
$filename = $item->{runner}($filename);
$self->logger->debug("New file name: $filename");
} catch {
my $error = "Error processing $filename: $_";
my $fail_dir = $self->config->{config}->{fail_dir};
move $filename, $fail_dir ||
ETLpException->throw(error =>
"Cannot move $filename to $fail_dir: $!");
$filename = $fail_dir .'/' . basename($filename);
$audit_item->update_message(''.$error);
$audit_item->update_status('failed');
if ($on_error eq 'die') {
ETLpException->throw(error => $error);
}
$warning = 1;
$self->logger->error($error);
# Can't go to next loop item from within the catch
( run in 0.853 second using v1.01-cache-2.11-cpan-5b529ec07f3 )