From 0e9963f4df73b3b01ce6fa2a0677864f56e135b1 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 6 Feb 2026 14:27:41 +0100 Subject: [PATCH 1/3] NIFI-15519 - Skip flow save during shutdown to preserve processor auto-resume states --- .../nifi/controller/StandardFlowService.java | 10 ++ .../ProcessorAutoResumeAfterRestartIT.java | 127 ++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 7755030322d5..712aeb3270e2 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -1026,6 +1026,16 @@ public synchronized void run() { } writeLock.lock(); try { + // Skip saving during shutdown to preserve RUNNING processor states in flow.json.gz. + // During graceful shutdown, processor desired states are set to STOPPED before this + // save executes. Saving at that point would persist ENABLED states instead of RUNNING, + // which prevents auto-resume of processors on the next startup. + if (!running.get()) { + StandardFlowService.this.saveHolder.set(null); + logger.info("Skipping flow controller save because service is no longer running"); + return; + } + dao.save(controller, holder.shouldArchive); // Nulling it out if it is still set to our current SaveHolder. Otherwise leave it alone because it means // another save is already pending. diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java new file mode 100644 index 000000000000..ad5f482661d9 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.restart; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.nifi.tests.system.NiFiInstance; +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.zip.GZIPInputStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class ProcessorAutoResumeAfterRestartIT extends NiFiSystemIT { + + @Test + public void testRunningProcessorsResumeAfterRestart() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generate = getClientUtil().createProcessor(GENERATE_FLOWFILE); + final ProcessorEntity terminate = getClientUtil().createProcessor(TERMINATE_FLOWFILE); + getClientUtil().createConnection(generate, terminate, SUCCESS); + + getClientUtil().updateProcessorSchedulingPeriod(generate, "10 sec"); + getClientUtil().startProcessor(generate); + getClientUtil().startProcessor(terminate); + getClientUtil().waitForRunningProcessor(generate.getId()); + getClientUtil().waitForRunningProcessor(terminate.getId()); + + // Wait for the flow to be saved at least once with RUNNING states. + // The SaveReportingTask runs every 500ms and the default write delay is 500ms, + // so 2 seconds is sufficient for at least one successful save. + Thread.sleep(2000); + + // Trigger a new save request by making a flow modification, then immediately stop NiFi. + // Each REST API modification calls saveFlowChanges() which sets a saveHolder with a 500ms delay. + // By stopping NiFi immediately after the modification, the pending save has not yet been processed. + // During NiFi's shutdown sequence, the pending save would execute after all processors have been + // stopped. + getClientUtil().createProcessor(TERMINATE_FLOWFILE); + + final NiFiInstance nifiInstance = getNiFiInstance(); + nifiInstance.stop(); + + // After shutdown, verify that flow.json.gz still has RUNNING states for the processors. + // Without the fix in SaveReportingTask, the shutdown race condition would cause processors + // to be saved with ENABLED states instead of RUNNING. + final File confDir = new File(nifiInstance.getInstanceDirectory(), "conf"); + final File flowJsonGz = new File(confDir, "flow.json.gz"); + + final List processorStates = getProcessorScheduledStates(flowJsonGz); + assertFalse(processorStates.isEmpty(), "Expected processors in flow.json.gz"); + for (final String state : processorStates) { + assertEquals("RUNNING", state, + "Processor should have RUNNING state in flow.json.gz after graceful shutdown, but found %s".formatted(state)); + } + + nifiInstance.start(true); + getClientUtil().waitForRunningProcessor(generate.getId()); + getClientUtil().waitForRunningProcessor(terminate.getId()); + } + + private List getProcessorScheduledStates(final File flowJsonGz) throws IOException { + final byte[] decompressed = decompress(flowJsonGz); + final ObjectMapper mapper = new ObjectMapper(); + final JsonNode root = mapper.readTree(decompressed); + final List states = new ArrayList<>(); + final JsonNode rootGroup = root.path("rootGroup"); + collectProcessorStates(rootGroup, states); + return states; + } + + private void collectProcessorStates(final JsonNode group, final List states) { + final JsonNode processors = group.path("processors"); + if (processors.isArray()) { + for (final JsonNode processor : processors) { + final JsonNode scheduledState = processor.path("scheduledState"); + if (!scheduledState.isMissingNode()) { + states.add(scheduledState.asText()); + } + } + } + final JsonNode childGroups = group.path("processGroups"); + if (childGroups.isArray()) { + for (final JsonNode childGroup : childGroups) { + collectProcessorStates(childGroup, states); + } + } + } + + private byte[] decompress(final File gzipFile) throws IOException { + try (final InputStream fis = Files.newInputStream(gzipFile.toPath()); + final GZIPInputStream gzis = new GZIPInputStream(fis); + final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + final byte[] buffer = new byte[8192]; + int len; + while ((len = gzis.read(buffer)) != -1) { + baos.write(buffer, 0, len); + } + return baos.toByteArray(); + } + } +} From 04bf7159b1d23443a2c33cf780d11a7ba7df3be2 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 6 Feb 2026 17:26:43 +0100 Subject: [PATCH 2/3] Make sure we still write the pending flow changes --- .../nifi/controller/StandardFlowService.java | 22 +++++++- .../ProcessorAutoResumeAfterRestartIT.java | 55 +++++++++++++------ 2 files changed, 58 insertions(+), 19 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 712aeb3270e2..3e15e05dc69f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -311,6 +311,19 @@ public void stop(final boolean force) { return; } + // Flush any pending save while processors are still running, preserving their + // RUNNING states in flow.json.gz. This must happen before controller.shutdown() + // which sets all processor desired states to STOPPED. + final SaveHolder pendingSave = saveHolder.getAndSet(null); + if (pendingSave != null) { + try { + dao.save(controller, pendingSave.shouldArchive); + logger.info("Flushed pending flow save before shutdown"); + } catch (final Exception e) { + logger.error("Failed to flush pending flow save before shutdown", e); + } + } + running.set(false); // Stop Cluster Coordinator before Node Protocol Sender @@ -334,6 +347,10 @@ public void stop(final boolean force) { if (!controller.isTerminated()) { controller.shutdown(force); } + + // Clear any save requests triggered during shutdown (e.g. by stopping processors). + // These would contain incorrect processor states (STOPPED mapped to ENABLED). + saveHolder.set(null); } finally { writeLock.unlock(); } @@ -358,7 +375,10 @@ public void stop(final boolean force) { } } - // Ensure that our background save reporting task has a chance to run, because we've now shut down the executor, which could cause the save reporting task to get canceled. + // Run the save reporting task one final time. At this point, saveHolder should be null + // (cleared above), so this will be a no-op. However, if a save was somehow requested + // between the writeLock release and here, the running check will prevent saving + // incorrect post-shutdown processor states. saveReportingTask.run(); } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java index ad5f482661d9..709f6a53cb98 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/ProcessorAutoResumeAfterRestartIT.java @@ -30,12 +30,15 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import java.util.zip.GZIPInputStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class ProcessorAutoResumeAfterRestartIT extends NiFiSystemIT { @@ -51,56 +54,72 @@ public void testRunningProcessorsResumeAfterRestart() throws NiFiClientException getClientUtil().waitForRunningProcessor(generate.getId()); getClientUtil().waitForRunningProcessor(terminate.getId()); + final Set runningProcessorIds = Set.of(generate.getId(), terminate.getId()); + // Wait for the flow to be saved at least once with RUNNING states. // The SaveReportingTask runs every 500ms and the default write delay is 500ms, - // so 2 seconds is sufficient for at least one successful save. - Thread.sleep(2000); + // so 3 seconds is sufficient for at least one successful save. + Thread.sleep(3000); // Trigger a new save request by making a flow modification, then immediately stop NiFi. // Each REST API modification calls saveFlowChanges() which sets a saveHolder with a 500ms delay. // By stopping NiFi immediately after the modification, the pending save has not yet been processed. // During NiFi's shutdown sequence, the pending save would execute after all processors have been - // stopped. - getClientUtil().createProcessor(TERMINATE_FLOWFILE); + // stopped, persisting ENABLED states instead of RUNNING. The fix in StandardFlowService.stop() + // flushes any pending save before stopping the controller, preserving RUNNING states. + final ProcessorEntity addedBeforeShutdown = getClientUtil().createProcessor(TERMINATE_FLOWFILE); final NiFiInstance nifiInstance = getNiFiInstance(); nifiInstance.stop(); - // After shutdown, verify that flow.json.gz still has RUNNING states for the processors. - // Without the fix in SaveReportingTask, the shutdown race condition would cause processors - // to be saved with ENABLED states instead of RUNNING. + // After shutdown, verify that flow.json.gz still has RUNNING states for the started processors. + // Without the fix, the shutdown race condition would cause processors to be saved with ENABLED + // states instead of RUNNING, preventing auto-resume on the next startup. final File confDir = new File(nifiInstance.getInstanceDirectory(), "conf"); final File flowJsonGz = new File(confDir, "flow.json.gz"); - final List processorStates = getProcessorScheduledStates(flowJsonGz); + final Map processorStates = getProcessorScheduledStates(flowJsonGz); assertFalse(processorStates.isEmpty(), "Expected processors in flow.json.gz"); - for (final String state : processorStates) { - assertEquals("RUNNING", state, - "Processor should have RUNNING state in flow.json.gz after graceful shutdown, but found %s".formatted(state)); + for (final String processorId : runningProcessorIds) { + final String state = processorStates.get(processorId); + assertEquals("RUNNING", state); } + assertTrue(processorStates.containsKey(addedBeforeShutdown.getId())); + nifiInstance.start(true); getClientUtil().waitForRunningProcessor(generate.getId()); getClientUtil().waitForRunningProcessor(terminate.getId()); + + final ProcessorEntity generateAfterRestart = getNifiClient().getProcessorClient().getProcessor(generate.getId()); + assertEquals("RUNNING", generateAfterRestart.getComponent().getState()); + + final ProcessorEntity terminateAfterRestart = getNifiClient().getProcessorClient().getProcessor(terminate.getId()); + assertEquals("RUNNING", terminateAfterRestart.getComponent().getState()); + + final ProcessorEntity addedAfterRestart = getNifiClient().getProcessorClient().getProcessor(addedBeforeShutdown.getId()); + assertNotNull(addedAfterRestart); + assertEquals("STOPPED", addedAfterRestart.getComponent().getState()); } - private List getProcessorScheduledStates(final File flowJsonGz) throws IOException { + private Map getProcessorScheduledStates(final File flowJsonGz) throws IOException { final byte[] decompressed = decompress(flowJsonGz); final ObjectMapper mapper = new ObjectMapper(); final JsonNode root = mapper.readTree(decompressed); - final List states = new ArrayList<>(); + final Map states = new HashMap<>(); final JsonNode rootGroup = root.path("rootGroup"); collectProcessorStates(rootGroup, states); return states; } - private void collectProcessorStates(final JsonNode group, final List states) { + private void collectProcessorStates(final JsonNode group, final Map states) { final JsonNode processors = group.path("processors"); if (processors.isArray()) { for (final JsonNode processor : processors) { + final JsonNode instanceId = processor.path("instanceIdentifier"); final JsonNode scheduledState = processor.path("scheduledState"); - if (!scheduledState.isMissingNode()) { - states.add(scheduledState.asText()); + if (!instanceId.isMissingNode() && !scheduledState.isMissingNode()) { + states.put(instanceId.asText(), scheduledState.asText()); } } } From efeb15da8a15205324a7fc0cf8ac08dfaf588770 Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Fri, 6 Feb 2026 19:52:30 +0100 Subject: [PATCH 3/3] removed non required no-op call --- .../org/apache/nifi/controller/StandardFlowService.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java index 3e15e05dc69f..5c6f7864ae23 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java @@ -374,12 +374,6 @@ public void stop(final boolean force) { logger.warn("Scheduling service did not gracefully shutdown within configured {} second window", gracefulShutdownSeconds); } } - - // Run the save reporting task one final time. At this point, saveHolder should be null - // (cleared above), so this will be a no-op. However, if a save was somehow requested - // between the writeLock release and here, the running check will prevent saving - // incorrect post-shutdown processor states. - saveReportingTask.run(); } @Override