App-Oozie

 view release on metacpan or  search on metacpan

lib/App/Oozie/Deploy.pm  view on Meta::CPAN

    short   => 'p',
    doc     => 'Prune obsolete files on HDFS',
);

option sla => (
    is      => 'rw',
    doc     => 'Enable SLA under Oozie?',
);

option oozie_workflows_base => (
    is      => 'rw',
    format  => 's',
    default => sub {
        my $self = shift;
        File::Spec->catdir( $self->local_oozie_code_path, 'workflows' ),
    },
    lazy   => 1,
);

option dump_xml_to_json => (
    is      => 'rw',
    isa     => IsDir,
    format  => 's',
    doc     => 'Specify a directory to convert and dump XML files in the workflow as JSON. This implies a dryrun.',
);

option hdfs_properties_file => (
    is      => 'rw',
    isa     => Str,
    format  => 's',
    # i.e.: /share/oozie.properties
    doc     => 'The location of the optional properties file on HDFS',
);


#------------------------------------------------------------------------------#

has required_tt_files => (
    is      => 'ro',
    isa     => ArrayRef[Str],
    default => sub {
        return [qw(
            coordinator_config_xml
            ttree.cfg
            workflow_global_xml_end
            workflow_global_xml_start
            workflow_parameters_xml_end
            workflow_parameters_xml_start
            workflow_sla_xml
            workflow_xmlns
        )],
    },
);

has ttlib_base_dir => (
    is       => 'rw',
    isa      => IsDir,
    lazy     => 1,
    default  => sub {
        my $self        = shift;
        my $first_guess = File::Spec->catdir( $self->local_oozie_code_path, 'lib' );
        return $first_guess if App::Oozie::Types::Common->get_type(IsDir)->check( $first_guess );
        (my $whereami = __FILE__) =~ s{ [.]pm \z }{}xms;
        my $base = File::Spec->catdir( $whereami, 'ttlib' );
        return $base if App::Oozie::Types::Common->get_type(IsDir)->check( $base );
        die 'Failed to locate the ttlib path!';
    },
);

# this will be used for dynamic includes/directives etc. It will be inside
# ttlib_base_dir
has ttlib_dynamic_base_dir_name => (
    is      => 'ro',
    isa     => Str,
    default => sub { '.oozie-deploy-lib' },
);

has deploy_start => (
    is      => 'ro',
    default => sub { time },
);

has max_node_name_len => (
    is      => 'ro',
    isa     => StrictNum, # range check?
    lazy    => 1,
    default => sub {
        shift->oozie->max_node_name_len;
    },
);

has spec_queue_is_missing_message => (
    is  => 'rw',
    isa => Str,
    default => sub {
        <<'NO_QUEUE_MSG';
The action configuration property "%s" is not
defined for these action(s):

%s
NO_QUEUE_MSG
    },
);

has deployment_meta_file_name => (
    is      => 'rw',
    default => sub { '.deployment' },
);

has configuration_files => (
    is      => 'rw',
    # TODO: type needs fixing for coercion
    # isa     => ArrayRef[IsFile],
    lazy    => 1,
    default => sub {
        my $self  = shift;
        my $ttlib = $self->ttlib_base_dir;
        return [
            File::Spec->catfile( $ttlib, 'common.properties' ),
        ],
    },
);

lib/App/Oozie/Deploy.pm  view on Meta::CPAN

        # Left in place for historial reasons.
        # All clusters should be under Kerberos.
        # Possible removal in a future version.
        #
        # unsafe, but needed when uploading with mapred's uid or hdfs dfs cannot see the files
        chmod oct( DEFAULT_FILE_MODE ), $config->{base_dest};
    }

    my $success = $self->upload_to_hdfs;

    $self->maybe_update_coordinators( $update_coord ) if @{ $update_coord };

    if ($self->prune) {
        $logger->info( '--prune is set, checking workflow directories for old files' );
        for my $workflow ( @{ $workflows } ) {
            $self->prune_path(
                File::Spec->catdir(
                    $config->{hdfs_dest},
                    basename $workflow
                )
            );
        }
    }

    $logger->info(
        sprintf '%s Completed successfully in %s (took %s) %s',
                    $log_marker,
                    sprintf( '%s%s', $self->cluster_name, ( $dryrun ? ' (dryrun is set)' : EMPTY_STRING ) ),
                    duration_exact( time - $run_start_epoch ),
                    $log_marker,
    );

    return $success;
}

