Event-ExecFlow

 view release on metacpan or  search on metacpan

lib/Event/ExecFlow/Job/Group.pm  view on Meta::CPAN

    $job->set_group($self);
    weaken($job->{group});

    my $job_cnt = $job->get_job_cnt;
    $self->increase_progress_max($job_cnt) if $job_cnt != 0;

    if ( $self->get_state eq 'finished' ||
         $self->get_state eq 'error' ) {
        $self->set_state("waiting");
    }

    $self->add_child_post_callback($job);

    $self->get_frontend->report_job_added($job);

    1;
}

sub remove_job {
    my $self = shift;
    my ($job) = @_;
    
    my $jobs = $self->get_jobs;
    
    my $i;
    for ( $i=0; $i < @{$jobs}; ++$i ) {
        last if $jobs->[$i] eq $job;
    }
    
    die "Job with ID ".$job->get_id." no member of this group"
        if $i == @{$jobs};

    splice @{$jobs}, $i, 1;

    my $job_cnt = $job->get_job_cnt;
    $self->decrease_progress_max($job_cnt) if $job_cnt != 0;

    $self->get_frontend->report_job_removed($job);

    1;
}

sub get_job_by_name {
    my $self = shift;
    my ($job_name) = @_;
    
    foreach my $job ( @{$self->get_jobs} ) {
        return $job if $job->get_name eq $job_name;
    }
    
    die "Job '$job_name' not member of group '".$self->get_name."'";
}

sub execute {
    my $self = shift;
    my %par = @_;
    my ($skip) = $par{'skip'};
    
    $skip = "" if ! defined $skip;

    my $blocked_job;
    while ( 1 ) {
        if (      $self->get_cancelled
             ||   $self->all_jobs_finished
             || ( $self->get_error_message &&
                  $self->get_stop_on_failure ) ) {
            $self->execution_finished;
            if ( $self->get_scheduler &&
                 $self->get_scheduler->is_exclusive ) {
                $self->get_scheduler->run;
            }
            return;
        }

        return if $self->get_scheduler &&
                  $self->get_scheduler->is_exclusive;
    
        my $job = $self->get_next_job(blocked=>$blocked_job);
        next if defined $job && "$job" eq "$skip";

        if ( !$job ) {
            $self->try_reschedule_jobs(skip => $skip);
            last;
        }

        if ( $self->get_scheduler ) {
            my $state = $self->get_scheduler->schedule_job($job);
            return if $state eq 'sched-blocked';
            if ( $state eq 'job-blocked' ) {
                $blocked_job = $job;
                next;
            }
            die "Illegal scheduler state '$state'"
                unless $state eq 'ok';
        }

        $self->start_child_job($job);

        last if !$self->get_parallel;
    }
    
    1;    
}

sub try_reschedule_jobs {
    my $self = shift;
    my %par = @_;
    my ($skip) = $par{'skip'};

    my $executed = 0;
    foreach my $job ( @{$self->get_jobs} ) {
        next if "$job" eq "$skip";

        # Parallel execution groups which are running now
        # probably can execute more job, so give it a try.
        if ( $job->get_type  eq 'group'   &&
             $job->get_state eq 'running' &&
             $job->get_parallel ) {
            $job->execute;
            $executed = 1;
        }
    }
    
    if ( !$executed && $self->get_group ) {
        $self->get_group->execute(skip => $self);
    }
    
    1;
}

sub cancel {
    my $self = shift;
    
    $self->set_cancelled(1);
    $_->get_state eq 'running' && $_->cancel for @{$self->get_jobs};
    
    1;
}

sub pause_job {
    my $self = shift;
    
    $_->get_state eq 'running' && $_->pause for @{$self->get_jobs};
    
    1;
}

sub reset {
    my $self = shift;
    

lib/Event/ExecFlow/Job/Group.pm  view on Meta::CPAN


    $self->set_progress_cnt(0) unless defined $self->get_progress_cnt;
    $self->get_frontend->report_job_progress($self);

    $job->start;

    1;
}

sub child_job_finished {
    my $self = shift;
    my ($job) = @_;
    
    $Event::ExecFlow::DEBUG && print "Group->child_job_finished(".$job->get_info.")\n";

    $self->get_member_finished_callbacks->execute()
        if $self->get_member_finished_callbacks;

    if ( $job->get_error_message && !$job->get_cancelled ) {
        if  ( $self->get_fail_with_members ) {
            $self->set_state("error");
            $self->add_job_error_message($job);
            $self->get_frontend->report_job_error($self);
        }
    }

    if ( $self->get_scheduler ) {
        $self->get_scheduler->job_finished($job);
    }

    $self->execute;

    1;
}

sub add_job_error_message {
    my $self = shift;
    my ($job) = @_;

    my $error_message = $self->get_error_message || "";

    $error_message .=
        "Job '".$job->get_info."' ".
        "failed with error message:\n".
        $job->get_error_message."\n".
        ("-"x80)."\n";

    $self->set_error_message($error_message);

    1;
}

sub get_first_job {
    my $self = shift;
    return $self->get_jobs->[0];
}

sub get_next_job {
    my $self = shift;
    my %par = @_;
    my ($blocked) = $par{'blocked'};

    $blocked = "" if ! defined $blocked;

    my $next_job;    
    foreach my $job ( @{$self->get_jobs} ) {
        next if defined $job && "$job" eq "$blocked";
        $Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job: check ".$job->get_info."=>".$job->get_state."\n";
        if ( $job->get_state eq 'waiting' &&
             $self->dependencies_ok($job) ) {
            $next_job = $job;
            last;
        }
    }
    
    $Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job=".
        ($next_job ? $next_job->get_info : "NOJOB")."\n";
    
    return $next_job;
}

sub dependencies_ok {
    my $self = shift;
    my ($job) = @_;

    foreach my $dep_job_name ( @{$job->get_depends_on} ) {
        my $dep_job = $self->get_job_by_name($dep_job_name);
        $Event::ExecFlow::DEBUG && print "Job(".$job->get_info.")->dependencies_ok: check ".$dep_job->get_info." =>".$dep_job->get_state."\n";
        return if $dep_job->get_state ne 'finished';
    }

    return 1;    
}

sub all_jobs_finished {
    my $self = shift;

    foreach my $job ( @{$self->get_jobs} ) {
        return 0 if $job->get_state eq 'waiting' ||
                    $job->get_state eq 'error' ||
                    $job->get_state eq 'running';
    }
    
    return 1;
}

sub get_max_diskspace_consumed {
    my $self = shift;
    my ($currently_consumed, $max_consumed) = @_;

    foreach my $job ( @{$self->get_jobs} ) {
        ($currently_consumed, $max_consumed) =
            $job->get_max_diskspace_consumed
                ($currently_consumed, $max_consumed);
    }

    return ($currently_consumed, $max_consumed);
}

sub backup_state {
    my $self = shift;
    
    my $data_href = $self->SUPER::backup_state();
    
    delete $data_href->{jobs};
    delete $data_href->{scheduler};
    delete $data_href->{member_finished_callbacks};



( run in 0.654 second using v1.01-cache-2.11-cpan-39bf76dae61 )