BPM-Engine

 view release on metacpan or  search on metacpan

lib/BPM/Engine/ProcessRunner.pm  view on Meta::CPAN

    $instance->apply_transition('finish');
    $instance->fire_join if $activity->is_join;    
    $instance->update({ completed => DateTime->now() });

    if ($activity->is_end_activity()) {
        unless ($self->process_instance->activity_instances_rs->active->count) {
            $self->complete_process();
            return;
            }
        }
    else {
        $self->_execute_transitions($activity, $instance);
        }

    $self->_run if $run;
    }

sub complete_process {
    my $self = shift;

    my $pi = $self->process_instance;
    return unless $self->cb_complete_process($self->process, $pi);

    $pi->apply_transition('finish');
    $pi->update({ completed => DateTime->now() });

    $self->_queue_clear();
    $self->_defer_clear();

    if ($pi->parent_ai_id) {
        my $pai = $pi->parent_activity_instance;
        $self->_complete_parent_activity($pai->activity, $pai);
        }
    }

sub _complete_parent_activity {
    my ($self, $activity, $instance) = @_;

    $self->error('runner: subflows not implemented');
    throw_abstract error => 'Subflows not implemented';
    }

sub _execute_transitions {
    my ($self, $activity, $instance) = @_;

    my $pref = { prefetch => ['from_activity', 'to_activity'] };
    my @transitions =
          $activity->is_split
        ? $activity->transitions_by_ref({}, $pref)->all
        : $activity->transitions({}, $pref)->all;
    unless (@transitions) {
        my $act_id =
               $activity->activity_name
            || $activity->activity_uid
            || $activity->id;
        throw_model error =>
            "Model error: no outgoing transitions for activity '$act_id'";
        }

    my (@instances) = ();
    my (@blocked)   = ();
    my ($stop_following, $fired_count) = (0, 0);
    my ($otherwise, $exception) = ();

    # evaluate efferent transitions
    foreach my $transition (@transitions) {
        if (   $transition->condition_type eq 'NONE'
            || $transition->condition_type eq 'CONDITION') {
            my $t_instance;
            unless ($stop_following) {
                $t_instance =
                    $self->_execute_transition($transition, $instance, 0);
                }
            if ($t_instance) {
                push(@instances, [$transition, $t_instance]);
                $fired_count++;
                # only one transition in an XOR split can fire.
                $stop_following++ if $activity->is_xor_split;
                }
            elsif ($activity->is_split) {
                my $split = $instance->split
                    or die("No split for " . $activity->activity_uid);
                $split->set_transition($transition->id, 'blocked');
                push(@blocked, [$transition, $instance]);
                }
            }
        elsif ($transition->condition_type eq 'OTHERWISE') {
            $otherwise = $transition;
            }
        elsif ($transition->condition_type eq 'DEFAULTEXCEPTION'
            || $transition->condition_type eq 'EXCEPTION') {
            $exception = $transition;
            }

        }

    if ($fired_count == 0) {
        unless ($otherwise) {
            throw_model(
                error => "Deadlock: OTHERWISE transition missing on activity '"
                    . $activity->activity_uid
                    . "'");
            }
        my $t_instance = $self->_execute_transition($otherwise, $instance, 0);
        if ($t_instance) {
            push(@instances, [$otherwise, $t_instance]);
            }
        else {
            throw_runner error =>
                "Execution of transition with 'Otherwise' condition failed";
            }
        }
    elsif ($otherwise && $activity->is_split) {
        my $split = $instance->split
            or die("No join found for split " . $activity->activity_uid);
        $split->set_transition($otherwise->id, 'blocked');
        }

    # activate successor activities
    my $followed_back = 0;
    foreach my $inst (@instances) {
        $followed_back++ if $inst->[0]->is_back_edge;
        my $r_instance = $inst->[1];
        my $r_activity = $r_instance->activity;
        $self->_enqueue_ai($r_activity, $r_instance);
        }

    # blocked paths may trigger downstream deferred activities which must now be
    # resolved; signal deferred activity instances on other branches in the
    # wf-net when paths were blocked and any transition downstream was followed
    if (scalar(@blocked) && $followed_back != scalar @instances) {
        $self->_signal_upstream_orjoins_if_in_split_branch(@blocked);
        }

    return;
    }

