Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,8 @@ private void waitIfAllChangelogsCompletelyRead() {
tasksAndActionsCondition.await();
}
} catch (final InterruptedException ignored) {
// we never interrupt the thread, but only signal the condition
// and hence this exception should never be thrown
Thread.currentThread().interrupt();
log.warn("State updater thread was interrupted while waiting for changelogs");
} finally {
tasksAndActionsLock.unlock();
isIdle.set(false);
Expand Down Expand Up @@ -878,6 +878,8 @@ public void shutdown(final Duration timeout) {
}
stateUpdaterThread = null;
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
log.warn("Interrupted while waiting for state updater thread to shut down");
}
}
}
Expand Down Expand Up @@ -955,7 +957,9 @@ public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
now = time.milliseconds();
}
return result;
} catch (final InterruptedException ignored) {
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a warn-log?

log.warn("StateUpdaterThread was interrupted while waiting", e);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ public void maybeWaitForNonEmptyTopology(final Supplier<StreamThread.State> thre
log.debug("Detected that the topology is currently empty, waiting for something to process");
version.topologyCV.await();
} catch (final InterruptedException e) {
log.error("StreamThread was interrupted while waiting on empty topology", e);
Thread.currentThread().interrupt();
log.warn("StreamThread was interrupted while waiting on empty topology", e);
break;
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public StreamsException exceptionNow() {
return new StreamsException(e.getCause());
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private void resetOffsets(final Set<TopicPartition> partitionsToReset) throws St
log.info("Successfully completed resetting offsets.");
break;
} catch (final InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
log.error("Offset reset failed.", ex);
throw new StreamsException(ex);
} catch (final ExecutionException ex) {
Expand All @@ -331,7 +331,9 @@ private void resetOffsets(final Set<TopicPartition> partitionsToReset) throws St
try {
Thread.sleep(100);
} catch (final InterruptedException ex) {
ex.printStackTrace();
Thread.currentThread().interrupt();
log.warn("Interrupted during offset reset retry backoff", ex);
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,10 @@ private void runOnce(final long nowMs) {
if (currentTask == null) {
try {
taskManager.awaitProcessableTasks(shutdownRequested::get);
} catch (final InterruptedException ignored) {
// Can be ignored, the cause of the interrupted will be handled in the event loop
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
// The event loop will check shutdownRequested on the next iteration
log.warn("TaskExecutorThread was interrupted while waiting", e);
}
} else {
boolean progressed = false;
Expand Down Expand Up @@ -267,6 +269,8 @@ public void awaitShutdown(final Duration timeout) {
}
taskExecutorThread = null;
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
log.warn("Interrupted while waiting for task executor thread to shut down");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ public void awaitProcessableTasks(final Supplier<Boolean> isShuttingDown) throws
log.debug("Not awaiting since shutdown was requested");
}
} catch (final InterruptedException ignored) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider handling InterruptedException in waitIfAllChangelogsCompletelyRead the same way as the other sites: Thread.currentThread().interrupt() (+ optional warn).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That path (waitIfAllChangelogsCompletelyRead) is never triggered, which was mentioned in the description. That being a private method, adding interrupt would become a tight loop and need to add break which is unnecessary imo. Prefer to ignore that.

Copy link
Copy Markdown
Member

@mjsax mjsax Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I follow the argument?

In the end, KS code does not call interrupt() by itself. Following this argument, InterruptedException should not needed to be handled anywhere, because it should never happen. It's still good to add a defensive guard as proposed on this PR for it.

I was digging around a little bit, and found other existing code doing:

} catch (final InterruptedException fatalException) {
    // this should not happen; if it ever happens it indicate a bug
    Thread.currentThread().interrupt();
    log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
    throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
}

So maybe we should follow this pattern everywhere (maybe except in shutdown code path)?

Or, considering the comment below:

// we interrupt the thread for shut down and pause.

We might want to always handle InterruptedException gracefully and remove existing code that treats it as fatal?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree we should still handle interrupts defensively here.

For waitIfAllChangelogsCompletelyRead, IMHO graceful handling (restore interrupt + leave the await() loop) is preferable over the fatal IllegalStateException style. Shutdown already uses isRunning + signalAll(), and this is an idle condition wait, not a “must complete this step” path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with being consistent. Updated with interrupt here

// we interrupt the thread for shut down and pause.
// we can ignore this exception.
Thread.currentThread().interrupt();
log.debug("Await unblocked: Interrupted while waiting for processable tasks");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be a warn-log?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we could keep debug as it is expected to shutdown or pause ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Digging into the code, I am actually not sure when we would interrupt a thread, so I am a little bit unsure about the existing comment... we interrupt the thread for shut down and pause

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks like a pre-existing comment. I have removed it.

And for the await unblock, as it is not an error, log.debug is better than log.warn ?

return true;
}
Expand Down
Loading