App-Oozie

 view release on metacpan or  search on metacpan

lib/App/Oozie/Update/Coordinator.pm  view on Meta::CPAN

        $current_coord_user,
        $current_xml,
        $oozie_build,
        $oozie_cdh_version,
        $oozie_version,
        $base_path,
        $meta_startTime,
        $meta_endTime,
        %job_properties
    );

    eval {
        my $oozie           = $self->oozie;
        my $job             = $oozie->job( $coord )       || die sprintf 'No configuration for the job: %s', $coord;
        $oozie_build        = $oozie->new->build_version  || die 'Failed to get the Oozie server version!';
        my @vtuple          = split m{ \Q-cdh\E }xms, $oozie_build;
        $oozie_version      = shift @vtuple               || die sprintf 'Unable to determine the Oozie server version from %s', $oozie_build;
        $oozie_cdh_version  = shift @vtuple               || die sprintf 'Unable to determine the Oozie server CDH version from %s', $oozie_build;
        $current_coord_user = $job->{user}                || die sprintf 'Failed to locate the user running %s', $coord;
        $current_xml        = $job->{conf}                || die sprintf 'No configuration for the job: %s', $coord;
        # If you extend the coordinator, then this data gets updated but the
        # XML config will retain the old and meaningless record. While
        # it should be fine for the startTime, it will be bogus for the endTime
        # and our shifting logic/workaround will not do anything and in fact
        # the server will respond with an "Error: E0803" even when you want
        # to update everything but the scheduling. For some reason XML conf
        # does not get updated.
        #
        $meta_startTime     = $job->{startTime}           || die sprintf 'No startTime set for the job: %s', $coord;
        $meta_endTime       = $job->{endTime}             || die sprintf 'No endTime set for the job: %s', $coord;
        my $path            = $job->{coordJobPath}        || die sprintf 'No coordJobPath defined for the job: %s', $coord; # shouldn't happen
        my $hdfs_dest       = $self->default_hdfs_destination;
        ($base_path         = $path) =~ s{ \A $hdfs_dest [/]? }{}xms;
        my $jp_hdfs_path    = catfile $path,      'job.properties';
        my $jp_local_path   = catfile $base_path, 'job.properties';

        my $jp;
        my $hdfs = $self->hdfs;
        if ( my $meta = $hdfs->exists( $jp_hdfs_path ) ) {
            $logger->info( sprintf 'job.properties exists on HDFS. Fetching %s', $jp_hdfs_path );
            $jp = $hdfs->read( $jp_hdfs_path );
        }
        elsif ( -e $jp_local_path ) {
            $logger->info( sprintf 'job.properties exists on local file system. Fetching %s', $jp_local_path );
            open my $FH, '<', $jp_local_path or die sprintf q{Can't read %s: %s}, $jp_local_path, $!;
            $jp =  do { local $/; <$FH> };
            if ( ! close $FH ) {
                $logger->warn(
                    sprintf 'Failed to close %s: %s',,
                                $jp_local_path,
                                $!,
                );
            }
        }
        else {
            my $uh_oh = sprintf <<'FYI', Cwd::getcwd, $base_path;

No job.properties file neither on hdfs nor local file system.
There are no parameter overrides to collect.

This program looks at relative paths to search for local files.

Your current working directory is %s and search path is %s.

If this is not the directory for the local application files, please change
to the proper location and try again.

FYI
            $logger->warn( $uh_oh );
        }

        %job_properties = $jp ? ParseConfig( -String => $jp ) : ();

        for my $name ( keys %job_properties ) {
            my $val = $job_properties{ $name};
            if ( is_ref $val ) {
                require Data::Dumper;
                my $d = Data::Dumper->new([ $val ], [ $name ]);
                $logger->logdie(
                    sprintf 'You seem to have a double definition in %s for %s as %s',
                                'job.properties',
                                $name,
                                $d->Dump,
                );
            }
        }

        1;
    } or do {
        my $eval_error = $@ || 'Zombie error';
        $logger->fatal(
            sprintf 'Could not get config for job %s: %s',
                        $coord,
                        $eval_error,
        );
        die 'Failed.';
    };

    if ( $self->verbose ) {
        $logger->debug( 'Start Current XML configuration' );
        $logger->debug( $current_xml );
        $logger->debug( 'End Current XML configuration' );
    }

    my $show_cmd_output;
    if (   $oozie_cdh_version        ge '5.8.0'
        && $self->effective_username ne $current_coord_user
    ) {
        $logger->warn(
            sprintf 'Current user `%s` is not the same as the coordinator user: `%s`. Will attempt to impersonate.',
                        $self->effective_username,
                        $current_coord_user,
        );
    }

    my %job_meta = (
        current_coord_user => $current_coord_user,
        current_xml_ref    => \$current_xml,
        startTime          => $meta_startTime,
        endTime            => $meta_endTime,
        show_cmd_output    => $show_cmd_output || 0,



( run in 2.411 seconds using v1.01-cache-2.11-cpan-e1769b4cff6 )