sub _verify_and_compile_all_workflows {
    my $self = shift;
    my $workflows = shift;

    my $logger    = $self->logger;

    if ( ! is_arrayref $workflows || ! @{ $workflows } ) {
        $logger->logdie( 'Please give one or several workflow name(s) on the command line (glob pattern accepted). Also see --help' );
    }

    $self->pre_verification( $workflows );
    $self->verify_temp_dir;

    if (   $self->gitfeatures
        && ! $self->gitforce
    ) {
        $self->verify_git_tag;
    }

    my $wfs = $self->collect_names_to_deploy( $workflows );
    my($total_errors, $validation_errors);

    my @update_coord;
    for my $workflow ( @{ $wfs } ) {
        my($t_validation_errors, $t_total_errors, $dest, $cvc) =  $self->process_workflow( $workflow );
        push @update_coord, $self->guess_running_coordinator( $workflow, $cvc, $dest );
        $total_errors      += $t_validation_errors;
        $validation_errors += $t_total_errors;
    }

    if ($total_errors) {
        $logger->fatal( sprintf 'ERROR: %s errors were encountered during this run. Please fix it!', $total_errors );
        $logger->fatal( 'The --force option has been disabled, as not enough really paid attention.' );
        $logger->fatal( 'Fixing the errors is really your best and easiest option.' );
        $logger->logdie( 'Failed.' );
    }

    return \@update_coord;
}

sub process_workflow {
    my $self = shift;
    my $workflow = shift;
    my($t_validation_errors, $t_total_errors, $dest, $cvc) = $self->process_templates( $workflow );
    return $t_validation_errors, $t_total_errors, $dest, $cvc;
}

sub pre_verification {
    # stub
}

sub destination_path {
    my $self    = shift;
    my $default = shift || $self->default_hdfs_destination;
    return $default =~ m{ \A hdfs:// }xms
            ? $default
            : File::Spec->canonpath( File::Spec->catdir( q{/}, $default ) )
            ;
}

sub __collect_internal_conf_hdfs {
    my $self   = shift;
    my $logger = $self->logger;
    my $file   = $self->hdfs_properties_file;

    return {} if ! $file; # not specified at all

    $logger->debug( sprintf 'If exists, fetching from HDFS: %s', $file );

    return {} if ! $self->_hdfs_exists_no_exception( $file );

    return {
        Config::General::ParseConfig(
            -String => $self->hdfs->read( $file ),
        )
    };
}

sub __collect_internal_conf {
    my $self  = shift;
    my $keep  = $self->keep_deploy_path;
    my $logger = $self->logger;

    # This will load static properties that we will reuse as variables in the
    # template and merge it with the common.properties file
    my $properties = Config::Properties->new;

lib/App/Oozie/Deploy.pm  view on Meta::CPAN

            if ( ! close $FH ) {
                $logger->warn(
                    sprintf 'Failed to close %s: %s',,
                                $file,
                                $!,
                );
            }
            $config = {
                %{ $config },
                $properties->properties,
            };
        }
    }

    $config = {
        %{ $config },
        %{ $self->__collect_internal_conf_hdfs },
    };

    my $base_dest = File::Temp::tempdir(
                        CLEANUP => ! $keep,
                        DIR     => resolve_tmp_dir(),
                    );

    $config->{base_dest} = $base_dest;

    $self->logger->info(
        "Output directory: `$base_dest`.",
        ( $keep ? ' You have decided to keep it after completion' : EMPTY_STRING )
    );

    # Override the paramters only when they are not set.
    # For example, these keys might be set with an HDFS
    # config file, collected before this point.
    #
    $config->{workflowsBaseDir} //= $self->oozie_basepath;
    $config->{clusterName}      //= $self->cluster_name;
    $config->{hdfs_dest}        //= $self->destination_path( $config->{workflowsBaseDir} );

    # If YARN, use a different property.
    # The oozie syntax doesn't change
    # (still uses the jobtracker property)
    $config->{jobTracker}       //= $config->{resourceManager} || $self->resource_manager;
    $config->{nameNode}         //= $self->template_namenode;
    $config->{has_sla}            = $self->sla;

    $self->logger->info( sprintf 'Upload directory: %s', $self->destination_path );

    return $config;
}

