Skip to content

Commit f12e3c1

Browse files
committed
fix: 适配调度系统#AI Commit#
1 parent 94f064a commit f12e3c1

10 files changed

Lines changed: 838 additions & 61 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.webank.wedatasphere.dss.appconn.schedulis.constant;
2+
3+
public class BranchSchedulisConstant {
4+
5+
private BranchSchedulisConstant() {
6+
}
7+
8+
public static final String BRANCH_NODE_TYPE = "workflow.branch";
9+
public static final String BRANCH_ROUTE_LINKIS_TYPE = "linkis.appconn.branch.route";
10+
11+
public static final String BRANCH_ROUTE_ENABLED = "wds.branch.route.enabled";
12+
public static final String BRANCH_ROUTE_NODE_ID = "wds.branch.route.node.id";
13+
public static final String BRANCH_ROUTE_NODE_NAME = "wds.branch.route.node.name";
14+
public static final String BRANCH_ROUTE_RULE_TEXT = "wds.branch.route.rule.text";
15+
public static final String BRANCH_ROUTE_TARGETS = "wds.branch.route.targets";
16+
17+
public static final String BRANCH_GUARD_RULES = "wds.branch.guard.rules";
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package com.webank.wedatasphere.dss.appconn.schedulis.conversion;
2+
3+
import com.webank.wedatasphere.dss.appconn.schedulis.constant.BranchSchedulisConstant;
4+
import com.webank.wedatasphere.dss.common.entity.node.DSSEdge;
5+
import com.webank.wedatasphere.dss.workflow.conversion.entity.ConvertedRel;
6+
import com.webank.wedatasphere.dss.workflow.conversion.entity.PreConversionRel;
7+
import com.webank.wedatasphere.dss.workflow.conversion.entity.ProjectPreConversionRel;
8+
import com.webank.wedatasphere.dss.workflow.conversion.operation.WorkflowToRelConverter;
9+
import com.webank.wedatasphere.dss.workflow.core.entity.Workflow;
10+
import com.webank.wedatasphere.dss.workflow.core.entity.WorkflowNode;
11+
import com.webank.wedatasphere.dss.workflow.core.entity.WorkflowNodeEdge;
12+
import org.apache.commons.collections.CollectionUtils;
13+
import org.apache.commons.lang.StringUtils;
14+
15+
import java.util.ArrayDeque;
16+
import java.util.ArrayList;
17+
import java.util.Collections;
18+
import java.util.Comparator;
19+
import java.util.Deque;
20+
import java.util.HashMap;
21+
import java.util.HashSet;
22+
import java.util.LinkedHashMap;
23+
import java.util.LinkedHashSet;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.stream.Collectors;
28+
29+
public class BranchWorkflowRewriteConverter implements WorkflowToRelConverter {
30+
31+
@Override
32+
public ConvertedRel convertToRel(PreConversionRel rel) {
33+
((ProjectPreConversionRel) rel).getWorkflows().forEach(this::rewriteWorkflow);
34+
return (ConvertedRel) rel;
35+
}
36+
37+
private void rewriteWorkflow(Workflow workflow) {
38+
if (workflow == null || CollectionUtils.isEmpty(workflow.getWorkflowNodes()) || CollectionUtils.isEmpty(workflow.getWorkflowNodeEdges())) {
39+
rewriteChildren(workflow);
40+
return;
41+
}
42+
Map<String, WorkflowNode> nodeById = workflow.getWorkflowNodes().stream()
43+
.collect(Collectors.toMap(WorkflowNode::getId, node -> node, (left, right) -> left, LinkedHashMap::new));
44+
Map<String, List<DSSEdge>> outgoingEdges = buildOutgoingEdges(workflow.getWorkflowNodeEdges());
45+
workflow.getWorkflowNodes().stream()
46+
.filter(this::isBranchNode)
47+
.forEach(branchNode -> rewriteBranchNode(branchNode, nodeById, outgoingEdges));
48+
rewriteChildren(workflow);
49+
}
50+
51+
private void rewriteChildren(Workflow workflow) {
52+
if (workflow != null && workflow.getChildren() != null) {
53+
workflow.getChildren().forEach(this::rewriteWorkflow);
54+
}
55+
}
56+
57+
private Map<String, List<DSSEdge>> buildOutgoingEdges(List<WorkflowNodeEdge> workflowNodeEdges) {
58+
Map<String, List<DSSEdge>> outgoingEdges = new LinkedHashMap<>();
59+
for (WorkflowNodeEdge workflowNodeEdge : workflowNodeEdges) {
60+
DSSEdge edge = workflowNodeEdge.getDSSEdge();
61+
if (edge == null || StringUtils.isBlank(edge.getSource()) || StringUtils.isBlank(edge.getTarget())) {
62+
continue;
63+
}
64+
outgoingEdges.computeIfAbsent(edge.getSource(), key -> new ArrayList<>()).add(edge);
65+
}
66+
outgoingEdges.values().forEach(edges -> edges.sort(Comparator.comparing(edge -> edge.getPriority() == null ? Integer.MAX_VALUE : edge.getPriority())));
67+
return outgoingEdges;
68+
}
69+
70+
private void rewriteBranchNode(WorkflowNode branchNode,
71+
Map<String, WorkflowNode> nodeById,
72+
Map<String, List<DSSEdge>> outgoingEdges) {
73+
List<DSSEdge> directEdges = outgoingEdges.getOrDefault(branchNode.getId(), Collections.emptyList());
74+
if (directEdges.isEmpty()) {
75+
return;
76+
}
77+
annotateRouteMetadata(branchNode, directEdges, nodeById);
78+
Map<String, Set<String>> reachableTargetMap = collectReachableTargets(branchNode, directEdges, outgoingEdges);
79+
reachableTargetMap.forEach((nodeId, allowedTargets) -> {
80+
WorkflowNode node = nodeById.get(nodeId);
81+
if (node != null && !branchNode.getId().equals(node.getId())) {
82+
annotateGuardMetadata(node, branchNode, allowedTargets);
83+
}
84+
});
85+
}
86+
87+
private void annotateRouteMetadata(WorkflowNode branchNode, List<DSSEdge> directEdges, Map<String, WorkflowNode> nodeById) {
88+
Map<String, Object> params = getOrCreateParams(branchNode);
89+
params.put(BranchSchedulisConstant.BRANCH_ROUTE_ENABLED, Boolean.TRUE);
90+
params.put(BranchSchedulisConstant.BRANCH_ROUTE_NODE_ID, branchNode.getId());
91+
params.put(BranchSchedulisConstant.BRANCH_ROUTE_NODE_NAME, branchNode.getName());
92+
params.put(BranchSchedulisConstant.BRANCH_ROUTE_RULE_TEXT, getBranchRuleText(branchNode));
93+
List<Map<String, Object>> targets = new ArrayList<>();
94+
for (DSSEdge edge : directEdges) {
95+
WorkflowNode targetNode = nodeById.get(edge.getTarget());
96+
if (targetNode == null) {
97+
continue;
98+
}
99+
Map<String, Object> target = new LinkedHashMap<>();
100+
target.put("targetId", targetNode.getId());
101+
target.put("targetName", targetNode.getName());
102+
target.put("priority", edge.getPriority());
103+
target.put("condition", edge.getCondition());
104+
target.put("branchKey", edge.getBranchKey());
105+
target.put("branchLabel", edge.getBranchLabel());
106+
target.put("default", edge.getDefault());
107+
targets.add(target);
108+
}
109+
params.put(BranchSchedulisConstant.BRANCH_ROUTE_TARGETS, targets);
110+
}
111+
112+
private Map<String, Set<String>> collectReachableTargets(WorkflowNode branchNode,
113+
List<DSSEdge> directEdges,
114+
Map<String, List<DSSEdge>> outgoingEdges) {
115+
Map<String, Set<String>> reachableTargets = new LinkedHashMap<>();
116+
for (DSSEdge directEdge : directEdges) {
117+
String targetId = directEdge.getTarget();
118+
if (StringUtils.isBlank(targetId)) {
119+
continue;
120+
}
121+
Deque<String> queue = new ArrayDeque<>();
122+
Set<String> visited = new HashSet<>();
123+
queue.add(targetId);
124+
while (!queue.isEmpty()) {
125+
String currentNodeId = queue.poll();
126+
if (!visited.add(currentNodeId) || branchNode.getId().equals(currentNodeId)) {
127+
continue;
128+
}
129+
reachableTargets.computeIfAbsent(currentNodeId, key -> new LinkedHashSet<>()).add(targetId);
130+
for (DSSEdge next : outgoingEdges.getOrDefault(currentNodeId, Collections.emptyList())) {
131+
if (StringUtils.isNotBlank(next.getTarget())) {
132+
queue.add(next.getTarget());
133+
}
134+
}
135+
}
136+
}
137+
return reachableTargets;
138+
}
139+
140+
private void annotateGuardMetadata(WorkflowNode workflowNode, WorkflowNode branchNode, Set<String> allowedTargets) {
141+
if (CollectionUtils.isEmpty(allowedTargets)) {
142+
return;
143+
}
144+
Map<String, Object> params = getOrCreateParams(workflowNode);
145+
List<Map<String, Object>> guardRules = getOrCreateGuardRules(params);
146+
guardRules.removeIf(rule -> branchNode.getId().equals(String.valueOf(rule.get("branchNodeId"))));
147+
Map<String, Object> guardRule = new LinkedHashMap<>();
148+
guardRule.put("branchNodeId", branchNode.getId());
149+
guardRule.put("branchNodeName", branchNode.getName());
150+
guardRule.put("allowedTargetIds", new ArrayList<>(allowedTargets));
151+
guardRules.add(guardRule);
152+
params.put(BranchSchedulisConstant.BRANCH_GUARD_RULES, guardRules);
153+
}
154+
155+
private List<Map<String, Object>> getOrCreateGuardRules(Map<String, Object> params) {
156+
Object existing = params.get(BranchSchedulisConstant.BRANCH_GUARD_RULES);
157+
if (existing instanceof List) {
158+
List<Map<String, Object>> copiedRules = new ArrayList<>();
159+
for (Object item : (List<?>) existing) {
160+
if (item instanceof Map) {
161+
copiedRules.add(new LinkedHashMap<>((Map<String, Object>) item));
162+
}
163+
}
164+
return copiedRules;
165+
}
166+
return new ArrayList<>();
167+
}
168+
169+
private Map<String, Object> getOrCreateParams(WorkflowNode workflowNode) {
170+
Map<String, Object> params = workflowNode.getDSSNode().getParams();
171+
if (params == null) {
172+
params = new HashMap<>();
173+
workflowNode.getDSSNode().setParams(params);
174+
}
175+
return params;
176+
}
177+
178+
private boolean isBranchNode(WorkflowNode workflowNode) {
179+
return workflowNode != null && BranchSchedulisConstant.BRANCH_NODE_TYPE.equalsIgnoreCase(workflowNode.getNodeType());
180+
}
181+
182+
private String getBranchRuleText(WorkflowNode workflowNode) {
183+
Map<String, Object> params = workflowNode.getDSSNode().getParams();
184+
if (params == null || params.isEmpty()) {
185+
return "";
186+
}
187+
Object topLevelRule = params.get("branch.rules");
188+
if (topLevelRule != null && StringUtils.isNotBlank(topLevelRule.toString())) {
189+
return topLevelRule.toString();
190+
}
191+
Object configurationObj = params.get("configuration");
192+
if (!(configurationObj instanceof Map)) {
193+
return "";
194+
}
195+
Object specialObj = ((Map<?, ?>) configurationObj).get("special");
196+
if (!(specialObj instanceof Map)) {
197+
return "";
198+
}
199+
Object specialRule = ((Map<?, ?>) specialObj).get("branch.rules");
200+
return specialRule == null ? "" : String.valueOf(specialRule);
201+
}
202+
203+
@Override
204+
public int getOrder() {
205+
return 50;
206+
}
207+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.webank.wedatasphere.dss.appconn.schedulis.linkisjob;
2+
3+
import com.webank.wedatasphere.dss.appconn.schedulis.constant.BranchSchedulisConstant;
4+
5+
public class BranchRouteJobTuning implements LinkisJobTuning {
6+
7+
@Override
8+
public LinkisJob tuningJob(LinkisJob job) {
9+
job.setType("linkis");
10+
job.setLinkistype(BranchSchedulisConstant.BRANCH_ROUTE_LINKIS_TYPE);
11+
return job;
12+
}
13+
14+
@Override
15+
public boolean ifJobCantuning(String nodeType) {
16+
return BranchSchedulisConstant.BRANCH_NODE_TYPE.equalsIgnoreCase(nodeType);
17+
}
18+
}

dss-appconn/appconns/dss-schedulis-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/schedulis/linkisjob/LinkisJobConverter.java

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.webank.wedatasphere.dss.appconn.scheduler.utils.SchedulerConf;
2020
import com.webank.wedatasphere.dss.appconn.schedulis.conf.AzkabanConf;
2121
import com.webank.wedatasphere.dss.appconn.schedulis.constant.AzkabanConstant;
22+
import com.webank.wedatasphere.dss.appconn.schedulis.constant.BranchSchedulisConstant;
2223
import com.webank.wedatasphere.dss.appconn.schedulis.conversion.NodeConverter;
2324
import com.webank.wedatasphere.dss.common.utils.DSSCommonUtils;
2425
import com.webank.wedatasphere.dss.workflow.core.constant.WorkflowConstant;
@@ -35,7 +36,7 @@ public class LinkisJobConverter implements NodeConverter {
3536
private LinkisJobTuning[] linkisJobTunings;
3637

3738
public LinkisJobConverter(){
38-
this.linkisJobTunings = new LinkisJobTuning[]{new AzkabanSubFlowJobTuning()};
39+
this.linkisJobTunings = new LinkisJobTuning[]{new AzkabanSubFlowJobTuning(), new BranchRouteJobTuning()};
3940
}
4041

4142
@Override
@@ -52,6 +53,7 @@ private String baseConversion(WorkflowNode workflowNode){
5253
convertDependencies(workflowNode,job);
5354
convertProxyUser(workflowNode,job);
5455
convertConfiguration(workflowNode,job);
56+
convertBranchControl(workflowNode, job);
5557
convertJobCommand(workflowNode,job);
5658
Arrays.stream(linkisJobTunings).forEach(t ->{
5759
if(t.ifJobCantuning(workflowNode.getNodeType())) {
@@ -112,30 +114,58 @@ private void convertConfiguration(WorkflowNode workflowNode, LinkisJob job){
112114
if (params != null && !params.isEmpty()) {
113115
Map<String, Map<String,Object>> configuration = (Map<String, Map<String, Object>>) params.get("configuration");
114116
String confprefix = "node.conf.";
115-
configuration.forEach((k,v)-> {
116-
if(null!=v) {
117-
v.forEach((k2, v2) -> {
118-
if(v2!=null) {
119-
String vStr;
120-
if (v2 instanceof Number) {
121-
Number numValue = (Number) v2;
122-
vStr = numValue.longValue() == numValue.doubleValue() ?
123-
String.valueOf(numValue.longValue()) :
124-
numValue.toString();
125-
} else {
126-
vStr = v2.toString();
117+
if (configuration != null) {
118+
configuration.forEach((k,v)-> {
119+
if(null!=v) {
120+
v.forEach((k2, v2) -> {
121+
if(v2!=null) {
122+
String vStr = stringifyConfValue(v2);
123+
if (AzkabanConstant.AUTO_DISABLED.equals(k2) ) {
124+
job.setAutoDisabled(vStr);
125+
} else {
126+
job.getConf().put(confprefix + k + "." + k2, vStr);
127+
}
127128
}
128-
if (AzkabanConstant.AUTO_DISABLED.equals(k2) ) {
129-
job.setAutoDisabled(vStr);
130-
} else {
131-
job.getConf().put(confprefix + k + "." + k2, vStr);
132-
}
133-
}
134-
});
135-
}
136-
});
129+
});
130+
}
131+
});
132+
}
133+
}
134+
135+
}
136+
137+
private void convertBranchControl(WorkflowNode workflowNode, LinkisJob job) {
138+
Map<String, Object> params = workflowNode.getDSSNode().getParams();
139+
if (params == null || params.isEmpty()) {
140+
return;
137141
}
142+
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_ENABLED);
143+
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_NODE_ID);
144+
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_NODE_NAME);
145+
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_RULE_TEXT);
146+
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_ROUTE_TARGETS);
147+
putBranchConf(job, params, BranchSchedulisConstant.BRANCH_GUARD_RULES);
148+
}
149+
150+
private void putBranchConf(LinkisJob job, Map<String, Object> params, String key) {
151+
Object value = params.get(key);
152+
if (value != null) {
153+
job.getConf().put(key, stringifyConfValue(value));
154+
}
155+
}
138156

157+
private String stringifyConfValue(Object value) {
158+
if (value == null) {
159+
return null;
160+
}
161+
if (value instanceof Number) {
162+
Number numValue = (Number) value;
163+
return numValue.longValue() == numValue.doubleValue() ? String.valueOf(numValue.longValue()) : numValue.toString();
164+
}
165+
if (value instanceof CharSequence || value instanceof Boolean) {
166+
return value.toString();
167+
}
168+
return DSSCommonUtils.COMMON_GSON.toJson(value);
139169
}
140170

141171
private void convertJobCommand(WorkflowNode workflowNode, LinkisJob job){

0 commit comments

Comments
 (0)