App-Oozie

 view release on metacpan or  search on metacpan

lib/App/Oozie/Run.pm  view on Meta::CPAN

    my $CWD = getcwd() || die "Can't happen: unable to get cwd: $!";
    if ( ! chdir $wf_dir ) {
        die sprintf 'Cannot chdir to %s: %s -- Current dir: %s', $wf_dir, $!, $CWD;
    }

    if ( ! $self->appname ) {
        my $guess = basename getcwd;
        $logger->info( 'appname is not set, using the basedir=' . $guess );
        $self->appname( $guess );
    }

    # move to constructor?
    (my $appname = $self->appname) =~ s{ [/]+ \z }{}xms;
    $self->appname( $appname );

    $self->logger->info( sprintf 'Job name: %s',            $self->appname );
    $self->logger->info( sprintf 'Job path (HDFS dir): %s', $self->path    );

    $self->setup_dates;

    my($cmd_tmpl, $cmd_param) = $self->collect_oozie_cmd_args;

    # Are we alone or do we need to kill our brothers and sisters?
    $self->check_current_instances;

    Template->new
            ->process(
                \join( SPACE_CHAR, @{ $cmd_tmpl } ),
                $cmd_param,
                \my $command,
            );
    my $success = $self->execute( $command );
    if(!$success){
        return $success;
    }
    # go back where we started!
    chdir $CWD if $CWD;


    $logger->info(
        sprintf 'Completed successfully in %s (took %s)',
                    sprintf( '%s%s', $self->cluster_name, ( $self->dryrun ? ' (dryrun is set)' : EMPTY_STRING ) ),
                    duration_exact( time - $run_start_epoch ),
    );

    return $success;
}

