App-Oozie

 view release on metacpan or  search on metacpan

lib/App/Oozie/Deploy.pm  view on Meta::CPAN

        },
    },
);

sub BUILD {
    my ($self, $args)  = @_;

    my $logger         = $self->logger;
    my $oozie_base_dir = $self->local_oozie_code_path;
    my $ttlib_base_dir = $self->ttlib_base_dir;
    my $verbose        = $self->verbose;
    my $is_file        = IsFile->library->get_type( IsFile );

    foreach my $file ( @{ $self->required_tt_files } ) {
        my $absolute_path = File::Spec->catfile( $ttlib_base_dir, $file );
        if ( $verbose ) {
            $logger->debug("Assert file: $absolute_path");
        }
        # assert_valid() does not display the error message, hence the manual check

        my $error = $is_file->validate( $absolute_path ) || next;
        $logger->logdie( sprintf 'required_tt_files(): %s', $error );
    }

    if ( $verbose ) {
        $logger->debug( join q{=}, $_, $self->$_ ) for qw(
            local_oozie_code_path
            ttlib_base_dir
        );
    }

    if ( $self->dump_xml_to_json && ! $self->dryrun ) {
        $self->logger->info( 'dump_xml_to_json is enabled without a dryrun. Enabling dryrun as well.' );
        $self->dryrun( 1 );
    }

    return;
}

sub run {
    my $self      = shift;
    my $workflows = shift;
    my $logger    = $self->logger;
    my $config    = $self->internal_conf;
    my $dryrun    = $self->dryrun;
    my $verbose   = $self->verbose;

    my $run_start_epoch = time;
    my $log_marker = q{#} x TERMINAL_INFO_LINE_LEN;

    $logger->info(
        sprintf '%s Starting deployment in %s%s %s',
                    $log_marker,
                    $self->cluster_name,
                    $verbose ? EMPTY_STRING : '. Enable --verbose to see the underlying commands',
                    $log_marker,
    );

    $self->log_versions if $verbose;

    my($update_coord) = $self->_verify_and_compile_all_workflows( $workflows );

    if (!$self->secure_cluster) {
        # Left in place for historial reasons.
        # All clusters should be under Kerberos.
        # Possible removal in a future version.
        #
        # unsafe, but needed when uploading with mapred's uid or hdfs dfs cannot see the files
        chmod oct( DEFAULT_FILE_MODE ), $config->{base_dest};
    }

    my $success = $self->upload_to_hdfs;

    $self->maybe_update_coordinators( $update_coord ) if @{ $update_coord };

    if ($self->prune) {
        $logger->info( '--prune is set, checking workflow directories for old files' );
        for my $workflow ( @{ $workflows } ) {
            $self->prune_path(
                File::Spec->catdir(
                    $config->{hdfs_dest},
                    basename $workflow
                )
            );
        }
    }

    $logger->info(
        sprintf '%s Completed successfully in %s (took %s) %s',
                    $log_marker,
                    sprintf( '%s%s', $self->cluster_name, ( $dryrun ? ' (dryrun is set)' : EMPTY_STRING ) ),
                    duration_exact( time - $run_start_epoch ),
                    $log_marker,
    );

    return $success;
}

sub _verify_and_compile_all_workflows {
    my $self = shift;
    my $workflows = shift;

    my $logger    = $self->logger;

    if ( ! is_arrayref $workflows || ! @{ $workflows } ) {
        $logger->logdie( 'Please give one or several workflow name(s) on the command line (glob pattern accepted). Also see --help' );
    }

    $self->pre_verification( $workflows );
    $self->verify_temp_dir;

    if (   $self->gitfeatures
        && ! $self->gitforce
    ) {
        $self->verify_git_tag;
    }

    my $wfs = $self->collect_names_to_deploy( $workflows );
    my($total_errors, $validation_errors);

    my @update_coord;
    for my $workflow ( @{ $wfs } ) {
        my($t_validation_errors, $t_total_errors, $dest, $cvc) =  $self->process_workflow( $workflow );
        push @update_coord, $self->guess_running_coordinator( $workflow, $cvc, $dest );
        $total_errors      += $t_validation_errors;
        $validation_errors += $t_total_errors;
    }

    if ($total_errors) {
        $logger->fatal( sprintf 'ERROR: %s errors were encountered during this run. Please fix it!', $total_errors );
        $logger->fatal( 'The --force option has been disabled, as not enough really paid attention.' );
        $logger->fatal( 'Fixing the errors is really your best and easiest option.' );
        $logger->logdie( 'Failed.' );
    }

    return \@update_coord;
}

sub process_workflow {
    my $self = shift;
    my $workflow = shift;
    my($t_validation_errors, $t_total_errors, $dest, $cvc) = $self->process_templates( $workflow );
    return $t_validation_errors, $t_total_errors, $dest, $cvc;
}

sub pre_verification {
    # stub
}

sub destination_path {
    my $self    = shift;
    my $default = shift || $self->default_hdfs_destination;
    return $default =~ m{ \A hdfs:// }xms
            ? $default
            : File::Spec->canonpath( File::Spec->catdir( q{/}, $default ) )
            ;
}

sub __collect_internal_conf_hdfs {
    my $self   = shift;
    my $logger = $self->logger;
    my $file   = $self->hdfs_properties_file;

    return {} if ! $file; # not specified at all

    $logger->debug( sprintf 'If exists, fetching from HDFS: %s', $file );

    return {} if ! $self->_hdfs_exists_no_exception( $file );

    return {
        Config::General::ParseConfig(
            -String => $self->hdfs->read( $file ),
        )
    };
}

lib/App/Oozie/Deploy.pm  view on Meta::CPAN

            ttlib_base_dir
            ttlib_dynamic_base_dir_name
            verbose
            write_ownership_to_workflow_xml
        )
    ];

    my $t = App::Oozie::Deploy::Template->new(
                map { $_ => $self->$_ }
                    @{ $pass_through }
            );

    my($template_validation_errors,
       $template_total_errors,
       $dest,
    ) = $t->compile( $workflow );

    my $cvc = $t->coordinator_directive_var_cache;

    ${ $validation_errors_ref } += $template_validation_errors;
    ${ $total_errors_ref }      += $template_total_errors;

    return $dest, $cvc;
}

