From e30a3d381edbd9f0791ab00e22b035dadb5c2338 Mon Sep 17 00:00:00 2001 From: Sanjana Date: Wed, 25 Mar 2026 00:56:03 +0530 Subject: [PATCH] [Improvement-18039][Metrics] Add missing metrics for workflow and task state transitions This PR adds missing metrics for task and workflow execution into DolphinScheduler metrics. It also adopts an event-driven mechanism to track Task metrics cleanly upon Task event bus fires. --- .../engine/WorkflowEventBusFireWorker.java | 34 ++++ .../statemachine/AbstractTaskStateAction.java | 1 - .../WorkflowExecutionRunnableFactory.java | 6 +- .../AbstractWorkflowStateAction.java | 3 + .../server/master/metrics/TaskMetrics.java | 27 --- .../master/metrics/TaskMetricsTest.java | 84 +++++++++ .../metrics/WorkflowInstanceMetricsTest.java | 175 ++++++++++++++++++ .../worker/metrics/WorkerServerMetrics.java | 76 ++++---- 8 files changed, 336 insertions(+), 70 deletions(-) create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetricsTest.java create mode 100644 dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetricsTest.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java index 2e1c807edf80..ced681c930e8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java @@ -156,6 +156,40 @@ private void doFireSingleEvent(final IWorkflowExecutionRunnable workflowExecutio throw new RuntimeException("No EventHandler found for event: " + event.getEventType()); } lifecycleEventHandler.handle(workflowExecutionRunnable, event); + + recordTaskInstanceMetrics(event); + } + + private void recordTaskInstanceMetrics(AbstractLifecycleEvent event) { + if (!(event + .getEventType() instanceof org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType)) { + return; + } + + switch ((org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType) event + .getEventType()) { + case DISPATCHED: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("dispatch"); + break; + case SUCCEEDED: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("success"); + break; + case FAILED: + case FATAL: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("fail"); + break; + case KILLED: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("kill"); + break; + case RETRY: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("retry"); + break; + case TIMEOUT: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("timeout"); + break; + default: + break; + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java index 88281145c67c..4f679754d8a2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java @@ -196,7 +196,6 @@ private void persistentTaskInstanceKilledEventToDB(final ITaskExecutionRunnable taskInstance.setState(TaskExecutionStatus.KILL); taskInstance.setEndTime(taskKilledEvent.getEndTime()); taskInstanceDao.updateById(taskInstance); - } @Override diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java index 2f5b5841283a..b5bc3aeab45d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.server.master.engine.command.ICommandHandler; import org.apache.dolphinscheduler.server.master.engine.exceptions.CommandDuplicateHandleException; +import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics; import java.util.List; @@ -52,8 +53,11 @@ public class WorkflowExecutionRunnableFactory { */ @Transactional public IWorkflowExecutionRunnable createWorkflowExecuteRunnable(Command command) { + long startTime = System.currentTimeMillis(); deleteCommandOrThrow(command); - return doCreateWorkflowExecutionRunnable(command); + IWorkflowExecutionRunnable workflowExecutionRunnable = doCreateWorkflowExecutionRunnable(command); + WorkflowInstanceMetrics.recordWorkflowInstanceGenerateTime(System.currentTimeMillis() - startTime); + return workflowExecutionRunnable; } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java index 59e770c4fb1e..483fce4dd418 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java @@ -176,6 +176,9 @@ protected void transformWorkflowInstanceState(final IWorkflowExecutionRunnable w workflowInstanceDao.updateById(workflowInstance); log.info("Success set WorkflowExecuteRunnable: {} state from: {} to {}", workflowInstance.getName(), originState.name(), targetState.name()); + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + targetState, + String.valueOf(workflowInstance.getWorkflowDefinitionCode())); } catch (Exception ex) { workflowInstance.setState(originState); throw ex; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java index 8ea9f13c4dae..05b53aa7a3eb 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java @@ -50,39 +50,12 @@ public class TaskMetrics { } - private final Counter taskDispatchCounter = - Counter.builder("ds.task.dispatch.count") - .description("Task dispatch count") - .register(Metrics.globalRegistry); - - private final Counter taskDispatchFailCounter = - Counter.builder("ds.task.dispatch.failure.count") - .description("Task dispatch failures count, retried ones included") - .register(Metrics.globalRegistry); - - private final Counter taskDispatchErrorCounter = - Counter.builder("ds.task.dispatch.error.count") - .description("Number of errors during task dispatch") - .register(Metrics.globalRegistry); - public synchronized void registerTaskPrepared(Supplier consumer) { Gauge.builder("ds.task.prepared", consumer) .description("Task prepared count") .register(Metrics.globalRegistry); } - public void incTaskDispatchFailed(int failedCount) { - taskDispatchFailCounter.increment(failedCount); - } - - public void incTaskDispatchError() { - taskDispatchErrorCounter.increment(); - } - - public void incTaskDispatch() { - taskDispatchCounter.increment(); - } - public void incTaskInstanceByState(final String state) { if (taskInstanceCounters.get(state) == null) { return; diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetricsTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetricsTest.java new file mode 100644 index 000000000000..ea3d3bf3972d --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetricsTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.Test; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; + +class TaskMetricsTest { + + @Test + void testIncTaskInstanceByState_validStates() { + List validStates = Arrays.asList( + "submit", "timeout", "finish", "failover", "retry", "dispatch", "success", "kill", "fail", "stop"); + + for (String state : validStates) { + Counter counter = Metrics.globalRegistry.find("ds.task.instance.count") + .tag("state", state) + .counter(); + assertNotNull(counter, "Counter should exist for state: " + state); + double before = counter.count(); + TaskMetrics.incTaskInstanceByState(state); + assertEquals(before + 1, counter.count(), 0.001, + "Counter should be incremented for state: " + state); + } + } + + @Test + void testIncTaskInstanceByState_invalidState() { + TaskMetrics.incTaskInstanceByState("nonexistent_state"); + Counter counter = Metrics.globalRegistry.find("ds.task.instance.count") + .tag("state", "nonexistent_state") + .counter(); + assertTrue(counter == null || counter.count() == 0, + "Counter should not exist or be zero for invalid state"); + } + + @Test + void testIncTaskInstanceByState_multipleIncrements() { + Counter counter = Metrics.globalRegistry.find("ds.task.instance.count") + .tag("state", "submit") + .counter(); + assertNotNull(counter); + double before = counter.count(); + + TaskMetrics.incTaskInstanceByState("submit"); + TaskMetrics.incTaskInstanceByState("submit"); + TaskMetrics.incTaskInstanceByState("submit"); + + assertEquals(before + 3, counter.count(), 0.001, + "Counter should be incremented by 3 after three calls"); + } + + @Test + void testRegisterTaskPrepared() { + TaskMetrics.registerTaskPrepared(() -> 5); + assertNotNull(Metrics.globalRegistry.find("ds.task.prepared").gauge(), + "Task prepared gauge should be registered"); + } + +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetricsTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetricsTest.java new file mode 100644 index 000000000000..a45f2715931b --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetricsTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; + +import org.junit.jupiter.api.Test; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; + +class WorkflowInstanceMetricsTest { + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_submitState() { + String defCode = "test_submit_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.SUBMITTED_SUCCESS, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "submit") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for submit state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_failureState() { + String defCode = "test_failure_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.FAILURE, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "fail") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for fail state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_successState() { + String defCode = "test_success_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.SUCCESS, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "success") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for success state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_stopState() { + String defCode = "test_stop_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.STOP, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "stop") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for stop state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_pauseState() { + String defCode = "test_pause_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.PAUSE, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "pause") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for pause state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_failoverState() { + String defCode = "test_failover_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.FAILOVER, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "failover") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for failover state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_defaultMapping() { + String defCode = "test_running_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.RUNNING_EXECUTION, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "running_execution") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for default-mapped state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testRecordCommandQueryTime() { + WorkflowInstanceMetrics.recordCommandQueryTime(100L); + Timer timer = Metrics.globalRegistry.find("ds.workflow.command.query.duration").timer(); + assertNotNull(timer, "Command query timer should be registered"); + assertEquals(1, timer.count(), "Timer should have recorded one event"); + } + + @Test + void testRecordWorkflowInstanceGenerateTime() { + WorkflowInstanceMetrics.recordWorkflowInstanceGenerateTime(200L); + Timer timer = Metrics.globalRegistry.find("ds.workflow.instance.generate.duration").timer(); + assertNotNull(timer, "Workflow instance generate timer should be registered"); + assertEquals(1, timer.count(), "Timer should have recorded one event"); + } + + @Test + void testRegisterWorkflowInstanceRunningGauge() { + WorkflowInstanceMetrics.registerWorkflowInstanceRunningGauge(() -> 10); + assertNotNull(Metrics.globalRegistry.find("ds.workflow.instance.running").gauge(), + "Running gauge should be registered"); + } + + @Test + void testRegisterWorkflowInstanceResubmitGauge() { + WorkflowInstanceMetrics.registerWorkflowInstanceResubmitGauge(() -> 3); + assertNotNull(Metrics.globalRegistry.find("ds.workflow.instance.resubmit").gauge(), + "Resubmit gauge should be registered"); + } + + @Test + void testCleanUpWorkflowInstanceCountMetricsByDefinitionCode() { + String defCode = "99999"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.SUCCESS, defCode); + Counter counterBefore = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "success") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counterBefore, "Counter should exist before cleanup"); + + WorkflowInstanceMetrics.cleanUpWorkflowInstanceCountMetricsByDefinitionCode(99999L); + + Counter counterAfter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "success") + .tag("workflow.definition.code", defCode) + .counter(); + assertNull(counterAfter, "Counter should be removed after cleanup"); + } + +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java index e17db2f34663..61f07a90d2c7 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java @@ -30,47 +30,41 @@ @UtilityClass public class WorkerServerMetrics { - private final Counter workerOverloadCounter = - Counter.builder("ds.worker.overload.count") - .description("overloaded workers count") - .register(Metrics.globalRegistry); - - private final Counter workerFullSubmitQueueCounter = - Counter.builder("ds.worker.full.submit.queue.count") - .description("full worker submit queues count") - .register(Metrics.globalRegistry); - - private final Counter workerResourceDownloadSuccessCounter = - Counter.builder("ds.worker.resource.download.count") - .tag("status", "success") - .description("worker resource download success count") - .register(Metrics.globalRegistry); - - private final Counter workerResourceDownloadFailCounter = - Counter.builder("ds.worker.resource.download.count") - .tag("status", "fail") - .description("worker resource download failure count") - .register(Metrics.globalRegistry); - - private final Counter workerHeartBeatCounter = - Counter.builder("ds.worker.heartbeat.count") - .description("worker heartbeat count") - .register(Metrics.globalRegistry); - - private final Timer workerResourceDownloadDurationTimer = - Timer.builder("ds.worker.resource.download.duration") - .publishPercentiles(0.5, 0.75, 0.95, 0.99) - .publishPercentileHistogram() - .description("time cost of resource download on workers") - .register(Metrics.globalRegistry); - - private final DistributionSummary workerResourceDownloadSizeDistribution = - DistributionSummary.builder("ds.worker.resource.download.size") - .baseUnit("bytes") - .publishPercentiles(0.5, 0.75, 0.95, 0.99) - .publishPercentileHistogram() - .description("size of downloaded resource files on worker") - .register(Metrics.globalRegistry); + private final Counter workerOverloadCounter = Counter.builder("ds.worker.overload.count") + .description("overloaded workers count") + .register(Metrics.globalRegistry); + + private final Counter workerFullSubmitQueueCounter = Counter.builder("ds.worker.full.submit.queue.count") + .description("full worker submit queues count") + .register(Metrics.globalRegistry); + + private final Counter workerResourceDownloadSuccessCounter = Counter.builder("ds.worker.resource.download.count") + .tag("status", "success") + .description("worker resource download success count") + .register(Metrics.globalRegistry); + + private final Counter workerResourceDownloadFailCounter = Counter.builder("ds.worker.resource.download.count") + .tag("status", "fail") + .description("worker resource download failure count") + .register(Metrics.globalRegistry); + + private final Counter workerHeartBeatCounter = Counter.builder("ds.worker.heartbeat.count") + .description("worker heartbeat count") + .register(Metrics.globalRegistry); + + private final Timer workerResourceDownloadDurationTimer = Timer.builder("ds.worker.resource.download.duration") + .publishPercentiles(0.5, 0.75, 0.95, 0.99) + .publishPercentileHistogram() + .description("time cost of resource download on workers") + .register(Metrics.globalRegistry); + + private final DistributionSummary workerResourceDownloadSizeDistribution = DistributionSummary + .builder("ds.worker.resource.download.size") + .baseUnit("bytes") + .publishPercentiles(0.5, 0.75, 0.95, 0.99) + .publishPercentileHistogram() + .description("size of downloaded resource files on worker") + .register(Metrics.globalRegistry); public void incWorkerOverloadCount() { workerOverloadCounter.increment();