App-Oozie
view release on metacpan or search on metacpan
lib/App/Oozie/Deploy.pm view on Meta::CPAN
},
},
);
sub BUILD {
my ($self, $args) = @_;
my $logger = $self->logger;
my $oozie_base_dir = $self->local_oozie_code_path;
my $ttlib_base_dir = $self->ttlib_base_dir;
my $verbose = $self->verbose;
my $is_file = IsFile->library->get_type( IsFile );
foreach my $file ( @{ $self->required_tt_files } ) {
my $absolute_path = File::Spec->catfile( $ttlib_base_dir, $file );
if ( $verbose ) {
$logger->debug("Assert file: $absolute_path");
}
# assert_valid() does not display the error message, hence the manual check
my $error = $is_file->validate( $absolute_path ) || next;
$logger->logdie( sprintf 'required_tt_files(): %s', $error );
}
if ( $verbose ) {
$logger->debug( join q{=}, $_, $self->$_ ) for qw(
local_oozie_code_path
ttlib_base_dir
);
}
if ( $self->dump_xml_to_json && ! $self->dryrun ) {
$self->logger->info( 'dump_xml_to_json is enabled without a dryrun. Enabling dryrun as well.' );
$self->dryrun( 1 );
}
return;
}
sub run {
my $self = shift;
my $workflows = shift;
my $logger = $self->logger;
my $config = $self->internal_conf;
my $dryrun = $self->dryrun;
my $verbose = $self->verbose;
my $run_start_epoch = time;
my $log_marker = q{#} x TERMINAL_INFO_LINE_LEN;
$logger->info(
sprintf '%s Starting deployment in %s%s %s',
$log_marker,
$self->cluster_name,
$verbose ? EMPTY_STRING : '. Enable --verbose to see the underlying commands',
$log_marker,
);
$self->log_versions if $verbose;
my($update_coord) = $self->_verify_and_compile_all_workflows( $workflows );
if (!$self->secure_cluster) {
# Left in place for historial reasons.
# All clusters should be under Kerberos.
# Possible removal in a future version.
#
# unsafe, but needed when uploading with mapred's uid or hdfs dfs cannot see the files
chmod oct( DEFAULT_FILE_MODE ), $config->{base_dest};
}
my $success = $self->upload_to_hdfs;
$self->maybe_update_coordinators( $update_coord ) if @{ $update_coord };
if ($self->prune) {
$logger->info( '--prune is set, checking workflow directories for old files' );
for my $workflow ( @{ $workflows } ) {
$self->prune_path(
File::Spec->catdir(
$config->{hdfs_dest},
basename $workflow
)
);
}
}
$logger->info(
sprintf '%s Completed successfully in %s (took %s) %s',
$log_marker,
sprintf( '%s%s', $self->cluster_name, ( $dryrun ? ' (dryrun is set)' : EMPTY_STRING ) ),
duration_exact( time - $run_start_epoch ),
$log_marker,
);
return $success;
}
sub _verify_and_compile_all_workflows {
my $self = shift;
my $workflows = shift;
my $logger = $self->logger;
if ( ! is_arrayref $workflows || ! @{ $workflows } ) {
$logger->logdie( 'Please give one or several workflow name(s) on the command line (glob pattern accepted). Also see --help' );
}
$self->pre_verification( $workflows );
$self->verify_temp_dir;
if ( $self->gitfeatures
&& ! $self->gitforce
) {
$self->verify_git_tag;
}
my $wfs = $self->collect_names_to_deploy( $workflows );
my($total_errors, $validation_errors);
my @update_coord;
for my $workflow ( @{ $wfs } ) {
my($t_validation_errors, $t_total_errors, $dest, $cvc) = $self->process_workflow( $workflow );
push @update_coord, $self->guess_running_coordinator( $workflow, $cvc, $dest );
$total_errors += $t_validation_errors;
$validation_errors += $t_total_errors;
}
if ($total_errors) {
$logger->fatal( sprintf 'ERROR: %s errors were encountered during this run. Please fix it!', $total_errors );
$logger->fatal( 'The --force option has been disabled, as not enough really paid attention.' );
$logger->fatal( 'Fixing the errors is really your best and easiest option.' );
$logger->logdie( 'Failed.' );
}
return \@update_coord;
}
sub process_workflow {
my $self = shift;
my $workflow = shift;
my($t_validation_errors, $t_total_errors, $dest, $cvc) = $self->process_templates( $workflow );
return $t_validation_errors, $t_total_errors, $dest, $cvc;
}
sub pre_verification {
# stub
}
sub destination_path {
my $self = shift;
my $default = shift || $self->default_hdfs_destination;
return $default =~ m{ \A hdfs:// }xms
? $default
: File::Spec->canonpath( File::Spec->catdir( q{/}, $default ) )
;
}
sub __collect_internal_conf_hdfs {
my $self = shift;
my $logger = $self->logger;
my $file = $self->hdfs_properties_file;
return {} if ! $file; # not specified at all
$logger->debug( sprintf 'If exists, fetching from HDFS: %s', $file );
return {} if ! $self->_hdfs_exists_no_exception( $file );
return {
Config::General::ParseConfig(
-String => $self->hdfs->read( $file ),
)
};
}
lib/App/Oozie/Deploy.pm view on Meta::CPAN
ttlib_base_dir
ttlib_dynamic_base_dir_name
verbose
write_ownership_to_workflow_xml
)
];
my $t = App::Oozie::Deploy::Template->new(
map { $_ => $self->$_ }
@{ $pass_through }
);
my($template_validation_errors,
$template_total_errors,
$dest,
) = $t->compile( $workflow );
my $cvc = $t->coordinator_directive_var_cache;
${ $validation_errors_ref } += $template_validation_errors;
${ $total_errors_ref } += $template_total_errors;
return $dest, $cvc;
}
sub process_templates {
my $self = shift;
my $workflow = shift || die 'No workflow path specified!';
if ( ! -d $workflow ) {
die sprintf 'The workflow path %s either does not exist or not a directory',
$workflow,
;
}
my($validation_errors, $total_errors);
my($dest, $cvc) = $self->compile_templates(
$workflow,
\$validation_errors,
\$total_errors,
);
if ( $self->write_ownership_to_workflow_xml ) {
$self->validate_meta_file(
File::Spec->catfile( $workflow, $self->meta->file ),
\$validation_errors,
\$total_errors,
{},
);
}
my $sv = $self->_get_spec_validator( $dest );
$self->__maybe_dump_xml_to_json(
$sv,
\$validation_errors,
\$total_errors,
) if $self->dump_xml_to_json;
my($spec_validation_errors, $spec_total_errors) = $sv->verify( $workflow );
$validation_errors += $spec_validation_errors;
$total_errors += $spec_total_errors;
$self->create_deployment_meta_file( $dest, $workflow, $total_errors );
if ( $validation_errors ) {
$self->logger->error( 'Oozie deployment validation status: !!!!! FAILED !!!!!' );
}
else {
$self->logger->info( 'Oozie deployment validation status: OK' );
}
return $validation_errors, $total_errors, $dest, $cvc;
}
sub validate_meta_file {
my $self = shift;
my $file = shift;
$self->logger->info( sprintf 'Extra validation for %s', $file );
return;
}
sub verify_temp_dir {
my $self = shift;
my $user_setting = $ENV{TMPDIR} || return;
my $logger = $self->logger;
# The path needs to be readable by mapred in order the deploy to be successful
# Some users have this set in their environment to paths lacking relevant
# permissions leading to failures.
#
# If the path is bogus, then by removing the setting locally in here
# will lead the temporary directory to be created inside "/tmp" by default.
#
# Otherwise it can still be altered to elsewhere by changing the
# environment by the users.
#
my $remove;
if ( ! -d $user_setting ) {
$logger->warn( sprintf q{You have TMPDIR=%s but it doesn't exist! I will ignore/remove that setting!}, $user_setting );
$remove = 1;
}
else {
my $mode = (stat $user_setting)[STAT_MODE];
my $group_read = ( $mode & S_IRGRP ) >> MODE_BITSHIFT_READ;
my $other_read = $mode & S_IROTH;
if ( ! $group_read || ! $other_read ) {
$logger->warn(
sprintf q{You have TMPDIR=%s and it is not group/other readable (mode=%04o)! I will ignore/remove that setting!},
$user_setting,
S_IMODE( $mode ),
);
$remove = 1;
}
}
delete $ENV{TMPDIR} if $remove;
return;
}
sub collect_names_to_deploy {
my $self = shift;
my $names = shift || die 'No workflow names were specified!';
my $owf_base = $self->oozie_workflows_base;
my $logger = $self->logger;
my $verbose = $self->verbose;
if ( ! is_arrayref $names ) {
die 'Workflow names need to be specified as an arrayref';
}
my(@firstLevelMatchingPatterns, @secondLevelMatchingPatterns);
my @workflow = map { trim_slashes( $_ ) } @{ $names };
my $workflowPatternCount = @workflow;
for my $w (@workflow) {
my $separators = () = $w =~ m{ [/] }xmsg;
lib/App/Oozie/Deploy.pm view on Meta::CPAN
=pod
=encoding UTF-8
=head1 NAME
App::Oozie::Deploy
=head1 VERSION
version 0.020
=head1 SYNOPSIS
use App::Oozie::Deploy;
App::Oozie::Deploy->new_with_options->run;
=head1 DESCRIPTION
This is an action/program in the Oozie Tooling.
=for Pod::Coverage BUILD
=head1 NAME
App::Oozie::Deploy - The program to deploy Oozie workflows.
=head1 Methods
=head2 collect_data_for_deployment_meta_file
=head2 collect_names_to_deploy
=head2 compile_templates
=head2 create_deployment_meta_file
=head2 destination_path
=head2 guess_running_coordinator
=head2 max_wf_xml_length
=head2 maybe_update_coordinators
=head2 pre_verification
=head2 process_templates
=head2 process_workflow
=head2 prune_path
=head2 run
=head2 upload_to_hdfs
=head2 validate_meta_file
=head2 verify_temp_dir
=head2 write_deployment_meta_file
=head1 Accessors
=head2 Overridable from cli
=head3 dump_xml_to_json
=head3 hdfs_dest
=head3 keep_deploy_path
=head3 oozie_workflows_base
=head3 prune
=head3 sla
=head3 write_ownership_to_workflow_xml
=head2 Overridable from sub-classes
=head3 configuration_files
=head3 deploy_start
=head3 deployment_meta_file_name
=head3 email_validator
=head3 internal_conf
=head3 max_node_name_len
=head3 process_coord_directive_varname
=head3 required_tt_files
=head3 spec_queue_is_missing_message
=head3 ttlib_base_dir
=head3 ttlib_dynamic_base_dir_name
=head1 SEE ALSO
L<App::Oozie>.
=head1 AUTHORS
=over 4
=item *
David Morel
=item *
Burak Gursoy
( run in 1.115 second using v1.01-cache-2.11-cpan-e1769b4cff6 )