BPM-Engine
view release on metacpan or search on metacpan
lib/BPM/Engine/ProcessRunner.pm view on Meta::CPAN
$instance->apply_transition('finish');
$instance->fire_join if $activity->is_join;
$instance->update({ completed => DateTime->now() });
if ($activity->is_end_activity()) {
unless ($self->process_instance->activity_instances_rs->active->count) {
$self->complete_process();
return;
}
}
else {
$self->_execute_transitions($activity, $instance);
}
$self->_run if $run;
}
sub complete_process {
my $self = shift;
my $pi = $self->process_instance;
return unless $self->cb_complete_process($self->process, $pi);
$pi->apply_transition('finish');
$pi->update({ completed => DateTime->now() });
$self->_queue_clear();
$self->_defer_clear();
if ($pi->parent_ai_id) {
my $pai = $pi->parent_activity_instance;
$self->_complete_parent_activity($pai->activity, $pai);
}
}
sub _complete_parent_activity {
my ($self, $activity, $instance) = @_;
$self->error('runner: subflows not implemented');
throw_abstract error => 'Subflows not implemented';
}
sub _execute_transitions {
my ($self, $activity, $instance) = @_;
my $pref = { prefetch => ['from_activity', 'to_activity'] };
my @transitions =
$activity->is_split
? $activity->transitions_by_ref({}, $pref)->all
: $activity->transitions({}, $pref)->all;
unless (@transitions) {
my $act_id =
$activity->activity_name
|| $activity->activity_uid
|| $activity->id;
throw_model error =>
"Model error: no outgoing transitions for activity '$act_id'";
}
my (@instances) = ();
my (@blocked) = ();
my ($stop_following, $fired_count) = (0, 0);
my ($otherwise, $exception) = ();
# evaluate efferent transitions
foreach my $transition (@transitions) {
if ( $transition->condition_type eq 'NONE'
|| $transition->condition_type eq 'CONDITION') {
my $t_instance;
unless ($stop_following) {
$t_instance =
$self->_execute_transition($transition, $instance, 0);
}
if ($t_instance) {
push(@instances, [$transition, $t_instance]);
$fired_count++;
# only one transition in an XOR split can fire.
$stop_following++ if $activity->is_xor_split;
}
elsif ($activity->is_split) {
my $split = $instance->split
or die("No split for " . $activity->activity_uid);
$split->set_transition($transition->id, 'blocked');
push(@blocked, [$transition, $instance]);
}
}
elsif ($transition->condition_type eq 'OTHERWISE') {
$otherwise = $transition;
}
elsif ($transition->condition_type eq 'DEFAULTEXCEPTION'
|| $transition->condition_type eq 'EXCEPTION') {
$exception = $transition;
}
}
if ($fired_count == 0) {
unless ($otherwise) {
throw_model(
error => "Deadlock: OTHERWISE transition missing on activity '"
. $activity->activity_uid
. "'");
}
my $t_instance = $self->_execute_transition($otherwise, $instance, 0);
if ($t_instance) {
push(@instances, [$otherwise, $t_instance]);
}
else {
throw_runner error =>
"Execution of transition with 'Otherwise' condition failed";
}
}
elsif ($otherwise && $activity->is_split) {
my $split = $instance->split
or die("No join found for split " . $activity->activity_uid);
$split->set_transition($otherwise->id, 'blocked');
}
# activate successor activities
my $followed_back = 0;
foreach my $inst (@instances) {
$followed_back++ if $inst->[0]->is_back_edge;
my $r_instance = $inst->[1];
my $r_activity = $r_instance->activity;
$self->_enqueue_ai($r_activity, $r_instance);
}
# blocked paths may trigger downstream deferred activities which must now be
# resolved; signal deferred activity instances on other branches in the
# wf-net when paths were blocked and any transition downstream was followed
if (scalar(@blocked) && $followed_back != scalar @instances) {
$self->_signal_upstream_orjoins_if_in_split_branch(@blocked);
}
return;
}
sub _execute_transition {
my ($self, $transition, $from_instance, $run) = @_;
#XXX mitigate expensive debugging
my $tid = $transition->transition_uid || $transition->id || 'noid';
$self->debug("runner: executing transition $tid from "
. $transition->from_activity->activity_uid . ' to '
. $transition->to_activity->activity_uid);
$self->cb_execute_transition($transition, $from_instance);
my $to_instance = eval { $transition->apply($from_instance); };
my $err = $@;
if ($err) {
$self->debug("runner: transition '"
. $transition->transition_uid
. "' did not result in a new activity_instance : $err");
if (is_Exception($err)) {
# condition false
return if $err->isa('BPM::Engine::Exception::Condition');
#warn $err->trace->as_string;
$err->rethrow;
}
else {
$err =~ s/\n//;
$self->error("Error applying transition: $err");
throw_model error => $err;
}
}
elsif (!$to_instance) {
$self->error("Applying transition did not result in an instance");
throw_runner error => "Applying transition did not return an instance";
}
$self->_run if $run;
return $to_instance;
}
sub _enqueue_ai {
my ($self, $activity, $instance, $deferred) = @_;
$self->debug("runner: _enqueue activity " . $activity->activity_uid);
my $should_fire = $activity->is_join ? $instance->is_enabled() : 1;
if ($should_fire) {
if ($instance->is_deferred) {
#$instance->update({ deferred => \'NULL' });
$instance->update({ deferred => undef })->discard_changes;
}
#$instance->fire_join if $activity->is_join;
if ($activity->is_auto_start) {
$self->debug("runner: _enqueue Pushing instance "
. $activity->activity_uid
. " to active queue");
$self->start_activity($activity, $instance, 0);
}
}
else {
$instance->update({ deferred => DateTime->now });
$self->debug("runner: _enqueue Pushing instance "
. $activity->activity_uid
. " to deferred queue");
$self->_defer_push([$activity, $instance]) unless $deferred;
}
}
sub _signal_upstream_orjoins_if_in_split_branch {
my ($self, @blocked) = @_;
my @deferred = $self->process_instance->activity_instances->deferred->all;
foreach my $instance (@deferred) {
$self->debug("runner: _run Pushing db instance "
. $instance->activity->activity_uid
. " to deferred queue");
my $graph = $self->graph;
foreach my $block (@blocked) {
my $tr = $block->[0];
my $ai = $block->[1];
my $a_to = $tr->to_activity;
if ($graph->is_reachable($a_to->id, $instance->activity->id)) {
$self->_defer_push([$instance->activity, $instance]);
}
}
}
}
__PACKAGE__->meta->make_immutable;
1;
__END__
=pod
=encoding utf-8
=head1 NAME
BPM::Engine::ProcessRunner - Runs workflow processes
=head1 VERSION
0.01
=head1 SYNOPSIS
use BPM::Engine::ProcessRunner;
my $callback = sub {
my($runner, $entity, $action, $node, $instance) = @_;
...
return 1;
};
my $runner = BPM::Engine::ProcessRunner->new(
process_instance => $instance,
callback => $callback,
);
$runner->start_process();
# somewhere else, after completing a task,
# from an asynchronous task handler...
$runner->complete_activity($activity, $instance, 1);
=head1 DESCRIPTION
Implements the workflow enactment logic.
=head1 CALLBACKS
The methods in this package emit callback events to a callback handler that may
be passed to the constructor. If no callback handler is specified, the default
return values are applied for these event calls.
( run in 0.431 second using v1.01-cache-2.11-cpan-e1769b4cff6 )