sub process_templates {
    my $self = shift;
    my $workflow = shift || die 'No workflow path specified!';

    if ( ! -d $workflow ) {
        die sprintf 'The workflow path %s either does not exist or not a directory',
                    $workflow,
        ;
    }

    my($validation_errors, $total_errors);

    my($dest, $cvc) = $self->compile_templates(
                            $workflow,
                            \$validation_errors,
                            \$total_errors,
                        );

    if ( $self->write_ownership_to_workflow_xml ) {
        $self->validate_meta_file(
            File::Spec->catfile( $workflow, $self->meta->file ),
            \$validation_errors,
            \$total_errors,
            {},
        );
    }

    my $sv = $self->_get_spec_validator( $dest );

    $self->__maybe_dump_xml_to_json(
        $sv,
        \$validation_errors,
        \$total_errors,
    ) if $self->dump_xml_to_json;

    my($spec_validation_errors, $spec_total_errors) = $sv->verify( $workflow );
    $validation_errors += $spec_validation_errors;
    $total_errors      += $spec_total_errors;

    $self->create_deployment_meta_file( $dest, $workflow, $total_errors );

    if ( $validation_errors ) {
        $self->logger->error( 'Oozie deployment validation status: !!!!! FAILED !!!!!' );
    }
    else {
        $self->logger->info( 'Oozie deployment validation status: OK' );
    }

    return $validation_errors, $total_errors, $dest, $cvc;
}

sub validate_meta_file {
    my $self = shift;
    my $file = shift;

    $self->logger->info( sprintf 'Extra validation for %s', $file );

    return;
}

