App-Oozie
view release on metacpan or search on metacpan
lib/App/Oozie/Deploy.pm view on Meta::CPAN
has email_validator => (
is => 'rw',
default => sub {
my $self = shift;
sub {
my $self = shift;
my $emails = shift || do {
$self->logger->warn( 'No email was set!' );
return;
};
my @splits = map s/\+.+?@/@/r, map s/^\s+|\s+$//gr, split q{,}, $emails; ## no critic (ProhibitStringySplit,RequireDotMatchAnything,RequireExtendedFormatting,RequireLineBoundaryMatching,ProhibitEscapedMetacharacters)
my @invalids = grep { ! Email::Valid->address( $_ ) } @splits;
return 1 if ! @invalids;
for my $bogus ( @invalids ) {
$self->logger->warn(
sprintf 'FIXME !!! errorEmailTo parameter in workflow.xml is not set to a proper address: %s',
$bogus,
);
}
return;
},
},
isa => CodeRef,
lazy => 1,
);
has internal_conf => (
is => 'ro',
builder => '__collect_internal_conf',
lazy => 1,
);
has process_coord_directive_varname => (
is => 'rw',
isa => CodeRef,
default => sub {
sub {
my $name = shift;
return $name;
},
},
);
sub BUILD {
my ($self, $args) = @_;
my $logger = $self->logger;
my $oozie_base_dir = $self->local_oozie_code_path;
my $ttlib_base_dir = $self->ttlib_base_dir;
my $verbose = $self->verbose;
my $is_file = IsFile->library->get_type( IsFile );
foreach my $file ( @{ $self->required_tt_files } ) {
my $absolute_path = File::Spec->catfile( $ttlib_base_dir, $file );
if ( $verbose ) {
$logger->debug("Assert file: $absolute_path");
}
# assert_valid() does not display the error message, hence the manual check
my $error = $is_file->validate( $absolute_path ) || next;
$logger->logdie( sprintf 'required_tt_files(): %s', $error );
}
if ( $verbose ) {
$logger->debug( join q{=}, $_, $self->$_ ) for qw(
local_oozie_code_path
ttlib_base_dir
);
}
if ( $self->dump_xml_to_json && ! $self->dryrun ) {
$self->logger->info( 'dump_xml_to_json is enabled without a dryrun. Enabling dryrun as well.' );
$self->dryrun( 1 );
}
return;
}
sub run {
my $self = shift;
my $workflows = shift;
my $logger = $self->logger;
my $config = $self->internal_conf;
my $dryrun = $self->dryrun;
my $verbose = $self->verbose;
my $run_start_epoch = time;
my $log_marker = q{#} x TERMINAL_INFO_LINE_LEN;
$logger->info(
sprintf '%s Starting deployment in %s%s %s',
$log_marker,
$self->cluster_name,
$verbose ? EMPTY_STRING : '. Enable --verbose to see the underlying commands',
$log_marker,
);
$self->log_versions if $verbose;
my($update_coord) = $self->_verify_and_compile_all_workflows( $workflows );
if (!$self->secure_cluster) {
# Left in place for historial reasons.
# All clusters should be under Kerberos.
# Possible removal in a future version.
#
# unsafe, but needed when uploading with mapred's uid or hdfs dfs cannot see the files
chmod oct( DEFAULT_FILE_MODE ), $config->{base_dest};
}
my $success = $self->upload_to_hdfs;
$self->maybe_update_coordinators( $update_coord ) if @{ $update_coord };
if ($self->prune) {
$logger->info( '--prune is set, checking workflow directories for old files' );
for my $workflow ( @{ $workflows } ) {
$self->prune_path(
File::Spec->catdir(
$config->{hdfs_dest},
lib/App/Oozie/Deploy.pm view on Meta::CPAN
path => $path,
coord_id => $cid,
job => $job,
cvc => $cvc,
workflow => $workflow,
dest => $dest,
};
}
}
}
return @rv;
}
sub maybe_update_coordinators {
my $self = shift;
my $coords = shift;
for my $e ( @{ $coords } ) {
# stub: better override in a subclass
}
return;
}
sub _get_spec_validator {
my($self, $dest) = @_;
my @pass_through = qw(
email_validator
max_node_name_len
max_wf_xml_length
oozie_cli
oozie_client_jar
oozie_uri
spec_queue_is_missing_message
timeout
verbose
);
return App::Oozie::Deploy::Validate::Spec->new(
( map { $_ => $self->$_ } @pass_through ),
local_path => $dest,
);
}
sub __maybe_dump_xml_to_json {
my $self = shift;
my $dump_path = $self->dump_xml_to_json || return;
my $logger = $self->logger;
require JSON;
my $sv = shift || $logger->logdie( 'Spec validator not specified!' );
my $validation_errors_ref = shift;
my $total_errors_ref = shift;
for my $xml_file ( $sv->local_xml_files ) {
my $parsed = $sv->maybe_parse_xml( $xml_file );
if ( my $error = $parsed->{error} ) {
$logger->fatal(
sprintf q{We can't validate %s since parsing failed: %s},
$parsed->{relative_file_name},
$error,
);
${ $validation_errors_ref }++;
${ $total_errors_ref }++;
next; #we don't even have valid XML file at this point, so just skip it
};
$logger->info( sprintf 'Dumping xml to json within %s', $dump_path );
my $json_filename = File::Spec->catfile(
$dump_path,
File::Basename::basename($xml_file, '.xml') . '.json'
);
File::Path::make_path( $dump_path );
open my $JSON_FH, '>', $json_filename or $logger->logdie( sprintf 'Failed to create %s: %s', $json_filename, $! );
print $JSON_FH JSON->new->pretty->encode( $parsed->{xml_in} );
if ( ! close $JSON_FH ) {
$logger->warn(
sprintf 'Failed to close %s: %s',,
$json_filename,
$!,
);
}
}
return;
}
sub compile_templates {
my $self = shift;
my $workflow = shift;
my $validation_errors_ref = shift;
my $total_errors_ref = shift;
if ( ! -d $workflow ) {
die sprintf 'The workflow path `%s` either does not exist or not a directory',
$workflow;
}
state $pass_through = [
qw(
dryrun
effective_username
internal_conf
oozie_workflows_base
process_coord_directive_varname
timeout
ttlib_base_dir
ttlib_dynamic_base_dir_name
verbose
write_ownership_to_workflow_xml
)
];
my $t = App::Oozie::Deploy::Template->new(
map { $_ => $self->$_ }
@{ $pass_through }
);
my($template_validation_errors,
$template_total_errors,
$dest,
) = $t->compile( $workflow );
my $cvc = $t->coordinator_directive_var_cache;
${ $validation_errors_ref } += $template_validation_errors;
${ $total_errors_ref } += $template_total_errors;
return $dest, $cvc;
}
sub process_templates {
my $self = shift;
my $workflow = shift || die 'No workflow path specified!';
if ( ! -d $workflow ) {
die sprintf 'The workflow path %s either does not exist or not a directory',
$workflow,
;
}
my($validation_errors, $total_errors);
my($dest, $cvc) = $self->compile_templates(
$workflow,
\$validation_errors,
\$total_errors,
);
if ( $self->write_ownership_to_workflow_xml ) {
$self->validate_meta_file(
File::Spec->catfile( $workflow, $self->meta->file ),
\$validation_errors,
\$total_errors,
{},
);
}
my $sv = $self->_get_spec_validator( $dest );
$self->__maybe_dump_xml_to_json(
$sv,
\$validation_errors,
\$total_errors,
) if $self->dump_xml_to_json;
my($spec_validation_errors, $spec_total_errors) = $sv->verify( $workflow );
$validation_errors += $spec_validation_errors;
$total_errors += $spec_total_errors;
$self->create_deployment_meta_file( $dest, $workflow, $total_errors );
if ( $validation_errors ) {
$self->logger->error( 'Oozie deployment validation status: !!!!! FAILED !!!!!' );
}
else {
$self->logger->info( 'Oozie deployment validation status: OK' );
}
return $validation_errors, $total_errors, $dest, $cvc;
}
sub validate_meta_file {
my $self = shift;
my $file = shift;
$self->logger->info( sprintf 'Extra validation for %s', $file );
return;
}
sub verify_temp_dir {
my $self = shift;
my $user_setting = $ENV{TMPDIR} || return;
my $logger = $self->logger;
# The path needs to be readable by mapred in order the deploy to be successful
# Some users have this set in their environment to paths lacking relevant
# permissions leading to failures.
#
# If the path is bogus, then by removing the setting locally in here
# will lead the temporary directory to be created inside "/tmp" by default.
#
# Otherwise it can still be altered to elsewhere by changing the
# environment by the users.
#
my $remove;
if ( ! -d $user_setting ) {
$logger->warn( sprintf q{You have TMPDIR=%s but it doesn't exist! I will ignore/remove that setting!}, $user_setting );
$remove = 1;
}
else {
my $mode = (stat $user_setting)[STAT_MODE];
my $group_read = ( $mode & S_IRGRP ) >> MODE_BITSHIFT_READ;
my $other_read = $mode & S_IROTH;
if ( ! $group_read || ! $other_read ) {
$logger->warn(
sprintf q{You have TMPDIR=%s and it is not group/other readable (mode=%04o)! I will ignore/remove that setting!},
$user_setting,
S_IMODE( $mode ),
);
$remove = 1;
}
}
delete $ENV{TMPDIR} if $remove;
return;
}
sub collect_names_to_deploy {
my $self = shift;
my $names = shift || die 'No workflow names were specified!';
my $owf_base = $self->oozie_workflows_base;
my $logger = $self->logger;
my $verbose = $self->verbose;
if ( ! is_arrayref $names ) {
die 'Workflow names need to be specified as an arrayref';
lib/App/Oozie/Deploy.pm view on Meta::CPAN
__END__
=pod
=encoding UTF-8
=head1 NAME
App::Oozie::Deploy
=head1 VERSION
version 0.020
=head1 SYNOPSIS
use App::Oozie::Deploy;
App::Oozie::Deploy->new_with_options->run;
=head1 DESCRIPTION
This is an action/program in the Oozie Tooling.
=for Pod::Coverage BUILD
=head1 NAME
App::Oozie::Deploy - The program to deploy Oozie workflows.
=head1 Methods
=head2 collect_data_for_deployment_meta_file
=head2 collect_names_to_deploy
=head2 compile_templates
=head2 create_deployment_meta_file
=head2 destination_path
=head2 guess_running_coordinator
=head2 max_wf_xml_length
=head2 maybe_update_coordinators
=head2 pre_verification
=head2 process_templates
=head2 process_workflow
=head2 prune_path
=head2 run
=head2 upload_to_hdfs
=head2 validate_meta_file
=head2 verify_temp_dir
=head2 write_deployment_meta_file
=head1 Accessors
=head2 Overridable from cli
=head3 dump_xml_to_json
=head3 hdfs_dest
=head3 keep_deploy_path
=head3 oozie_workflows_base
=head3 prune
=head3 sla
=head3 write_ownership_to_workflow_xml
=head2 Overridable from sub-classes
=head3 configuration_files
=head3 deploy_start
=head3 deployment_meta_file_name
=head3 email_validator
=head3 internal_conf
=head3 max_node_name_len
=head3 process_coord_directive_varname
=head3 required_tt_files
=head3 spec_queue_is_missing_message
=head3 ttlib_base_dir
=head3 ttlib_dynamic_base_dir_name
=head1 SEE ALSO
L<App::Oozie>.
=head1 AUTHORS
=over 4
=item *
David Morel
=item *
( run in 0.651 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )