Event-ExecFlow

 view release on metacpan or  search on metacpan

Changes  view on Meta::CPAN

      it was successful.
    - Group considered itself finished even when a child
      job didn't finish successfully (cancelled or error).
    
0.62 Sat Jun 17, 2006, joern
    Features:
    - Executed programs are now set into C locale, so parsing
      output is independent from the locale settings.
    
    Bugfix:
    - set job into error state if post callbacks return
      with error.

0.61 Sun Apr 2, 2006, joern
    Notes:
    - added some helper methods to the Group class
    - scheduler stuff is still work in progress and needs some cleanup
    - shell commands maybe closures returning the shell code at runtime
    - fixed a bug with depends_on argument to constructor not
      processed correctly

lib/Event/ExecFlow/Callbacks.pm  view on Meta::CPAN


__END__

=head1 NAME

Event::ExecFlow::Callbacks - Callbacks attached to jobs

=head1 SYNOPSIS

  #-- Create a new Callbacks object
  my $callbacks = Event::ExecFlow::Callbacks->new (
    sub { print "sub called\n" },
    sub { print "another sub of this called\n" },
  );

  #-- Attach callbacks to a job
  $job->set_pre_callbacks($callbacks);
  
  #-- Add more subs
  $callbacks->add(sub { print "a sub added later\n" });
  $callbacks->prepend(sub { print "a sub prepended to the list of subs } );

  #-- the execute() methods is executed later by Event::ExecFlow
  $callbacks->execute($job);
  
=head1 DESCRIPTION

This class represents one or more closures which can be attached as
callbacks to an Event::ExecFlow::Job.

=head1 OBJECT HIERARCHY

  Event::ExecFlow

  Event::ExecFlow::Job
  +--- Event::ExecFlow::Job::Group
  +--- Event::ExecFlow::Job::Command
  +--- Event::ExecFlow::Job::Code

lib/Event/ExecFlow/Frontend/Term.pm  view on Meta::CPAN

sub get_nl_needed               { shift->{nl_needed}                    }

sub set_quiet                   { shift->{quiet}                = $_[1] }
sub set_nl_needed               { shift->{nl_needed}            = $_[1] }

sub start_job {
    my $self = shift;
    my ($job) = @_;

    my $w = AnyEvent->condvar;
    $job->get_post_callbacks->add(sub { $w->broadcast });
    $self->SUPER::start_job($job);
    $w->wait;

    1;
}

sub report_job_start {
    my $self = shift;
    my ($job) = @_;

lib/Event/ExecFlow/Job.pm  view on Meta::CPAN

sub get_error_message           { shift->{error_message}                }
sub get_warning_message         { shift->{warning_message}              }
sub get_progress_max            { shift->{progress_max}                 }
sub get_progress_cnt            { shift->{progress_cnt}                 }
sub get_progress_start_time     { shift->{progress_start_time}          }
sub get_progress_end_time       { shift->{progress_end_time}            }
sub get_progress_ips            { shift->{progress_ips}                 }
sub get_no_progress             { shift->{no_progress}                  }
sub get_last_progress           { shift->{last_progress}                }
sub get_last_percent_logged     { shift->{last_percent_logged}          }
sub get_pre_callbacks           { shift->{pre_callbacks}                }
sub get_post_callbacks          { shift->{post_callbacks}               }
sub get_error_callbacks         { shift->{error_callbacks}              }
sub get_warning_callbacks       { shift->{warning_callbacks}            }
sub get_frontend                { shift->{frontend}                     }
sub get_group                   { shift->{group}                        }
sub get_diskspace_consumed      { shift->{diskspace_consumed}           }
sub get_diskspace_freed         { shift->{diskspace_freed}              }
sub get_stash                   { shift->{stash}                        }
sub get_paused                  { shift->{paused}                       }
sub get_paused_seconds          { shift->{paused_seconds}               }
sub get_paused_start_time       { shift->{paused_start_time}            }
sub get_skipped                 { shift->{skipped}                      }

lib/Event/ExecFlow/Job.pm  view on Meta::CPAN

sub set_error_message           { shift->{error_message}        = $_[1] }
sub set_warning_message         { shift->{warning_message}      = $_[1] }
sub set_progress_max            { shift->{progress_max}         = $_[1] }
sub set_progress_cnt            { shift->{progress_cnt}         = $_[1] }
sub set_progress_start_time     { shift->{progress_start_time}  = $_[1] }
sub set_progress_end_time       { shift->{progress_end_time}    = $_[1] }
sub set_progress_ips            { shift->{progress_ips}         = $_[1] }
sub set_no_progress             { shift->{no_progress}          = $_[1] }
sub set_last_progress           { shift->{last_progress}        = $_[1] }
sub set_last_percent_logged     { shift->{last_percent_logged}  = $_[1] }
sub set_pre_callbacks           { shift->{pre_callbacks}        = $_[1] }
sub set_post_callbacks          { shift->{post_callbacks}       = $_[1] }
sub set_error_callbacks         { shift->{error_callbacks}      = $_[1] }
sub set_warning_callbacks       { shift->{warning_callbacks}    = $_[1] }
sub set_frontend                { shift->{frontend}             = $_[1] }
sub set_group                   { shift->{group}                = $_[1] }
sub set_diskspace_consumed      { shift->{diskspace_consumed}   = $_[1] }
sub set_diskspace_freed         { shift->{diskspace_freed}      = $_[1] }
sub set_stash                   { shift->{stash}                = $_[1] }
sub set_paused                  { shift->{paused}               = $_[1] }
sub set_paused_seconds          { shift->{paused_seconds}       = $_[1] }
sub set_paused_start_time       { shift->{paused_start_time}    = $_[1] }
sub set_skipped                 { shift->{skipped}              = $_[1] }

lib/Event/ExecFlow/Job.pm  view on Meta::CPAN

    my $self = shift;
    return !$self->get_cancelled &&
           !$self->get_error_message;
}

my $JOB_ID = (time - 1140691085) * 1_000_000;

sub new {
    my $class = shift;
    my %par = @_;
    my  ($title, $name, $depends_on, $pre_callbacks) =
    @par{'title','name','depends_on','pre_callbacks'};
    my  ($post_callbacks, $error_callbacks, $warning_callbacks) =
    @par{'post_callbacks','error_callbacks','warning_callbacks'};
    my  ($progress_cnt, $progress_max, $progress_ips, $no_progress) =
    @par{'progress_cnt','progress_max','progress_ips','no_progress'};
    my  ($diskspace_consumed, $diskspace_freed, $stash, $frontend) =
    @par{'diskspace_consumed','diskspace_freed','stash','frontend'};

    my $id = ++$JOB_ID;

    $depends_on ||= [];
    $stash      ||= {};
    $name       ||= '~'.$id;

    croak "Job '$name' depends on itself"
        if grep { $_ eq $name } @{$depends_on};

    for my $cb ( $pre_callbacks,   $post_callbacks,
                 $error_callbacks, $warning_callbacks ) {
        $cb ||= Event::ExecFlow::Callbacks->new;
        $cb   = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
    }

    my $self = bless {
        id                      => $id,
        title                   => $title,
        name                    => $name,
        depends_on              => $depends_on,
        state                   => 'waiting',
        diskspace_consumed      => $diskspace_consumed,
        diskspace_freed         => $diskspace_freed,
        progress_cnt            => $progress_cnt,
        progress_max            => $progress_max,
        progress_ips            => $progress_ips,
        no_progress             => $no_progress,
        pre_callbacks           => $pre_callbacks,
        post_callbacks          => $post_callbacks,
        error_callbacks         => $error_callbacks,
        warning_callbacks       => $warning_callbacks,
        stash                   => $stash,
        frontend                => $frontend,
        paused_seconds          => 0,
        last_percent_logged     => 0,
        group                   => undef,
    }, $class;
    
    $self->set_depends_on($depends_on);
    
    return $self;

lib/Event/ExecFlow/Job.pm  view on Meta::CPAN

    if ( !$self->get_frontend ) {
        require Event::ExecFlow::Frontend;
        $self->set_frontend(Event::ExecFlow::Frontend->new);
    }
    
    $self->init;
    $self->set_state("running");

    $self->get_frontend->report_job_start($self);
    
    $self->get_pre_callbacks->execute($self);
    
    if ( $self->get_error_message ) {
        $self->execution_finished;
        return 0;
    }
    
    if ( $self->get_warning_message ) {
        $self->get_warning_callbacks->execute($self);
        $self->get_frontend->report_job_warning($self);
    }

    if ( $self->get_skipped ) { # may be set by pre_callbacks
        $self->execution_finished;
        return 0;
    }

    $self->execute;
    
    1;
}

sub reset {

lib/Event/ExecFlow/Job.pm  view on Meta::CPAN


    if ( !$self->get_cancelled ) {
        if ( $self->get_error_message ) {
            $self->set_state("error");
        }
        else {
            $self->set_state("finished");
        }
    }

    $self->get_post_callbacks->execute($self);

    $self->set_state("error") if $self->get_error_message;

    $self->get_frontend->report_job_finished($self);

    if ( !$self->get_cancelled ) {
        if ( $self->get_error_message ) {
            $self->get_error_callbacks->execute($self);
            $self->get_frontend->report_job_error($self);
        }        

        if ( $self->get_warning_message ) {
            $self->get_warning_callbacks->execute($self);
            $self->get_frontend->report_job_warning($self);
        }
    }

    if ( $self->get_type ne 'group' and $self->get_state eq 'finished' ) {
        my $parent = $self;
        while ( $parent = $parent->get_group ) {
            $parent->set_progress_cnt($parent->get_progress_cnt+1);
            $self->get_frontend->report_job_progress($parent);
        }

lib/Event/ExecFlow/Job.pm  view on Meta::CPAN

    return ($currently_consumed, $max_consumed);
}

sub backup_state {
    my $self = shift;
    
    my %data = %{$self};
    
    delete @data{
        qw(
            pre_callbacks
            post_callbacks
            error_callbacks
            warning_callbacks
            frontend
            group
            _post_callbacks_added
        )
    };

    $data{type} = $self->get_type;

    return \%data;
}

sub restore_state {
    my $self = shift;

lib/Event/ExecFlow/Job.pm  view on Meta::CPAN


=head1 SYNOPSIS

  Event::ExecFlow::Job->new (
    title                => Descriptive title,
    name                 => Internal short name,
    depends_on           => Names of jobs, this job depends on,
    progress_max         => Maximum expected progress value,
    progress_ips         => String to show as "items per second",
    no_progress          => Job has no progress state at all,
    pre_callbacks        => Callbacks executed before job starts,
    post_callbacks       => Callbacks executed after job finished,
    error_callbacks      => Callbacks executed if job had errors,
    warning_callbacks    => Callbacks executed if job had warnings,
    stash                => A custom data hash stored with the job,
  );

=head1 DESCRIPTION

This is an abstract base class and usually not used directly from the
application. For daily programming the attributes defined in this
class are most important, since they are common to all Jobs of the
Event::ExecFlow framework.

lib/Event/ExecFlow/Job/Group.pm  view on Meta::CPAN

use strict;
use Scalar::Util qw(weaken);

sub get_type                    { "group" }

sub get_jobs                    { shift->{jobs}                         }
sub get_fail_with_members       { shift->{fail_with_members}            }
sub get_stop_on_failure         { shift->{stop_on_failure}              }
sub get_parallel                { shift->{parallel}                     }
sub get_scheduler               { shift->{scheduler}                    }
sub get_member_finished_callbacks { shift->{member_finished_callbacks}  }

sub set_jobs                    { shift->{jobs}                 = $_[1] }
sub set_fail_with_members       { shift->{fail_with_members}    = $_[1] }
sub set_stop_on_failure         { shift->{stop_on_failure}      = $_[1] }
sub set_parallel                { shift->{parallel}             = $_[1] }
sub set_member_finished_callbacks { shift->{member_finished_callbacks} = $_[1] }

sub new {
    my $class = shift;
    my %par = @_;
    my  ($jobs, $fail_with_members, $stop_on_failure) =
    @par{'jobs','fail_with_members','stop_on_failure'};
    my  ($parallel, $scheduler, $member_finished_callbacks) =
    @par{'parallel','scheduler','member_finished_callbacks'};

    $jobs              = [] unless defined $jobs;
    $fail_with_members = 1  unless defined $fail_with_members;
    $stop_on_failure   = 1  unless defined $stop_on_failure;

    my $self = $class->SUPER::new(@_);

    for my $cb ( $member_finished_callbacks ) {
        $cb ||= Event::ExecFlow::Callbacks->new;
        $cb   = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
    }

    $self->set_jobs($jobs);
    $self->set_fail_with_members($fail_with_members);
    $self->set_stop_on_failure($stop_on_failure);
    $self->set_parallel($parallel);
    $self->set_scheduler($scheduler);
    $self->set_member_finished_callbacks($member_finished_callbacks);

    return $self;
}

sub set_frontend {
    my $self = shift;
    my ($frontend) = @_;
    
    $self->SUPER::set_frontend($frontend);

lib/Event/ExecFlow/Job/Group.pm  view on Meta::CPAN


    return $self->SUPER::reset() if $self->get_progress_cnt == 0;

    0;
}

sub add_child_post_callback {
    my $self = shift;
    my ($job) = @_;
    
    if ( $job->{_post_callbacks_added} ) {
return;
        require Carp;
        Carp::confess($job->get_info.": callbacks added twice!");
    }
    $job->{_post_callbacks_added} = 1;
    
    $job->get_post_callbacks->add( sub {
        my ($job) = @_;
        $self->child_job_finished($job);
        1;
    });

    1;
}

sub start_child_job {
    my $self = shift;

lib/Event/ExecFlow/Job/Group.pm  view on Meta::CPAN


    1;
}

sub child_job_finished {
    my $self = shift;
    my ($job) = @_;
    
    $Event::ExecFlow::DEBUG && print "Group->child_job_finished(".$job->get_info.")\n";

    $self->get_member_finished_callbacks->execute()
        if $self->get_member_finished_callbacks;

    if ( $job->get_error_message && !$job->get_cancelled ) {
        if  ( $self->get_fail_with_members ) {
            $self->set_state("error");
            $self->add_job_error_message($job);
            $self->get_frontend->report_job_error($self);
        }
    }

    if ( $self->get_scheduler ) {

lib/Event/ExecFlow/Job/Group.pm  view on Meta::CPAN

    return ($currently_consumed, $max_consumed);
}

sub backup_state {
    my $self = shift;
    
    my $data_href = $self->SUPER::backup_state();
    
    delete $data_href->{jobs};
    delete $data_href->{scheduler};
    delete $data_href->{member_finished_callbacks};

    my $jobs = $self->get_jobs;
    foreach my $job ( @{$jobs} ) {
        push @{$data_href->{jobs}}, 
            $job->backup_state;
    }
    
    return $data_href;
}

t/02.parallel.t  view on Meta::CPAN

    my $max = 5;
    my $dur = 2;

    for my $i ( 1..$max ) {
        push @jobs, Event::ExecFlow::Job::Command->new (
            name            => "sleep_${nr}_$i",
            title           => "Take a sleep ($i/$max)",
            command         => "perl -e'\$|=1;for(1..$dur){print qq(\$_\\n);sleep 1}'",
            progress_max    => $dur,
            progress_parser => qr/(\d+)/,
            post_callbacks  => sub {
                my ($job) = @_;
                ok($job->get_state eq 'finished',"Job $i executed Ok");
            },
        );
    }
    
    return Event::ExecFlow::Job::Group->new (
        name        => "sleeps_$nr",
        title       => "A bunch of sleeps",
        jobs        => \@jobs,



( run in 3.173 seconds using v1.01-cache-2.11-cpan-9b1e4054eb1 )