sub collect_oozie_cmd_args {
    my $self   = shift;
    my $logger = $self->logger;

    my @extra_oozie_args;

    my @define = @{ $self->define };

    my %extra_def = ();
    # We are not supporting sla for bundles (yet)
    if ( !($self->type eq 'bundle') ) {
      %extra_def = (
          $self->verify_sla,
      );
    }

    if ( $self->type eq 'wf' ) {
        %extra_def = (
            %extra_def,
            $self->check_coordinator_function_calls({
                map { (split m{ [=] }xms, $_)[0] => 1 } @define
            }),
        );
    }

    if (@{ $self->errors } ) {
        $logger->error(
            'Overridable errors encountered',
            ( $self->force ? EMPTY_STRING : ' (relaunch using --force to proceed)' )
        );
        $logger->error( '- ' . $_ ) for @{ $self->errors };
        die if !$self->force && !$self->dryrun;
    }

    my $hash_to_def = sub {
        my($h, $no_quote) = @_;
        my $tmpl = $no_quote ? q{-D%s=%s} : q{-D'%s=%s'};
        map { sprintf $tmpl, $_, $h->{$_} } keys %{ $h }
    };

    # IMPORTANT ! keep this in sync with the sudoers file,
    # if you have a corresponding setting in such a place

    my %def = (
        appName                             => '[% app_name      %]',
        startTime                           => '[% start_time    %]',
        endTime                             => '[% end_time      %]',
        workflowPath                        => '[% workflow_path %]',
        'oozie.[% type %].application.path' => '[% workflow_path %]',
        path                                => '[% path %]',
        nameNode                            => '[% name_node     %]',
        'oozie.use.system.libpath'          => 'true',
        ( @define ? ( map { split m{ [=] }xms, $_, 2 } @define ) : () ),
    );

    my %prop  = $self->probe_settings;
    my %owner = $self->probe_meta;

    $self->logger->info( 'Combining owner info into job.properties' );

    my $override_file = File::Temp->new(
                            SUFFIX => '.properties',
                            DIR    => resolve_tmp_dir(),
                        );
    my $original = EMPTY_STRING;
    my $orig_filename = 'job.properties';

    if ( open my $ORIG_FH, '<', $orig_filename ) {
        local $/;
        $original = <$ORIG_FH>;
        if ( ! close $ORIG_FH ) {
            $logger->warn(
                sprintf 'Failed to close %s: %s',

lib/App/Oozie/Run.pm  view on Meta::CPAN

        ($self->secure_cluster ? ($hash_to_def->({ 'oozie.auth.token.cache' => 'false'}, 1)) : ()),
        @username_override,
        job => ( $self->dryrun ? '-dryrun' : '-run' ),
        @args,
    );

    if ( $self->notify ) {
        # TODO: check if this whole section can be removed
        my %ndef =  map {
                        $_ => $prop{ $_ }
                    }
                    grep {
                        m{ notification[.]url }xms
                    }
                    keys %prop
                    ;

        #if ( ! %ndef ) {
        #    die "--notify is set but the required settings are not in your configuration";
        #}

        push @cmd_tmpl, $hash_to_def->( \%ndef );
    }

    push @cmd_tmpl,'-oozie=[% oozie_uri %]';

    my $end_time   = $prop{endTime}
                        || sprintf FORMAT_ZULU_TIME,
                                    map { $self->$_ }
                                    qw(
                                        enddate
                                        endhour
                                        endmin
                                    );

    my $start_time = $prop{startTime}
                        || sprintf FORMAT_ZULU_TIME,
                                    map { $self->$_ }
                                    qw(
                                        startdate
                                        starthour
                                        startmin
                                    );

    my $nameNode = $prop{nameNode} || $self->template_namenode;

    my %cmd_param = (
        app_name      => $self->appname,
        end_time      => $end_time,
        name_node     => $nameNode,
        oozie_uri     => $self->oozie_uri,
        start_time    => $start_time,
        type          => $self->type,
        workflow_path => $self->path . ($self->type eq 'bundle'? '/bundle.xml' : EMPTY_STRING) ,
        path          => $nameNode . $self->path,
    );

    return \@cmd_tmpl, \%cmd_param;
}

sub verify_sla {
    my $self = shift;
    my %rv;

    # check the SLA parameter is provided if the workflow has an SLA block
    eval {
        my $raw  = $self->hdfs->read(
                        File::Spec->catfile( $self->path, 'workflow.xml' )
                    );
        if ( $raw =~ m{ sla[:]info }xms ) {
            if ( ! $self->sla_duration ) {
                die 'The workflow contains an SLA block, please provide an --sla-duration parameter in minutes';
            }
            %rv = (
                slaDuration    => $self->sla_duration,
                slaEmailErrors => 'duration_miss',
            );
        }
        elsif ( $self->sla_duration ) {
            die q{You've specified an SLA duration, but the workflow does not contain the sla:info block! There will be no SLA events};
        }
        1;
    } or do {
        my $eval_error = $@ || 'Zombie error';
        die sprintf 'Cannot retrieve the workflow.xml off HDFS; did you deploy the workflow? %s', $eval_error;
    };

    return %rv;
}

sub check_current_instances {
    my $self = shift;
    my $logger = $self->logger;
    $logger->info( 'Duplicates check' );

    if ( $self->type eq 'wf' ) {
        $logger->warn( q{Please note that this program doesn't check the existence of duplicate workflows yet, only coordinators} );
        return;
    }

    my @running = @{
        $self->oozie->coordinators(
            filter => {
                name   => $self->appname,
                status => [ OOZIE_STATES_RUNNING ],
            }
        )->{coordinatorjobs} || []
    };

    return if !@running;

    $logger->warn( 'There are coordinator(s) already running under the same name on the server.' );

    my $meta_tmpl = <<'META';

* coordinator: %s
  start date : %s
  frequency  : %s %s
  console URL: %s

META

lib/App/Oozie/Run.pm  view on Meta::CPAN


=encoding UTF-8

=head1 NAME

App::Oozie::Run

=head1 VERSION

version 0.020

=head1 SYNOPSIS

    use App::Oozie::Run;
    App::Oozie::Run->new_with_options->run;

=head1 DESCRIPTION

For this to work, the coordinator.xml and workflow.xml must make

    <coordinator-app timezone="UTC" xmlns="uri:oozie:coordinator:0.1"
        name      = "${appName}"
        frequency = "${if frequency}"
        start     = "${startTime}"
        end=      = "${endTime}"
        >
        <action>
            <workflow>
                <!-- HDFS base dir -->
                <app-path>${workflowsBase}/${appName}</app-path>
            </workflow>
        </action>
    </coordinator-app>

=head1 NAME

App::Oozie::Run - Schedule Oozie Coordinators and Workflows.

=head1 Methods

=head3 ask

=head3 check_coordinator_function_calls

=head3 check_current_instances

=head3 collect_oozie_cmd_args

=head3 collect_properties

=head3 execute

=head3 log_console_url

=head3 probe_settings

=head3 run

=head3 setup_dates

=head3 verify_sla

=head1 Accessors

=head2 Overridable from cli

=head3 appname

=head3 dates_from_properties

=head3 define

=head3 doas

=head3 enddate

=head3 endhour

=head3 endmin

=head3 notify

=head3 path

=head3 sla_duration

=head3 startdate

=head3 starthour

=head3 startmin

=head3 type

=head2 Overridable from sub-classes

=head3 basedir

=head3 errors

=head1 SEE ALSO

L<App::Oozie>.

=head1 AUTHORS

=over 4

=item *

David Morel

=item *

Burak Gursoy

=back

=head1 COPYRIGHT AND LICENSE

This software is copyright (c) 2023 by Booking.com.



( run in 0.399 second using v1.01-cache-2.11-cpan-13bb782fe5a )