Event-ExecFlow
view release on metacpan or search on metacpan
lib/Event/ExecFlow/Job/Group.pm view on Meta::CPAN
$job->set_group($self);
weaken($job->{group});
my $job_cnt = $job->get_job_cnt;
$self->increase_progress_max($job_cnt) if $job_cnt != 0;
if ( $self->get_state eq 'finished' ||
$self->get_state eq 'error' ) {
$self->set_state("waiting");
}
$self->add_child_post_callback($job);
$self->get_frontend->report_job_added($job);
1;
}
sub remove_job {
my $self = shift;
my ($job) = @_;
my $jobs = $self->get_jobs;
my $i;
for ( $i=0; $i < @{$jobs}; ++$i ) {
last if $jobs->[$i] eq $job;
}
die "Job with ID ".$job->get_id." no member of this group"
if $i == @{$jobs};
splice @{$jobs}, $i, 1;
my $job_cnt = $job->get_job_cnt;
$self->decrease_progress_max($job_cnt) if $job_cnt != 0;
$self->get_frontend->report_job_removed($job);
1;
}
sub get_job_by_name {
my $self = shift;
my ($job_name) = @_;
foreach my $job ( @{$self->get_jobs} ) {
return $job if $job->get_name eq $job_name;
}
die "Job '$job_name' not member of group '".$self->get_name."'";
}
sub execute {
my $self = shift;
my %par = @_;
my ($skip) = $par{'skip'};
$skip = "" if ! defined $skip;
my $blocked_job;
while ( 1 ) {
if ( $self->get_cancelled
|| $self->all_jobs_finished
|| ( $self->get_error_message &&
$self->get_stop_on_failure ) ) {
$self->execution_finished;
if ( $self->get_scheduler &&
$self->get_scheduler->is_exclusive ) {
$self->get_scheduler->run;
}
return;
}
return if $self->get_scheduler &&
$self->get_scheduler->is_exclusive;
my $job = $self->get_next_job(blocked=>$blocked_job);
next if defined $job && "$job" eq "$skip";
if ( !$job ) {
$self->try_reschedule_jobs(skip => $skip);
last;
}
if ( $self->get_scheduler ) {
my $state = $self->get_scheduler->schedule_job($job);
return if $state eq 'sched-blocked';
if ( $state eq 'job-blocked' ) {
$blocked_job = $job;
next;
}
die "Illegal scheduler state '$state'"
unless $state eq 'ok';
}
$self->start_child_job($job);
last if !$self->get_parallel;
}
1;
}
sub try_reschedule_jobs {
my $self = shift;
my %par = @_;
my ($skip) = $par{'skip'};
my $executed = 0;
foreach my $job ( @{$self->get_jobs} ) {
next if "$job" eq "$skip";
# Parallel execution groups which are running now
# probably can execute more job, so give it a try.
if ( $job->get_type eq 'group' &&
$job->get_state eq 'running' &&
$job->get_parallel ) {
$job->execute;
$executed = 1;
}
}
if ( !$executed && $self->get_group ) {
$self->get_group->execute(skip => $self);
}
1;
}
sub cancel {
my $self = shift;
$self->set_cancelled(1);
$_->get_state eq 'running' && $_->cancel for @{$self->get_jobs};
1;
}
sub pause_job {
my $self = shift;
$_->get_state eq 'running' && $_->pause for @{$self->get_jobs};
1;
}
sub reset {
my $self = shift;
lib/Event/ExecFlow/Job/Group.pm view on Meta::CPAN
$self->set_progress_cnt(0) unless defined $self->get_progress_cnt;
$self->get_frontend->report_job_progress($self);
$job->start;
1;
}
sub child_job_finished {
my $self = shift;
my ($job) = @_;
$Event::ExecFlow::DEBUG && print "Group->child_job_finished(".$job->get_info.")\n";
$self->get_member_finished_callbacks->execute()
if $self->get_member_finished_callbacks;
if ( $job->get_error_message && !$job->get_cancelled ) {
if ( $self->get_fail_with_members ) {
$self->set_state("error");
$self->add_job_error_message($job);
$self->get_frontend->report_job_error($self);
}
}
if ( $self->get_scheduler ) {
$self->get_scheduler->job_finished($job);
}
$self->execute;
1;
}
sub add_job_error_message {
my $self = shift;
my ($job) = @_;
my $error_message = $self->get_error_message || "";
$error_message .=
"Job '".$job->get_info."' ".
"failed with error message:\n".
$job->get_error_message."\n".
("-"x80)."\n";
$self->set_error_message($error_message);
1;
}
sub get_first_job {
my $self = shift;
return $self->get_jobs->[0];
}
sub get_next_job {
my $self = shift;
my %par = @_;
my ($blocked) = $par{'blocked'};
$blocked = "" if ! defined $blocked;
my $next_job;
foreach my $job ( @{$self->get_jobs} ) {
next if defined $job && "$job" eq "$blocked";
$Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job: check ".$job->get_info."=>".$job->get_state."\n";
if ( $job->get_state eq 'waiting' &&
$self->dependencies_ok($job) ) {
$next_job = $job;
last;
}
}
$Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job=".
($next_job ? $next_job->get_info : "NOJOB")."\n";
return $next_job;
}
sub dependencies_ok {
my $self = shift;
my ($job) = @_;
foreach my $dep_job_name ( @{$job->get_depends_on} ) {
my $dep_job = $self->get_job_by_name($dep_job_name);
$Event::ExecFlow::DEBUG && print "Job(".$job->get_info.")->dependencies_ok: check ".$dep_job->get_info." =>".$dep_job->get_state."\n";
return if $dep_job->get_state ne 'finished';
}
return 1;
}
sub all_jobs_finished {
my $self = shift;
foreach my $job ( @{$self->get_jobs} ) {
return 0 if $job->get_state eq 'waiting' ||
$job->get_state eq 'error' ||
$job->get_state eq 'running';
}
return 1;
}
sub get_max_diskspace_consumed {
my $self = shift;
my ($currently_consumed, $max_consumed) = @_;
foreach my $job ( @{$self->get_jobs} ) {
($currently_consumed, $max_consumed) =
$job->get_max_diskspace_consumed
($currently_consumed, $max_consumed);
}
return ($currently_consumed, $max_consumed);
}
sub backup_state {
my $self = shift;
my $data_href = $self->SUPER::backup_state();
delete $data_href->{jobs};
delete $data_href->{scheduler};
delete $data_href->{member_finished_callbacks};
( run in 0.654 second using v1.01-cache-2.11-cpan-39bf76dae61 )