Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ public ExitCodeException(int exitCode, String message) {
this.exitCode = exitCode;
}

public int getExitCode() {
return exitCode;
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.common.shell;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class AbstractShellTest {

@Test
void testExitCodeExceptionShouldExposeExitCode() {
AbstractShell.ExitCodeException exception =
new AbstractShell.ExitCodeException(127, "command not found");

Assertions.assertEquals(127, exception.getExitCode());
Assertions.assertEquals("command not found", exception.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;

import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.shell.AbstractShell.ExitCodeException;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
Expand All @@ -47,6 +48,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.ServiceLoader;
Expand Down Expand Up @@ -103,6 +105,18 @@ private ProcessUtils() {
private static final String SIGTERM = "15";
private static final String SIGKILL = "9";

private enum ProcessStatus {
ALIVE,
NOT_ALIVE,
NO_PERMISSION
}

private static final class AlivePidResult {

private final List<Integer> alivePidList = new ArrayList<>();
private final List<Integer> noPermissionPidList = new ArrayList<>();
}

/**
* Terminate the task process, support multi-level signal processing and fallback strategy
* @param request Task execution context
Expand Down Expand Up @@ -163,8 +177,14 @@ private static boolean sendKillSignal(String signal, List<Integer> pidList, Stri
return true;
}

List<Integer> alivePidList = getAlivePidList(pidList, tenantCode);
AlivePidResult alivePidResult = getAlivePidResult(pidList, tenantCode);
List<Integer> alivePidList = alivePidResult.alivePidList;
if (alivePidList.isEmpty()) {
if (!alivePidResult.noPermissionPidList.isEmpty()) {
log.warn("No killable process found, but some processes still exist without permission: {}",
alivePidResult.noPermissionPidList);
return false;
}
log.info("All processes already terminated.");
return true;
}
Expand All @@ -187,7 +207,7 @@ private static boolean sendKillSignal(String signal, List<Integer> pidList, Stri
long startTime = System.currentTimeMillis();
while (!alivePidList.isEmpty() && (System.currentTimeMillis() - startTime < timeoutMillis)) {
// Remove if process is no longer alive
alivePidList.removeIf(pid -> !isProcessAlive(pid, tenantCode));
alivePidList.removeIf(pid -> checkProcessStatus(pid, tenantCode) == ProcessStatus.NOT_ALIVE);
if (!alivePidList.isEmpty()) {
// Wait for a short interval before checking process statuses again, to avoid excessive CPU usage
// from tight-loop polling.
Expand All @@ -214,37 +234,57 @@ private static boolean sendKillSignal(String signal, List<Integer> pidList, Stri
}

/**
* Returns a list of process IDs that are still running.
* Returns process IDs grouped by their current process status.
* This method filters the provided list of PIDs by checking whether each process is still active
*
* @param pidList the list of process IDs to check
* @param tenantCode the tenant identifier used for permission control or logging context
* @return a new list containing only the PIDs of processes that are still running;
* returns an empty list if none are alive
* @return process IDs grouped by active and no-permission status
*/
private static List<Integer> getAlivePidList(List<Integer> pidList, String tenantCode) {
return pidList.stream()
.filter(pid -> isProcessAlive(pid, tenantCode))
.collect(Collectors.toList());
private static AlivePidResult getAlivePidResult(List<Integer> pidList, String tenantCode) {
AlivePidResult alivePidResult = new AlivePidResult();
for (Integer pid : pidList) {
ProcessStatus processStatus = checkProcessStatus(pid, tenantCode);
if (processStatus == ProcessStatus.ALIVE) {
alivePidResult.alivePidList.add(pid);
} else if (processStatus == ProcessStatus.NO_PERMISSION) {
alivePidResult.noPermissionPidList.add(pid);
}
}
return alivePidResult;
}

/**
* Check if a process with the specified PID is alive.
*
* @param pid the process ID to check
* @return true if the process exists and is running, false otherwise
* @return the process status
*/
private static boolean isProcessAlive(int pid, String tenantCode) {
private static ProcessStatus checkProcessStatus(int pid, String tenantCode) {
try {
// Use kill -0 to check if the process exists; it does not actually send a signal
String checkCmd = String.format("kill -0 %d", pid);
checkCmd = OSUtils.getSudoCmd(tenantCode, checkCmd);
OSUtils.exeCmd(checkCmd);
// If the command executes successfully, the process exists
return true;
return ProcessStatus.ALIVE;
} catch (ExitCodeException e) {
if (e.getExitCode() == 0) {
return ProcessStatus.ALIVE;
}
String errorMessage = StringUtils.defaultString(e.getMessage()).toLowerCase(Locale.ROOT);
if (errorMessage.contains("operation not permitted")) {
log.warn("No permission to check process status, pid: {}", pid, e);
return ProcessStatus.NO_PERMISSION;
}
if (errorMessage.contains("no such process")) {
return ProcessStatus.NOT_ALIVE;
}
log.warn("Failed to check process status, pid: {}", pid, e);
return ProcessStatus.NOT_ALIVE;
} catch (Exception e) {
// If the command fails, the process does not exist
return false;
return ProcessStatus.NOT_ALIVE;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.dolphinscheduler.plugin.task.api.utils;

import org.apache.dolphinscheduler.common.shell.AbstractShell.ExitCodeException;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
Expand Down Expand Up @@ -49,6 +50,7 @@ void tearDown() {
mockedOSUtils.close();
}
}

@Test
public void testGetPidList() throws Exception {
// first
Expand Down Expand Up @@ -199,6 +201,44 @@ void testKillProcessSuccessWithSigInt() {
mockedOSUtils.verify(() -> OSUtils.exeCmd("kill -9 12345 1234"), Mockito.never());
}

@Test
void testKillProcessShouldKillPermittedChildrenWhenSudoParentCheckIsDenied() throws Exception {
TaskExecutionContext taskRequest = Mockito.mock(TaskExecutionContext.class);
Mockito.when(taskRequest.getProcessId()).thenReturn(12345);
Mockito.when(taskRequest.getTenantCode()).thenReturn("testTenant");

String pstreeCmd;
String pstreeOutput;
if (SystemUtils.IS_OS_MAC) {
pstreeCmd = "pstree -sp 12345";
pstreeOutput = "-+= 12345 sudo -+- 1234 86.sh --- 5678 python3";
} else {
pstreeCmd = "pstree -p 12345";
pstreeOutput = "sudo(12345)---86.sh(1234)---python3(5678)";
}
mockedOSUtils.when(() -> OSUtils.exeCmd(pstreeCmd)).thenReturn(pstreeOutput);

mockedOSUtils.when(() -> OSUtils.getSudoCmd(Mockito.eq("testTenant"), Mockito.anyString()))
.thenAnswer(invocation -> invocation.getArgument(1));
mockedOSUtils.when(() -> OSUtils.exeCmd("kill -0 12345"))
.thenThrow(new ExitCodeException(1, "Operation not permitted"));
mockedOSUtils.when(() -> OSUtils.exeCmd("kill -0 1234"))
.thenThrow(new ExitCodeException(0, "HISTSIZE: readonly variable"))
.thenThrow(new ExitCodeException(1, "No such process"));
mockedOSUtils.when(() -> OSUtils.exeCmd("kill -0 5678"))
.thenThrow(new ExitCodeException(0, "HISTSIZE: readonly variable"))
.thenThrow(new ExitCodeException(1, "No such process"));
mockedOSUtils.when(() -> OSUtils.exeCmd("kill -2 1234 5678")).thenReturn("");

boolean result = ProcessUtils.kill(taskRequest);

Assertions.assertTrue(result);
mockedOSUtils.verify(() -> OSUtils.exeCmd("kill -2 1234 5678"), Mockito.times(1));
mockedOSUtils.verify(() -> OSUtils.exeCmd("kill -2 12345 1234 5678"), Mockito.never());
mockedOSUtils.verify(() -> OSUtils.exeCmd("kill -15 1234 5678"), Mockito.never());
mockedOSUtils.verify(() -> OSUtils.exeCmd("kill -9 1234 5678"), Mockito.never());
}

@Test
void testKillProcessFail() {
// Arrange
Expand Down
Loading