App-Oozie

 view release on metacpan or  search on metacpan

lib/App/Oozie/Update/Coordinator.pm  view on Meta::CPAN


    TRY:
    for my $try ( 1..$max_retry ) {
        ($twig, $state) = $self->_modify_xml(
                                $job_meta->{current_xml_ref},
                                $job_properties,
                                $state,
                                $job_meta->{startTime},
                                $job_meta->{endTime},
                            );

        my $command = [
            $self->oozie_cli,
            '-Doozie.auth.token.cache=false',
            '-Duser.name=' . $job_meta->{current_coord_user},
            job => -update => $self->coord,
                   -config => $self->_dump_twig_to_temp_file( $twig ),
            -oozie => $self->oozie_uri,
            ($self->doas ? (-doas => $self->doas) : ()), #impersonation
        ];

        push @{ $command }, '-dryrun' if $self->dryrun;

        $logger->info(
            sprintf 'Updating the coordinator (%s) attempt: %s',
                        $self->coord,
                        $try,
        );

        $success = IPC::Cmd::run(
                        buffer  => \my $out,
                        command => $command,
                        timeout => $self->timeout,
                        verbose => $self->verbose || $job_meta->{show_cmd_output},
                    );

        $last_out = $out;

        if ( ! $success ) {
            if ( $out ) {
                if ( $out =~ m{ \QStart time can't be changed\E }xms ) {
                    $state->{fix_starttime} = 1;
                }

                if ( $out =~ m{ \QEnd time can't be changed\E }xms ) {
                    $state->{fix_endtime} = 1;
                }
            }
            else {
                $logger->warn(
                    sprintf 'Coordinator %s update failed (%s): %s',
                                $self->coord,
                                $self->dryrun ? ' (dryrun)' : EMPTY_STRING,
                                $out // '[no output]',
                );
            }
            next TRY;
        }

        $logger->info(
            sprintf 'Coordinator %s updated%s',
                        $self->coord,
                        $self->dryrun ? ' (dryrun)' : EMPTY_STRING,
        );

        last TRY;
    }

    if ( ! $success ) {
        $logger->fatal(
            sprintf 'Coordinator %s was NOT updated%s.',
                        $self->coord,
                        $self->dryrun ? ' (dryrun)' : EMPTY_STRING,
        );
        if ( $last_out ) {
            $logger->fatal( $last_out );
            if ( $last_out =~ m{ \QFrequency can't be changed\E }xms ) {
                $logger->fatal('Your running coordinator and the local coordinator.xml seems to have out of sync frequency settings. Please update coordinator.xml before continuing to reflect the scheduled job settings.');
            }
        }
    }

    return $success;
}

sub collect_current_conf {
    my $self   = shift;
    my $logger = $self->logger;
    my $coord  = $self->coord;

    my(
        $current_coord_user,
        $current_xml,
        $oozie_build,
        $oozie_cdh_version,
        $oozie_version,
        $base_path,
        $meta_startTime,
        $meta_endTime,
        %job_properties
    );

    eval {
        my $oozie           = $self->oozie;
        my $job             = $oozie->job( $coord )       || die sprintf 'No configuration for the job: %s', $coord;
        $oozie_build        = $oozie->new->build_version  || die 'Failed to get the Oozie server version!';
        my @vtuple          = split m{ \Q-cdh\E }xms, $oozie_build;
        $oozie_version      = shift @vtuple               || die sprintf 'Unable to determine the Oozie server version from %s', $oozie_build;
        $oozie_cdh_version  = shift @vtuple               || die sprintf 'Unable to determine the Oozie server CDH version from %s', $oozie_build;
        $current_coord_user = $job->{user}                || die sprintf 'Failed to locate the user running %s', $coord;
        $current_xml        = $job->{conf}                || die sprintf 'No configuration for the job: %s', $coord;
        # If you extend the coordinator, then this data gets updated but the
        # XML config will retain the old and meaningless record. While
        # it should be fine for the startTime, it will be bogus for the endTime
        # and our shifting logic/workaround will not do anything and in fact
        # the server will respond with an "Error: E0803" even when you want
        # to update everything but the scheduling. For some reason XML conf
        # does not get updated.
        #
        $meta_startTime     = $job->{startTime}           || die sprintf 'No startTime set for the job: %s', $coord;
        $meta_endTime       = $job->{endTime}             || die sprintf 'No endTime set for the job: %s', $coord;
        my $path            = $job->{coordJobPath}        || die sprintf 'No coordJobPath defined for the job: %s', $coord; # shouldn't happen
        my $hdfs_dest       = $self->default_hdfs_destination;
        ($base_path         = $path) =~ s{ \A $hdfs_dest [/]? }{}xms;
        my $jp_hdfs_path    = catfile $path,      'job.properties';
        my $jp_local_path   = catfile $base_path, 'job.properties';

        my $jp;
        my $hdfs = $self->hdfs;
        if ( my $meta = $hdfs->exists( $jp_hdfs_path ) ) {
            $logger->info( sprintf 'job.properties exists on HDFS. Fetching %s', $jp_hdfs_path );
            $jp = $hdfs->read( $jp_hdfs_path );
        }
        elsif ( -e $jp_local_path ) {
            $logger->info( sprintf 'job.properties exists on local file system. Fetching %s', $jp_local_path );
            open my $FH, '<', $jp_local_path or die sprintf q{Can't read %s: %s}, $jp_local_path, $!;
            $jp =  do { local $/; <$FH> };
            if ( ! close $FH ) {
                $logger->warn(
                    sprintf 'Failed to close %s: %s',,
                                $jp_local_path,
                                $!,
                );
            }
        }
        else {
            my $uh_oh = sprintf <<'FYI', Cwd::getcwd, $base_path;

No job.properties file neither on hdfs nor local file system.
There are no parameter overrides to collect.

This program looks at relative paths to search for local files.

Your current working directory is %s and search path is %s.

If this is not the directory for the local application files, please change
to the proper location and try again.

FYI
            $logger->warn( $uh_oh );
        }

        %job_properties = $jp ? ParseConfig( -String => $jp ) : ();

        for my $name ( keys %job_properties ) {
            my $val = $job_properties{ $name};
            if ( is_ref $val ) {
                require Data::Dumper;
                my $d = Data::Dumper->new([ $val ], [ $name ]);
                $logger->logdie(
                    sprintf 'You seem to have a double definition in %s for %s as %s',
                                'job.properties',
                                $name,
                                $d->Dump,
                );
            }
        }



( run in 2.404 seconds using v1.01-cache-2.11-cpan-5a3173703d6 )