Skip to content

Commit fe4603f

Browse files
committed
Merge branch 'dev-1.21.0' of https://github.com/WeDataSphere/DataSphereStudio into dev-1.21.0
2 parents d966319 + c2745da commit fe4603f

File tree

5 files changed

+98
-11
lines changed

5 files changed

+98
-11
lines changed

dss-appconn/appconns/dss-schedulis-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/schedulis/constant/BranchSchedulisConstant.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ private BranchSchedulisConstant() {
77

88
public static final String BRANCH_NODE_TYPE = "workflow.branch";
99
public static final String BRANCH_ROUTE_LINKIS_TYPE = "linkis.appconn.branch.route";
10+
public static final String DECISION_JOB_TYPE = "decision";
1011

1112
public static final String BRANCH_ROUTE_ENABLED = "wds.branch.route.enabled";
1213
public static final String BRANCH_ROUTE_NODE_ID = "wds.branch.route.node.id";
@@ -15,4 +16,8 @@ private BranchSchedulisConstant() {
1516
public static final String BRANCH_ROUTE_TARGETS = "wds.branch.route.targets";
1617

1718
public static final String BRANCH_GUARD_RULES = "wds.branch.guard.rules";
19+
20+
public static final String DECISION_CONDITION_PREFIX = "condition.";
21+
public static final String DECISION_ON_SUCCESS_PREFIX = "on.success.";
22+
public static final String DECISION_ON_FAILURE_PREFIX = "on.failure.";
1823
}

dss-appconn/appconns/dss-schedulis-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/schedulis/linkisjob/BranchRouteJobTuning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ public class BranchRouteJobTuning implements LinkisJobTuning {
66

77
@Override
88
public LinkisJob tuningJob(LinkisJob job) {
9-
job.setType("linkis");
9+
job.setType(BranchSchedulisConstant.DECISION_JOB_TYPE);
1010
job.setLinkistype(BranchSchedulisConstant.BRANCH_ROUTE_LINKIS_TYPE);
1111
return job;
1212
}

dss-appconn/appconns/dss-schedulis-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/schedulis/linkisjob/LinkisJobConverter.java

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.webank.wedatasphere.dss.workflow.core.entity.WorkflowNode;
2727
import org.apache.commons.lang.StringUtils;
2828

29+
import java.util.ArrayList;
2930
import java.util.Arrays;
3031
import java.util.HashMap;
3132
import java.util.List;
@@ -64,13 +65,16 @@ private String baseConversion(WorkflowNode workflowNode){
6465
}
6566

6667
private String convertJobToString(LinkisJob job){
67-
HashMap<String, String> map = new HashMap<>(8);
68-
map.put(AzkabanConstant.LINKIS_VERSION, AzkabanConf.LINKIS_VERSION.getValue());
68+
HashMap<String, String> map = new HashMap<>(16);
69+
boolean decisionJob = BranchSchedulisConstant.DECISION_JOB_TYPE.equalsIgnoreCase(job.getType());
70+
if (!decisionJob) {
71+
map.put(AzkabanConstant.LINKIS_VERSION, AzkabanConf.LINKIS_VERSION.getValue());
72+
map.put(AzkabanConstant.LINKIS_TYPE,job.getLinkistype());
73+
map.put(AzkabanConstant.JOB_COMMAND,job.getCommand());
74+
}
6975
map.put(AzkabanConstant.JOB_TYPE,job.getType());
70-
map.put(AzkabanConstant.LINKIS_TYPE,job.getLinkistype());
7176
map.put(AzkabanConstant.ZAKABAN_DEPENDENCIES_KEY,job.getDependencies());
7277
map.put(WorkflowConstant.PROXY_USER,job.getProxyUser());
73-
map.put(AzkabanConstant.JOB_COMMAND,job.getCommand());
7478
map.put(AzkabanConstant.JOB_COMMENT,job.getComment());
7579
map.put(AzkabanConstant.AUTO_DISABLED,job.getAutoDisabled());
7680
Map<String, Object> labels = new HashMap<>(1);
@@ -139,14 +143,54 @@ private void convertBranchControl(WorkflowNode workflowNode, LinkisJob job) {
139143
if (params == null || params.isEmpty()) {
140144
return;
141145
}
142-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_ENABLED);
143-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_NODE_ID);
144-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_NODE_NAME);
145-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_RULE_TEXT);
146-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_TARGETS);
146+
if (isBranchNode(workflowNode)) {
147+
putDecisionRules(job, stringifyConfValue(params.get(BranchSchedulisConstant.BRANCH_ROUTE_RULE_TEXT)));
148+
return;
149+
}
147150
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_GUARD_RULES);
148151
}
149152

