App-Oozie

 view release on metacpan or  search on metacpan

lib/App/Oozie/Deploy.pm  view on Meta::CPAN

    my $oozie_base_dir = $self->local_oozie_code_path;
    my $ttlib_base_dir = $self->ttlib_base_dir;
    my $verbose        = $self->verbose;
    my $is_file        = IsFile->library->get_type( IsFile );

    foreach my $file ( @{ $self->required_tt_files } ) {
        my $absolute_path = File::Spec->catfile( $ttlib_base_dir, $file );
        if ( $verbose ) {
            $logger->debug("Assert file: $absolute_path");
        }
        # assert_valid() does not display the error message, hence the manual check

        my $error = $is_file->validate( $absolute_path ) || next;
        $logger->logdie( sprintf 'required_tt_files(): %s', $error );
    }

    if ( $verbose ) {
        $logger->debug( join q{=}, $_, $self->$_ ) for qw(
            local_oozie_code_path
            ttlib_base_dir
        );
    }

    if ( $self->dump_xml_to_json && ! $self->dryrun ) {
        $self->logger->info( 'dump_xml_to_json is enabled without a dryrun. Enabling dryrun as well.' );
        $self->dryrun( 1 );
    }

    return;
}

sub run {
    my $self      = shift;
    my $workflows = shift;
    my $logger    = $self->logger;
    my $config    = $self->internal_conf;
    my $dryrun    = $self->dryrun;
    my $verbose   = $self->verbose;

    my $run_start_epoch = time;
    my $log_marker = q{#} x TERMINAL_INFO_LINE_LEN;

    $logger->info(
        sprintf '%s Starting deployment in %s%s %s',
                    $log_marker,
                    $self->cluster_name,
                    $verbose ? EMPTY_STRING : '. Enable --verbose to see the underlying commands',
                    $log_marker,
    );

    $self->log_versions if $verbose;

    my($update_coord) = $self->_verify_and_compile_all_workflows( $workflows );

    if (!$self->secure_cluster) {
        # Left in place for historial reasons.
        # All clusters should be under Kerberos.
        # Possible removal in a future version.
        #
        # unsafe, but needed when uploading with mapred's uid or hdfs dfs cannot see the files
        chmod oct( DEFAULT_FILE_MODE ), $config->{base_dest};
    }

    my $success = $self->upload_to_hdfs;

    $self->maybe_update_coordinators( $update_coord ) if @{ $update_coord };

    if ($self->prune) {
        $logger->info( '--prune is set, checking workflow directories for old files' );
        for my $workflow ( @{ $workflows } ) {
            $self->prune_path(
                File::Spec->catdir(
                    $config->{hdfs_dest},
                    basename $workflow
                )
            );
        }
    }

    $logger->info(
        sprintf '%s Completed successfully in %s (took %s) %s',
                    $log_marker,
                    sprintf( '%s%s', $self->cluster_name, ( $dryrun ? ' (dryrun is set)' : EMPTY_STRING ) ),
                    duration_exact( time - $run_start_epoch ),
                    $log_marker,
    );

    return $success;
}

sub _verify_and_compile_all_workflows {
    my $self = shift;
    my $workflows = shift;

    my $logger    = $self->logger;

    if ( ! is_arrayref $workflows || ! @{ $workflows } ) {
        $logger->logdie( 'Please give one or several workflow name(s) on the command line (glob pattern accepted). Also see --help' );
    }

    $self->pre_verification( $workflows );
    $self->verify_temp_dir;

    if (   $self->gitfeatures
        && ! $self->gitforce
    ) {
        $self->verify_git_tag;
    }

    my $wfs = $self->collect_names_to_deploy( $workflows );
    my($total_errors, $validation_errors);

    my @update_coord;
    for my $workflow ( @{ $wfs } ) {
        my($t_validation_errors, $t_total_errors, $dest, $cvc) =  $self->process_workflow( $workflow );
        push @update_coord, $self->guess_running_coordinator( $workflow, $cvc, $dest );
        $total_errors      += $t_validation_errors;
        $validation_errors += $t_total_errors;
    }

    if ($total_errors) {

lib/App/Oozie/Deploy.pm  view on Meta::CPAN

    my $hdfs = $self->hdfs;
    my $rv;

    eval {
        $rv = $hdfs->exists( $path );
        1;
    } or do {
        my $eval_error = $@ || 'Zombie error';
        if ( $self->verbose ) {
            $self->logger->debug(
                sprintf 'WebHDFS exists() failed with exception, however since this is a silent call, it is ignored: %s',
                            $eval_error,
            )
        }
    };

    return $rv;
}

sub _copy_to_hdfs_with_webhdfs {
    my $self         = shift;
    my $sourceFolder = shift;
    my $destFolder   = shift;

    my $hdfs         = $self->hdfs;
    my $logger       = $self->logger;
    my $verbose      = $self->verbose;

    $logger->info(
        sprintf 'copying from `%s` to `%s`',
                    $sourceFolder,
                    $destFolder,
    );

    if ( ! $self->_hdfs_exists_no_exception( $destFolder ) ) {
        if ( $verbose ) {
            $logger->debug(
                sprintf 'HDFS destination %s does not exist',
                            $destFolder,
            );
        }
        my(undef, @paths) = File::Spec->splitpath( $destFolder );
        my $remote_base;
        for my $chunk ( @paths ) {
            if ( $remote_base ) {
                $remote_base = File::Spec->catdir( $remote_base, $chunk);
            }
            else {
                $remote_base = $chunk;
            }
            if ( $self->_hdfs_exists_no_exception( $remote_base ) ) {
                next;
            }
            if ( $verbose ) {
                $logger->debug(
                    sprintf 'Attempting to mkdir HDFS destination %s',
                                $remote_base,
                );
            }
            $hdfs->mkdir( $remote_base );
            $hdfs->chmod( $remote_base, DEFAULT_DIR_MODE );
        }
        # since the above calls were silent, see if this throws anything
        if ( $hdfs->exists($destFolder) ) {
            if ( $verbose ) {
                $logger->debug(
                    sprintf 'HDFS destination %s exists',
                                $destFolder,
                );
            }
        }
    }
    else {
        if ( $verbose ) {
            $logger->debug(
                sprintf 'HDFS destination %s exists',
                            $destFolder,
            );
        }
    }
    my $f_rule = File::Find::Rule->new->file->maxdepth(1)->mindepth(1);

    my @files = $f_rule->in($sourceFolder);

    foreach my $file (@files)
    {
        my $filename = basename($file);
        my $dest = File::Spec->catfile($destFolder, $filename);
        my $filehandle = path( $file );
        my $data = $filehandle->slurp_raw;
        if($verbose){
            $logger->debug("Creating $dest");
        }
        $hdfs->touchz( $dest );
        if ( ! $hdfs->create(
                    $dest,
                    $data,
                    overwrite => 'true',
                )
        ) {
            $logger->logdie(
                sprintf 'Failed to create %s through WebHDFS',
                        $dest
            );
        }
        $hdfs->chmod( $dest, DEFAULT_DIR_MODE );
    }

    my $d_rule = File::Find::Rule->new->directory->maxdepth(1)->mindepth(1);
    my @folders = $d_rule->in($sourceFolder);

    foreach my $folder (@folders)
    {
        my $foldername = basename($folder);
        my $dest = File::Spec->catfile($destFolder, $foldername);
        $self->_copy_to_hdfs_with_webhdfs($folder, $dest)
    }
    return 1;
}

1;

__END__

=pod

=encoding UTF-8

=head1 NAME

App::Oozie::Deploy

=head1 VERSION

version 0.020

=head1 SYNOPSIS

    use App::Oozie::Deploy;
    App::Oozie::Deploy->new_with_options->run;

=head1 DESCRIPTION

This is an action/program in the Oozie Tooling.

=for Pod::Coverage BUILD

=head1 NAME

App::Oozie::Deploy - The program to deploy Oozie workflows.

=head1 Methods

=head2 collect_data_for_deployment_meta_file

=head2 collect_names_to_deploy

=head2 compile_templates

=head2 create_deployment_meta_file

=head2 destination_path

=head2 guess_running_coordinator

=head2 max_wf_xml_length



( run in 0.744 second using v1.01-cache-2.11-cpan-cdf2f3d4e48 )