From 68a5e89586d1dbd5bb715dd53e5d24ede5867be8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=8F=E4=B9=89=E8=B6=85?= Date: Thu, 2 Apr 2026 14:08:18 +0800 Subject: [PATCH] Add separate workflow instance logs --- .../dao/entity/WorkflowInstance.java | 2 + .../resources/sql/dolphinscheduler_h2.sql | 1 + .../resources/sql/dolphinscheduler_mysql.sql | 1 + .../sql/dolphinscheduler_postgresql.sql | 1 + .../mysql/dolphinscheduler_ddl.sql | 19 ++++ .../mysql/dolphinscheduler_dml.sql | 16 ++++ .../postgresql/dolphinscheduler_ddl.sql | 18 ++++ .../postgresql/dolphinscheduler_dml.sql | 16 ++++ .../engine/WorkflowEventBusFireWorker.java | 4 + .../handler/AbstractCommandHandler.java | 5 + .../handler/ExecuteTaskCommandHandler.java | 7 ++ .../handler/ReRunWorkflowCommandHandler.java | 7 ++ .../RecoverFailureTaskCommandHandler.java | 7 ++ .../RecoverSerialWaitCommandHandler.java | 7 ++ .../handler/RunWorkflowCommandHandler.java | 7 ++ .../WorkflowFailoverCommandHandler.java | 7 ++ .../dispatcher/WorkerGroupDispatcher.java | 4 + .../AbstractWorkflowStateAction.java | 4 + .../server/master/failover/TaskFailover.java | 3 + .../master/log/WorkflowLogDiscriminator.java | 52 +++++++++++ .../server/master/log/WorkflowLogFilter.java | 59 ++++++++++++ .../server/master/log/WorkflowLogMarkers.java | 43 +++++++++ .../rpc/TaskExecutorEventListenerImpl.java | 59 ++++++++---- .../rpc/TaskInstanceControllerImpl.java | 6 ++ .../master/runner/WorkflowExecuteContext.java | 7 +- .../server/master/utils/WorkflowLogUtils.java | 92 +++++++++++++++++++ .../src/main/resources/logback-spring.xml | 22 +++++ 27 files changed, 456 insertions(+), 20 deletions(-) create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_dml.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_ddl.sql create mode 100644 dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_dml.sql create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogDiscriminator.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogFilter.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogMarkers.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowLogUtils.java diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java index 8abf6a46666d..31caeb7adf1d 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/WorkflowInstance.java @@ -158,6 +158,8 @@ public class WorkflowInstance { private Date restartTime; + private String logPath; + /** * set the process name with process define version and timestamp * diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 996fce0732b9..d4896e74a68b 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -636,6 +636,7 @@ CREATE TABLE t_ds_workflow_instance var_pool longtext, dry_run int NULL DEFAULT 0, restart_time datetime DEFAULT NULL, + log_path longtext DEFAULT NULL, PRIMARY KEY (id) ); diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 7067cd7429aa..d3fb8a6fea77 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -644,6 +644,7 @@ CREATE TABLE `t_ds_workflow_instance` ( `dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run', `next_workflow_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next workflowInstanceId', `restart_time` datetime DEFAULT NULL COMMENT 'workflow instance restart time', + `log_path` longtext DEFAULT NULL COMMENT 'workflow instance log path', PRIMARY KEY (`id`), KEY `workflow_instance_index` (`workflow_definition_code`,`id`) USING BTREE, KEY `start_time_index` (`start_time`,`end_time`) USING BTREE diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 36261d7a8c2d..f8d2f0b22519 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -584,6 +584,7 @@ CREATE TABLE t_ds_workflow_instance ( dry_run int DEFAULT '0' , next_workflow_instance_id int DEFAULT '0', restart_time timestamp DEFAULT NULL , + log_path text DEFAULT NULL , PRIMARY KEY (id) ) ; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000000..d186ab70e3f7 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +ALTER TABLE `t_ds_workflow_instance` +ADD COLUMN `log_path` longtext NULL COMMENT 'workflow instance log path'; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 000000000000..4a14f326b985 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000000..3d439848b44b --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +ALTER TABLE t_ds_workflow_instance ADD COLUMN IF NOT EXISTS "log_path" text DEFAULT NULL; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 000000000000..4a14f326b985 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,16 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java index 2e1c807edf80..70145fdf28a8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java @@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext; import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; @@ -86,11 +87,14 @@ public void fireAllRegisteredEvent() { final String workflowInstanceName = workflowExecutionRunnable.getName(); try { LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC( + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getLogPath()); doFireSingleWorkflowEventBus(workflowExecutionRunnable); } catch (Exception ex) { log.error("Fire event failed for WorkflowExecuteRunnable: {}", workflowInstanceName, ex); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java index 2c9df21a870c..1bfc215a695e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/AbstractCommandHandler.java @@ -81,6 +81,7 @@ public WorkflowExecutionRunnable handleCommand(final Command command) { assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder); assembleWorkflowEventBus(workflowExecuteContextBuilder); assembleWorkflowExecutionGraph(workflowExecuteContextBuilder); + assembleLogPath(workflowExecuteContextBuilder); final WorkflowExecutionRunnableBuilder workflowExecutionRunnableBuilder = WorkflowExecutionRunnableBuilder .builder() @@ -159,4 +160,8 @@ protected void assembleProject( workflowExecuteContextBuilder.setProject(project); } + protected void assembleLogPath(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) { + workflowExecuteContextBuilder.setLogPath(workflowExecuteContextBuilder.getWorkflowInstance().getLogPath()); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandler.java index 43afdb09e733..f2393867fe09 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ExecuteTaskCommandHandler.java @@ -36,6 +36,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -86,6 +87,12 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work workflowInstance.setTaskDependType(command.getTaskDependType()); } workflowInstance.setHost(masterConfig.getMasterAddress()); + workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath( + workflowInstance.getRestartTime(), + workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion(), + workflowInstance.getId())); + workflowInstanceDao.updateById(workflowInstance); workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java index 524c7a225a5c..dfbeb49db9fd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/ReRunWorkflowCommandHandler.java @@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import java.util.Date; import java.util.List; @@ -74,6 +75,12 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work workflowInstance.setHost(masterConfig.getMasterAddress()); workflowInstance.setEndTime(null); workflowInstance.setRunTimes(workflowInstance.getRunTimes() + 1); + workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath( + workflowInstance.getRestartTime(), + workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion(), + workflowInstance.getId())); + workflowInstanceDao.updateById(workflowInstance); workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java index 92f5fb4ed0c7..14dc1060432f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverFailureTaskCommandHandler.java @@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import java.util.ArrayList; import java.util.HashSet; @@ -95,6 +96,12 @@ protected void assembleWorkflowInstance( workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, command.getCommandType().name()); workflowInstance.setCommandType(command.getCommandType()); workflowInstance.setHost(masterConfig.getMasterAddress()); + workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath( + workflowInstance.getStartTime(), + workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion(), + workflowInstance.getId())); + workflowInstanceDao.updateById(workflowInstance); workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java index 09486db8baac..e91332e29cbb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RecoverSerialWaitCommandHandler.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -45,6 +46,12 @@ protected void assembleWorkflowInstance(WorkflowExecuteContext.WorkflowExecuteCo .orElseThrow(() -> new IllegalArgumentException("Cannot find WorkflowInstance:" + workflowInstanceId)); workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, command.getCommandType().name()); workflowInstance.setHost(masterConfig.getMasterAddress()); + workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath( + workflowInstance.getRestartTime(), + workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion(), + workflowInstance.getId())); + workflowInstanceDao.updateById(workflowInstance); workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java index c56006c89ad8..54a2ba949f26 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/RunWorkflowCommandHandler.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.commons.collections4.CollectionUtils; @@ -77,6 +78,12 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work workflowInstance.setHost(masterConfig.getMasterAddress()); workflowInstance.setCommandParam(command.getCommandParam()); workflowInstance.setGlobalParams(mergeCommandParamsWithWorkflowParams(command, workflowDefinition)); + workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath( + workflowInstance.getStartTime(), + workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion(), + workflowInstance.getId())); + workflowInstanceDao.upsertWorkflowInstance(workflowInstance); workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java index e18d02b400f8..d64be2e2e9e9 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import java.util.Date; import java.util.Map; @@ -89,6 +90,12 @@ protected void assembleWorkflowInstance( workflowInstance.setRestartTime(new Date()); workflowInstance.setState(workflowFailoverCommandParam.getWorkflowExecutionStatus()); workflowInstance.setHost(masterConfig.getMasterAddress()); + workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath( + workflowInstance.getRestartTime(), + workflowInstance.getWorkflowDefinitionCode(), + workflowInstance.getWorkflowDefinitionVersion(), + workflowInstance.getId())); + workflowInstanceDao.updateById(workflowInstance); workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java index 51ccd2f9cf11..b37e2d30423b 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java @@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils; import java.util.Date; @@ -93,9 +94,12 @@ public void run() { TaskExecutorMDCUtils.MDCAutoClosable ignore = TaskExecutorMDCUtils.logWithMDC(taskExecutionRunnable.getId())) { LogUtils.setWorkflowInstanceIdMDC(taskExecutionRunnable.getTaskInstance().getWorkflowInstanceId()); + WorkflowLogUtils + .setWorkflowInstanceLogFullPathMDC(taskExecutionRunnable.getWorkflowInstance().getLogPath()); doDispatchTask(taskExecutionRunnable); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java index 59e770c4fb1e..7ae98653bd31 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java @@ -38,6 +38,7 @@ import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics; import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager; import org.apache.commons.collections4.CollectionUtils; @@ -108,12 +109,15 @@ protected void triggerTasks(final IWorkflowExecutionRunnable workflowExecutionRu protected void pauseActiveTask(final IWorkflowExecutionRunnable workflowExecutionRunnable) { try { LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId()); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC( + workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getLogPath()); workflowExecutionRunnable .getWorkflowExecutionGraph() .getActiveTaskExecutionRunnable() .forEach(ITaskExecutionRunnable::pause); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/TaskFailover.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/TaskFailover.java index c68b4bb4e6f4..343d44c5bde6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/TaskFailover.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/failover/TaskFailover.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import org.springframework.stereotype.Component; @@ -28,8 +29,10 @@ public class TaskFailover { public void failoverTask(final ITaskExecutionRunnable taskExecutionRunnable) { LogUtils.setWorkflowInstanceIdMDC(taskExecutionRunnable.getWorkflowInstance().getId()); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(taskExecutionRunnable.getWorkflowInstance().getLogPath()); taskExecutionRunnable.getWorkflowEventBus().publish(TaskFailoverLifecycleEvent.of(taskExecutionRunnable)); LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogDiscriminator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogDiscriminator.java new file mode 100644 index 000000000000..d8baaea1577f --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogDiscriminator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.log; + +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import org.slf4j.MDC; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.sift.AbstractDiscriminator; + +/** + * Workflow Log Discriminator + */ +@Slf4j +@Getter +@Setter +public class WorkflowLogDiscriminator extends AbstractDiscriminator { + + private String key; + + private String logBase; + + @Override + public String getDiscriminatingValue(ILoggingEvent event) { + String workflowInstanceLogPath = MDC.get(WorkflowLogUtils.WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY); + if (workflowInstanceLogPath == null) { + log.error("The workflow instance log path is null, please check the logback configuration, log: {}", event); + } + return workflowInstanceLogPath; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogFilter.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogFilter.java new file mode 100644 index 000000000000..c9c42fa2d9a3 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogFilter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.log; + +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; + +import org.apache.commons.lang3.StringUtils; + +import lombok.extern.slf4j.Slf4j; + +import org.slf4j.MDC; +import org.slf4j.Marker; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.filter.Filter; +import ch.qos.logback.core.spi.FilterReply; + +/** + * This class is used to filter the log of the workflow instance. + */ +@Slf4j +public class WorkflowLogFilter extends Filter { + + @Override + public FilterReply decide(ILoggingEvent event) { + String workflowInstanceLogPath = MDC.get(WorkflowLogUtils.WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY); + // If the workflowInstanceLogPath is empty, it means that the log is not related to a workflow instance. + if (StringUtils.isEmpty(workflowInstanceLogPath)) { + return FilterReply.DENY; + } + + final Marker marker = event.getMarker(); + if (marker == null) { + return FilterReply.ACCEPT; + } + if (marker.contains(WorkflowLogMarkers.includeInWorkflowLog())) { + return FilterReply.ACCEPT; + } + if (marker.contains(WorkflowLogMarkers.excludeInWorkflowLog())) { + return FilterReply.DENY; + } + return FilterReply.ACCEPT; + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogMarkers.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogMarkers.java new file mode 100644 index 000000000000..0653e555d2ed --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/log/WorkflowLogMarkers.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.log; + +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; + +public class WorkflowLogMarkers { + + private static final Marker WORKFLOW_LOGGER_EXCLUDE_MARKER = MarkerFactory.getMarker("WORKFLOW_LOGGER_EXCLUDE"); + + private static final Marker WORKFLOW_LOGGER_INCLUDE_MARKER = MarkerFactory.getMarker("WORKFLOW_LOGGER_INCLUDE"); + + /** + * The marker used to exclude logs from the workflow instance log file. + */ + public static Marker excludeInWorkflowLog() { + return WORKFLOW_LOGGER_EXCLUDE_MARKER; + } + + /** + * The marker used to include logs from the workflow instance log file. + */ + public static Marker includeInWorkflowLog() { + return WORKFLOW_LOGGER_INCLUDE_MARKER; + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java index 94df43ede4d0..fb71709b7bbe 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.rpc; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.extract.master.ITaskExecutorEventListener; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository; @@ -29,14 +30,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskSuccessLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; -import org.apache.dolphinscheduler.task.executor.events.IReportableTaskExecutorLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorDispatchedLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorFailedLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorKilledLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorPausedLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorStartedLifecycleEvent; -import org.apache.dolphinscheduler.task.executor.events.TaskExecutorSuccessLifecycleEvent; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import java.util.Date; @@ -54,7 +48,10 @@ public class TaskExecutorEventListenerImpl implements ITaskExecutorEventListener @Override public void onTaskExecutorDispatched(final TaskExecutorDispatchedLifecycleEvent taskExecutorDispatchedLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorDispatchedLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorDispatchedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorDispatchedLifecycleEvent); @@ -66,12 +63,16 @@ public void onTaskExecutorDispatched(final TaskExecutorDispatchedLifecycleEvent taskExecutionRunnable.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent taskExecutorStartedLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorStartedLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorStartedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorStartedLifecycleEvent); @@ -84,31 +85,39 @@ public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent taskEx taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override - public void onTaskExecutorRuntimeContextChanged(final TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorRuntimeContextChangedLifecycleEventr) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorRuntimeContextChangedLifecycleEventr.getWorkflowInstanceId()); + public void onTaskExecutorRuntimeContextChanged(final TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorRuntimeContextChangedLifecycleEvent) { + int workflowInstanceId = taskExecutorRuntimeContextChangedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecutionRunnable taskExecutionRunnable = - getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEventr); + getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEvent); final TaskRuntimeContextChangedEvent taskRuntimeContextChangedEvent = TaskRuntimeContextChangedEvent.builder() .taskExecutionRunnable(taskExecutionRunnable) - .runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds()) + .runtimeContext(taskExecutorRuntimeContextChangedLifecycleEvent.getAppIds()) .build(); taskExecutionRunnable.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorSuccess(final TaskExecutorSuccessLifecycleEvent taskExecutorSuccessLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorSuccessLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorSuccessLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorSuccessLifecycleEvent); @@ -120,12 +129,16 @@ public void onTaskExecutorSuccess(final TaskExecutorSuccessLifecycleEvent taskEx taskExecutionRunnable.getWorkflowEventBus().publish(taskSuccessEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorFailed(final TaskExecutorFailedLifecycleEvent taskExecutorFailedLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorFailedLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorFailedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorFailedLifecycleEvent); @@ -136,12 +149,16 @@ public void onTaskExecutorFailed(final TaskExecutorFailedLifecycleEvent taskExec taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorKilled(final TaskExecutorKilledLifecycleEvent taskExecutorKilledLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorKilledLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorKilledLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorKilledLifecycleEvent); @@ -152,12 +169,16 @@ public void onTaskExecutorKilled(final TaskExecutorKilledLifecycleEvent taskExec taskExecutionRunnable.getWorkflowEventBus().publish(taskKilledEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @Override public void onTaskExecutorPaused(final TaskExecutorPausedLifecycleEvent taskExecutorPausedLifecycleEvent) { - LogUtils.setWorkflowInstanceIdMDC(taskExecutorPausedLifecycleEvent.getWorkflowInstanceId()); + int workflowInstanceId = taskExecutorPausedLifecycleEvent.getWorkflowInstanceId(); + LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId); + WorkflowInstance workflowInstance = workflowRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); try { final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorPausedLifecycleEvent); @@ -165,6 +186,7 @@ public void onTaskExecutorPaused(final TaskExecutorPausedLifecycleEvent taskExec taskExecutionRunnable.getWorkflowEventBus().publish(taskPausedEvent); } finally { LogUtils.removeWorkflowInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } @@ -184,5 +206,4 @@ private ITaskExecutionRunnable getTaskExecutionRunnable(final IReportableTaskExe } return taskExecutionRunnable; } - } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceControllerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceControllerImpl.java index 16b7910b82d8..e28abf153241 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceControllerImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskInstanceControllerImpl.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.rpc; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.extract.master.ITaskInstanceController; import org.apache.dolphinscheduler.extract.master.transportor.TaskGroupSlotAcquireSuccessNotifyRequest; import org.apache.dolphinscheduler.extract.master.transportor.TaskGroupSlotAcquireSuccessNotifyResponse; @@ -25,6 +26,7 @@ import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchLifecycleEvent; import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; +import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils; import lombok.extern.slf4j.Slf4j; @@ -46,6 +48,9 @@ public TaskGroupSlotAcquireSuccessNotifyResponse notifyTaskGroupSlotAcquireSucce final int workflowInstanceId = taskGroupSlotAcquireSuccessNotifyRequest.getWorkflowInstanceId(); final int taskInstanceId = taskGroupSlotAcquireSuccessNotifyRequest.getTaskInstanceId(); LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstanceId, taskInstanceId); + WorkflowInstance workflowInstance = + workflowExecutionRunnableMemoryRepository.get(workflowInstanceId).getWorkflowInstance(); + WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(workflowInstance.getLogPath()); final IWorkflowExecutionRunnable workflowExecutionRunnable = workflowExecutionRunnableMemoryRepository.get(workflowInstanceId); if (workflowExecutionRunnable == null) { @@ -68,6 +73,7 @@ public TaskGroupSlotAcquireSuccessNotifyResponse notifyTaskGroupSlotAcquireSucce return TaskGroupSlotAcquireSuccessNotifyResponse.success(); } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC(); } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java index 0727ff5e9841..8796649c12bd 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteContext.java @@ -55,6 +55,8 @@ public class WorkflowExecuteContext implements IWorkflowExecuteContext { private final List workflowInstanceLifecycleListeners; + private String logPath; + public static WorkflowExecuteContextBuilder builder() { return new WorkflowExecuteContextBuilder(); } @@ -79,6 +81,8 @@ public static class WorkflowExecuteContextBuilder { private Project project; + private String logPath; + public WorkflowExecuteContextBuilder withCommand(Command command) { this.command = command; return this; @@ -93,7 +97,8 @@ public WorkflowExecuteContext build() { workflowGraph, workflowExecutionGraph, workflowEventBus, - Optional.ofNullable(workflowInstanceLifecycleListeners).orElse(Collections.emptyList())); + Optional.ofNullable(workflowInstanceLifecycleListeners).orElse(Collections.emptyList()), + logPath); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowLogUtils.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowLogUtils.java new file mode 100644 index 000000000000..db1ea5256669 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/utils/WorkflowLogUtils.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.utils; + +import org.apache.dolphinscheduler.common.constants.DateConstants; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.server.master.log.WorkflowLogDiscriminator; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Date; +import java.util.Optional; + +import lombok.experimental.UtilityClass; +import lombok.extern.slf4j.Slf4j; + +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import ch.qos.logback.classic.sift.SiftingAppender; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.spi.AppenderAttachable; + +@Slf4j +@UtilityClass +public class WorkflowLogUtils { + + private static final Path WORKFLOW_INSTANCE_LOG_BASE_PATH = getWorkflowInstanceLogBasePath(); + public static final String WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY = "workflowInstanceLogFullPath"; + + public static String getWorkflowInstanceLogFullPath(Date workflowStartTime, + Long workflowDefinitionCode, + int workflowDefinitionVersion, + int workflowInstanceId) { + if (WORKFLOW_INSTANCE_LOG_BASE_PATH == null) { + throw new IllegalArgumentException( + "Cannot find the workflow instance log base path, please check your logback.xml file"); + } + final String workflowLogFileName = Paths.get( + String.valueOf(workflowDefinitionCode), + String.valueOf(workflowDefinitionVersion), + String.format("%s.log", workflowInstanceId)).toString(); + return WORKFLOW_INSTANCE_LOG_BASE_PATH + .resolve(DateUtils.format(workflowStartTime, DateConstants.YYYYMMDD, null)) + .resolve(workflowLogFileName) + .toString(); + } + + /** + * Get workflow instance log base absolute path, this is defined in logback.xml + */ + public static Path getWorkflowInstanceLogBasePath() { + return Optional.of(LoggerFactory.getILoggerFactory()) + .map(e -> (AppenderAttachable) (e.getLogger("ROOT"))) + .map(e -> (SiftingAppender) (e.getAppender("WORKFLOWLOGFILE"))) + .map(e -> ((WorkflowLogDiscriminator) (e.getDiscriminator()))) + .map(WorkflowLogDiscriminator::getLogBase) + .map(e -> Paths.get(e).toAbsolutePath()) + .orElse(null); + } + + public static String getWorkflowInstanceLogFullPathMDC() { + return MDC.get(WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY); + } + + public static void setWorkflowInstanceLogFullPathMDC(String workflowInstanceLogFullPath) { + if (workflowInstanceLogFullPath == null) { + log.warn("workflowInstanceLogFullPath is null"); + return; + } + MDC.put(WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY, workflowInstanceLogFullPath); + } + + public static void removeWorkflowInstanceLogFullPathMDC() { + MDC.remove(WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY); + } +} diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index a4c3a4f22dfe..b16c201335d4 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -51,6 +51,27 @@ + + + + + workflowInstanceLogFullPath + ${log.base}/workflows + + + + ${workflowInstanceLogFullPath} + + + %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%thread] %logger{10}:[%line] - %message%n + + UTF-8 + + true + + + + ${log.base}/dolphinscheduler-master.log @@ -75,6 +96,7 @@ +