view release on metacpan or search on metacpan
it was successful.
- Group considered itself finished even when a child
job didn't finish successfully (cancelled or error).
0.62 Sat Jun 17, 2006, joern
Features:
- Executed programs are now set into C locale, so parsing
output is independent from the locale settings.
Bugfix:
- set job into error state if post callbacks return
with error.
0.61 Sun Apr 2, 2006, joern
Notes:
- added some helper methods to the Group class
- scheduler stuff is still work in progress and needs some cleanup
- shell commands maybe closures returning the shell code at runtime
- fixed a bug with depends_on argument to constructor not
processed correctly
lib/Event/ExecFlow/Callbacks.pm view on Meta::CPAN
__END__
=head1 NAME
Event::ExecFlow::Callbacks - Callbacks attached to jobs
=head1 SYNOPSIS
#-- Create a new Callbacks object
my $callbacks = Event::ExecFlow::Callbacks->new (
sub { print "sub called\n" },
sub { print "another sub of this called\n" },
);
#-- Attach callbacks to a job
$job->set_pre_callbacks($callbacks);
#-- Add more subs
$callbacks->add(sub { print "a sub added later\n" });
$callbacks->prepend(sub { print "a sub prepended to the list of subs } );
#-- the execute() methods is executed later by Event::ExecFlow
$callbacks->execute($job);
=head1 DESCRIPTION
This class represents one or more closures which can be attached as
callbacks to an Event::ExecFlow::Job.
=head1 OBJECT HIERARCHY
Event::ExecFlow
Event::ExecFlow::Job
+--- Event::ExecFlow::Job::Group
+--- Event::ExecFlow::Job::Command
+--- Event::ExecFlow::Job::Code
lib/Event/ExecFlow/Frontend/Term.pm view on Meta::CPAN
sub get_nl_needed { shift->{nl_needed} }
sub set_quiet { shift->{quiet} = $_[1] }
sub set_nl_needed { shift->{nl_needed} = $_[1] }
sub start_job {
my $self = shift;
my ($job) = @_;
my $w = AnyEvent->condvar;
$job->get_post_callbacks->add(sub { $w->broadcast });
$self->SUPER::start_job($job);
$w->wait;
1;
}
sub report_job_start {
my $self = shift;
my ($job) = @_;
lib/Event/ExecFlow/Job.pm view on Meta::CPAN
sub get_error_message { shift->{error_message} }
sub get_warning_message { shift->{warning_message} }
sub get_progress_max { shift->{progress_max} }
sub get_progress_cnt { shift->{progress_cnt} }
sub get_progress_start_time { shift->{progress_start_time} }
sub get_progress_end_time { shift->{progress_end_time} }
sub get_progress_ips { shift->{progress_ips} }
sub get_no_progress { shift->{no_progress} }
sub get_last_progress { shift->{last_progress} }
sub get_last_percent_logged { shift->{last_percent_logged} }
sub get_pre_callbacks { shift->{pre_callbacks} }
sub get_post_callbacks { shift->{post_callbacks} }
sub get_error_callbacks { shift->{error_callbacks} }
sub get_warning_callbacks { shift->{warning_callbacks} }
sub get_frontend { shift->{frontend} }
sub get_group { shift->{group} }
sub get_diskspace_consumed { shift->{diskspace_consumed} }
sub get_diskspace_freed { shift->{diskspace_freed} }
sub get_stash { shift->{stash} }
sub get_paused { shift->{paused} }
sub get_paused_seconds { shift->{paused_seconds} }
sub get_paused_start_time { shift->{paused_start_time} }
sub get_skipped { shift->{skipped} }
lib/Event/ExecFlow/Job.pm view on Meta::CPAN
sub set_error_message { shift->{error_message} = $_[1] }
sub set_warning_message { shift->{warning_message} = $_[1] }
sub set_progress_max { shift->{progress_max} = $_[1] }
sub set_progress_cnt { shift->{progress_cnt} = $_[1] }
sub set_progress_start_time { shift->{progress_start_time} = $_[1] }
sub set_progress_end_time { shift->{progress_end_time} = $_[1] }
sub set_progress_ips { shift->{progress_ips} = $_[1] }
sub set_no_progress { shift->{no_progress} = $_[1] }
sub set_last_progress { shift->{last_progress} = $_[1] }
sub set_last_percent_logged { shift->{last_percent_logged} = $_[1] }
sub set_pre_callbacks { shift->{pre_callbacks} = $_[1] }
sub set_post_callbacks { shift->{post_callbacks} = $_[1] }
sub set_error_callbacks { shift->{error_callbacks} = $_[1] }
sub set_warning_callbacks { shift->{warning_callbacks} = $_[1] }
sub set_frontend { shift->{frontend} = $_[1] }
sub set_group { shift->{group} = $_[1] }
sub set_diskspace_consumed { shift->{diskspace_consumed} = $_[1] }
sub set_diskspace_freed { shift->{diskspace_freed} = $_[1] }
sub set_stash { shift->{stash} = $_[1] }
sub set_paused { shift->{paused} = $_[1] }
sub set_paused_seconds { shift->{paused_seconds} = $_[1] }
sub set_paused_start_time { shift->{paused_start_time} = $_[1] }
sub set_skipped { shift->{skipped} = $_[1] }
lib/Event/ExecFlow/Job.pm view on Meta::CPAN
my $self = shift;
return !$self->get_cancelled &&
!$self->get_error_message;
}
my $JOB_ID = (time - 1140691085) * 1_000_000;
sub new {
my $class = shift;
my %par = @_;
my ($title, $name, $depends_on, $pre_callbacks) =
@par{'title','name','depends_on','pre_callbacks'};
my ($post_callbacks, $error_callbacks, $warning_callbacks) =
@par{'post_callbacks','error_callbacks','warning_callbacks'};
my ($progress_cnt, $progress_max, $progress_ips, $no_progress) =
@par{'progress_cnt','progress_max','progress_ips','no_progress'};
my ($diskspace_consumed, $diskspace_freed, $stash, $frontend) =
@par{'diskspace_consumed','diskspace_freed','stash','frontend'};
my $id = ++$JOB_ID;
$depends_on ||= [];
$stash ||= {};
$name ||= '~'.$id;
croak "Job '$name' depends on itself"
if grep { $_ eq $name } @{$depends_on};
for my $cb ( $pre_callbacks, $post_callbacks,
$error_callbacks, $warning_callbacks ) {
$cb ||= Event::ExecFlow::Callbacks->new;
$cb = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
}
my $self = bless {
id => $id,
title => $title,
name => $name,
depends_on => $depends_on,
state => 'waiting',
diskspace_consumed => $diskspace_consumed,
diskspace_freed => $diskspace_freed,
progress_cnt => $progress_cnt,
progress_max => $progress_max,
progress_ips => $progress_ips,
no_progress => $no_progress,
pre_callbacks => $pre_callbacks,
post_callbacks => $post_callbacks,
error_callbacks => $error_callbacks,
warning_callbacks => $warning_callbacks,
stash => $stash,
frontend => $frontend,
paused_seconds => 0,
last_percent_logged => 0,
group => undef,
}, $class;
$self->set_depends_on($depends_on);
return $self;
lib/Event/ExecFlow/Job.pm view on Meta::CPAN
if ( !$self->get_frontend ) {
require Event::ExecFlow::Frontend;
$self->set_frontend(Event::ExecFlow::Frontend->new);
}
$self->init;
$self->set_state("running");
$self->get_frontend->report_job_start($self);
$self->get_pre_callbacks->execute($self);
if ( $self->get_error_message ) {
$self->execution_finished;
return 0;
}
if ( $self->get_warning_message ) {
$self->get_warning_callbacks->execute($self);
$self->get_frontend->report_job_warning($self);
}
if ( $self->get_skipped ) { # may be set by pre_callbacks
$self->execution_finished;
return 0;
}
$self->execute;
1;
}
sub reset {
lib/Event/ExecFlow/Job.pm view on Meta::CPAN
if ( !$self->get_cancelled ) {
if ( $self->get_error_message ) {
$self->set_state("error");
}
else {
$self->set_state("finished");
}
}
$self->get_post_callbacks->execute($self);
$self->set_state("error") if $self->get_error_message;
$self->get_frontend->report_job_finished($self);
if ( !$self->get_cancelled ) {
if ( $self->get_error_message ) {
$self->get_error_callbacks->execute($self);
$self->get_frontend->report_job_error($self);
}
if ( $self->get_warning_message ) {
$self->get_warning_callbacks->execute($self);
$self->get_frontend->report_job_warning($self);
}
}
if ( $self->get_type ne 'group' and $self->get_state eq 'finished' ) {
my $parent = $self;
while ( $parent = $parent->get_group ) {
$parent->set_progress_cnt($parent->get_progress_cnt+1);
$self->get_frontend->report_job_progress($parent);
}
lib/Event/ExecFlow/Job.pm view on Meta::CPAN
return ($currently_consumed, $max_consumed);
}
sub backup_state {
my $self = shift;
my %data = %{$self};
delete @data{
qw(
pre_callbacks
post_callbacks
error_callbacks
warning_callbacks
frontend
group
_post_callbacks_added
)
};
$data{type} = $self->get_type;
return \%data;
}
sub restore_state {
my $self = shift;
lib/Event/ExecFlow/Job.pm view on Meta::CPAN
=head1 SYNOPSIS
Event::ExecFlow::Job->new (
title => Descriptive title,
name => Internal short name,
depends_on => Names of jobs, this job depends on,
progress_max => Maximum expected progress value,
progress_ips => String to show as "items per second",
no_progress => Job has no progress state at all,
pre_callbacks => Callbacks executed before job starts,
post_callbacks => Callbacks executed after job finished,
error_callbacks => Callbacks executed if job had errors,
warning_callbacks => Callbacks executed if job had warnings,
stash => A custom data hash stored with the job,
);
=head1 DESCRIPTION
This is an abstract base class and usually not used directly from the
application. For daily programming the attributes defined in this
class are most important, since they are common to all Jobs of the
Event::ExecFlow framework.
lib/Event/ExecFlow/Job/Group.pm view on Meta::CPAN
use strict;
use Scalar::Util qw(weaken);
sub get_type { "group" }
sub get_jobs { shift->{jobs} }
sub get_fail_with_members { shift->{fail_with_members} }
sub get_stop_on_failure { shift->{stop_on_failure} }
sub get_parallel { shift->{parallel} }
sub get_scheduler { shift->{scheduler} }
sub get_member_finished_callbacks { shift->{member_finished_callbacks} }
sub set_jobs { shift->{jobs} = $_[1] }
sub set_fail_with_members { shift->{fail_with_members} = $_[1] }
sub set_stop_on_failure { shift->{stop_on_failure} = $_[1] }
sub set_parallel { shift->{parallel} = $_[1] }
sub set_member_finished_callbacks { shift->{member_finished_callbacks} = $_[1] }
sub new {
my $class = shift;
my %par = @_;
my ($jobs, $fail_with_members, $stop_on_failure) =
@par{'jobs','fail_with_members','stop_on_failure'};
my ($parallel, $scheduler, $member_finished_callbacks) =
@par{'parallel','scheduler','member_finished_callbacks'};
$jobs = [] unless defined $jobs;
$fail_with_members = 1 unless defined $fail_with_members;
$stop_on_failure = 1 unless defined $stop_on_failure;
my $self = $class->SUPER::new(@_);
for my $cb ( $member_finished_callbacks ) {
$cb ||= Event::ExecFlow::Callbacks->new;
$cb = Event::ExecFlow::Callbacks->new($cb) if ref $cb eq 'CODE';
}
$self->set_jobs($jobs);
$self->set_fail_with_members($fail_with_members);
$self->set_stop_on_failure($stop_on_failure);
$self->set_parallel($parallel);
$self->set_scheduler($scheduler);
$self->set_member_finished_callbacks($member_finished_callbacks);
return $self;
}
sub set_frontend {
my $self = shift;
my ($frontend) = @_;
$self->SUPER::set_frontend($frontend);
lib/Event/ExecFlow/Job/Group.pm view on Meta::CPAN
return $self->SUPER::reset() if $self->get_progress_cnt == 0;
0;
}
sub add_child_post_callback {
my $self = shift;
my ($job) = @_;
if ( $job->{_post_callbacks_added} ) {
return;
require Carp;
Carp::confess($job->get_info.": callbacks added twice!");
}
$job->{_post_callbacks_added} = 1;
$job->get_post_callbacks->add( sub {
my ($job) = @_;
$self->child_job_finished($job);
1;
});
1;
}
sub start_child_job {
my $self = shift;
lib/Event/ExecFlow/Job/Group.pm view on Meta::CPAN
1;
}
sub child_job_finished {
my $self = shift;
my ($job) = @_;
$Event::ExecFlow::DEBUG && print "Group->child_job_finished(".$job->get_info.")\n";
$self->get_member_finished_callbacks->execute()
if $self->get_member_finished_callbacks;
if ( $job->get_error_message && !$job->get_cancelled ) {
if ( $self->get_fail_with_members ) {
$self->set_state("error");
$self->add_job_error_message($job);
$self->get_frontend->report_job_error($self);
}
}
if ( $self->get_scheduler ) {
lib/Event/ExecFlow/Job/Group.pm view on Meta::CPAN
return ($currently_consumed, $max_consumed);
}
sub backup_state {
my $self = shift;
my $data_href = $self->SUPER::backup_state();
delete $data_href->{jobs};
delete $data_href->{scheduler};
delete $data_href->{member_finished_callbacks};
my $jobs = $self->get_jobs;
foreach my $job ( @{$jobs} ) {
push @{$data_href->{jobs}},
$job->backup_state;
}
return $data_href;
}
t/02.parallel.t view on Meta::CPAN
my $max = 5;
my $dur = 2;
for my $i ( 1..$max ) {
push @jobs, Event::ExecFlow::Job::Command->new (
name => "sleep_${nr}_$i",
title => "Take a sleep ($i/$max)",
command => "perl -e'\$|=1;for(1..$dur){print qq(\$_\\n);sleep 1}'",
progress_max => $dur,
progress_parser => qr/(\d+)/,
post_callbacks => sub {
my ($job) = @_;
ok($job->get_state eq 'finished',"Job $i executed Ok");
},
);
}
return Event::ExecFlow::Job::Group->new (
name => "sleeps_$nr",
title => "A bunch of sleeps",
jobs => \@jobs,