Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ public class WorkflowInstance {

private Date restartTime;

private String logPath;

/**
* set the process name with process define version and timestamp
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ CREATE TABLE t_ds_workflow_instance
var_pool longtext,
dry_run int NULL DEFAULT 0,
restart_time datetime DEFAULT NULL,
log_path longtext DEFAULT NULL,
PRIMARY KEY (id)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ CREATE TABLE `t_ds_workflow_instance` (
`dry_run` tinyint(4) DEFAULT '0' COMMENT 'dry run flag:0 normal, 1 dry run',
`next_workflow_instance_id` int(11) DEFAULT '0' COMMENT 'serial queue next workflowInstanceId',
`restart_time` datetime DEFAULT NULL COMMENT 'workflow instance restart time',
`log_path` longtext DEFAULT NULL COMMENT 'workflow instance log path',
PRIMARY KEY (`id`),
KEY `workflow_instance_index` (`workflow_definition_code`,`id`) USING BTREE,
KEY `start_time_index` (`start_time`,`end_time`) USING BTREE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ CREATE TABLE t_ds_workflow_instance (
dry_run int DEFAULT '0' ,
next_workflow_instance_id int DEFAULT '0',
restart_time timestamp DEFAULT NULL ,
log_path text DEFAULT NULL ,
PRIMARY KEY (id)
) ;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE `t_ds_workflow_instance`
ADD COLUMN `log_path` longtext NULL COMMENT 'workflow instance log path';
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

ALTER TABLE t_ds_workflow_instance ADD COLUMN IF NOT EXISTS "log_path" text DEFAULT NULL;
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.runner.IWorkflowExecuteContext;
import org.apache.dolphinscheduler.server.master.utils.ExceptionUtils;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
Expand Down Expand Up @@ -86,11 +87,14 @@ public void fireAllRegisteredEvent() {
final String workflowInstanceName = workflowExecutionRunnable.getName();
try {
LogUtils.setWorkflowInstanceIdMDC(workflowInstanceId);
WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getLogPath());
doFireSingleWorkflowEventBus(workflowExecutionRunnable);
} catch (Exception ex) {
log.error("Fire event failed for WorkflowExecuteRunnable: {}", workflowInstanceName, ex);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public WorkflowExecutionRunnable handleCommand(final Command command) {
assembleWorkflowInstanceLifecycleListeners(workflowExecuteContextBuilder);
assembleWorkflowEventBus(workflowExecuteContextBuilder);
assembleWorkflowExecutionGraph(workflowExecuteContextBuilder);
assembleLogPath(workflowExecuteContextBuilder);

final WorkflowExecutionRunnableBuilder workflowExecutionRunnableBuilder = WorkflowExecutionRunnableBuilder
.builder()
Expand Down Expand Up @@ -159,4 +160,8 @@ protected void assembleProject(
workflowExecuteContextBuilder.setProject(project);
}

protected void assembleLogPath(final WorkflowExecuteContextBuilder workflowExecuteContextBuilder) {
workflowExecuteContextBuilder.setLogPath(workflowExecuteContextBuilder.getWorkflowInstance().getLogPath());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -86,6 +87,12 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work
workflowInstance.setTaskDependType(command.getTaskDependType());
}
workflowInstance.setHost(masterConfig.getMasterAddress());
workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath(
workflowInstance.getRestartTime(),
workflowInstance.getWorkflowDefinitionCode(),
workflowInstance.getWorkflowDefinitionVersion(),
workflowInstance.getId()));

workflowInstanceDao.updateById(workflowInstance);
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;

import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -74,6 +75,12 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work
workflowInstance.setHost(masterConfig.getMasterAddress());
workflowInstance.setEndTime(null);
workflowInstance.setRunTimes(workflowInstance.getRunTimes() + 1);
workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath(
workflowInstance.getRestartTime(),
workflowInstance.getWorkflowDefinitionCode(),
workflowInstance.getWorkflowDefinitionVersion(),
workflowInstance.getId()));

workflowInstanceDao.updateById(workflowInstance);

workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskInstanceFactories;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -95,6 +96,12 @@ protected void assembleWorkflowInstance(
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, command.getCommandType().name());
workflowInstance.setCommandType(command.getCommandType());
workflowInstance.setHost(masterConfig.getMasterAddress());
workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath(
workflowInstance.getStartTime(),
workflowInstance.getWorkflowDefinitionCode(),
workflowInstance.getWorkflowDefinitionVersion(),
workflowInstance.getId()));

workflowInstanceDao.updateById(workflowInstance);

workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand All @@ -45,6 +46,12 @@ protected void assembleWorkflowInstance(WorkflowExecuteContext.WorkflowExecuteCo
.orElseThrow(() -> new IllegalArgumentException("Cannot find WorkflowInstance:" + workflowInstanceId));
workflowInstance.setStateWithDesc(WorkflowExecutionStatus.RUNNING_EXECUTION, command.getCommandType().name());
workflowInstance.setHost(masterConfig.getMasterAddress());
workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath(
workflowInstance.getRestartTime(),
workflowInstance.getWorkflowDefinitionCode(),
workflowInstance.getWorkflowDefinitionVersion(),
workflowInstance.getId()));

workflowInstanceDao.updateById(workflowInstance);
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;

import org.apache.commons.collections4.CollectionUtils;

Expand Down Expand Up @@ -77,6 +78,12 @@ protected void assembleWorkflowInstance(final WorkflowExecuteContextBuilder work
workflowInstance.setHost(masterConfig.getMasterAddress());
workflowInstance.setCommandParam(command.getCommandParam());
workflowInstance.setGlobalParams(mergeCommandParamsWithWorkflowParams(command, workflowDefinition));
workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath(
workflowInstance.getStartTime(),
workflowInstance.getWorkflowDefinitionCode(),
workflowInstance.getWorkflowDefinitionVersion(),
workflowInstance.getId()));

workflowInstanceDao.upsertWorkflowInstance(workflowInstance);
workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.TaskExecutionRunnableBuilder;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteContext.WorkflowExecuteContextBuilder;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;

import java.util.Date;
import java.util.Map;
Expand Down Expand Up @@ -89,6 +90,12 @@ protected void assembleWorkflowInstance(
workflowInstance.setRestartTime(new Date());
workflowInstance.setState(workflowFailoverCommandParam.getWorkflowExecutionStatus());
workflowInstance.setHost(masterConfig.getMasterAddress());
workflowInstance.setLogPath(WorkflowLogUtils.getWorkflowInstanceLogFullPath(
workflowInstance.getRestartTime(),
workflowInstance.getWorkflowDefinitionCode(),
workflowInstance.getWorkflowDefinitionVersion(),
workflowInstance.getId()));

workflowInstanceDao.updateById(workflowInstance);

workflowExecuteContextBuilder.setWorkflowInstance(workflowInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.event.TaskDispatchableEvent;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;
import org.apache.dolphinscheduler.task.executor.log.TaskExecutorMDCUtils;

import java.util.Date;
Expand Down Expand Up @@ -93,9 +94,12 @@ public void run() {
TaskExecutorMDCUtils.MDCAutoClosable ignore =
TaskExecutorMDCUtils.logWithMDC(taskExecutionRunnable.getId())) {
LogUtils.setWorkflowInstanceIdMDC(taskExecutionRunnable.getTaskInstance().getWorkflowInstanceId());
WorkflowLogUtils
.setWorkflowInstanceLogFullPathMDC(taskExecutionRunnable.getWorkflowInstance().getLogPath());
doDispatchTask(taskExecutionRunnable);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable;
import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics;
import org.apache.dolphinscheduler.server.master.utils.WorkflowInstanceUtils;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;
import org.apache.dolphinscheduler.service.alert.WorkflowAlertManager;

import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -108,12 +109,15 @@ protected void triggerTasks(final IWorkflowExecutionRunnable workflowExecutionRu
protected void pauseActiveTask(final IWorkflowExecutionRunnable workflowExecutionRunnable) {
try {
LogUtils.setWorkflowInstanceIdMDC(workflowExecutionRunnable.getId());
WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance().getLogPath());
workflowExecutionRunnable
.getWorkflowExecutionGraph()
.getActiveTaskExecutionRunnable()
.forEach(ITaskExecutionRunnable::pause);
} finally {
LogUtils.removeWorkflowInstanceIdMDC();
WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailoverLifecycleEvent;
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;

import org.springframework.stereotype.Component;

Expand All @@ -28,8 +29,10 @@ public class TaskFailover {

public void failoverTask(final ITaskExecutionRunnable taskExecutionRunnable) {
LogUtils.setWorkflowInstanceIdMDC(taskExecutionRunnable.getWorkflowInstance().getId());
WorkflowLogUtils.setWorkflowInstanceLogFullPathMDC(taskExecutionRunnable.getWorkflowInstance().getLogPath());
taskExecutionRunnable.getWorkflowEventBus().publish(TaskFailoverLifecycleEvent.of(taskExecutionRunnable));
LogUtils.removeWorkflowInstanceIdMDC();
WorkflowLogUtils.removeWorkflowInstanceLogFullPathMDC();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.master.log;

import org.apache.dolphinscheduler.server.master.utils.WorkflowLogUtils;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import org.slf4j.MDC;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.sift.AbstractDiscriminator;

/**
* Workflow Log Discriminator
*/
@Slf4j
@Getter
@Setter
public class WorkflowLogDiscriminator extends AbstractDiscriminator<ILoggingEvent> {

private String key;

private String logBase;

@Override
public String getDiscriminatingValue(ILoggingEvent event) {
String workflowInstanceLogPath = MDC.get(WorkflowLogUtils.WORKFLOW_INSTANCE_LOG_FULL_PATH_MDC_KEY);
if (workflowInstanceLogPath == null) {
log.error("The workflow instance log path is null, please check the logback configuration, log: {}", event);
}
return workflowInstanceLogPath;
}

}
Loading
Loading