KAFKA-20403 : streams - Fix stream threads interruptions#21970
KAFKA-20403 : streams - Fix stream threads interruptions#21970muralibasani wants to merge 6 commits intoapache:trunkfrom
Conversation
| version.topologyCV.await(); | ||
| } catch (final InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| log.error("StreamThread was interrupted while waiting on empty topology", e); |
| } | ||
| return result; | ||
| } catch (final InterruptedException ignored) { | ||
| Thread.currentThread().interrupt(); |
| Thread.currentThread().interrupt(); | ||
| // we interrupt the thread for shut down and pause. | ||
| // we can ignore this exception. | ||
| log.debug("Await unblocked: Interrupted while waiting for processable tasks"); |
There was a problem hiding this comment.
Here we could keep debug as it is expected to shutdown or pause ?
There was a problem hiding this comment.
Digging into the code, I am actually not sure when we would interrupt a thread, so I am a little bit unsure about the existing comment... we interrupt the thread for shut down and pause
There was a problem hiding this comment.
That looks like a pre-existing comment. I have removed it.
And for the await unblock, as it is not an error, log.debug is better than log.warn ?
...ms/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutor.java
Show resolved
Hide resolved
|
@mjsax thankyou for the review. Pushed changes. |
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
Show resolved
Hide resolved
| @@ -147,8 +147,8 @@ public void awaitProcessableTasks(final Supplier<Boolean> isShuttingDown) throws | |||
| log.debug("Not awaiting since shutdown was requested"); | |||
| } | |||
| } catch (final InterruptedException ignored) { | |||
There was a problem hiding this comment.
Consider handling InterruptedException in waitIfAllChangelogsCompletelyRead the same way as the other sites: Thread.currentThread().interrupt() (+ optional warn).
There was a problem hiding this comment.
That path (waitIfAllChangelogsCompletelyRead) is never triggered, which was mentioned in the description. That being a private method, adding interrupt would become a tight loop and need to add break which is unnecessary imo. Prefer to ignore that.
There was a problem hiding this comment.
Not sure if I follow the argument?
In the end, KS code does not call interrupt() by itself. Following this argument, InterruptedException should not needed to be handled anywhere, because it should never happen. It's still good to add a defensive guard as proposed on this PR for it.
I was digging around a little bit, and found other existing code doing:
} catch (final InterruptedException fatalException) {
// this should not happen; if it ever happens it indicate a bug
Thread.currentThread().interrupt();
log.error(INTERRUPTED_ERROR_MESSAGE, fatalException);
throw new IllegalStateException(INTERRUPTED_ERROR_MESSAGE, fatalException);
}
So maybe we should follow this pattern everywhere (maybe except in shutdown code path)?
Or, considering the comment below:
// we interrupt the thread for shut down and pause.
We might want to always handle InterruptedException gracefully and remove existing code that treats it as fatal?
There was a problem hiding this comment.
Agree we should still handle interrupts defensively here.
For waitIfAllChangelogsCompletelyRead, IMHO graceful handling (restore interrupt + leave the await() loop) is preferable over the fatal IllegalStateException style. Shutdown already uses isRunning + signalAll(), and this is an idle condition wait, not a “must complete this step” path.
There was a problem hiding this comment.
Agree with being consistent. Updated with interrupt here
| Thread.currentThread().interrupt(); | ||
| log.warn("Interrupted while waiting for tasks {} to be locked", | ||
| ids.stream().map(TaskId::toString).collect(Collectors.joining(","))); | ||
| break; |
There was a problem hiding this comment.
One concern on TaskManager.maybeLockTasks: exiting the loop with a break after InterruptedException means the method returns without the lock having been acquired, but every caller still continues as if it had (handleCorruption, handleAssignment, handleRevocation, closeRunningTasksDirty, closeAndCleanUpTasks, commit, etc.). When schedulingTaskManager != null, that can allow the stream thread to do commit/suspend/close work while task executors might still be running the same tasks.
Could we Thread.currentThread().interrupt() as you have, but then fail the operation instead of falling through (e.g. throw new StreamsException(..., e) or another unchecked error that fits this module) rather than break? That keeps the “don’t swallow interrupt” fix without proceeding into the locked critical section without a lock.
There was a problem hiding this comment.
Good point. I just reverted the interrupt which I introduced.
Instead of even throwing (which is not handled by a few other callers).
I think it is better to ignore and get back to previous state.
https://issues.apache.org/jira/browse/KAFKA-20403
As per the ticket, adding thread interrupt together with log.warn.