Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,24 @@ public HttpResponse startWorkflowInstance(User loginUser,
String scheduleTime,
FailureStrategy failureStrategy,
WarningType warningType) {
return startWorkflowInstance(loginUser, projectCode, workflowDefinitionCode, scheduleTime, failureStrategy,
warningType, null);
}

public HttpResponse startWorkflowInstance(User loginUser,
long projectCode,
long workflowDefinitionCode,
String scheduleTime,
FailureStrategy failureStrategy,
WarningType warningType,
String startParams) {
Map<String, Object> params = new HashMap<>();
params.put("loginUser", loginUser);
params.put("workflowDefinitionCode", workflowDefinitionCode);
params.put("scheduleTime", scheduleTime);
params.put("failureStrategy", failureStrategy);
params.put("warningType", warningType);
params.put("startParams", startParams);
Map<String, String> headers = new HashMap<>();
headers.put(Constants.SESSION_ID_KEY, sessionId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,49 @@ public HttpResponse queryWorkflowDefinitionList(User loginUser, long projectCode
return requestClient.get(url, headers, params);
}

public HttpResponse updateWorkflowDefinition(User loginUser,
long projectCode,
long workflowDefinitionCode,
String jsonContent,
String workflowDefinitionName) {
return updateWorkflowDefinition(loginUser, projectCode, workflowDefinitionCode, jsonContent,
workflowDefinitionName, null);
}

public HttpResponse updateWorkflowDefinition(User loginUser,
long projectCode,
long workflowDefinitionCode,
String jsonContent,
String workflowDefinitionName,
ReleaseState releaseState) {
Map<String, Object> params = new HashMap<>();
params.put("loginUser", loginUser);

Map<String, Object> fileContentMap = JSONUtils.parseObject(jsonContent, new TypeReference<>() {
});
if (fileContentMap == null) {
throw new RuntimeException("file content parse error");
}
fileContentMap.replaceAll((key, value) -> {
if (value instanceof List) {
return JSONUtils.toJsonString(value);
}
return value;
});
params.putAll(fileContentMap);
params.put("name", workflowDefinitionName);
params.put("code", workflowDefinitionCode);
if (releaseState != null) {
params.put("releaseState", releaseState);
}

Map<String, String> headers = new HashMap<>();
headers.put(Constants.SESSION_ID_KEY, sessionId);
RequestClient requestClient = new RequestClient();
String url = String.format("/projects/%s/workflow-definition/%s", projectCode, workflowDefinitionCode);
return requestClient.put(url, headers, params);
}

public HttpResponse releaseWorkflowDefinition(User loginUser, long projectCode, long code,
ReleaseState releaseState) {
Map<String, Object> params = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,31 @@ public HttpResponse queryTaskListByWorkflowInstanceId(User loginUser, long proje
return requestClient.get(url, headers, params);
}

public HttpResponse queryTaskInstanceList(User loginUser, long projectCode, long workflowInstanceId) {
Map<String, Object> params = new HashMap<>();
params.put("loginUser", loginUser);
params.put("workflowInstanceId", workflowInstanceId);
params.put("pageNo", 1);
params.put("pageSize", 10);
Map<String, String> headers = new HashMap<>();
headers.put(Constants.SESSION_ID_KEY, sessionId);
RequestClient requestClient = new RequestClient();
String url = String.format("/projects/%s/task-instances", projectCode);
return requestClient.get(url, headers, params);
}

public HttpResponse queryTaskLog(User loginUser, int taskInstanceId, int skipLineNum, int limit) {
Map<String, Object> params = new HashMap<>();
params.put("loginUser", loginUser);
params.put("taskInstanceId", taskInstanceId);
params.put("skipLineNum", skipLineNum);
params.put("limit", limit);
Map<String, String> headers = new HashMap<>();
headers.put(Constants.SESSION_ID_KEY, sessionId);
RequestClient requestClient = new RequestClient();
return requestClient.get("/log/detail", headers, params);
}

public HttpResponse queryWorkflowInstanceById(User loginUser, long projectCode, long workflowInstanceId) {
Map<String, Object> params = new HashMap<>();
params.put("loginUser", loginUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.dolphinscheduler.api.service.WorkflowTaskRelationService;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.utils.SensitivePropertyUtils;
import org.apache.dolphinscheduler.api.vo.TaskDefinitionVO;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
Expand Down Expand Up @@ -138,7 +139,7 @@ public TaskDefinition queryTaskDefinitionByName(User loginUser, long projectCode
log.error("Task definition does not exist, taskName:{}.", taskName);
throw new ServiceException(Status.TASK_DEFINE_NOT_EXIST, taskName);
}
return taskDefinition;
return SensitivePropertyUtils.decryptAndMaskTaskDefinition(taskDefinition);
}

public void updateDag(User loginUser, long workflowDefinitionCode,
Expand Down Expand Up @@ -191,7 +192,7 @@ public TaskDefinition getTaskDefinition(User loginUser,
}
Project project = projectDao.queryByCode(taskDefinition.getProjectCode());
projectService.checkProjectAndAuthThrowException(loginUser, project, TASK_DEFINITION);
return taskDefinition;
return SensitivePropertyUtils.decryptAndMaskTaskDefinition(taskDefinition);
}

/**
Expand Down Expand Up @@ -223,17 +224,19 @@ private TaskDefinitionLog updateTask(User loginUser, long projectCode, long task
}
TaskDefinitionLog taskDefinitionToUpdate =
JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinitionToUpdate == null) {
log.warn("Parameter taskDefinitionJson is invalid.");
throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
}
taskDefinitionToUpdate.setTaskParams(SensitivePropertyUtils.mergeAndEncryptLocalParamsInTaskParams(
taskDefinitionToUpdate.getTaskParams(), taskDefinition.getTaskParams()));
if (TimeoutFlag.CLOSE == taskDefinition.getTimeoutFlag()) {
taskDefinition.setTimeoutNotifyStrategy(null);
}
if (taskDefinition.equals(taskDefinitionToUpdate)) {
log.warn("Task definition does not need update because no change, taskDefinitionCode:{}.", taskCode);
return null;
}
if (taskDefinitionToUpdate == null) {
log.warn("Parameter taskDefinitionJson is invalid.");
throw new ServiceException(Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
}
if (!checkTaskParameters(taskDefinitionToUpdate.getTaskType(), taskDefinitionToUpdate.getTaskParams())) {
throw new ServiceException(Status.WORKFLOW_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
}
Expand Down Expand Up @@ -613,7 +616,9 @@ public TaskDefinitionVO queryTaskDefinitionDetail(User loginUser, long projectCo
taskRelationList = taskRelationList.stream()
.filter(v -> v.getPreTaskCode() != 0).collect(Collectors.toList());
}
TaskDefinitionVO taskDefinitionVo = TaskDefinitionVO.fromTaskDefinition(taskDefinition);
TaskDefinitionVO taskDefinitionVo =
TaskDefinitionVO
.fromTaskDefinition(SensitivePropertyUtils.decryptAndMaskTaskDefinition(taskDefinition));
taskDefinitionVo.setWorkflowTaskRelationList(taskRelationList);
return taskDefinitionVo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.dolphinscheduler.api.utils.CheckUtils;
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.utils.SensitivePropertyUtils;
import org.apache.dolphinscheduler.api.validator.GlobalParamsValidator;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
Expand Down Expand Up @@ -257,6 +258,8 @@ public WorkflowDefinition createWorkflowDefinition(User loginUser,

List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);
List<WorkflowTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);
globalParams = SensitivePropertyUtils.encryptGlobalParams(globalParams);
encryptSensitiveLocalParams(taskDefinitionLogs);

long workflowDefinitionCode = CodeGenerateUtils.genCode();
WorkflowDefinition workflowDefinition =
Expand Down Expand Up @@ -445,7 +448,10 @@ public List<DagData> queryWorkflowDefinitionList(User loginUser, long projectCod
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);

List<WorkflowDefinition> resourceList = workflowDefinitionDao.queryAllDefinitionList(projectCode);
return resourceList.stream().map(processService::genDagData).collect(Collectors.toList());
return resourceList.stream()
.map(processService::genDagData)
.map(SensitivePropertyUtils::decryptAndMaskDagData)
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -521,6 +527,7 @@ public PageInfo<WorkflowDefinition> queryWorkflowDefinitionListPaging(@NonNull U
Schedule schedule = scheduleMap.get(pd.getCode());
pd.setScheduleReleaseState(schedule == null ? null : schedule.getReleaseState());
pd.setSchedule(schedule);
SensitivePropertyUtils.decryptAndMaskWorkflowDefinition(pd);
}

PageInfo<WorkflowDefinition> pageInfo = new PageInfo<>(pageNo, pageSize);
Expand Down Expand Up @@ -549,7 +556,7 @@ public DagData queryWorkflowDefinitionByCode(User loginUser, long projectCode, l
log.error("workflow definition does not exist, workflowDefinitionCode:{}.", code);
throw new ServiceException(Status.WORKFLOW_DEFINITION_NOT_EXIST, String.valueOf(code));
}
return processService.genDagData(workflowDefinition);
return SensitivePropertyUtils.decryptAndMaskDagData(processService.genDagData(workflowDefinition));
}

@Override
Expand Down Expand Up @@ -583,7 +590,7 @@ public DagData queryWorkflowDefinitionByName(User loginUser, long projectCode, S
log.error("workflow definition does not exist, projectCode:{}.", projectCode);
throw new ServiceException(Status.WORKFLOW_DEFINITION_NOT_EXIST, name);
}
return processService.genDagData(workflowDefinition);
return SensitivePropertyUtils.decryptAndMaskDagData(processService.genDagData(workflowDefinition));
}

/**
Expand Down Expand Up @@ -649,6 +656,9 @@ public WorkflowDefinition updateWorkflowDefinition(User loginUser,
throw new ServiceException(Status.WORKFLOW_DEFINITION_NAME_EXIST, name);
}
}
globalParams =
SensitivePropertyUtils.mergeAndEncryptGlobalParams(globalParams, workflowDefinition.getGlobalParams());
mergeAndEncryptSensitiveLocalParams(taskDefinitionLogs);
WorkflowDefinition workflowDefinitionDeepCopy =
JSONUtils.parseObject(JSONUtils.toJsonString(workflowDefinition), WorkflowDefinition.class);
workflowDefinition.set(projectCode, name, description, globalParams, locations, timeout);
Expand All @@ -657,6 +667,35 @@ public WorkflowDefinition updateWorkflowDefinition(User loginUser,
taskDefinitionLogs);
}

private void encryptSensitiveLocalParams(List<TaskDefinitionLog> taskDefinitionLogs) {
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
return;
}
taskDefinitionLogs.forEach(taskDefinitionLog -> taskDefinitionLog.setTaskParams(
SensitivePropertyUtils.encryptLocalParamsInTaskParams(taskDefinitionLog.getTaskParams())));
}

private void mergeAndEncryptSensitiveLocalParams(List<TaskDefinitionLog> taskDefinitionLogs) {
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
return;
}
Set<Long> taskDefinitionCodes = taskDefinitionLogs.stream()
.map(TaskDefinitionLog::getCode)
.filter(taskCode -> taskCode > 0)
.collect(Collectors.toSet());
if (CollectionUtils.isEmpty(taskDefinitionCodes)) {
encryptSensitiveLocalParams(taskDefinitionLogs);
return;
}
Map<Long, String> existingTaskParamsMap = taskDefinitionDao.queryByCodes(taskDefinitionCodes)
.stream()
.collect(Collectors.toMap(TaskDefinition::getCode, TaskDefinition::getTaskParams));
taskDefinitionLogs.forEach(taskDefinitionLog -> taskDefinitionLog.setTaskParams(
SensitivePropertyUtils.mergeAndEncryptLocalParamsInTaskParams(
taskDefinitionLog.getTaskParams(),
existingTaskParamsMap.get(taskDefinitionLog.getCode()))));
}

/**
* Task want to delete whether used in other task, should throw exception when have be used.
* <p>
Expand Down Expand Up @@ -982,7 +1021,7 @@ public List<TaskDefinition> getTaskNodeListByDefinitionCode(User loginUser, long
log.error("workflow definition does not exist, workflowDefinitionCode:{}.", code);
throw new ServiceException(Status.WORKFLOW_DEFINITION_NOT_EXIST, String.valueOf(code));
}
DagData dagData = processService.genDagData(workflowDefinition);
DagData dagData = SensitivePropertyUtils.decryptAndMaskDagData(processService.genDagData(workflowDefinition));
return dagData.getTaskDefinitionList();
}

Expand Down Expand Up @@ -1021,7 +1060,8 @@ public Map<Long, List<TaskDefinition>> getNodeListMapByDefinitionCodes(User logi
}
Map<Long, List<TaskDefinition>> taskNodeMap = new HashMap<>();
for (WorkflowDefinition workflowDefinition : workflowDefinitionListInProject) {
DagData dagData = processService.genDagData(workflowDefinition);
DagData dagData =
SensitivePropertyUtils.decryptAndMaskDagData(processService.genDagData(workflowDefinition));
taskNodeMap.put(workflowDefinition.getCode(), dagData.getTaskDefinitionList());
}
return taskNodeMap;
Expand Down Expand Up @@ -1798,7 +1838,8 @@ public WorkflowDefinitionVariablesDTO viewVariables(User loginUser, long project
}

// global params
List<Property> globalParams = workflowDefinition.getGlobalParamList();
List<Property> globalParams =
SensitivePropertyUtils.decryptAndMaskSensitiveValues(workflowDefinition.getGlobalParamList());

Map<String, Map<String, Object>> localUserDefParams = getLocalParams(workflowDefinition);

Expand Down Expand Up @@ -1828,7 +1869,9 @@ private Map<String, Map<String, Object>> getLocalParams(WorkflowDefinition workf
Map<String, Object> localParamsMap = new HashMap<>();
String localParams = JSONUtils.getNodeString(taskDefinition.getTaskParams(), LOCAL_PARAMS);
if (!StringUtils.isEmpty(localParams)) {
List<Property> localParamsList = JSONUtils.toList(localParams, Property.class);
List<Property> localParamsList =
SensitivePropertyUtils.decryptAndMaskSensitiveValues(
JSONUtils.toList(localParams, Property.class));
localParamsMap.put(TASK_TYPE, taskDefinition.getTaskType());
localParamsMap.put(LOCAL_PARAMS_LIST, localParamsList);
if (CollectionUtils.isNotEmpty(localParamsList)) {
Expand Down
Loading