Event-ExecFlow
view release on metacpan or search on metacpan
lib/Event/ExecFlow/Job/Group.pm view on Meta::CPAN
package Event::ExecFlow::Job::Group;
use base qw( Event::ExecFlow::Job );
use strict;
use Scalar::Util qw(weaken);
sub get_type { "group" }
sub get_jobs { shift->{jobs} }
sub get_fail_with_members { shift->{fail_with_members} }
sub get_stop_on_failure { shift->{stop_on_failure} }
sub get_parallel { shift->{parallel} }
sub get_scheduler { shift->{scheduler} }
sub get_member_finished_callbacks { shift->{member_finished_callbacks} }
sub set_jobs { shift->{jobs} = $_[1] }
sub set_fail_with_members { shift->{fail_with_members} = $_[1] }
sub set_stop_on_failure { shift->{stop_on_failure} = $_[1] }
sub set_parallel { shift->{parallel} = $_[1] }
sub set_member_finished_callbacks { shift->{member_finished_callbacks} = $_[1] }
sub new {
my $class = shift;
my %par = @_;
my ($jobs, $fail_with_members, $stop_on_failure) =
@par{'jobs','fail_with_members','stop_on_failure'};
my ($parallel, $scheduler, $member_finished_callbacks) =
@par{'parallel','scheduler','member_finished_callbacks'};
$jobs = [] unless defined $jobs;
$fail_with_members = 1 unless defined $fail_with_members;
$stop_on_failure = 1 unless defined $stop_on_failure;
my $self = $class->SUPER::new(@_);
for my $cb ( $member_finished_callbacks ) {
$cb ||= Event::ExecFlow::Callbacks->new;
$cb = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
}
$self->set_jobs($jobs);
$self->set_fail_with_members($fail_with_members);
$self->set_stop_on_failure($stop_on_failure);
$self->set_parallel($parallel);
$self->set_scheduler($scheduler);
$self->set_member_finished_callbacks($member_finished_callbacks);
return $self;
}
sub set_frontend {
my $self = shift;
my ($frontend) = @_;
$self->SUPER::set_frontend($frontend);
$_->set_frontend($frontend) for @{$self->get_jobs};
return $frontend;
}
sub set_scheduler {
my $self = shift;
my ($scheduler) = @_;
$self->{scheduler} = $scheduler;
foreach my $job ( @{$self->get_jobs} ) {
$job->set_scheduler($scheduler)
if $job->get_type eq 'group';
}
return $scheduler;
}
sub get_exec_type {
my $self = shift;
my $job = $self->get_next_job;
return "sync" if not $job;
return $job->get_exec_type;
}
sub get_diskspace_consumed {
my $self = shift;
my $sum = $self->SUPER::get_diskspace_consumed;
$sum += $_->get_diskspace_consumed for @{$self->get_jobs};
return $sum;
}
sub get_diskspace_freed {
my $self = shift;
my $sum = $self->SUPER::get_diskspace_freed;
$sum += $_->get_diskspace_freed for @{$self->get_jobs};
return $sum;
}
sub init {
my $self = shift;
$self->SUPER::init();
foreach my $job ( @{$self->get_jobs} ) {
$job->set_group($self);
weaken($job->{group});
$self->add_child_post_callback($job);
}
$self->set_progress_max($self->get_job_cnt);
1;
}
sub reset_non_finished_jobs {
my $self = shift;
if ( $self->get_state ne 'finished' ) {
$self->set_state("waiting");
$self->set_cancelled(0);
$self->set_error_message();
$self->get_frontend->report_job_progress($self);
}
foreach my $job ( @{$self->get_jobs} ) {
if ( $job->get_state ne 'finished' ) {
$job->set_state("waiting");
$job->set_cancelled(0);
$job->set_error_message();
$self->get_frontend->report_job_progress($job);
}
$job->reset_non_finished_jobs if $job->get_type eq 'group';
}
1;
}
sub get_job_cnt {
my $self = shift;
my $cnt = 0;
foreach my $job ( @{$self->get_jobs} ) {
$cnt += $job->get_job_cnt;
}
return $cnt;
}
sub init_progress_state {
my $self = shift;
my $progress_cnt = 0;
foreach my $job ( @{$self->get_jobs} ) {
if ( $job->get_type eq 'group' ) {
$job->init_progress_state;
$progress_cnt += $job->get_progress_cnt;
}
else {
++$progress_cnt if $job->get_state eq 'finished' ||
$job->get_state eq 'error';
}
}
$self->set_progress_cnt($progress_cnt);
$self->set_progress_max($self->get_job_cnt);
$self->set_state("finished")
if $self->get_progress_cnt == $self->get_progress_max;
1;
}
sub set_group_in_all_childs {
my $self = shift;
foreach my $job ( @{$self->get_jobs} ) {
if ( $job->get_type eq 'group' ) {
$job->set_group($self);
weaken($job->{group});
$job->set_group_in_all_childs;
}
else {
$job->set_group($self);
weaken($job->{group});
}
}
1;
}
sub increase_progress_max {
my $self = shift;
my ($add) = @_;
my $job = $self;
while ( $job ) {
$job->set_progress_max($job->get_progress_max + $add);
$job = $job->get_group;
}
1;
}
sub decrease_progress_max {
my $self = shift;
my ($del) = @_;
my $job = $self;
while ( $job ) {
$job->set_progress_max($job->get_progress_max - $del);
$job = $job->get_group;
}
1;
}
sub increase_progress_cnt {
my $self = shift;
my ($add) = @_;
my $job = $self;
while ( $job ) {
$job->set_progress_cnt($job->get_progress_cnt + $add);
$job = $job->get_group;
}
1;
}
sub decrease_progress_cnt {
my $self = shift;
my ($del) = @_;
my $job = $self;
while ( $job ) {
$job->set_progress_cnt($job->get_progress_cnt - $del);
$job = $job->get_group;
}
1;
}
sub add_job {
my $self = shift;
my ($job) = @_;
push @{$self->get_jobs}, $job;
$job->set_frontend($self->get_frontend);
$job->set_group($self);
weaken($job->{group});
my $job_cnt = $job->get_job_cnt;
$self->increase_progress_max($job_cnt) if $job_cnt != 0;
if ( $self->get_state eq 'finished' ||
$self->get_state eq 'error' ) {
$self->set_state("waiting");
}
$self->add_child_post_callback($job);
$self->get_frontend->report_job_added($job);
1;
}
sub remove_job {
my $self = shift;
my ($job) = @_;
my $jobs = $self->get_jobs;
my $i;
for ( $i=0; $i < @{$jobs}; ++$i ) {
last if $jobs->[$i] eq $job;
}
die "Job with ID ".$job->get_id." no member of this group"
if $i == @{$jobs};
splice @{$jobs}, $i, 1;
my $job_cnt = $job->get_job_cnt;
$self->decrease_progress_max($job_cnt) if $job_cnt != 0;
$self->get_frontend->report_job_removed($job);
1;
}
sub get_job_by_name {
my $self = shift;
my ($job_name) = @_;
foreach my $job ( @{$self->get_jobs} ) {
return $job if $job->get_name eq $job_name;
}
die "Job '$job_name' not member of group '".$self->get_name."'";
}
sub execute {
my $self = shift;
my %par = @_;
my ($skip) = $par{'skip'};
$skip = "" if ! defined $skip;
my $blocked_job;
while ( 1 ) {
( run in 1.828 second using v1.01-cache-2.11-cpan-39bf76dae61 )