diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java index 1dea3f36eaca4..b4939ce1213fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java @@ -450,8 +450,8 @@ private void waitIfAllChangelogsCompletelyRead() { tasksAndActionsCondition.await(); } } catch (final InterruptedException ignored) { - // we never interrupt the thread, but only signal the condition - // and hence this exception should never be thrown + Thread.currentThread().interrupt(); + log.warn("State updater thread was interrupted while waiting for changelogs"); } finally { tasksAndActionsLock.unlock(); isIdle.set(false); @@ -878,6 +878,8 @@ public void shutdown(final Duration timeout) { } stateUpdaterThread = null; } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while waiting for state updater thread to shut down"); } } } @@ -955,7 +957,9 @@ public Set drainRestoredActiveTasks(final Duration timeout) { now = time.milliseconds(); } return result; - } catch (final InterruptedException ignored) { + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("StateUpdaterThread was interrupted while waiting", e); } return result; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index fe04f2c4613f2..33f48a6494f39 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -230,7 +230,9 @@ public void maybeWaitForNonEmptyTopology(final Supplier thre log.debug("Detected that the topology is currently empty, waiting for something to process"); version.topologyCV.await(); } catch (final InterruptedException e) { - log.error("StreamThread was interrupted while waiting on empty topology", e); + Thread.currentThread().interrupt(); + log.warn("StreamThread was interrupted while waiting on empty topology", e); + break; } } } finally { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java index 048eacda5b67e..17c2fddc0ba12 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java @@ -54,6 +54,7 @@ public StreamsException exceptionNow() { return new StreamsException(e.getCause()); } } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); return null; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index 943f00dc2e57b..23657850e4172 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -306,7 +306,7 @@ private void resetOffsets(final Set partitionsToReset) throws St log.info("Successfully completed resetting offsets."); break; } catch (final InterruptedException ex) { - ex.printStackTrace(); + Thread.currentThread().interrupt(); log.error("Offset reset failed.", ex); throw new StreamsException(ex); } catch (final ExecutionException ex) { @@ -331,7 +331,9 @@ private void resetOffsets(final Set partitionsToReset) throws St try { Thread.sleep(100); } catch (final InterruptedException ex) { - ex.printStackTrace(); + Thread.currentThread().interrupt(); + log.warn("Interrupted during offset reset retry backoff", ex); + break; } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java index fc7550952be45..dd9ef6baf7f44 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java @@ -117,8 +117,10 @@ private void runOnce(final long nowMs) { if (currentTask == null) { try { taskManager.awaitProcessableTasks(shutdownRequested::get); - } catch (final InterruptedException ignored) { - // Can be ignored, the cause of the interrupted will be handled in the event loop + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + // The event loop will check shutdownRequested on the next iteration + log.warn("TaskExecutorThread was interrupted while waiting", e); } } else { boolean progressed = false; @@ -267,6 +269,8 @@ public void awaitShutdown(final Duration timeout) { } taskExecutorThread = null; } catch (final InterruptedException ignored) { + Thread.currentThread().interrupt(); + log.warn("Interrupted while waiting for task executor thread to shut down"); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java index 53950c1008817..1c5deb7acd180 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java @@ -147,8 +147,7 @@ public void awaitProcessableTasks(final Supplier isShuttingDown) throws log.debug("Not awaiting since shutdown was requested"); } } catch (final InterruptedException ignored) { - // we interrupt the thread for shut down and pause. - // we can ignore this exception. + Thread.currentThread().interrupt(); log.debug("Await unblocked: Interrupted while waiting for processable tasks"); return true; }