App-Oozie
view release on metacpan or search on metacpan
lib/App/Oozie/Deploy.pm view on Meta::CPAN
my $oozie_base_dir = $self->local_oozie_code_path;
my $ttlib_base_dir = $self->ttlib_base_dir;
my $verbose = $self->verbose;
my $is_file = IsFile->library->get_type( IsFile );
foreach my $file ( @{ $self->required_tt_files } ) {
my $absolute_path = File::Spec->catfile( $ttlib_base_dir, $file );
if ( $verbose ) {
$logger->debug("Assert file: $absolute_path");
}
# assert_valid() does not display the error message, hence the manual check
my $error = $is_file->validate( $absolute_path ) || next;
$logger->logdie( sprintf 'required_tt_files(): %s', $error );
}
if ( $verbose ) {
$logger->debug( join q{=}, $_, $self->$_ ) for qw(
local_oozie_code_path
ttlib_base_dir
);
}
if ( $self->dump_xml_to_json && ! $self->dryrun ) {
$self->logger->info( 'dump_xml_to_json is enabled without a dryrun. Enabling dryrun as well.' );
$self->dryrun( 1 );
}
return;
}
sub run {
my $self = shift;
my $workflows = shift;
my $logger = $self->logger;
my $config = $self->internal_conf;
my $dryrun = $self->dryrun;
my $verbose = $self->verbose;
my $run_start_epoch = time;
my $log_marker = q{#} x TERMINAL_INFO_LINE_LEN;
$logger->info(
sprintf '%s Starting deployment in %s%s %s',
$log_marker,
$self->cluster_name,
$verbose ? EMPTY_STRING : '. Enable --verbose to see the underlying commands',
$log_marker,
);
$self->log_versions if $verbose;
my($update_coord) = $self->_verify_and_compile_all_workflows( $workflows );
if (!$self->secure_cluster) {
# Left in place for historial reasons.
# All clusters should be under Kerberos.
# Possible removal in a future version.
#
# unsafe, but needed when uploading with mapred's uid or hdfs dfs cannot see the files
chmod oct( DEFAULT_FILE_MODE ), $config->{base_dest};
}
my $success = $self->upload_to_hdfs;
$self->maybe_update_coordinators( $update_coord ) if @{ $update_coord };
if ($self->prune) {
$logger->info( '--prune is set, checking workflow directories for old files' );
for my $workflow ( @{ $workflows } ) {
$self->prune_path(
File::Spec->catdir(
$config->{hdfs_dest},
basename $workflow
)
);
}
}
$logger->info(
sprintf '%s Completed successfully in %s (took %s) %s',
$log_marker,
sprintf( '%s%s', $self->cluster_name, ( $dryrun ? ' (dryrun is set)' : EMPTY_STRING ) ),
duration_exact( time - $run_start_epoch ),
$log_marker,
);
return $success;
}
sub _verify_and_compile_all_workflows {
my $self = shift;
my $workflows = shift;
my $logger = $self->logger;
if ( ! is_arrayref $workflows || ! @{ $workflows } ) {
$logger->logdie( 'Please give one or several workflow name(s) on the command line (glob pattern accepted). Also see --help' );
}
$self->pre_verification( $workflows );
$self->verify_temp_dir;
if ( $self->gitfeatures
&& ! $self->gitforce
) {
$self->verify_git_tag;
}
my $wfs = $self->collect_names_to_deploy( $workflows );
my($total_errors, $validation_errors);
my @update_coord;
for my $workflow ( @{ $wfs } ) {
my($t_validation_errors, $t_total_errors, $dest, $cvc) = $self->process_workflow( $workflow );
push @update_coord, $self->guess_running_coordinator( $workflow, $cvc, $dest );
$total_errors += $t_validation_errors;
$validation_errors += $t_total_errors;
}
if ($total_errors) {
lib/App/Oozie/Deploy.pm view on Meta::CPAN
my $hdfs = $self->hdfs;
my $rv;
eval {
$rv = $hdfs->exists( $path );
1;
} or do {
my $eval_error = $@ || 'Zombie error';
if ( $self->verbose ) {
$self->logger->debug(
sprintf 'WebHDFS exists() failed with exception, however since this is a silent call, it is ignored: %s',
$eval_error,
)
}
};
return $rv;
}
sub _copy_to_hdfs_with_webhdfs {
my $self = shift;
my $sourceFolder = shift;
my $destFolder = shift;
my $hdfs = $self->hdfs;
my $logger = $self->logger;
my $verbose = $self->verbose;
$logger->info(
sprintf 'copying from `%s` to `%s`',
$sourceFolder,
$destFolder,
);
if ( ! $self->_hdfs_exists_no_exception( $destFolder ) ) {
if ( $verbose ) {
$logger->debug(
sprintf 'HDFS destination %s does not exist',
$destFolder,
);
}
my(undef, @paths) = File::Spec->splitpath( $destFolder );
my $remote_base;
for my $chunk ( @paths ) {
if ( $remote_base ) {
$remote_base = File::Spec->catdir( $remote_base, $chunk);
}
else {
$remote_base = $chunk;
}
if ( $self->_hdfs_exists_no_exception( $remote_base ) ) {
next;
}
if ( $verbose ) {
$logger->debug(
sprintf 'Attempting to mkdir HDFS destination %s',
$remote_base,
);
}
$hdfs->mkdir( $remote_base );
$hdfs->chmod( $remote_base, DEFAULT_DIR_MODE );
}
# since the above calls were silent, see if this throws anything
if ( $hdfs->exists($destFolder) ) {
if ( $verbose ) {
$logger->debug(
sprintf 'HDFS destination %s exists',
$destFolder,
);
}
}
}
else {
if ( $verbose ) {
$logger->debug(
sprintf 'HDFS destination %s exists',
$destFolder,
);
}
}
my $f_rule = File::Find::Rule->new->file->maxdepth(1)->mindepth(1);
my @files = $f_rule->in($sourceFolder);
foreach my $file (@files)
{
my $filename = basename($file);
my $dest = File::Spec->catfile($destFolder, $filename);
my $filehandle = path( $file );
my $data = $filehandle->slurp_raw;
if($verbose){
$logger->debug("Creating $dest");
}
$hdfs->touchz( $dest );
if ( ! $hdfs->create(
$dest,
$data,
overwrite => 'true',
)
) {
$logger->logdie(
sprintf 'Failed to create %s through WebHDFS',
$dest
);
}
$hdfs->chmod( $dest, DEFAULT_DIR_MODE );
}
my $d_rule = File::Find::Rule->new->directory->maxdepth(1)->mindepth(1);
my @folders = $d_rule->in($sourceFolder);
foreach my $folder (@folders)
{
my $foldername = basename($folder);
my $dest = File::Spec->catfile($destFolder, $foldername);
$self->_copy_to_hdfs_with_webhdfs($folder, $dest)
}
return 1;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
App::Oozie::Deploy
=head1 VERSION
version 0.020
=head1 SYNOPSIS
use App::Oozie::Deploy;
App::Oozie::Deploy->new_with_options->run;
=head1 DESCRIPTION
This is an action/program in the Oozie Tooling.
=for Pod::Coverage BUILD
=head1 NAME
App::Oozie::Deploy - The program to deploy Oozie workflows.
=head1 Methods
=head2 collect_data_for_deployment_meta_file
=head2 collect_names_to_deploy
=head2 compile_templates
=head2 create_deployment_meta_file
=head2 destination_path
=head2 guess_running_coordinator
=head2 max_wf_xml_length
( run in 0.744 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )