@@ -19,6 +19,7 @@ package com.webank.wedatasphere.dss.flow.execution.entrance.resolver
1919import java .util
2020
2121import com .webank .wedatasphere .dss .flow .execution .entrance .FlowContext
22+ import com .webank .wedatasphere .dss .flow .execution .entrance .enums .ExecuteStrategyEnum
2223import com .webank .wedatasphere .dss .flow .execution .entrance .job .FlowEntranceJob
2324import com .webank .wedatasphere .dss .flow .execution .entrance .utils .BranchExpressionUtils
2425import com .webank .wedatasphere .dss .workflow .core .entity .WorkflowNode
@@ -38,6 +39,10 @@ class FlowDependencyResolverImpl extends FlowDependencyResolver with Logging {
3839 val nodes = flowContext.getPendingNodes.toMap.values.map(_.getNode)
3940 val workflowNodesById = flow.getWorkflowNodes.map(node => node.getId -> node).toMap
4041 val workflowEdges = flow.getWorkflowNodeEdges.map(_.getDSSEdge)
42+ val executeStrategy = Option (flowJob.getParams)
43+ .map(_.get(" executeStrategy" ))
44+ .map(_.toString)
45+ .orNull
4146
4247 def incomingEdges (node : WorkflowNode ) = workflowEdges.filter(_.getTarget == node.getId)
4348
@@ -59,26 +64,31 @@ class FlowDependencyResolverImpl extends FlowDependencyResolver with Logging {
5964 workflowNodesById.get(edge.getSource).exists { sourceNode =>
6065 BranchExpressionUtils .isBranchNode(sourceNode) &&
6166 flowContext.isNodeCompleted(sourceNode.getName) &&
62- flowJob.hasBranchSelection(sourceNode.getId) &&
63- ! flowJob.isBranchTargetSelected(sourceNode.getId, node.getId)
67+ (flowContext.isNodeSkipped(sourceNode.getName) ||
68+ ! flowJob.hasBranchSelection(sourceNode.getId) ||
69+ ! flowJob.isBranchTargetSelected(sourceNode.getId, node.getId))
6470 }
6571 }
6672 }
6773
6874 def shouldSkip (node : WorkflowNode ): Boolean = {
69- shouldSkipByBranch(node) || areAllParentsSkipped(node)
75+ shouldSkipByBranch(node) ||
76+ (! ExecuteStrategyEnum .IS_SELECTED_EXECUTE .getValue.equalsIgnoreCase(executeStrategy) && areAllParentsSkipped(node))
7077 }
7178
7279 def isBranchRouteMatched (node : WorkflowNode ): Boolean = {
7380 incomingEdges(node).forall { edge =>
7481 workflowNodesById.get(edge.getSource) match {
7582 case Some (sourceNode) if BranchExpressionUtils .isBranchNode(sourceNode) =>
76- flowJob.hasBranchSelection(sourceNode.getId) && flowJob.isBranchTargetSelected(sourceNode.getId, node.getId)
83+ flowContext.isNodeSucceed(sourceNode.getName) &&
84+ flowJob.hasBranchSelection(sourceNode.getId) &&
85+ flowJob.isBranchTargetSelected(sourceNode.getId, node.getId)
7786 case _ => true
7887 }
7988 }
8089 }
8190
91+
8292 nodes.foreach { node =>
8393 val nodeName = node.getName
8494 def isCanExecutable : Boolean = {
@@ -104,4 +114,3 @@ class FlowDependencyResolverImpl extends FlowDependencyResolver with Logging {
104114 info(s " ${flowJob.getId} Finished to get executable node( ${flowContext.getScheduledNodes.size()}) " )
105115 }
106116}
107-
0 commit comments