App-Oozie

 view release on metacpan or  search on metacpan

lib/App/Oozie/Deploy.pm  view on Meta::CPAN


has email_validator => (
    is       => 'rw',
    default  => sub {
        my $self = shift;
        sub {
            my $self   = shift;
            my $emails = shift || do {
                $self->logger->warn( 'No email was set!' );
                return;
            };
            my @splits = map s/\+.+?@/@/r, map s/^\s+|\s+$//gr, split q{,}, $emails; ## no critic (ProhibitStringySplit,RequireDotMatchAnything,RequireExtendedFormatting,RequireLineBoundaryMatching,ProhibitEscapedMetacharacters)
            my @invalids = grep { ! Email::Valid->address( $_ ) } @splits;
            return 1 if ! @invalids;
            for my $bogus ( @invalids ) {
                $self->logger->warn(
                    sprintf 'FIXME !!! errorEmailTo parameter in workflow.xml is not set to a proper address: %s',
                            $bogus,
                );
            }
            return;
        },
    },
    isa => CodeRef,
    lazy => 1,
);

has internal_conf => (
    is      => 'ro',
    builder => '__collect_internal_conf',
    lazy    => 1,
);

has process_coord_directive_varname => (
    is      => 'rw',
    isa     => CodeRef,
    default => sub {
        sub {
            my $name = shift;
            return $name;
        },
    },
);

sub BUILD {
    my ($self, $args)  = @_;

    my $logger         = $self->logger;
    my $oozie_base_dir = $self->local_oozie_code_path;
    my $ttlib_base_dir = $self->ttlib_base_dir;
    my $verbose        = $self->verbose;
    my $is_file        = IsFile->library->get_type( IsFile );

    foreach my $file ( @{ $self->required_tt_files } ) {
        my $absolute_path = File::Spec->catfile( $ttlib_base_dir, $file );
        if ( $verbose ) {
            $logger->debug("Assert file: $absolute_path");
        }
        # assert_valid() does not display the error message, hence the manual check

        my $error = $is_file->validate( $absolute_path ) || next;
        $logger->logdie( sprintf 'required_tt_files(): %s', $error );
    }

    if ( $verbose ) {
        $logger->debug( join q{=}, $_, $self->$_ ) for qw(
            local_oozie_code_path
            ttlib_base_dir
        );
    }

    if ( $self->dump_xml_to_json && ! $self->dryrun ) {
        $self->logger->info( 'dump_xml_to_json is enabled without a dryrun. Enabling dryrun as well.' );
        $self->dryrun( 1 );
    }

    return;
}

