ETLp
view release on metacpan or search on metacpan
lib/ETLp/Execute/Serial.pm view on Meta::CPAN
package ETLp::Execute::Serial;
use MooseX::Declare;
=head1 NAME
ETLp::Execute::Serial - Execute a Serial Pipeline
=head1 DESCRIPTION
Takes the array ref of Pipeline items and executes each item in turn
=head1 METHODS
=head2 new
=head3 parameters
All of the following parameters are mandatory
* pipeline. an arrayref pipeline items
* config. The application configuration
=head3 retuns
* void
=cut
class ETLp::Execute::Serial with ETLp::Role::Config {
use Try::Tiny;
use Carp;
use ETLp::Exception;
use Data::Dumper;
has 'pipeline' => (is => 'ro', isa => 'ArrayRef', required => 1);
has 'config' => (is => 'ro', isa => 'HashRef', required => 1);
method run {
my $warning = 0;
try {
ITEM_LOOP: foreach my $item (@{$self->pipeline}) {
$self->logger->debug("Item: " . Dumper($item));
# Create an audit entry for the item
my $audit_item = $self->audit->create_item(
name => $item->{name},
type => $item->{type},
phase => $item->{phase},
);
# The action to be taken when an error is encountered
my $on_error = (defined $item->{item}->{on_error}) ?
$item->{item}->{on_error} :
$self->config->{config}->{on_error} || 'die';
my $run_ok = 1;
# Execute the item
try {
my $res = $item->{runner}();
}
catch {
# Yes, we encountered an error. Set the item status to
# failed
my $error = $_;
$audit_item->update_message(''.$error);
$audit_item->update_status('failed');
if ($on_error ne 'ignore') {
$error->rethrow if ref $error;
ETLpException->throw(error => $error);
}
$self->logger->error(''.$error);
$run_ok = 0;
$warning = 1;
};
$audit_item->update_status('succeeded') if $run_ok;
}
}
catch {
my $error = $_;
$self->audit->update_message(''.$error);
$self->audit->update_status('failed');
ETLpException->throw(error => ''.$error);
};
# We may have a warning is we encountered an error, but the on_error
# flag was set to ignore.
if ($warning) {
$self->audit->update_status('warning');
} else {
$self->audit->update_status('succeeded');
}
( run in 2.198 seconds using v1.01-cache-2.11-cpan-437f7b0c052 )