sub verify_temp_dir {
    my $self         = shift;
    my $user_setting = $ENV{TMPDIR} || return;
    my $logger       = $self->logger;

    # The path needs to be readable by mapred in order the deploy to be successful
    # Some users have this set in their environment to paths lacking relevant
    # permissions leading to failures.
    #
    # If the path is bogus, then by removing the setting locally in here
    # will lead the temporary directory to be created inside "/tmp" by default.
    #
    # Otherwise it can still be altered to elsewhere by changing the
    # environment by the users.
    #

    my $remove;

    if ( ! -d $user_setting ) {
        $logger->warn( sprintf q{You have TMPDIR=%s but it doesn't exist! I will ignore/remove that setting!}, $user_setting );
        $remove = 1;
    }
    else {
        my $mode       = (stat $user_setting)[STAT_MODE];
        my $group_read = ( $mode & S_IRGRP ) >> MODE_BITSHIFT_READ;
        my $other_read =   $mode & S_IROTH;

        if ( ! $group_read || ! $other_read ) {
            $logger->warn(
                sprintf q{You have TMPDIR=%s and it is not group/other readable (mode=%04o)! I will ignore/remove that setting!},
                             $user_setting,
                             S_IMODE( $mode ),
            );
            $remove = 1;
        }
    }

    delete $ENV{TMPDIR} if $remove;

    return;
}

sub collect_names_to_deploy {
    my $self  = shift;
    my $names = shift || die 'No workflow names were specified!';

    my $owf_base = $self->oozie_workflows_base;
    my $logger   = $self->logger;
    my $verbose  = $self->verbose;

    if ( ! is_arrayref $names ) {
        die 'Workflow names need to be specified as an arrayref';
    }

    my(@firstLevelMatchingPatterns, @secondLevelMatchingPatterns);

    my @workflow = map { trim_slashes( $_ ) } @{ $names };
    my $workflowPatternCount = @workflow;

    for my $w (@workflow) {
        my $separators = () = $w =~ m{ [/] }xmsg;

lib/App/Oozie/Deploy.pm  view on Meta::CPAN


=pod

=encoding UTF-8

=head1 NAME

App::Oozie::Deploy

=head1 VERSION

version 0.020

=head1 SYNOPSIS

    use App::Oozie::Deploy;
    App::Oozie::Deploy->new_with_options->run;

=head1 DESCRIPTION

This is an action/program in the Oozie Tooling.

=for Pod::Coverage BUILD

=head1 NAME

App::Oozie::Deploy - The program to deploy Oozie workflows.

=head1 Methods

=head2 collect_data_for_deployment_meta_file

=head2 collect_names_to_deploy

=head2 compile_templates

=head2 create_deployment_meta_file

=head2 destination_path

=head2 guess_running_coordinator

=head2 max_wf_xml_length

=head2 maybe_update_coordinators

=head2 pre_verification

=head2 process_templates

=head2 process_workflow

=head2 prune_path

=head2 run

=head2 upload_to_hdfs

=head2 validate_meta_file

=head2 verify_temp_dir

=head2 write_deployment_meta_file

=head1 Accessors

=head2 Overridable from cli

=head3 dump_xml_to_json

=head3 hdfs_dest

=head3 keep_deploy_path

=head3 oozie_workflows_base

=head3 prune

=head3 sla

=head3 write_ownership_to_workflow_xml

=head2 Overridable from sub-classes

=head3 configuration_files

=head3 deploy_start

=head3 deployment_meta_file_name

=head3 email_validator

=head3 internal_conf

=head3 max_node_name_len

=head3 process_coord_directive_varname

=head3 required_tt_files

=head3 spec_queue_is_missing_message

=head3 ttlib_base_dir

=head3 ttlib_dynamic_base_dir_name

=head1 SEE ALSO

L<App::Oozie>.

=head1 AUTHORS

=over 4

=item *

David Morel

=item *

Burak Gursoy



( run in 1.115 second using v1.01-cache-2.11-cpan-e1769b4cff6 )