153+
private boolean isBranchNode(WorkflowNode workflowNode) {
154+
return workflowNode != null && BranchSchedulisConstant.BRANCH_NODE_TYPE.equalsIgnoreCase(workflowNode.getNodeType());
155+
}
156+
157+
private void putDecisionRules(LinkisJob job, String branchRuleText) {
158+
if (StringUtils.isBlank(branchRuleText)) {
159+
return;
160+
}
161+
List<DecisionRule> decisionRules = parseDecisionRules(branchRuleText);
162+
for (int i = 0; i < decisionRules.size(); i++) {
163+
DecisionRule rule = decisionRules.get(i);
164+
int index = i + 1;
165+
job.getConf().put(BranchSchedulisConstant.DECISION_CONDITION_PREFIX + index, rule.condition);
166+
job.getConf().put(BranchSchedulisConstant.DECISION_ON_SUCCESS_PREFIX + index, rule.targetJobName);
167+
job.getConf().put(BranchSchedulisConstant.DECISION_ON_FAILURE_PREFIX + index, "");
168+
}
169+
}
170+
171+
private List<DecisionRule> parseDecisionRules(String branchRuleText) {
172+
List<DecisionRule> rules = new ArrayList<>();
173+
for (String ruleText : branchRuleText.split(";")) {
174+
if (StringUtils.isBlank(ruleText)) {
175+
continue;
176+
}
177+
int separatorIndex = ruleText.lastIndexOf('=');
178+
if (separatorIndex <= 0 || separatorIndex >= ruleText.length() - 1) {
179+
continue;
180+
}
181+
String condition = ruleText.substring(0, separatorIndex).trim();
182+
String targetJobName = ruleText.substring(separatorIndex + 1).trim();
183+
if (StringUtils.isBlank(condition) || StringUtils.isBlank(targetJobName)) {
184+
continue;
185+
}
186+
if ("default".equalsIgnoreCase(condition)) {
187+
condition = "true";
188+
}
189+
rules.add(new DecisionRule(condition, targetJobName));
190+
}
191+
return rules;
192+
}
193+
150194
private void putBranchConf(LinkisJob job, Map<String, Object> params, String key) {
151195
Object value = params.get(key);
152196
if (value != null) {
@@ -175,4 +219,14 @@ private void convertJobCommand(WorkflowNode workflowNode, LinkisJob job){
175219
job.setCommand(DSSCommonUtils.COMMON_GSON.toJson(jobContent));
176220
}
177221
}
222+
223+
private static class DecisionRule {
224+
private final String condition;
225+
private final String targetJobName;
226+
227+
private DecisionRule(String condition, String targetJobName) {
228+
this.condition = condition;
229+
this.targetJobName = targetJobName;
230+
}
231+
}
178232
}

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/AzkabanDssJobType.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class AzkabanDssJobType extends AbstractJob {
6161

6262
private Job job;
6363

64+
private volatile Props generatedProperties = new Props();
65+
6466
private boolean isCanceled = false;
6567

6668
public AzkabanDssJobType(String jobId, Props sysProps, Props jobProps, Logger log) {
@@ -151,6 +153,11 @@ public boolean isCanceled() {
151153
return isCanceled;
152154
}
153155

156+
@Override
157+
public Props getJobGeneratedProperties() {
158+
return this.generatedProperties == null ? new Props() : this.generatedProperties;
159+
}
160+
154161
@Override
155162
public double getProgress() throws Exception {
156163
return LinkisNodeExecutionImpl.getLinkisNodeExecution().getProgress(this.job);
@@ -165,6 +172,9 @@ private void collectBranchVariables() {
165172
Map<String, String> resultVariables = LinkisNodeExecutionImpl.getLinkisNodeExecution().getResultVariables(this.job, 128);
166173
Map<String, String> resolvedVariables = resolveBranchOutputVariables(resultVariables);
167174
if (!resolvedVariables.isEmpty()) {
175+
Props props = new Props();
176+
props.putAll(resolvedVariables);
177+
this.generatedProperties = props;
168178
BranchRuntimeStore.mergeFlowVariables(flowExecId, resolvedVariables);
169179
info("Collected branch flow variables: " + resolvedVariables);
170180
}
@@ -297,3 +307,4 @@ private String getRunTodayh(boolean stdFormat) {
297307
return null;
298308
}
299309
}
310+

plugins/azkaban/linkis-jobtype/src/main/java/com/webank/wedatasphere/dss/plugins/azkaban/linkis/jobtype/job/BranchRouteExecutor.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,34 @@ public void execute() throws Exception {
5757

5858
private Map<String, String> buildEvaluationContext(String branchNodeId, String branchNodeName) {
5959
Map<String, String> context = new LinkedHashMap<>();
60+
for (Map.Entry<String, String> entry : jobProps.entrySet()) {
61+
if (!isBlank(entry.getKey()) && entry.getValue() != null) {
62+
context.put(entry.getKey(), entry.getValue());
63+
}
64+
}
6065
for (Map.Entry<String, String> entry : jobProps.entrySet()) {
6166
if (entry.getKey().startsWith(LinkisJobTypeConf.FLOW_VARIABLE_PREFIX) && entry.getValue() != null) {
6267
context.put(entry.getKey().substring(LinkisJobTypeConf.FLOW_VARIABLE_PREFIX.length()), entry.getValue());
6368
}
6469
}
65-
context.putAll(BranchRuntimeStore.snapshotFlowVariables(jobProps.get(LinkisJobTypeConf.FLOW_EXEC_ID)));
70+
mergeMissingVariables(context, BranchRuntimeStore.snapshotFlowVariables(jobProps.get(LinkisJobTypeConf.FLOW_EXEC_ID)));
6671
context.put("node.id", branchNodeId);
6772
context.put("node.name", branchNodeName == null ? "" : branchNodeName);
6873
context.put("node.type", LinkisJobTypeConf.BRANCH_ROUTE_LINKIS_TYPE);
6974
return context;
7075
}
7176

77+
private void mergeMissingVariables(Map<String, String> context, Map<String, String> fallbackVariables) {
78+
if (fallbackVariables == null || fallbackVariables.isEmpty()) {
79+
return;
80+
}
81+
for (Map.Entry<String, String> entry : fallbackVariables.entrySet()) {
82+
if (!isBlank(entry.getKey()) && entry.getValue() != null && !context.containsKey(entry.getKey())) {
83+
context.put(entry.getKey(), entry.getValue());
84+
}
85+
}
86+
}
87+
7288
private String selectTarget(String branchRuleText, Map<String, String> targetNameToId, Map<String, String> context) {
7389
List<BranchExpressionUtils.BranchRule> rules = BranchExpressionUtils.parseBranchRules(branchRuleText);
7490
for (BranchExpressionUtils.BranchRule rule : rules) {
@@ -114,3 +130,4 @@ private boolean isBlank(String value) {
114130
return value == null || value.trim().isEmpty();
115131
}
116132
}
133+

0 commit comments

Comments
 (0)