Event-ExecFlow

 view release on metacpan or  search on metacpan

lib/Event/ExecFlow/Job/Group.pm  view on Meta::CPAN

package Event::ExecFlow::Job::Group;

use base qw( Event::ExecFlow::Job );

use strict;
use Scalar::Util qw(weaken);

sub get_type                    { "group" }

sub get_jobs                    { shift->{jobs}                         }
sub get_fail_with_members       { shift->{fail_with_members}            }
sub get_stop_on_failure         { shift->{stop_on_failure}              }
sub get_parallel                { shift->{parallel}                     }
sub get_scheduler               { shift->{scheduler}                    }
sub get_member_finished_callbacks { shift->{member_finished_callbacks}  }

sub set_jobs                    { shift->{jobs}                 = $_[1] }
sub set_fail_with_members       { shift->{fail_with_members}    = $_[1] }
sub set_stop_on_failure         { shift->{stop_on_failure}      = $_[1] }
sub set_parallel                { shift->{parallel}             = $_[1] }
sub set_member_finished_callbacks { shift->{member_finished_callbacks} = $_[1] }

sub new {
    my $class = shift;
    my %par = @_;
    my  ($jobs, $fail_with_members, $stop_on_failure) =
    @par{'jobs','fail_with_members','stop_on_failure'};
    my  ($parallel, $scheduler, $member_finished_callbacks) =
    @par{'parallel','scheduler','member_finished_callbacks'};

    $jobs              = [] unless defined $jobs;
    $fail_with_members = 1  unless defined $fail_with_members;
    $stop_on_failure   = 1  unless defined $stop_on_failure;

    my $self = $class->SUPER::new(@_);

    for my $cb ( $member_finished_callbacks ) {
        $cb ||= Event::ExecFlow::Callbacks->new;
        $cb   = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
    }

    $self->set_jobs($jobs);
    $self->set_fail_with_members($fail_with_members);
    $self->set_stop_on_failure($stop_on_failure);
    $self->set_parallel($parallel);
    $self->set_scheduler($scheduler);
    $self->set_member_finished_callbacks($member_finished_callbacks);

    return $self;
}

sub set_frontend {
    my $self = shift;
    my ($frontend) = @_;
    
    $self->SUPER::set_frontend($frontend);

    $_->set_frontend($frontend) for @{$self->get_jobs};
   
    return $frontend;
}

sub set_scheduler {
    my $self = shift;
    my ($scheduler) = @_;
    
    $self->{scheduler} = $scheduler;
    
    foreach my $job ( @{$self->get_jobs} ) {
        $job->set_scheduler($scheduler)
            if $job->get_type eq 'group';
    }
    
    return $scheduler;
}

sub get_exec_type {
    my $self = shift;
    my $job = $self->get_next_job;
    return "sync" if not $job;
    return $job->get_exec_type;
}

sub get_diskspace_consumed {
    my $self = shift;
    
    my $sum = $self->SUPER::get_diskspace_consumed;
    
    $sum += $_->get_diskspace_consumed for @{$self->get_jobs};
    
    return $sum;
}

sub get_diskspace_freed {
    my $self = shift;
    
    my $sum = $self->SUPER::get_diskspace_freed;
    
    $sum += $_->get_diskspace_freed for @{$self->get_jobs};
    
    return $sum;
}

sub init {
    my $self = shift;

    $self->SUPER::init();

    foreach my $job ( @{$self->get_jobs} ) {
        $job->set_group($self);
        weaken($job->{group});
        $self->add_child_post_callback($job);
    }

    $self->set_progress_max($self->get_job_cnt);

    1;
}

sub reset_non_finished_jobs {
    my $self = shift;
    
    if ( $self->get_state ne 'finished' ) {
        $self->set_state("waiting");
        $self->set_cancelled(0);
        $self->set_error_message();
        $self->get_frontend->report_job_progress($self);
    }
    
    foreach my $job ( @{$self->get_jobs} ) {
        if ( $job->get_state ne 'finished' ) {
            $job->set_state("waiting");
            $job->set_cancelled(0);
            $job->set_error_message();
            $self->get_frontend->report_job_progress($job);
        }
        $job->reset_non_finished_jobs if $job->get_type eq 'group';
    }

    1;
}

