Event-ExecFlow

 view release on metacpan or  search on metacpan

lib/Event/ExecFlow/Job/Group.pm  view on Meta::CPAN

package Event::ExecFlow::Job::Group;

use base qw( Event::ExecFlow::Job );

use strict;
use Scalar::Util qw(weaken);

sub get_type                    { "group" }

sub get_jobs                    { shift->{jobs}                         }
sub get_fail_with_members       { shift->{fail_with_members}            }
sub get_stop_on_failure         { shift->{stop_on_failure}              }
sub get_parallel                { shift->{parallel}                     }
sub get_scheduler               { shift->{scheduler}                    }
sub get_member_finished_callbacks { shift->{member_finished_callbacks}  }

sub set_jobs                    { shift->{jobs}                 = $_[1] }
sub set_fail_with_members       { shift->{fail_with_members}    = $_[1] }
sub set_stop_on_failure         { shift->{stop_on_failure}      = $_[1] }
sub set_parallel                { shift->{parallel}             = $_[1] }
sub set_member_finished_callbacks { shift->{member_finished_callbacks} = $_[1] }

sub new {
    my $class = shift;
    my %par = @_;
    my  ($jobs, $fail_with_members, $stop_on_failure) =
    @par{'jobs','fail_with_members','stop_on_failure'};
    my  ($parallel, $scheduler, $member_finished_callbacks) =
    @par{'parallel','scheduler','member_finished_callbacks'};

    $jobs              = [] unless defined $jobs;
    $fail_with_members = 1  unless defined $fail_with_members;
    $stop_on_failure   = 1  unless defined $stop_on_failure;

    my $self = $class->SUPER::new(@_);

    for my $cb ( $member_finished_callbacks ) {
        $cb ||= Event::ExecFlow::Callbacks->new;
        $cb   = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
    }

    $self->set_jobs($jobs);
    $self->set_fail_with_members($fail_with_members);
    $self->set_stop_on_failure($stop_on_failure);
    $self->set_parallel($parallel);
    $self->set_scheduler($scheduler);
    $self->set_member_finished_callbacks($member_finished_callbacks);

    return $self;
}

sub set_frontend {
    my $self = shift;
    my ($frontend) = @_;
    
    $self->SUPER::set_frontend($frontend);

    $_->set_frontend($frontend) for @{$self->get_jobs};
   
    return $frontend;
}

sub set_scheduler {
    my $self = shift;
    my ($scheduler) = @_;
    
    $self->{scheduler} = $scheduler;
    
    foreach my $job ( @{$self->get_jobs} ) {
        $job->set_scheduler($scheduler)
            if $job->get_type eq 'group';
    }
    
    return $scheduler;
}

sub get_exec_type {
    my $self = shift;
    my $job = $self->get_next_job;
    return "sync" if not $job;
    return $job->get_exec_type;
}

sub get_diskspace_consumed {
    my $self = shift;
    
    my $sum = $self->SUPER::get_diskspace_consumed;
    
    $sum += $_->get_diskspace_consumed for @{$self->get_jobs};
    
    return $sum;
}

sub get_diskspace_freed {
    my $self = shift;
    
    my $sum = $self->SUPER::get_diskspace_freed;
    
    $sum += $_->get_diskspace_freed for @{$self->get_jobs};
    
    return $sum;
}

sub init {
    my $self = shift;

    $self->SUPER::init();

lib/Event/ExecFlow/Job/Group.pm  view on Meta::CPAN

    my ($skip) = $par{'skip'};

    my $executed = 0;
    foreach my $job ( @{$self->get_jobs} ) {
        next if "$job" eq "$skip";

        # Parallel execution groups which are running now
        # probably can execute more job, so give it a try.
        if ( $job->get_type  eq 'group'   &&
             $job->get_state eq 'running' &&
             $job->get_parallel ) {
            $job->execute;
            $executed = 1;
        }
    }
    
    if ( !$executed && $self->get_group ) {
        $self->get_group->execute(skip => $self);
    }
    
    1;
}

sub cancel {
    my $self = shift;
    
    $self->set_cancelled(1);
    $_->get_state eq 'running' && $_->cancel for @{$self->get_jobs};
    
    1;
}

sub pause_job {
    my $self = shift;
    
    $_->get_state eq 'running' && $_->pause for @{$self->get_jobs};
    
    1;
}

sub reset {
    my $self = shift;
    
    foreach my $job ( @{$self->get_jobs} ) {
        if ( $job->reset ) {
            $self->decrease_progress_cnt($job->get_job_cnt);
        }
    }
    
    $self->get_frontend->report_job_progress($self);

    return $self->SUPER::reset() if $self->get_progress_cnt == 0;

    0;
}

sub add_child_post_callback {
    my $self = shift;
    my ($job) = @_;
    
    if ( $job->{_post_callbacks_added} ) {
return;
        require Carp;
        Carp::confess($job->get_info.": callbacks added twice!");
    }
    $job->{_post_callbacks_added} = 1;
    
    $job->get_post_callbacks->add( sub {
        my ($job) = @_;
        $self->child_job_finished($job);
        1;
    });

    1;
}

