App-Oozie
view release on metacpan or search on metacpan
lib/App/Oozie/Deploy.pm view on Meta::CPAN
short => 'p',
doc => 'Prune obsolete files on HDFS',
);
option sla => (
is => 'rw',
doc => 'Enable SLA under Oozie?',
);
option oozie_workflows_base => (
is => 'rw',
format => 's',
default => sub {
my $self = shift;
File::Spec->catdir( $self->local_oozie_code_path, 'workflows' ),
},
lazy => 1,
);
option dump_xml_to_json => (
is => 'rw',
isa => IsDir,
format => 's',
doc => 'Specify a directory to convert and dump XML files in the workflow as JSON. This implies a dryrun.',
);
option hdfs_properties_file => (
is => 'rw',
isa => Str,
format => 's',
# i.e.: /share/oozie.properties
doc => 'The location of the optional properties file on HDFS',
);
#------------------------------------------------------------------------------#
has required_tt_files => (
is => 'ro',
isa => ArrayRef[Str],
default => sub {
return [qw(
coordinator_config_xml
ttree.cfg
workflow_global_xml_end
workflow_global_xml_start
workflow_parameters_xml_end
workflow_parameters_xml_start
workflow_sla_xml
workflow_xmlns
)],
},
);
has ttlib_base_dir => (
is => 'rw',
isa => IsDir,
lazy => 1,
default => sub {
my $self = shift;
my $first_guess = File::Spec->catdir( $self->local_oozie_code_path, 'lib' );
return $first_guess if App::Oozie::Types::Common->get_type(IsDir)->check( $first_guess );
(my $whereami = __FILE__) =~ s{ [.]pm \z }{}xms;
my $base = File::Spec->catdir( $whereami, 'ttlib' );
return $base if App::Oozie::Types::Common->get_type(IsDir)->check( $base );
die 'Failed to locate the ttlib path!';
},
);
# this will be used for dynamic includes/directives etc. It will be inside
# ttlib_base_dir
has ttlib_dynamic_base_dir_name => (
is => 'ro',
isa => Str,
default => sub { '.oozie-deploy-lib' },
);
has deploy_start => (
is => 'ro',
default => sub { time },
);
has max_node_name_len => (
is => 'ro',
isa => StrictNum, # range check?
lazy => 1,
default => sub {
shift->oozie->max_node_name_len;
},
);
has spec_queue_is_missing_message => (
is => 'rw',
isa => Str,
default => sub {
<<'NO_QUEUE_MSG';
The action configuration property "%s" is not
defined for these action(s):
%s
NO_QUEUE_MSG
},
);
has deployment_meta_file_name => (
is => 'rw',
default => sub { '.deployment' },
);
has configuration_files => (
is => 'rw',
# TODO: type needs fixing for coercion
# isa => ArrayRef[IsFile],
lazy => 1,
default => sub {
my $self = shift;
my $ttlib = $self->ttlib_base_dir;
return [
File::Spec->catfile( $ttlib, 'common.properties' ),
],
},
);
lib/App/Oozie/Deploy.pm view on Meta::CPAN
# Left in place for historial reasons.
# All clusters should be under Kerberos.
# Possible removal in a future version.
#
# unsafe, but needed when uploading with mapred's uid or hdfs dfs cannot see the files
chmod oct( DEFAULT_FILE_MODE ), $config->{base_dest};
}
my $success = $self->upload_to_hdfs;
$self->maybe_update_coordinators( $update_coord ) if @{ $update_coord };
if ($self->prune) {
$logger->info( '--prune is set, checking workflow directories for old files' );
for my $workflow ( @{ $workflows } ) {
$self->prune_path(
File::Spec->catdir(
$config->{hdfs_dest},
basename $workflow
)
);
}
}
$logger->info(
sprintf '%s Completed successfully in %s (took %s) %s',
$log_marker,
sprintf( '%s%s', $self->cluster_name, ( $dryrun ? ' (dryrun is set)' : EMPTY_STRING ) ),
duration_exact( time - $run_start_epoch ),
$log_marker,
);
return $success;
}
sub _verify_and_compile_all_workflows {
my $self = shift;
my $workflows = shift;
my $logger = $self->logger;
if ( ! is_arrayref $workflows || ! @{ $workflows } ) {
$logger->logdie( 'Please give one or several workflow name(s) on the command line (glob pattern accepted). Also see --help' );
}
$self->pre_verification( $workflows );
$self->verify_temp_dir;
if ( $self->gitfeatures
&& ! $self->gitforce
) {
$self->verify_git_tag;
}
my $wfs = $self->collect_names_to_deploy( $workflows );
my($total_errors, $validation_errors);
my @update_coord;
for my $workflow ( @{ $wfs } ) {
my($t_validation_errors, $t_total_errors, $dest, $cvc) = $self->process_workflow( $workflow );
push @update_coord, $self->guess_running_coordinator( $workflow, $cvc, $dest );
$total_errors += $t_validation_errors;
$validation_errors += $t_total_errors;
}
if ($total_errors) {
$logger->fatal( sprintf 'ERROR: %s errors were encountered during this run. Please fix it!', $total_errors );
$logger->fatal( 'The --force option has been disabled, as not enough really paid attention.' );
$logger->fatal( 'Fixing the errors is really your best and easiest option.' );
$logger->logdie( 'Failed.' );
}
return \@update_coord;
}
sub process_workflow {
my $self = shift;
my $workflow = shift;
my($t_validation_errors, $t_total_errors, $dest, $cvc) = $self->process_templates( $workflow );
return $t_validation_errors, $t_total_errors, $dest, $cvc;
}
sub pre_verification {
# stub
}
sub destination_path {
my $self = shift;
my $default = shift || $self->default_hdfs_destination;
return $default =~ m{ \A hdfs:// }xms
? $default
: File::Spec->canonpath( File::Spec->catdir( q{/}, $default ) )
;
}
sub __collect_internal_conf_hdfs {
my $self = shift;
my $logger = $self->logger;
my $file = $self->hdfs_properties_file;
return {} if ! $file; # not specified at all
$logger->debug( sprintf 'If exists, fetching from HDFS: %s', $file );
return {} if ! $self->_hdfs_exists_no_exception( $file );
return {
Config::General::ParseConfig(
-String => $self->hdfs->read( $file ),
)
};
}
sub __collect_internal_conf {
my $self = shift;
my $keep = $self->keep_deploy_path;
my $logger = $self->logger;
# This will load static properties that we will reuse as variables in the
# template and merge it with the common.properties file
my $properties = Config::Properties->new;
lib/App/Oozie/Deploy.pm view on Meta::CPAN
if ( ! close $FH ) {
$logger->warn(
sprintf 'Failed to close %s: %s',,
$file,
$!,
);
}
$config = {
%{ $config },
$properties->properties,
};
}
}
$config = {
%{ $config },
%{ $self->__collect_internal_conf_hdfs },
};
my $base_dest = File::Temp::tempdir(
CLEANUP => ! $keep,
DIR => resolve_tmp_dir(),
);
$config->{base_dest} = $base_dest;
$self->logger->info(
"Output directory: `$base_dest`.",
( $keep ? ' You have decided to keep it after completion' : EMPTY_STRING )
);
# Override the paramters only when they are not set.
# For example, these keys might be set with an HDFS
# config file, collected before this point.
#
$config->{workflowsBaseDir} //= $self->oozie_basepath;
$config->{clusterName} //= $self->cluster_name;
$config->{hdfs_dest} //= $self->destination_path( $config->{workflowsBaseDir} );
# If YARN, use a different property.
# The oozie syntax doesn't change
# (still uses the jobtracker property)
$config->{jobTracker} //= $config->{resourceManager} || $self->resource_manager;
$config->{nameNode} //= $self->template_namenode;
$config->{has_sla} = $self->sla;
$self->logger->info( sprintf 'Upload directory: %s', $self->destination_path );
return $config;
}
sub max_wf_xml_length {
my $self = shift;
my $ooz_admin = $self->oozie->admin('configuration');
my $conf_val = $ooz_admin->{'oozie.service.WorkflowAppService.WorkflowDefinitionMaxLength'};
return $conf_val
|| $self->logger->logdie( 'Unable to fetch the ooozie configuration WorkflowDefinitionMaxLength!' );
}
sub guess_running_coordinator {
state $is_running = { map { $_ => 1 } OOZIE_STATES_RUNNING };
my $self = shift;
my $workflow = shift;
my $cvc = shift;
my $dest = shift;
my $logger = $self->logger;
$logger->info( 'Probing for existing coordinators ...' );
my $local_base = $self->oozie_workflows_base;
(my $rel_path = $workflow) =~ s{ \A \Q$local_base\E [/]? }{}xms;
my $remote_path = File::Spec->catfile( $self->destination_path, $rel_path );
my $paths = $self->oozie
->active_job_paths(
coordinator => $self->destination_path
);
my @rv;
foreach my $path ( grep { $_ =~ m{ \Q$remote_path\E \b \z }xms } keys %{ $paths } ) {
my $e = $paths->{ $path };
if ( @{ $e } > 1 ) {
# TODO: multi path
}
foreach my $jobs ( @{ $e } ) {
foreach my $cid ( keys %{ $jobs } ) {
# multiple coordinators
my $job = $jobs->{ $cid };
next if ! $is_running->{ $job->{status} };
push @rv,
{
path => $path,
coord_id => $cid,
job => $job,
cvc => $cvc,
workflow => $workflow,
dest => $dest,
};
}
}
}
return @rv;
}
sub maybe_update_coordinators {
my $self = shift;
my $coords = shift;
for my $e ( @{ $coords } ) {
# stub: better override in a subclass
}
return;
}
sub _get_spec_validator {
my($self, $dest) = @_;
my @pass_through = qw(
email_validator
max_node_name_len
lib/App/Oozie/Deploy.pm view on Meta::CPAN
);
}
$hdfs->chmod( $dest, DEFAULT_DIR_MODE );
}
my $d_rule = File::Find::Rule->new->directory->maxdepth(1)->mindepth(1);
my @folders = $d_rule->in($sourceFolder);
foreach my $folder (@folders)
{
my $foldername = basename($folder);
my $dest = File::Spec->catfile($destFolder, $foldername);
$self->_copy_to_hdfs_with_webhdfs($folder, $dest)
}
return 1;
}
1;
__END__
=pod
=encoding UTF-8
=head1 NAME
App::Oozie::Deploy
=head1 VERSION
version 0.020
=head1 SYNOPSIS
use App::Oozie::Deploy;
App::Oozie::Deploy->new_with_options->run;
=head1 DESCRIPTION
This is an action/program in the Oozie Tooling.
=for Pod::Coverage BUILD
=head1 NAME
App::Oozie::Deploy - The program to deploy Oozie workflows.
=head1 Methods
=head2 collect_data_for_deployment_meta_file
=head2 collect_names_to_deploy
=head2 compile_templates
=head2 create_deployment_meta_file
=head2 destination_path
=head2 guess_running_coordinator
=head2 max_wf_xml_length
=head2 maybe_update_coordinators
=head2 pre_verification
=head2 process_templates
=head2 process_workflow
=head2 prune_path
=head2 run
=head2 upload_to_hdfs
=head2 validate_meta_file
=head2 verify_temp_dir
=head2 write_deployment_meta_file
=head1 Accessors
=head2 Overridable from cli
=head3 dump_xml_to_json
=head3 hdfs_dest
=head3 keep_deploy_path
=head3 oozie_workflows_base
=head3 prune
=head3 sla
=head3 write_ownership_to_workflow_xml
=head2 Overridable from sub-classes
=head3 configuration_files
=head3 deploy_start
=head3 deployment_meta_file_name
=head3 email_validator
=head3 internal_conf
=head3 max_node_name_len
=head3 process_coord_directive_varname
=head3 required_tt_files
=head3 spec_queue_is_missing_message
( run in 2.323 seconds using v1.01-cache-2.11-cpan-5a3173703d6 )