sub _execute_transition {
    my ($self, $transition, $from_instance, $run) = @_;

    #XXX mitigate expensive debugging
    my $tid = $transition->transition_uid || $transition->id || 'noid';
    $self->debug("runner: executing transition $tid from "
            . $transition->from_activity->activity_uid . ' to '
            . $transition->to_activity->activity_uid);

    $self->cb_execute_transition($transition, $from_instance);

    my $to_instance = eval { $transition->apply($from_instance); };

    my $err = $@;
    if ($err) {
        $self->debug("runner: transition '"
                . $transition->transition_uid
                . "' did not result in a new activity_instance : $err");
        if (is_Exception($err)) {
            # condition false
            return if $err->isa('BPM::Engine::Exception::Condition');
            #warn $err->trace->as_string;
            $err->rethrow;
            }
        else {
            $err =~ s/\n//;
            $self->error("Error applying transition: $err");
            throw_model error => $err;
            }
        }
    elsif (!$to_instance) {
        $self->error("Applying transition did not result in an instance");
        throw_runner error => "Applying transition did not return an instance";
        }

    $self->_run if $run;

    return $to_instance;
    }

sub _enqueue_ai {
    my ($self, $activity, $instance, $deferred) = @_;

    $self->debug("runner: _enqueue activity " . $activity->activity_uid);
    my $should_fire = $activity->is_join ? $instance->is_enabled() : 1;
    if ($should_fire) {
        if ($instance->is_deferred) {
            #$instance->update({ deferred => \'NULL' });
            $instance->update({ deferred => undef })->discard_changes;
            }
        #$instance->fire_join if $activity->is_join;

        if ($activity->is_auto_start) {
            $self->debug("runner: _enqueue Pushing instance "
                    . $activity->activity_uid
                    . " to active queue");
            $self->start_activity($activity, $instance, 0);
            }
        }
    else {
        $instance->update({ deferred => DateTime->now });

        $self->debug("runner: _enqueue Pushing instance "
                . $activity->activity_uid
                . " to deferred queue");

        $self->_defer_push([$activity, $instance]) unless $deferred;
        }
    }

sub _signal_upstream_orjoins_if_in_split_branch {
    my ($self, @blocked) = @_;

    my @deferred = $self->process_instance->activity_instances->deferred->all;

    foreach my $instance (@deferred) {

        $self->debug("runner: _run Pushing db instance "
                . $instance->activity->activity_uid
                . " to deferred queue");

        my $graph = $self->graph;

        foreach my $block (@blocked) {
            my $tr   = $block->[0];
            my $ai   = $block->[1];
            my $a_to = $tr->to_activity;
            if ($graph->is_reachable($a_to->id, $instance->activity->id)) {
                $self->_defer_push([$instance->activity, $instance]);
                }
            }

        }
    }

__PACKAGE__->meta->make_immutable;

1;
__END__

=pod

=encoding utf-8

=head1 NAME

BPM::Engine::ProcessRunner - Runs workflow processes

=head1 VERSION

0.01

=head1 SYNOPSIS

    use BPM::Engine::ProcessRunner;

    my $callback = sub {
        my($runner, $entity, $action, $node, $instance) = @_;
        ...        
        return 1;
        };

    my $runner = BPM::Engine::ProcessRunner->new(
        process_instance => $instance,
        callback         => $callback,
        );
  
    $runner->start_process();

    # somewhere else, after completing a task, 
    # from an asynchronous task handler...
  
    $runner->complete_activity($activity, $instance, 1);

=head1 DESCRIPTION

Implements the workflow enactment logic.

=head1 CALLBACKS

The methods in this package emit callback events to a callback handler that may 
be passed to the constructor. If no callback handler is specified, the default 
return values are applied for these event calls.



( run in 0.431 second using v1.01-cache-2.11-cpan-e1769b4cff6 )