sub run {
    my $self      = shift;
    my $workflows = shift;
    my $logger    = $self->logger;
    my $config    = $self->internal_conf;
    my $dryrun    = $self->dryrun;
    my $verbose   = $self->verbose;

    my $run_start_epoch = time;
    my $log_marker = q{#} x TERMINAL_INFO_LINE_LEN;

    $logger->info(
        sprintf '%s Starting deployment in %s%s %s',
                    $log_marker,
                    $self->cluster_name,
                    $verbose ? EMPTY_STRING : '. Enable --verbose to see the underlying commands',
                    $log_marker,
    );

    $self->log_versions if $verbose;

    my($update_coord) = $self->_verify_and_compile_all_workflows( $workflows );

    if (!$self->secure_cluster) {
        # Left in place for historial reasons.
        # All clusters should be under Kerberos.
        # Possible removal in a future version.
        #
        # unsafe, but needed when uploading with mapred's uid or hdfs dfs cannot see the files
        chmod oct( DEFAULT_FILE_MODE ), $config->{base_dest};
    }

    my $success = $self->upload_to_hdfs;

    $self->maybe_update_coordinators( $update_coord ) if @{ $update_coord };

    if ($self->prune) {
        $logger->info( '--prune is set, checking workflow directories for old files' );
        for my $workflow ( @{ $workflows } ) {
            $self->prune_path(
                File::Spec->catdir(
                    $config->{hdfs_dest},

lib/App/Oozie/Deploy.pm  view on Meta::CPAN

                        path     => $path,
                        coord_id => $cid,
                        job      => $job,
                        cvc      => $cvc,
                        workflow => $workflow,
                        dest     => $dest,
                    };
            }
        }
    }

    return @rv;
}

sub maybe_update_coordinators {
    my $self   = shift;
    my $coords = shift;
    for my $e ( @{ $coords } ) {
        # stub: better override in a subclass
    }
    return;
}

sub _get_spec_validator {
    my($self, $dest) = @_;

    my @pass_through = qw(
        email_validator
        max_node_name_len
        max_wf_xml_length
        oozie_cli
        oozie_client_jar
        oozie_uri
        spec_queue_is_missing_message
        timeout
        verbose
    );

    return App::Oozie::Deploy::Validate::Spec->new(
                ( map { $_ => $self->$_ } @pass_through ),
                local_path => $dest,
            );
}

sub __maybe_dump_xml_to_json {
    my $self      = shift;
    my $dump_path = $self->dump_xml_to_json || return;
    my $logger    = $self->logger;

    require JSON;

    my $sv                    = shift || $logger->logdie( 'Spec validator not specified!' );
    my $validation_errors_ref = shift;
    my $total_errors_ref      = shift;

    for my $xml_file ( $sv->local_xml_files ) {
        my $parsed = $sv->maybe_parse_xml( $xml_file );

        if ( my $error = $parsed->{error} ) {
            $logger->fatal(
                sprintf q{We can't validate %s since parsing failed: %s},
                            $parsed->{relative_file_name},
                            $error,
            );
            ${ $validation_errors_ref }++;
            ${ $total_errors_ref }++;
            next; #we don't even have valid XML file at this point, so just skip it
        };

        $logger->info( sprintf 'Dumping xml to json within %s', $dump_path );
        my $json_filename = File::Spec->catfile(
            $dump_path,
            File::Basename::basename($xml_file, '.xml') . '.json'
        );
        File::Path::make_path( $dump_path );
        open my $JSON_FH, '>', $json_filename or $logger->logdie( sprintf 'Failed to create %s: %s', $json_filename, $! );
        print $JSON_FH JSON->new->pretty->encode( $parsed->{xml_in} );
        if ( ! close $JSON_FH ) {
            $logger->warn(
                sprintf 'Failed to close %s: %s',,
                            $json_filename,
                            $!,
            );
        }
    }

    return;
}

sub compile_templates {
    my $self                  = shift;
    my $workflow              = shift;
    my $validation_errors_ref = shift;
    my $total_errors_ref      = shift;

    if ( ! -d $workflow ) {
        die sprintf 'The workflow path `%s` either does not exist or not a directory',
                    $workflow;
    }

    state $pass_through = [
        qw(
            dryrun
            effective_username
            internal_conf
            oozie_workflows_base
            process_coord_directive_varname
            timeout
            ttlib_base_dir
            ttlib_dynamic_base_dir_name
            verbose
            write_ownership_to_workflow_xml
        )
    ];

    my $t = App::Oozie::Deploy::Template->new(
                map { $_ => $self->$_ }
                    @{ $pass_through }
            );

    my($template_validation_errors,
       $template_total_errors,
       $dest,
    ) = $t->compile( $workflow );

    my $cvc = $t->coordinator_directive_var_cache;

    ${ $validation_errors_ref } += $template_validation_errors;
    ${ $total_errors_ref }      += $template_total_errors;

    return $dest, $cvc;
}

sub process_templates {
    my $self = shift;
    my $workflow = shift || die 'No workflow path specified!';

    if ( ! -d $workflow ) {
        die sprintf 'The workflow path %s either does not exist or not a directory',
                    $workflow,
        ;
    }

    my($validation_errors, $total_errors);

    my($dest, $cvc) = $self->compile_templates(
                            $workflow,
                            \$validation_errors,
                            \$total_errors,
                        );

    if ( $self->write_ownership_to_workflow_xml ) {
        $self->validate_meta_file(
            File::Spec->catfile( $workflow, $self->meta->file ),
            \$validation_errors,
            \$total_errors,
            {},
        );
    }

    my $sv = $self->_get_spec_validator( $dest );

    $self->__maybe_dump_xml_to_json(
        $sv,
        \$validation_errors,
        \$total_errors,
    ) if $self->dump_xml_to_json;

    my($spec_validation_errors, $spec_total_errors) = $sv->verify( $workflow );
    $validation_errors += $spec_validation_errors;
    $total_errors      += $spec_total_errors;

    $self->create_deployment_meta_file( $dest, $workflow, $total_errors );

    if ( $validation_errors ) {
        $self->logger->error( 'Oozie deployment validation status: !!!!! FAILED !!!!!' );
    }
    else {
        $self->logger->info( 'Oozie deployment validation status: OK' );
    }

    return $validation_errors, $total_errors, $dest, $cvc;
}

sub validate_meta_file {
    my $self = shift;
    my $file = shift;

    $self->logger->info( sprintf 'Extra validation for %s', $file );

    return;
}

sub verify_temp_dir {
    my $self         = shift;
    my $user_setting = $ENV{TMPDIR} || return;
    my $logger       = $self->logger;

    # The path needs to be readable by mapred in order the deploy to be successful
    # Some users have this set in their environment to paths lacking relevant
    # permissions leading to failures.
    #
    # If the path is bogus, then by removing the setting locally in here
    # will lead the temporary directory to be created inside "/tmp" by default.
    #
    # Otherwise it can still be altered to elsewhere by changing the
    # environment by the users.
    #

    my $remove;

    if ( ! -d $user_setting ) {
        $logger->warn( sprintf q{You have TMPDIR=%s but it doesn't exist! I will ignore/remove that setting!}, $user_setting );
        $remove = 1;
    }
    else {
        my $mode       = (stat $user_setting)[STAT_MODE];
        my $group_read = ( $mode & S_IRGRP ) >> MODE_BITSHIFT_READ;
        my $other_read =   $mode & S_IROTH;

        if ( ! $group_read || ! $other_read ) {
            $logger->warn(
                sprintf q{You have TMPDIR=%s and it is not group/other readable (mode=%04o)! I will ignore/remove that setting!},
                             $user_setting,
                             S_IMODE( $mode ),
            );
            $remove = 1;
        }
    }

    delete $ENV{TMPDIR} if $remove;

    return;
}

sub collect_names_to_deploy {
    my $self  = shift;
    my $names = shift || die 'No workflow names were specified!';

    my $owf_base = $self->oozie_workflows_base;
    my $logger   = $self->logger;
    my $verbose  = $self->verbose;

    if ( ! is_arrayref $names ) {
        die 'Workflow names need to be specified as an arrayref';

lib/App/Oozie/Deploy.pm  view on Meta::CPAN


__END__

=pod

=encoding UTF-8

=head1 NAME

App::Oozie::Deploy

=head1 VERSION

version 0.020

=head1 SYNOPSIS

    use App::Oozie::Deploy;
    App::Oozie::Deploy->new_with_options->run;

=head1 DESCRIPTION

This is an action/program in the Oozie Tooling.

=for Pod::Coverage BUILD

=head1 NAME

App::Oozie::Deploy - The program to deploy Oozie workflows.

=head1 Methods

=head2 collect_data_for_deployment_meta_file

=head2 collect_names_to_deploy

=head2 compile_templates

=head2 create_deployment_meta_file

=head2 destination_path

=head2 guess_running_coordinator

=head2 max_wf_xml_length

=head2 maybe_update_coordinators

=head2 pre_verification

=head2 process_templates

=head2 process_workflow

=head2 prune_path

=head2 run

=head2 upload_to_hdfs

=head2 validate_meta_file

=head2 verify_temp_dir

=head2 write_deployment_meta_file

=head1 Accessors

=head2 Overridable from cli

=head3 dump_xml_to_json

=head3 hdfs_dest

=head3 keep_deploy_path

=head3 oozie_workflows_base

=head3 prune

=head3 sla

=head3 write_ownership_to_workflow_xml

=head2 Overridable from sub-classes

=head3 configuration_files

=head3 deploy_start

=head3 deployment_meta_file_name

=head3 email_validator

=head3 internal_conf

=head3 max_node_name_len

=head3 process_coord_directive_varname

=head3 required_tt_files

=head3 spec_queue_is_missing_message

=head3 ttlib_base_dir

=head3 ttlib_dynamic_base_dir_name

=head1 SEE ALSO

L<App::Oozie>.

=head1 AUTHORS

=over 4

=item *

David Morel

=item *



( run in 0.651 second using v1.01-cache-2.11-cpan-0bb4e1dffa6 )