sub max_wf_xml_length {
    my $self      = shift;
    my $ooz_admin = $self->oozie->admin('configuration');
    my $conf_val  = $ooz_admin->{'oozie.service.WorkflowAppService.WorkflowDefinitionMaxLength'};

    return $conf_val
            || $self->logger->logdie( 'Unable to fetch the ooozie configuration WorkflowDefinitionMaxLength!' );
}

sub guess_running_coordinator {
    state $is_running = { map { $_ => 1 } OOZIE_STATES_RUNNING };

    my $self     = shift;
    my $workflow = shift;
    my $cvc      = shift;
    my $dest     = shift;

    my $logger = $self->logger;
    $logger->info( 'Probing for existing coordinators ...' );

    my $local_base  = $self->oozie_workflows_base;
    (my $rel_path   = $workflow) =~ s{ \A \Q$local_base\E [/]? }{}xms;
    my $remote_path = File::Spec->catfile( $self->destination_path, $rel_path );
    my $paths       = $self->oozie
                          ->active_job_paths(
                               coordinator => $self->destination_path
                           );

    my @rv;
    foreach my $path ( grep { $_ =~ m{ \Q$remote_path\E \b \z }xms } keys %{ $paths } ) {
        my $e = $paths->{ $path };
        if ( @{ $e } > 1 ) {
            # TODO: multi path
        }
        foreach my $jobs ( @{ $e } ) {
            foreach my $cid ( keys %{ $jobs } ) {
                # multiple coordinators
                my $job = $jobs->{ $cid };
                next if ! $is_running->{ $job->{status} };
                push @rv,
                     {
                        path     => $path,
                        coord_id => $cid,
                        job      => $job,
                        cvc      => $cvc,
                        workflow => $workflow,
                        dest     => $dest,
                    };
            }
        }
    }

    return @rv;
}

sub maybe_update_coordinators {
    my $self   = shift;
    my $coords = shift;
    for my $e ( @{ $coords } ) {
        # stub: better override in a subclass
    }
    return;
}

sub _get_spec_validator {
    my($self, $dest) = @_;

    my @pass_through = qw(
        email_validator
        max_node_name_len

lib/App/Oozie/Deploy.pm  view on Meta::CPAN

            );
        }
        $hdfs->chmod( $dest, DEFAULT_DIR_MODE );
    }

    my $d_rule = File::Find::Rule->new->directory->maxdepth(1)->mindepth(1);
    my @folders = $d_rule->in($sourceFolder);

    foreach my $folder (@folders)
    {
        my $foldername = basename($folder);
        my $dest = File::Spec->catfile($destFolder, $foldername);
        $self->_copy_to_hdfs_with_webhdfs($folder, $dest)
    }
    return 1;
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

App::Oozie::Deploy

=head1 VERSION

version 0.020

=head1 SYNOPSIS

    use App::Oozie::Deploy;
    App::Oozie::Deploy->new_with_options->run;

=head1 DESCRIPTION

This is an action/program in the Oozie Tooling.

=for Pod::Coverage BUILD

=head1 NAME

App::Oozie::Deploy - The program to deploy Oozie workflows.

=head1 Methods

=head2 collect_data_for_deployment_meta_file

=head2 collect_names_to_deploy

=head2 compile_templates

=head2 create_deployment_meta_file

=head2 destination_path

=head2 guess_running_coordinator

=head2 max_wf_xml_length

=head2 maybe_update_coordinators

=head2 pre_verification

=head2 process_templates

=head2 process_workflow

=head2 prune_path

=head2 run

=head2 upload_to_hdfs

=head2 validate_meta_file

=head2 verify_temp_dir

=head2 write_deployment_meta_file

=head1 Accessors

=head2 Overridable from cli

=head3 dump_xml_to_json

=head3 hdfs_dest

=head3 keep_deploy_path

=head3 oozie_workflows_base

=head3 prune

=head3 sla

=head3 write_ownership_to_workflow_xml

=head2 Overridable from sub-classes

=head3 configuration_files

=head3 deploy_start

=head3 deployment_meta_file_name

=head3 email_validator

=head3 internal_conf

=head3 max_node_name_len

=head3 process_coord_directive_varname

=head3 required_tt_files

=head3 spec_queue_is_missing_message



( run in 2.323 seconds using v1.01-cache-2.11-cpan-5a3173703d6 )