BPM-Engine
view release on metacpan or search on metacpan
lib/BPM/Engine/Store/ResultRole/ActivityInstanceJoin.pm view on Meta::CPAN
#warn("Not an active instance") unless $self->is_active;
my $activity = $self->activity;
if(!$activity->is_join) {
die("Not a join " . $activity->activity_uid);
}
elsif($activity->is_and_join) {
return $self->_and_join_should_fire;
}
elsif($activity->is_or_join) {
return $self->_or_join_should_fire;
}
elsif($activity->is_xor_join) {
return 1;
}
elsif($activity->is_complex_join) {
throw_abstract error => "Complex joins not implemented";
}
else {
die("Not a valid join type " . $self->activity->join_type);
}
}
## use critic (ProhibitCascadingIfElse)
sub _and_join_should_fire {
my $self = shift;
my $activity = $self->activity;
my $deferred_states = $self->result_source->schema
->resultset('ActivityInstance')->deferred({
process_instance_id => $self->process_instance_id,
activity_id => $activity->id,
tokenset => $self->tokenset,
});
my %deferred_trans = map { $_->transition_id => 1 } $deferred_states->all;
$deferred_trans{$self->transition_id} = 1;
my @transa = $activity->transitions_in->all;
my @transd = grep { $deferred_trans{$_->id} } @transa;
return scalar @transa == @transd ? 1 : 0;
}
sub _or_join_should_fire {
my $self = shift;
my $deferred_states = $self->result_source->schema
->resultset('ActivityInstance')->deferred({
activity_id => $self->activity->id,
process_instance_id => $self->process_instance_id,
tokenset => $self->tokenset,
});
my %deferred_trans = map { $_->transition_id => 1 } $deferred_states->all;
$deferred_trans{$self->transition_id} = 1;
# Each transition corresponds to either waiting for upstream,
# executed+deferred, blocked, the start of a new cycle or this ai's
# transition itself. Join should fire if there's no upstream activity left.
foreach my $transition($self->activity->transitions_in->all) {
next if($deferred_trans{$transition->id});
next if($transition->is_back_edge);
return 0 unless $self->_upstream_blocked($transition);
}
return 1;
}
# Search the transition's upstream subnet for active or blocked activity
# instances. Transition has not been applied yet, so either
# - still activity further upstream (last ai in process thread=active), or
# - split.path blocked for last completed ai in process thread
sub _upstream_blocked {
my ($self, $transition) = @_;
my $rs = $self->process_instance->activity_instances_rs({
tokenset => $self->tokenset,
})->active_or_completed;
my $split_blocked = sub {
my ($ai, $trans) = @_;
my $split = $ai->split || die("Inclusive split has no join attached");
$split->discard_changes;
if( $split->states->{$trans->id}
&& $split->states->{$trans->id} eq 'blocked') {
# no blocking if followed a backedge upstream (cyclic wf)
my @tids =
map { $_->id }
$ai->activity->transitions({ is_back_edge => 1 })->all;
if(scalar @tids) {
return 0 if $ai->next({ transition_id => [@tids] })->count;
}
return 1;
}
else {
return 0;
}
};
my $seen = 0;
my $block = 0;
my(@act) = ([$transition->from_activity, $transition]);
while(my $next = shift(@act)) {
my ($upstream_act, $down_trans) = ($next->[0], $next->[1]);
my @ai = $rs->search({'activity_id' => $upstream_act->id})->all;
# no activity instances, traverse further upstream
if(!scalar @ai) {
foreach my $trans($upstream_act->transitions_in) {
next if $trans->is_back_edge;
my $src = $trans->from_activity;
unless($src->id == $self->activity->id) {
push(@act, [$src, $trans]);
}
}
}
# active or completed+blocked instances
else {
$seen++;
my %status = ();
foreach(@ai) {
$status{
$_->is_deferred ? 'deferred' :
($_->is_completed ? 'completed' : 'active')
}++;
}
die("Invalid db state for instances " . $upstream_act->activity_uid)
if($status{deferred} && ($status{active} || $status{completed}));
die("Invalid db state for instances " . $upstream_act->activity_uid)
if($status{active} && $status{active} > 1);
# active ai, may have come from split upstream
if($status{active}) {
return 0;
}
# completed, is_split, blocked transition path
elsif($status{completed} && scalar(keys %status) == 1) {
# OR-split should be blocked, XOR split missed this transition by definition
if($upstream_act->is_or_split) {
my $blocked = 0;
foreach my $ai(@ai) {
$blocked++ if &$split_blocked($ai, $down_trans);
}
die("OR split " . $upstream_act->activity_uid . " completed but not blocked")
unless $blocked;
}
elsif(!$upstream_act->is_xor_split) {
die("Not an OR/XOR split " . $upstream_act->activity_uid);
}
$block++;
}
else {
die("Wrong status");
}
}
}
die("Invalid transition " . $transition->transition_uid) unless $block == $seen;
return 1;
}
sub fire_join {
my $self = shift;
die("Not a join") unless $self->activity->is_join;
die("Not active") unless $self->is_active;
$self->_mark_upstream_joined();
# make all deferreds completed
$self->result_source->schema
->resultset('ActivityInstance')->deferred({
activity_id => $self->activity->id,
process_instance_id => $self->process_instance_id,
tokenset => $self->tokenset,
})->update({ deferred => \'NULL', completed => DateTime->now() });
}
# follow upstream up to root, stopping only after all 'takens' have been set to
# 'joineds' OR stop when first 'joined' found (meaning previously traversed
# upwards when this was previously set from 'taken')
sub _mark_upstream_joined {
my ($self, $open_reach) = @_;
$open_reach ||= {};
# verify paths against local joins
my $upstream_ai = undef;
my $transition = $self->transition or die("Transition not found");
my $is_parent = 1;
# traverse upstream
while($upstream_ai = $self->prev) {
delete $open_reach->{$upstream_ai->activity->id};
if($upstream_ai->activity->is_or_split) {
if($upstream_ai->activity->id != $transition->from_activity->id) {
die("ShouldFire: Illegal transition for JoinActivity '" .
$upstream_ai->activity->activity_uid .
"' doesn't match transition " . $transition->transition_uid .
" activity '" . $transition->from_activity->activity_uid . "'");
}
my $split = $upstream_ai->split
|| die("Inclusive split has no join attached");
$split->discard_changes;
( run in 2.512 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )