Skip to content

Commit 8a0d7dd

Browse files
Mrhs121ruanwenjun
authored andcommitted
[Fix-17767] add ut
1 parent 703c40f commit 8a0d7dd

1 file changed

Lines changed: 181 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.server.master.engine.command.handler;
19+
20+
import static org.junit.jupiter.api.Assertions.assertEquals;
21+
import static org.junit.jupiter.api.Assertions.assertNotNull;
22+
import static org.junit.jupiter.api.Assertions.assertSame;
23+
import static org.junit.jupiter.api.Assertions.assertThrows;
24+
import static org.mockito.Mockito.verify;
25+
import static org.mockito.Mockito.when;
26+
27+
import org.apache.dolphinscheduler.common.constants.CommandKeyConstants;
28+
import org.apache.dolphinscheduler.common.enums.CommandType;
29+
import org.apache.dolphinscheduler.common.enums.TaskDependType;
30+
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
31+
import org.apache.dolphinscheduler.common.utils.JSONUtils;
32+
import org.apache.dolphinscheduler.dao.entity.Command;
33+
import org.apache.dolphinscheduler.dao.entity.Project;
34+
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
35+
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
36+
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
37+
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
38+
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
39+
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
40+
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
41+
import org.apache.dolphinscheduler.server.master.engine.graph.IWorkflowGraph;
42+
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext;
43+
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
44+
45+
import java.util.Collections;
46+
import java.util.HashMap;
47+
import java.util.Map;
48+
import java.util.Optional;
49+
50+
import org.junit.jupiter.api.BeforeEach;
51+
import org.junit.jupiter.api.Test;
52+
import org.junit.jupiter.api.extension.ExtendWith;
53+
import org.mockito.Mock;
54+
import org.mockito.junit.jupiter.MockitoExtension;
55+
import org.springframework.context.ApplicationContext;
56+
import org.springframework.test.util.ReflectionTestUtils;
57+
58+
@ExtendWith(MockitoExtension.class)
59+
class ExecuteTaskCommandHandlerTest {
60+
61+
private ExecuteTaskCommandHandler executeTaskCommandHandler;
62+
63+
@Mock
64+
private WorkflowInstanceDao workflowInstanceDao;
65+
66+
@Mock
67+
private TaskInstanceDao taskInstanceDao;
68+
69+
@Mock
70+
private ApplicationContext applicationContext;
71+
72+
@Mock
73+
private IWorkflowGraph workflowGraph;
74+
75+
private MasterConfig masterConfig;
76+
77+
@BeforeEach
78+
void setUp() {
79+
executeTaskCommandHandler = new ExecuteTaskCommandHandler();
80+
masterConfig = new MasterConfig();
81+
masterConfig.setMasterAddress("127.0.0.1:5678");
82+
ReflectionTestUtils.setField(executeTaskCommandHandler, "workflowInstanceDao", workflowInstanceDao);
83+
ReflectionTestUtils.setField(executeTaskCommandHandler, "taskInstanceDao", taskInstanceDao);
84+
ReflectionTestUtils.setField(executeTaskCommandHandler, AbstractCommandHandler.class, "taskInstanceDao",
85+
taskInstanceDao, TaskInstanceDao.class);
86+
ReflectionTestUtils.setField(executeTaskCommandHandler, "applicationContext", applicationContext);
87+
ReflectionTestUtils.setField(executeTaskCommandHandler, "masterConfig", masterConfig);
88+
}
89+
90+
@Test
91+
void testExecuteTaskCommandType() {
92+
assertEquals(CommandType.EXECUTE_TASK, executeTaskCommandHandler.commandType());
93+
}
94+
95+
@Test
96+
void testAssembleWorkflowInstance() {
97+
Command command = new Command();
98+
command.setWorkflowInstanceId(1);
99+
command.setCommandType(CommandType.EXECUTE_TASK);
100+
command.setTaskDependType(TaskDependType.TASK_POST);
101+
WorkflowExecuteContextBuilder contextBuilder = WorkflowExecuteContext.builder().withCommand(command);
102+
103+
WorkflowInstance workflowInstance = new WorkflowInstance();
104+
workflowInstance.setId(1);
105+
workflowInstance.setTaskDependType(TaskDependType.TASK_ONLY);
106+
when(workflowInstanceDao.queryOptionalById(1)).thenReturn(Optional.of(workflowInstance));
107+
108+
executeTaskCommandHandler.assembleWorkflowInstance(contextBuilder);
109+
110+
assertSame(workflowInstance, contextBuilder.getWorkflowInstance());
111+
assertEquals(WorkflowExecutionStatus.RUNNING_EXECUTION, workflowInstance.getState());
112+
assertEquals(CommandType.EXECUTE_TASK, workflowInstance.getCommandType());
113+
assertEquals(TaskDependType.TASK_POST, workflowInstance.getTaskDependType());
114+
assertEquals("127.0.0.1:5678", workflowInstance.getHost());
115+
verify(workflowInstanceDao).updateById(workflowInstance);
116+
}
117+
118+
@Test
119+
void testThrowExceptionWhenWorkflowInstanceNotExists() {
120+
Command command = new Command();
121+
command.setWorkflowInstanceId(100);
122+
WorkflowExecuteContextBuilder contextBuilder = WorkflowExecuteContext.builder().withCommand(command);
123+
when(workflowInstanceDao.queryOptionalById(100)).thenReturn(Optional.empty());
124+
125+
IllegalArgumentException ex = assertThrows(IllegalArgumentException.class,
126+
() -> executeTaskCommandHandler.assembleWorkflowInstance(contextBuilder));
127+
128+
assertEquals("Cannot find WorkflowInstance:100", ex.getMessage());
129+
}
130+
131+
@Test
132+
void testAssembleWorkflowExecutionGraph() {
133+
Command command = new Command();
134+
command.setWorkflowInstanceId(1);
135+
command.setCommandType(CommandType.EXECUTE_TASK);
136+
Map<String, Object> commandParam = new HashMap<>();
137+
commandParam.put(CommandKeyConstants.CMD_PARAM_START_NODES, "101");
138+
command.setCommandParam(JSONUtils.toJsonString(commandParam));
139+
WorkflowExecuteContextBuilder contextBuilder = WorkflowExecuteContext.builder().withCommand(command);
140+
141+
WorkflowInstance workflowInstance = new WorkflowInstance();
142+
workflowInstance.setId(1);
143+
workflowInstance.setTaskDependType(TaskDependType.TASK_POST);
144+
contextBuilder.setWorkflowInstance(workflowInstance);
145+
contextBuilder.setWorkflowDefinition(new WorkflowDefinition());
146+
contextBuilder.setProject(new Project());
147+
contextBuilder.setWorkflowEventBus(new WorkflowEventBus());
148+
contextBuilder.setWorkflowGraph(workflowGraph);
149+
150+
TaskDefinition taskDefinition = new TaskDefinition();
151+
taskDefinition.setCode(101L);
152+
taskDefinition.setName("shell1");
153+
when(workflowGraph.getTaskNodeByCode(101L)).thenReturn(taskDefinition);
154+
when(workflowGraph.getTaskNodeByName("shell1")).thenReturn(taskDefinition);
155+
when(workflowGraph.getAllTaskNodes()).thenReturn(Collections.singletonList(taskDefinition));
156+
when(workflowGraph.getPredecessors("shell1")).thenReturn(Collections.emptySet());
157+
when(workflowGraph.getSuccessors("shell1")).thenReturn(Collections.<String>emptySet());
158+
when(taskInstanceDao.queryValidTaskListByWorkflowInstanceId(1)).thenReturn(Collections.emptyList());
159+
160+
executeTaskCommandHandler.assembleWorkflowExecutionGraph(contextBuilder);
161+
162+
assertNotNull(contextBuilder.getWorkflowExecutionGraph());
163+
assertEquals(1, contextBuilder.getWorkflowExecutionGraph().getAllTaskExecutionRunnable().size());
164+
}
165+
166+
@Test
167+
void testThrowExceptionWhenStartNodesMissing() {
168+
Command command = new Command();
169+
command.setCommandType(CommandType.EXECUTE_TASK);
170+
command.setCommandParam("{}");
171+
WorkflowExecuteContextBuilder contextBuilder = WorkflowExecuteContext.builder().withCommand(command);
172+
contextBuilder.setWorkflowGraph(workflowGraph);
173+
174+
WorkflowInstance workflowInstance = new WorkflowInstance();
175+
workflowInstance.setTaskDependType(TaskDependType.TASK_POST);
176+
contextBuilder.setWorkflowInstance(workflowInstance);
177+
178+
assertThrows(IllegalArgumentException.class,
179+
() -> executeTaskCommandHandler.assembleWorkflowExecutionGraph(contextBuilder));
180+
}
181+
}

0 commit comments

Comments
 (0)