Skip to content

Commit c2745da

Browse files
committed
fix: 适配调度系统#AI Commit#
1 parent 2037b4f commit c2745da

File tree

3 files changed

+69
-10
lines changed

3 files changed

+69
-10
lines changed

dss-appconn/appconns/dss-schedulis-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/schedulis/constant/BranchSchedulisConstant.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ private BranchSchedulisConstant() {
77

88
public static final String BRANCH_NODE_TYPE = "workflow.branch";
99
public static final String BRANCH_ROUTE_LINKIS_TYPE = "linkis.appconn.branch.route";
10+
public static final String DECISION_JOB_TYPE = "decision";
1011

1112
public static final String BRANCH_ROUTE_ENABLED = "wds.branch.route.enabled";
1213
public static final String BRANCH_ROUTE_NODE_ID = "wds.branch.route.node.id";
@@ -15,4 +16,8 @@ private BranchSchedulisConstant() {
1516
public static final String BRANCH_ROUTE_TARGETS = "wds.branch.route.targets";
1617

1718
public static final String BRANCH_GUARD_RULES = "wds.branch.guard.rules";
19+
20+
public static final String DECISION_CONDITION_PREFIX = "condition.";
21+
public static final String DECISION_ON_SUCCESS_PREFIX = "on.success.";
22+
public static final String DECISION_ON_FAILURE_PREFIX = "on.failure.";
1823
}

dss-appconn/appconns/dss-schedulis-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/schedulis/linkisjob/BranchRouteJobTuning.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ public class BranchRouteJobTuning implements LinkisJobTuning {
66

77
@Override
88
public LinkisJob tuningJob(LinkisJob job) {
9-
job.setType("linkis");
9+
job.setType(BranchSchedulisConstant.DECISION_JOB_TYPE);
1010
job.setLinkistype(BranchSchedulisConstant.BRANCH_ROUTE_LINKIS_TYPE);
1111
return job;
1212
}

dss-appconn/appconns/dss-schedulis-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/schedulis/linkisjob/LinkisJobConverter.java

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.webank.wedatasphere.dss.workflow.core.entity.WorkflowNode;
2727
import org.apache.commons.lang.StringUtils;
2828

29+
import java.util.ArrayList;
2930
import java.util.Arrays;
3031
import java.util.HashMap;
3132
import java.util.List;
@@ -64,13 +65,16 @@ private String baseConversion(WorkflowNode workflowNode){
6465
}
6566

6667
private String convertJobToString(LinkisJob job){
67-
HashMap<String, String> map = new HashMap<>(8);
68-
map.put(AzkabanConstant.LINKIS_VERSION, AzkabanConf.LINKIS_VERSION.getValue());
68+
HashMap<String, String> map = new HashMap<>(16);
69+
boolean decisionJob = BranchSchedulisConstant.DECISION_JOB_TYPE.equalsIgnoreCase(job.getType());
70+
if (!decisionJob) {
71+
map.put(AzkabanConstant.LINKIS_VERSION, AzkabanConf.LINKIS_VERSION.getValue());
72+
map.put(AzkabanConstant.LINKIS_TYPE,job.getLinkistype());
73+
map.put(AzkabanConstant.JOB_COMMAND,job.getCommand());
74+
}
6975
map.put(AzkabanConstant.JOB_TYPE,job.getType());
70-
map.put(AzkabanConstant.LINKIS_TYPE,job.getLinkistype());
7176
map.put(AzkabanConstant.ZAKABAN_DEPENDENCIES_KEY,job.getDependencies());
7277
map.put(WorkflowConstant.PROXY_USER,job.getProxyUser());
73-
map.put(AzkabanConstant.JOB_COMMAND,job.getCommand());
7478
map.put(AzkabanConstant.JOB_COMMENT,job.getComment());
7579
map.put(AzkabanConstant.AUTO_DISABLED,job.getAutoDisabled());
7680
Map<String, Object> labels = new HashMap<>(1);
@@ -139,14 +143,54 @@ private void convertBranchControl(WorkflowNode workflowNode, LinkisJob job) {
139143
if (params == null || params.isEmpty()) {
140144
return;
141145
}
142-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_ENABLED);
143-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_NODE_ID);
144-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_NODE_NAME);
145-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_RULE_TEXT);
146-
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_TARGETS);
146+
if (isBranchNode(workflowNode)) {
147+
putDecisionRules(job, stringifyConfValue(params.get(BranchSchedulisConstant.BRANCH_ROUTE_RULE_TEXT)));
148+
return;
149+
}
147150
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_GUARD_RULES);
148151
}
149152

153+
private boolean isBranchNode(WorkflowNode workflowNode) {
154+
return workflowNode != null && BranchSchedulisConstant.BRANCH_NODE_TYPE.equalsIgnoreCase(workflowNode.getNodeType());
155+
}
156+
157+
private void putDecisionRules(LinkisJob job, String branchRuleText) {
158+
if (StringUtils.isBlank(branchRuleText)) {
159+
return;
160+
}
161+
List<DecisionRule> decisionRules = parseDecisionRules(branchRuleText);
162+
for (int i = 0; i < decisionRules.size(); i++) {
163+
DecisionRule rule = decisionRules.get(i);
164+
int index = i + 1;
165+
job.getConf().put(BranchSchedulisConstant.DECISION_CONDITION_PREFIX + index, rule.condition);
166+
job.getConf().put(BranchSchedulisConstant.DECISION_ON_SUCCESS_PREFIX + index, rule.targetJobName);
167+
job.getConf().put(BranchSchedulisConstant.DECISION_ON_FAILURE_PREFIX + index, "");
168+
}
169+
}
170+
171+
private List<DecisionRule> parseDecisionRules(String branchRuleText) {
172+
List<DecisionRule> rules = new ArrayList<>();
173+
for (String ruleText : branchRuleText.split(";")) {
174+
if (StringUtils.isBlank(ruleText)) {
175+
continue;
176+
}
177+
int separatorIndex = ruleText.lastIndexOf('=');
178+
if (separatorIndex <= 0 || separatorIndex >= ruleText.length() - 1) {
179+
continue;
180+
}
181+
String condition = ruleText.substring(0, separatorIndex).trim();
182+
String targetJobName = ruleText.substring(separatorIndex + 1).trim();
183+
if (StringUtils.isBlank(condition) || StringUtils.isBlank(targetJobName)) {
184+
continue;
185+
}
186+
if ("default".equalsIgnoreCase(condition)) {
187+
condition = "true";
188+
}
189+
rules.add(new DecisionRule(condition, targetJobName));
190+
}
191+
return rules;
192+
}
193+
150194
private void putBranchConf(LinkisJob job, Map<String, Object> params, String key) {
151195
Object value = params.get(key);
152196
if (value != null) {
@@ -175,4 +219,14 @@ private void convertJobCommand(WorkflowNode workflowNode, LinkisJob job){
175219
job.setCommand(DSSCommonUtils.COMMON_GSON.toJson(jobContent));
176220
}
177221
}
222+
223+
private static class DecisionRule {
224+
private final String condition;
225+
private final String targetJobName;
226+
227+
private DecisionRule(String condition, String targetJobName) {
228+
this.condition = condition;
229+
this.targetJobName = targetJobName;
230+
}
231+
}
178232
}

0 commit comments

Comments
 (0)