sub get_job_cnt {
    my $self = shift;

    my $cnt = 0;
    foreach my $job ( @{$self->get_jobs} ) {
        $cnt += $job->get_job_cnt;
    }
    
    return $cnt;
}

sub init_progress_state {
    my $self = shift;
    
    my $progress_cnt = 0;
    foreach my $job ( @{$self->get_jobs} ) {
        if ( $job->get_type eq 'group' ) {
            $job->init_progress_state;
            $progress_cnt += $job->get_progress_cnt;
        }
        else {
            ++$progress_cnt if $job->get_state eq 'finished' ||
                               $job->get_state eq 'error';
        }
    }

    $self->set_progress_cnt($progress_cnt);
    $self->set_progress_max($self->get_job_cnt);

    $self->set_state("finished")
        if $self->get_progress_cnt == $self->get_progress_max;

    1;
}

sub set_group_in_all_childs {
    my $self = shift;

    foreach my $job ( @{$self->get_jobs} ) {
        if ( $job->get_type eq 'group' ) {
            $job->set_group($self);
            weaken($job->{group});
            $job->set_group_in_all_childs;
        }
        else {
            $job->set_group($self);
            weaken($job->{group});
        }
    }

    1;
}

sub increase_progress_max {
    my $self = shift;
    my ($add) = @_;

    my $job = $self;
    while ( $job ) {
        $job->set_progress_max($job->get_progress_max + $add);
        $job = $job->get_group;
    }

    1;
}

sub decrease_progress_max {
    my $self = shift;
    my ($del) = @_;

    my $job = $self;
    while ( $job ) {
        $job->set_progress_max($job->get_progress_max - $del);
        $job = $job->get_group;
    }
    
    1;
}

sub increase_progress_cnt {
    my $self = shift;
    my ($add) = @_;

    my $job = $self;
    while ( $job ) {
        $job->set_progress_cnt($job->get_progress_cnt + $add);
        $job = $job->get_group;
    }

    1;
}

sub decrease_progress_cnt {
    my $self = shift;
    my ($del) = @_;

    my $job = $self;
    while ( $job ) {
        $job->set_progress_cnt($job->get_progress_cnt - $del);
        $job = $job->get_group;
    }
    
    1;
}

sub add_job {
    my $self = shift;
    my ($job) = @_;
    
    push @{$self->get_jobs}, $job;
    
    $job->set_frontend($self->get_frontend);
    $job->set_group($self);
    weaken($job->{group});

    my $job_cnt = $job->get_job_cnt;
    $self->increase_progress_max($job_cnt) if $job_cnt != 0;

    if ( $self->get_state eq 'finished' ||
         $self->get_state eq 'error' ) {
        $self->set_state("waiting");
    }

    $self->add_child_post_callback($job);

    $self->get_frontend->report_job_added($job);

    1;
}

sub remove_job {
    my $self = shift;
    my ($job) = @_;
    
    my $jobs = $self->get_jobs;
    
    my $i;
    for ( $i=0; $i < @{$jobs}; ++$i ) {
        last if $jobs->[$i] eq $job;
    }
    
    die "Job with ID ".$job->get_id." no member of this group"
        if $i == @{$jobs};

    splice @{$jobs}, $i, 1;

    my $job_cnt = $job->get_job_cnt;
    $self->decrease_progress_max($job_cnt) if $job_cnt != 0;

    $self->get_frontend->report_job_removed($job);

    1;
}

sub get_job_by_name {
    my $self = shift;
    my ($job_name) = @_;
    
    foreach my $job ( @{$self->get_jobs} ) {
        return $job if $job->get_name eq $job_name;
    }
    
    die "Job '$job_name' not member of group '".$self->get_name."'";
}

sub execute {
    my $self = shift;
    my %par = @_;
    my ($skip) = $par{'skip'};
    
    $skip = "" if ! defined $skip;

    my $blocked_job;
    while ( 1 ) {



( run in 1.828 second using v1.01-cache-2.11-cpan-39bf76dae61 )