sub start_child_job {
    my $self = shift;
    my ($job) = @_;
    
    $Event::ExecFlow::DEBUG && print "Group->start_child_job(".$job->get_info.")\n";

    $self->set_progress_cnt(0) unless defined $self->get_progress_cnt;
    $self->get_frontend->report_job_progress($self);

    $job->start;

    1;
}

sub child_job_finished {
    my $self = shift;
    my ($job) = @_;
    
    $Event::ExecFlow::DEBUG && print "Group->child_job_finished(".$job->get_info.")\n";

    $self->get_member_finished_callbacks->execute()
        if $self->get_member_finished_callbacks;

    if ( $job->get_error_message && !$job->get_cancelled ) {
        if  ( $self->get_fail_with_members ) {
            $self->set_state("error");
            $self->add_job_error_message($job);
            $self->get_frontend->report_job_error($self);
        }
    }

    if ( $self->get_scheduler ) {
        $self->get_scheduler->job_finished($job);
    }

    $self->execute;

    1;
}

sub add_job_error_message {
    my $self = shift;
    my ($job) = @_;

    my $error_message = $self->get_error_message || "";

    $error_message .=
        "Job '".$job->get_info."' ".
        "failed with error message:\n".
        $job->get_error_message."\n".
        ("-"x80)."\n";

    $self->set_error_message($error_message);

    1;
}

sub get_first_job {
    my $self = shift;
    return $self->get_jobs->[0];
}

sub get_next_job {
    my $self = shift;
    my %par = @_;
    my ($blocked) = $par{'blocked'};

    $blocked = "" if ! defined $blocked;

    my $next_job;    
    foreach my $job ( @{$self->get_jobs} ) {
        next if defined $job && "$job" eq "$blocked";
        $Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job: check ".$job->get_info."=>".$job->get_state."\n";
        if ( $job->get_state eq 'waiting' &&
             $self->dependencies_ok($job) ) {
            $next_job = $job;
            last;
        }
    }
    
    $Event::ExecFlow::DEBUG && print "Group(".$self->get_info.")->get_next_job=".
        ($next_job ? $next_job->get_info : "NOJOB")."\n";
    
    return $next_job;
}

sub dependencies_ok {
    my $self = shift;
    my ($job) = @_;

    foreach my $dep_job_name ( @{$job->get_depends_on} ) {
        my $dep_job = $self->get_job_by_name($dep_job_name);
        $Event::ExecFlow::DEBUG && print "Job(".$job->get_info.")->dependencies_ok: check ".$dep_job->get_info." =>".$dep_job->get_state."\n";
        return if $dep_job->get_state ne 'finished';
    }

    return 1;    
}

sub all_jobs_finished {
    my $self = shift;

    foreach my $job ( @{$self->get_jobs} ) {
        return 0 if $job->get_state eq 'waiting' ||
                    $job->get_state eq 'error' ||
                    $job->get_state eq 'running';
    }
    
    return 1;
}

sub get_max_diskspace_consumed {
    my $self = shift;
    my ($currently_consumed, $max_consumed) = @_;

    foreach my $job ( @{$self->get_jobs} ) {
        ($currently_consumed, $max_consumed) =
            $job->get_max_diskspace_consumed
                ($currently_consumed, $max_consumed);
    }

    return ($currently_consumed, $max_consumed);
}

sub backup_state {
    my $self = shift;
    
    my $data_href = $self->SUPER::backup_state();
    
    delete $data_href->{jobs};
    delete $data_href->{scheduler};
    delete $data_href->{member_finished_callbacks};

    my $jobs = $self->get_jobs;
    foreach my $job ( @{$jobs} ) {
        push @{$data_href->{jobs}}, 
            $job->backup_state;
    }
    
    return $data_href;
}

sub restore_state {
    my $self = shift;
    my ($data_href) = @_;

    my $jobs = $self->get_jobs;
    
    $self->SUPER::restore_state($data_href);
    
    my $job_states = delete $self->{jobs};

    my $i = 0;
    foreach my $job ( @{$jobs} ) {
        $job->restore_state($job_states->[$i]);
        ++$i;
    }
    
    $self->set_jobs($jobs);

    1;
}

sub add_stash_to_all_jobs {
    my $self = shift;
    my ($add_stash) = @_;

    $self->add_stash($add_stash);
    
    foreach my $job ( @{$self->get_jobs} ) {
        if ( $job->get_type eq 'group' ) {
            $job->add_stash_to_all_jobs($add_stash);
        }
        else {
            $job->add_stash($add_stash);
        }
    }
}

sub traverse_all_jobs {
    my $self = shift;
    my ($code) = @_;

    foreach my $job ( @{$self->get_jobs} ) {
        $code->($job);
        if ( $job->get_type eq 'group' ) {
            $job->traverse_all_jobs($code);
        }
    }

    1;    
}



( run in 0.818 second using v1.01-cache-2.11-cpan-39bf76dae61 )