Skip to content

Commit 46f0f3c

Browse files
authored
[Improvement-18095][API-Test] Add dependent task api test case (#18096)
1 parent ec6e930 commit 46f0f3c

5 files changed

Lines changed: 492 additions & 2 deletions

File tree

.github/workflows/api-test.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ jobs:
119119
class: org.apache.dolphinscheduler.api.test.cases.GrpcTaskAPITest
120120
- name: OidcLoginAPITest
121121
class: org.apache.dolphinscheduler.api.test.cases.OidcLoginAPITest
122+
- name: DependentTaskAPITest
123+
class: org.apache.dolphinscheduler.api.test.cases.tasks.DependentTaskAPITest
122124
env:
123125
RECORDING_PATH: /tmp/recording-${{ matrix.case.name }}
124126
steps:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.api.test.cases.tasks;
19+
20+
import org.apache.dolphinscheduler.api.test.core.DolphinScheduler;
21+
import org.apache.dolphinscheduler.api.test.entity.HttpResponse;
22+
import org.apache.dolphinscheduler.api.test.entity.LoginResponseData;
23+
import org.apache.dolphinscheduler.api.test.pages.LoginPage;
24+
import org.apache.dolphinscheduler.api.test.pages.project.ProjectPage;
25+
import org.apache.dolphinscheduler.api.test.pages.workflow.ExecutorPage;
26+
import org.apache.dolphinscheduler.api.test.pages.workflow.WorkflowDefinitionPage;
27+
import org.apache.dolphinscheduler.api.test.pages.workflow.WorkflowInstancePage;
28+
import org.apache.dolphinscheduler.api.test.utils.JSONUtils;
29+
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
30+
import org.apache.dolphinscheduler.common.enums.ReleaseState;
31+
import org.apache.dolphinscheduler.common.enums.UserType;
32+
import org.apache.dolphinscheduler.common.enums.WarningType;
33+
import org.apache.dolphinscheduler.dao.entity.User;
34+
35+
import java.io.File;
36+
import java.nio.charset.StandardCharsets;
37+
import java.nio.file.Files;
38+
import java.text.SimpleDateFormat;
39+
import java.util.Date;
40+
import java.util.LinkedHashMap;
41+
import java.util.List;
42+
43+
import lombok.extern.slf4j.Slf4j;
44+
45+
import org.junit.jupiter.api.AfterAll;
46+
import org.junit.jupiter.api.Assertions;
47+
import org.junit.jupiter.api.BeforeAll;
48+
import org.junit.jupiter.api.Order;
49+
import org.junit.jupiter.api.Test;
50+
import org.junitpioneer.jupiter.DisableIfTestFails;
51+
52+
@DolphinScheduler(composeFiles = "docker/basic/docker-compose.yaml")
53+
@Slf4j
54+
@DisableIfTestFails
55+
public class DependentTaskAPITest {
56+
57+
private static final String username = "admin";
58+
59+
private static final String password = "dolphinscheduler123";
60+
61+
private static final String projectName = "dependent-test-project-" + System.currentTimeMillis();
62+
63+
private static String sessionId;
64+
65+
private static User loginUser;
66+
67+
private static ExecutorPage executorPage;
68+
69+
private static WorkflowDefinitionPage workflowDefinitionPage;
70+
71+
private static WorkflowInstancePage workflowInstancePage;
72+
73+
private static ProjectPage projectPage;
74+
75+
private static long projectCode;
76+
77+
private static long upstreamWorkflowDefinitionCode;
78+
79+
private static long dependentWorkflowDefinitionCode;
80+
81+
private static long failedDependentWorkflowDefinitionCode;
82+
83+
@BeforeAll
84+
public static void setup() {
85+
LoginPage loginPage = new LoginPage();
86+
HttpResponse loginHttpResponse = loginPage.login(username, password);
87+
sessionId =
88+
JSONUtils.convertValue(loginHttpResponse.getBody().getData(), LoginResponseData.class).getSessionId();
89+
executorPage = new ExecutorPage(sessionId);
90+
workflowDefinitionPage = new WorkflowDefinitionPage(sessionId);
91+
workflowInstancePage = new WorkflowInstancePage(sessionId);
92+
projectPage = new ProjectPage(sessionId);
93+
loginUser = new User();
94+
loginUser.setUserName("admin");
95+
loginUser.setId(1);
96+
loginUser.setUserType(UserType.ADMIN_USER);
97+
}
98+
99+
@AfterAll
100+
public static void cleanup() {
101+
log.info("success cleanup");
102+
}
103+
104+
@Test
105+
@Order(1)
106+
public void testCreateUpstreamWorkflow() throws Exception {
107+
// create test project
108+
HttpResponse createProjectResponse = projectPage.createProject(loginUser, projectName);
109+
Assertions.assertTrue(createProjectResponse.getBody().getSuccess());
110+
111+
HttpResponse queryAllProjectListResponse = projectPage.queryAllProjectList(loginUser);
112+
Assertions.assertTrue(queryAllProjectListResponse.getBody().getSuccess());
113+
114+
// find the project by name
115+
List<LinkedHashMap> projectList = (List<LinkedHashMap>) queryAllProjectListResponse.getBody().getData();
116+
for (LinkedHashMap<String, Object> project : projectList) {
117+
if (projectName.equals(project.get("name"))) {
118+
projectCode = ((Number) project.get("code")).longValue();
119+
break;
120+
}
121+
}
122+
Assertions.assertNotEquals(0, projectCode, "project should be found by name");
123+
log.info("project code: {}", projectCode);
124+
125+
// create upstream workflow definition (shell task: echo hello)
126+
ClassLoader classLoader = getClass().getClassLoader();
127+
File file = new File(classLoader.getResource("workflow-json/test.json").getFile());
128+
String upstreamWorkflowName = "upstream_shell_workflow_" + System.currentTimeMillis();
129+
HttpResponse createWorkflowResponse = workflowDefinitionPage
130+
.createWorkflowDefinition(loginUser, projectCode, file, upstreamWorkflowName);
131+
Assertions.assertTrue(createWorkflowResponse.getBody().getSuccess());
132+
133+
// get upstream workflow definition code
134+
HttpResponse queryAllWorkflowResponse =
135+
workflowDefinitionPage.queryAllWorkflowDefinitionByProjectCode(loginUser, projectCode);
136+
Assertions.assertTrue(queryAllWorkflowResponse.getBody().getSuccess());
137+
upstreamWorkflowDefinitionCode =
138+
(long) ((LinkedHashMap<String, Object>) ((LinkedHashMap<String, Object>) ((List<LinkedHashMap>) queryAllWorkflowResponse
139+
.getBody().getData()).get(0)).get("workflowDefinition")).get("code");
140+
log.info("upstream workflow definition code: {}", upstreamWorkflowDefinitionCode);
141+
142+
// release upstream workflow
143+
HttpResponse releaseResponse = workflowDefinitionPage.releaseWorkflowDefinition(loginUser,
144+
projectCode, upstreamWorkflowDefinitionCode, ReleaseState.ONLINE);
145+
Assertions.assertTrue(releaseResponse.getBody().getSuccess());
146+
147+
// trigger upstream workflow instance
148+
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
149+
Date date = new Date();
150+
String scheduleTime = String.format("%s,%s", formatter.format(date), formatter.format(date));
151+
log.info("use current time {} as scheduleTime", scheduleTime);
152+
HttpResponse startWorkflowResponse = executorPage.startWorkflowInstance(loginUser, projectCode,
153+
upstreamWorkflowDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE);
154+
Assertions.assertTrue(startWorkflowResponse.getBody().getSuccess());
155+
156+
List<Integer> workflowInstanceIds = (List<Integer>) startWorkflowResponse.getBody().getData();
157+
Assertions.assertFalse(workflowInstanceIds.isEmpty());
158+
log.info("upstream workflow instance ids: {}", workflowInstanceIds);
159+
160+
// wait for upstream workflow to complete
161+
int workflowInstanceId = workflowInstanceIds.get(0);
162+
boolean completed = false;
163+
for (int i = 0; i < 60; i++) {
164+
Thread.sleep(2000);
165+
HttpResponse queryResponse = workflowInstancePage.queryWorkflowInstanceById(loginUser,
166+
projectCode, workflowInstanceId);
167+
if (queryResponse.getBody().getSuccess() && queryResponse.getBody().getData() != null) {
168+
LinkedHashMap<String, Object> instanceData =
169+
(LinkedHashMap<String, Object>) queryResponse.getBody().getData();
170+
String state = (String) instanceData.get("state");
171+
log.info("upstream workflow instance state: {}", state);
172+
if ("SUCCESS".equals(state)) {
173+
completed = true;
174+
break;
175+
} else if ("FAILURE".equals(state) || "STOP".equals(state)) {
176+
Assertions.fail("upstream workflow instance failed with state: " + state);
177+
}
178+
}
179+
}
180+
Assertions.assertTrue(completed, "upstream workflow instance should complete within 120 seconds");
181+
}
182+
183+
@Test
184+
@Order(10)
185+
public void testDependentSuccessWorkflowInstance() throws Exception {
186+
// read dependent success workflow json template and replace placeholders
187+
ClassLoader classLoader = getClass().getClassLoader();
188+
File file = new File(
189+
classLoader.getResource("workflow-json/task-dependent/dependentSuccessWorkflow.json").getFile());
190+
String jsonContent = new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
191+
jsonContent = jsonContent
192+
.replace("${projectCode}", String.valueOf(projectCode))
193+
.replace("${definitionCode}", String.valueOf(upstreamWorkflowDefinitionCode));
194+
195+
String dependentWorkflowName = "dependent_success_" + System.currentTimeMillis();
196+
HttpResponse createResponse = workflowDefinitionPage
197+
.createWorkflowDefinition(loginUser, projectCode, jsonContent, dependentWorkflowName);
198+
Assertions.assertTrue(createResponse.getBody().getSuccess());
199+
200+
// get dependent workflow definition code from create response
201+
LinkedHashMap<String, Object> createData =
202+
(LinkedHashMap<String, Object>) createResponse.getBody().getData();
203+
dependentWorkflowDefinitionCode = ((Number) createData.get("code")).longValue();
204+
log.info("dependent workflow definition code: {}", dependentWorkflowDefinitionCode);
205+
206+
// release dependent workflow
207+
HttpResponse releaseResponse = workflowDefinitionPage.releaseWorkflowDefinition(loginUser,
208+
projectCode, dependentWorkflowDefinitionCode, ReleaseState.ONLINE);
209+
Assertions.assertTrue(releaseResponse.getBody().getSuccess());
210+
211+
// trigger dependent workflow instance
212+
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
213+
Date date = new Date();
214+
String scheduleTime = String.format("%s,%s", formatter.format(date), formatter.format(date));
215+
log.info("use current time {} as scheduleTime", scheduleTime);
216+
HttpResponse startWorkflowResponse = executorPage.startWorkflowInstance(loginUser, projectCode,
217+
dependentWorkflowDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE);
218+
Assertions.assertTrue(startWorkflowResponse.getBody().getSuccess());
219+
220+
List<Integer> workflowInstanceIds = (List<Integer>) startWorkflowResponse.getBody().getData();
221+
Assertions.assertFalse(workflowInstanceIds.isEmpty());
222+
log.info("dependent workflow instance ids: {}", workflowInstanceIds);
223+
224+
// wait for dependent workflow to complete
225+
int workflowInstanceId = workflowInstanceIds.get(0);
226+
boolean completed = false;
227+
for (int i = 0; i < 60; i++) {
228+
Thread.sleep(2000);
229+
HttpResponse queryResponse = workflowInstancePage.queryWorkflowInstanceById(loginUser,
230+
projectCode, workflowInstanceId);
231+
if (queryResponse.getBody().getSuccess() && queryResponse.getBody().getData() != null) {
232+
LinkedHashMap<String, Object> instanceData =
233+
(LinkedHashMap<String, Object>) queryResponse.getBody().getData();
234+
String state = (String) instanceData.get("state");
235+
log.info("dependent success workflow instance state: {}", state);
236+
if ("SUCCESS".equals(state)) {
237+
completed = true;
238+
break;
239+
} else if ("FAILURE".equals(state) || "STOP".equals(state)) {
240+
Assertions.fail("dependent workflow instance failed with state: " + state);
241+
}
242+
}
243+
}
244+
Assertions.assertTrue(completed, "dependent workflow instance should complete within 120 seconds");
245+
246+
// query task instances and verify DEPENDENT task type
247+
HttpResponse queryTaskListResponse = workflowInstancePage.queryTaskListByWorkflowInstanceId(loginUser,
248+
projectCode, workflowInstanceId);
249+
Assertions.assertTrue(queryTaskListResponse.getBody().getSuccess());
250+
251+
List<LinkedHashMap<String, Object>> taskList;
252+
Object taskData = queryTaskListResponse.getBody().getData();
253+
if (taskData instanceof List) {
254+
taskList = (List<LinkedHashMap<String, Object>>) taskData;
255+
} else {
256+
LinkedHashMap<String, Object> pageData = (LinkedHashMap<String, Object>) taskData;
257+
taskList = (List<LinkedHashMap<String, Object>>) pageData.get("taskList");
258+
}
259+
Assertions.assertNotNull(taskList);
260+
Assertions.assertFalse(taskList.isEmpty());
261+
262+
LinkedHashMap<String, Object> dependentTask = taskList.get(0);
263+
Assertions.assertEquals("DEPENDENT", dependentTask.get("taskType"));
264+
Assertions.assertEquals("SUCCESS", dependentTask.get("state"));
265+
log.info("dependent task instance verified: taskType={}, state={}",
266+
dependentTask.get("taskType"), dependentTask.get("state"));
267+
}
268+
269+
@Test
270+
@Order(20)
271+
public void testDependentFailedWorkflowInstance() throws Exception {
272+
// read dependent failed workflow json template and replace placeholders
273+
// this workflow depends on a non-existent definition code, so the dependent task should fail
274+
ClassLoader classLoader = getClass().getClassLoader();
275+
File file = new File(
276+
classLoader.getResource("workflow-json/task-dependent/dependentFailedWorkflow.json").getFile());
277+
String jsonContent = new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8);
278+
jsonContent = jsonContent.replace("${projectCode}", String.valueOf(projectCode));
279+
280+
String dependentWorkflowName = "dependent_failed_" + System.currentTimeMillis();
281+
HttpResponse createResponse = workflowDefinitionPage
282+
.createWorkflowDefinition(loginUser, projectCode, jsonContent, dependentWorkflowName);
283+
Assertions.assertTrue(createResponse.getBody().getSuccess());
284+
285+
// get failed dependent workflow definition code from create response
286+
LinkedHashMap<String, Object> createData =
287+
(LinkedHashMap<String, Object>) createResponse.getBody().getData();
288+
failedDependentWorkflowDefinitionCode = ((Number) createData.get("code")).longValue();
289+
log.info("failed dependent workflow definition code: {}", failedDependentWorkflowDefinitionCode);
290+
291+
// release failed dependent workflow
292+
HttpResponse releaseResponse = workflowDefinitionPage.releaseWorkflowDefinition(loginUser,
293+
projectCode, failedDependentWorkflowDefinitionCode, ReleaseState.ONLINE);
294+
Assertions.assertTrue(releaseResponse.getBody().getSuccess());
295+
296+
// trigger failed dependent workflow instance
297+
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
298+
Date date = new Date();
299+
String scheduleTime = String.format("%s,%s", formatter.format(date), formatter.format(date));
300+
log.info("use current time {} as scheduleTime", scheduleTime);
301+
HttpResponse startWorkflowResponse = executorPage.startWorkflowInstance(loginUser, projectCode,
302+
failedDependentWorkflowDefinitionCode, scheduleTime, FailureStrategy.END, WarningType.NONE);
303+
Assertions.assertTrue(startWorkflowResponse.getBody().getSuccess());
304+
305+
List<Integer> workflowInstanceIds = (List<Integer>) startWorkflowResponse.getBody().getData();
306+
Assertions.assertFalse(workflowInstanceIds.isEmpty());
307+
log.info("failed dependent workflow instance ids: {}", workflowInstanceIds);
308+
309+
// wait for dependent workflow to fail
310+
int workflowInstanceId = workflowInstanceIds.get(0);
311+
boolean failed = false;
312+
for (int i = 0; i < 60; i++) {
313+
Thread.sleep(2000);
314+
HttpResponse queryResponse = workflowInstancePage.queryWorkflowInstanceById(loginUser,
315+
projectCode, workflowInstanceId);
316+
if (queryResponse.getBody().getSuccess() && queryResponse.getBody().getData() != null) {
317+
LinkedHashMap<String, Object> instanceData =
318+
(LinkedHashMap<String, Object>) queryResponse.getBody().getData();
319+
String state = (String) instanceData.get("state");
320+
log.info("failed dependent workflow instance state: {}", state);
321+
if ("FAILURE".equals(state)) {
322+
failed = true;
323+
break;
324+
} else if ("SUCCESS".equals(state)) {
325+
Assertions.fail("dependent workflow instance should not succeed");
326+
}
327+
}
328+
}
329+
Assertions.assertTrue(failed, "dependent workflow referencing non-existent definition should fail");
330+
331+
// query task instances and verify DEPENDENT task type with FAILURE state
332+
HttpResponse queryTaskListResponse = workflowInstancePage.queryTaskListByWorkflowInstanceId(loginUser,
333+
projectCode, workflowInstanceId);
334+
Assertions.assertTrue(queryTaskListResponse.getBody().getSuccess());
335+
336+
List<LinkedHashMap<String, Object>> failedTaskList;
337+
Object failedTaskData = queryTaskListResponse.getBody().getData();
338+
if (failedTaskData instanceof List) {
339+
failedTaskList = (List<LinkedHashMap<String, Object>>) failedTaskData;
340+
} else {
341+
LinkedHashMap<String, Object> pageData = (LinkedHashMap<String, Object>) failedTaskData;
342+
failedTaskList = (List<LinkedHashMap<String, Object>>) pageData.get("taskList");
343+
}
344+
Assertions.assertNotNull(failedTaskList);
345+
Assertions.assertFalse(failedTaskList.isEmpty());
346+
347+
LinkedHashMap<String, Object> dependentTask = failedTaskList.get(0);
348+
Assertions.assertEquals("DEPENDENT", dependentTask.get("taskType"));
349+
Assertions.assertEquals("FAILURE", dependentTask.get("state"));
350+
log.info("failed dependent task instance verified: taskType={}, state={}",
351+
dependentTask.get("taskType"), dependentTask.get("state"));
352+
}
353+
}

0 commit comments

Comments
 (0)