BPM-Engine

 view release on metacpan or  search on metacpan

lib/BPM/Engine/Store/ResultRole/ActivityInstanceJoin.pm  view on Meta::CPAN


    #warn("Not an active instance") unless $self->is_active;    
    
    my $activity = $self->activity;
    if(!$activity->is_join) {
        die("Not a join " . $activity->activity_uid);
        }
    elsif($activity->is_and_join) {
        return $self->_and_join_should_fire;
        }
    elsif($activity->is_or_join) {
        return $self->_or_join_should_fire;
        }
    elsif($activity->is_xor_join) {
        return 1;
        }
    elsif($activity->is_complex_join) {
        throw_abstract error => "Complex joins not implemented";
        }
    else {
        die("Not a valid join type " . $self->activity->join_type);
        }
    }

## use critic (ProhibitCascadingIfElse)

sub _and_join_should_fire {
    my $self = shift;

    my $activity = $self->activity;
    my $deferred_states = $self->result_source->schema
        ->resultset('ActivityInstance')->deferred({
            process_instance_id => $self->process_instance_id,
            activity_id         => $activity->id,
            tokenset            => $self->tokenset,
        });

    my %deferred_trans = map { $_->transition_id => 1 } $deferred_states->all;
    $deferred_trans{$self->transition_id} = 1;

    my @transa = $activity->transitions_in->all;
    my @transd = grep { $deferred_trans{$_->id} } @transa;

    return scalar @transa == @transd ? 1 : 0;
    }

sub _or_join_should_fire {
    my $self = shift;

    my $deferred_states = $self->result_source->schema
        ->resultset('ActivityInstance')->deferred({
          activity_id         => $self->activity->id,
          process_instance_id => $self->process_instance_id,
          tokenset            => $self->tokenset,
        });

    my %deferred_trans = map { $_->transition_id => 1 } $deferred_states->all;
    $deferred_trans{$self->transition_id} = 1;
    
    # Each transition corresponds to either waiting for upstream,
    # executed+deferred, blocked, the start of a new cycle or this ai's
    # transition itself. Join should fire if there's no upstream activity left.
    foreach my $transition($self->activity->transitions_in->all) {
        next if($deferred_trans{$transition->id});
        next if($transition->is_back_edge);
        return 0 unless $self->_upstream_blocked($transition);
        }
    
    return 1;
    }

# Search the transition's upstream subnet for active or blocked activity 
# instances. Transition has not been applied yet, so either
# - still activity further upstream (last ai in process thread=active), or
# - split.path blocked for last completed ai in process thread
sub _upstream_blocked {
    my ($self, $transition) = @_;
    
    my $rs = $self->process_instance->activity_instances_rs({
        tokenset => $self->tokenset,
        })->active_or_completed;
    
    my $split_blocked = sub {    
        my ($ai, $trans) = @_;
        my $split = $ai->split || die("Inclusive split has no join attached");
        $split->discard_changes;
        if(   $split->states->{$trans->id} 
           && $split->states->{$trans->id} eq 'blocked') {
            # no blocking if followed a backedge upstream (cyclic wf)
            my @tids = 
                map { $_->id } 
                $ai->activity->transitions({ is_back_edge => 1 })->all;
            if(scalar @tids) {
                return 0 if $ai->next({ transition_id => [@tids] })->count;
                }
            return 1;
            }
        else {
            return 0;
            }
        };
    
    my $seen  = 0;
    my $block = 0;
    
    my(@act) = ([$transition->from_activity, $transition]);
    while(my $next = shift(@act)) {
        my ($upstream_act, $down_trans) = ($next->[0], $next->[1]);
        my @ai = $rs->search({'activity_id' => $upstream_act->id})->all;
        
        # no activity instances, traverse further upstream
        if(!scalar @ai) {
            foreach my $trans($upstream_act->transitions_in) {
                next if $trans->is_back_edge;
                my $src = $trans->from_activity;
                unless($src->id == $self->activity->id) {
                    push(@act, [$src, $trans]);
                    }
                }
            }
        # active or completed+blocked instances
        else {
            $seen++;
            my %status = ();
            foreach(@ai) { 
                $status{
                    $_->is_deferred ?  'deferred' : 
                    ($_->is_completed ? 'completed' : 'active') 
                    }++; 
                }

            die("Invalid db state for instances " . $upstream_act->activity_uid)
                if($status{deferred} && ($status{active} || $status{completed}));
            die("Invalid db state for instances " . $upstream_act->activity_uid)
                if($status{active} && $status{active} > 1);

            # active ai, may have come from split upstream
            if($status{active}) {
                return 0;
                }
            # completed, is_split, blocked transition path
            elsif($status{completed} && scalar(keys %status) == 1) {
                # OR-split should be blocked, XOR split missed this transition by definition
                if($upstream_act->is_or_split) {
                    my $blocked = 0;                    
                    foreach my $ai(@ai) {
                        $blocked++ if &$split_blocked($ai, $down_trans);
                        }
                    die("OR split " . $upstream_act->activity_uid . " completed but not blocked") 
                        unless $blocked;
                    }
                elsif(!$upstream_act->is_xor_split) {
                    die("Not an OR/XOR split " . $upstream_act->activity_uid);
                    }
                $block++;
                }
            else {
                die("Wrong status");
                }
            }
        }
    die("Invalid transition " . $transition->transition_uid) unless $block == $seen;
    return 1;
    }

sub fire_join {
    my $self = shift;
    
    die("Not a join") unless $self->activity->is_join;
    die("Not active") unless $self->is_active;

    $self->_mark_upstream_joined();

    # make all deferreds completed
    $self->result_source->schema
        ->resultset('ActivityInstance')->deferred({
          activity_id         => $self->activity->id,
          process_instance_id => $self->process_instance_id,
          tokenset            => $self->tokenset,
        })->update({ deferred => \'NULL', completed => DateTime->now() });
    }


# follow upstream up to root, stopping only after all 'takens' have been set to
# 'joineds' OR stop when first 'joined' found  (meaning previously traversed
# upwards when this was previously set from 'taken')
sub _mark_upstream_joined {
    my ($self, $open_reach) = @_;
    $open_reach ||= {};

    # verify paths against local joins
    my $upstream_ai = undef;
    my $transition  = $self->transition or die("Transition not found");
    my $is_parent   = 1;

    # traverse upstream
    while($upstream_ai = $self->prev) {
        delete $open_reach->{$upstream_ai->activity->id};
        
        if($upstream_ai->activity->is_or_split) {
            if($upstream_ai->activity->id != $transition->from_activity->id) {
                die("ShouldFire: Illegal transition for JoinActivity '" .
                    $upstream_ai->activity->activity_uid .
                    "' doesn't match transition " . $transition->transition_uid .
                    " activity '" . $transition->from_activity->activity_uid . "'");
                }

            my $split = $upstream_ai->split 
                || die("Inclusive split has no join attached");
            $split->discard_changes;



( run in 2.512 seconds using v1.01-cache-2.11-cpan-39bf76dae61 )