From d5ce5c38a8c9210d26cee4c726adfe29de296fa3 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Sat, 4 Apr 2026 21:56:30 +0200 Subject: [PATCH 1/6] Fix stream threads interruptions --- .../streams/processor/internals/DefaultStateUpdater.java | 3 +++ .../kafka/streams/processor/internals/TaskManager.java | 2 ++ .../kafka/streams/processor/internals/TopologyMetadata.java | 1 + .../internals/namedtopology/AddNamedTopologyResult.java | 1 + .../namedtopology/KafkaStreamsNamedTopologyWrapper.java | 6 ++++-- .../processor/internals/tasks/DefaultTaskExecutor.java | 5 ++++- .../processor/internals/tasks/DefaultTaskManager.java | 2 +- 7 files changed, 16 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 1dea3f36eaca4..a7d252b0e3e1c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -878,6 +878,8 @@ public void shutdown(final Duration timeout) { } stateUpdaterThread = null; } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while waiting for state updater thread to shut down"); } } } @@ -956,6 +958,7 @@ public Set drainRestoredActiveTasks(final Duration timeout) { } return result; } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); } return result; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 1657b3a414b92..be42e56c01157 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1786,8 +1786,10 @@ private void maybeLockTasks(final Set ids) { schedulingTaskManager.lockTasks(ids).get(); locked = true; } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); log.warn("Interrupted while waiting for tasks {} to be locked", ids.stream().map(TaskId::toString).collect(Collectors.joining(","))); + break; } catch (final ExecutionException e) { log.info("Failed to lock tasks"); throw new RuntimeException(e); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index fe04f2c4613f2..dcccb67f80f3b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -230,6 +230,7 @@ public void maybeWaitForNonEmptyTopology(final Supplier thre log.debug("Detected that the topology is currently empty, waiting for something to process"); version.topologyCV.await(); } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); log.error("StreamThread was interrupted while waiting on empty topology", e); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java index 048eacda5b67e..17c2fddc0ba12 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java @@ -54,6 +54,7 @@ public StreamsException exceptionNow() { return new StreamsException(e.getCause()); } } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); return null; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index 943f00dc2e57b..23657850e4172 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -306,7 +306,7 @@ private void resetOffsets(final Set partitionsToReset) throws St log.info("Successfully completed resetting offsets."); break; } catch (final InterruptedException ex) { - ex.printStackTrace(); + Thread.currentThread().interrupt(); log.error("Offset reset failed.", ex); throw new StreamsException(ex); } catch (final ExecutionException ex) { @@ -331,7 +331,9 @@ private void resetOffsets(final Set partitionsToReset) throws St try { Thread.sleep(100); } catch (final InterruptedException ex) { - ex.printStackTrace(); + Thread.currentThread().interrupt(); + log.warn("Interrupted during offset reset retry backoff", ex); + break; } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java index fc7550952be45..8869d9923dd66 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java @@ -118,7 +118,8 @@ private void runOnce(final long nowMs) { try { taskManager.awaitProcessableTasks(shutdownRequested::get); } catch (final InterruptedException ignored) { - // Can be ignored, the cause of the interrupted will be handled in the event loop + Thread.currentThread().interrupt(); + // The event loop will check shutdownRequested on the next iteration } } else { boolean progressed = false; @@ -267,6 +268,8 @@ public void awaitShutdown(final Duration timeout) { } taskExecutorThread = null; } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while waiting for task executor thread to shut down"); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java index 53950c1008817..cde42f9ccd4c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java @@ -147,8 +147,8 @@ public void awaitProcessableTasks(final Supplier isShuttingDown) throws log.debug("Not awaiting since shutdown was requested"); } } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); // we interrupt the thread for shut down and pause. - // we can ignore this exception. log.debug("Await unblocked: Interrupted while waiting for processable tasks"); return true; } From db8d98c8fa886ad3145f663337cb27898b04d45f Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Sun, 5 Apr 2026 10:51:25 +0200 Subject: [PATCH 2/6] Adding log messages --- .../kafka/streams/processor/internals/DefaultStateUpdater.java | 3 ++- .../kafka/streams/processor/internals/TopologyMetadata.java | 2 +- .../streams/processor/internals/tasks/DefaultTaskExecutor.java | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index a7d252b0e3e1c..527a2283d1e15 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -957,8 +957,9 @@ public Set drainRestoredActiveTasks(final Duration timeout) { now = time.milliseconds(); } return result; - } catch (final InterruptedException ignored) { + } catch (final InterruptedException e) { Thread.currentThread().interrupt(); + log.warn("StateUpdaterThread was interrupted while waiting", e); } return result; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index dcccb67f80f3b..a05e5a084ee17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -231,7 +231,7 @@ public void maybeWaitForNonEmptyTopology(final Supplier thre version.topologyCV.await(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); - log.error("StreamThread was interrupted while waiting on empty topology", e); + log.warn("StreamThread was interrupted while waiting on empty topology", e); } } } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java index 8869d9923dd66..dd9ef6baf7f44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java @@ -117,9 +117,10 @@ private void runOnce(final long nowMs) { if (currentTask == null) { try { taskManager.awaitProcessableTasks(shutdownRequested::get); - } catch (final InterruptedException ignored) { + } catch (final InterruptedException e) { Thread.currentThread().interrupt(); // The event loop will check shutdownRequested on the next iteration + log.warn("TaskExecutorThread was interrupted while waiting", e); } } else { boolean progressed = false; From ec7d231d6922db00022b3e61df1a2b4b06d5f34f Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Mon, 6 Apr 2026 10:58:37 +0200 Subject: [PATCH 3/6] Adding break statement --- .../kafka/streams/processor/internals/TopologyMetadata.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index a05e5a084ee17..33f48a6494f39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -232,6 +232,7 @@ public void maybeWaitForNonEmptyTopology(final Supplier thre } catch (final InterruptedException e) { Thread.currentThread().interrupt(); log.warn("StreamThread was interrupted while waiting on empty topology", e); + break; } } } finally { From aeb99604aed11e2157dadc4739e1b01c194ef5bb Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Mon, 6 Apr 2026 18:10:35 +0200 Subject: [PATCH 4/6] From review comments --- .../apache/kafka/streams/processor/internals/TaskManager.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index be42e56c01157..1657b3a414b92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1786,10 +1786,8 @@ private void maybeLockTasks(final Set ids) { schedulingTaskManager.lockTasks(ids).get(); locked = true; } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); log.warn("Interrupted while waiting for tasks {} to be locked", ids.stream().map(TaskId::toString).collect(Collectors.joining(","))); - break; } catch (final ExecutionException e) { log.info("Failed to lock tasks"); throw new RuntimeException(e); From cf989bffbbc9699f643bf557484e13bd576b84a7 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Tue, 7 Apr 2026 20:10:40 +0200 Subject: [PATCH 5/6] Remove comment --- .../streams/processor/internals/tasks/DefaultTaskManager.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java index cde42f9ccd4c0..1c5deb7acd180 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java @@ -148,7 +148,6 @@ public void awaitProcessableTasks(final Supplier isShuttingDown) throws } } catch (final InterruptedException ignored) { Thread.currentThread().interrupt(); - // we interrupt the thread for shut down and pause. log.debug("Await unblocked: Interrupted while waiting for processable tasks"); return true; } From 1d7df45f836a1fe7902bac71b8a58d72b52de334 Mon Sep 17 00:00:00 2001 From: Muralidhar Basani Date: Tue, 7 Apr 2026 20:16:23 +0200 Subject: [PATCH 6/6] Update other interrupt block from review --- .../streams/processor/internals/DefaultStateUpdater.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 527a2283d1e15..b4939ce1213fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -450,8 +450,8 @@ private void waitIfAllChangelogsCompletelyRead() { tasksAndActionsCondition.await(); } } catch (final InterruptedException ignored) { - // we never interrupt the thread, but only signal the condition - // and hence this exception should never be thrown + Thread.currentThread().interrupt(); + log.warn("State updater thread was interrupted while waiting for changelogs"); } finally { tasksAndActionsLock.unlock(); isIdle.set(false);