-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[SessionPool] Fix graceful shutdown by notifying waiting threads in close() #17381
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -74,8 +74,11 @@ | |||||||
| import java.util.List; | ||||||||
| import java.util.Map; | ||||||||
| import java.util.concurrent.ConcurrentLinkedDeque; | ||||||||
| import java.util.concurrent.CountDownLatch; | ||||||||
| import java.util.concurrent.TimeUnit; | ||||||||
|
|
||||||||
| import static org.junit.Assert.assertEquals; | ||||||||
| import static org.junit.Assert.assertNotNull; | ||||||||
| import static org.junit.Assert.assertTrue; | ||||||||
| import static org.mockito.ArgumentMatchers.any; | ||||||||
| import static org.mockito.ArgumentMatchers.eq; | ||||||||
|
|
@@ -1623,4 +1626,69 @@ private List<ByteBuffer> FakedFirstFetchTsBlockResult() { | |||||||
|
|
||||||||
| return Collections.singletonList(tsBlock); | ||||||||
| } | ||||||||
|
|
||||||||
| // Regression test for graceful shutdown | ||||||||
| @Test(timeout = 5000) | ||||||||
| public void testCloseNotifiesWaitingThreads() throws Exception { | ||||||||
| SessionPool pool = | ||||||||
| new SessionPool.Builder() | ||||||||
| .host("localhost") | ||||||||
| .port(6667) | ||||||||
| .user("root") | ||||||||
| .password("root") | ||||||||
| .maxSize(1) | ||||||||
| .waitToGetSessionTimeoutInMs(10000) | ||||||||
| .build(); | ||||||||
|
|
||||||||
| try { | ||||||||
| Session mockSession = Mockito.mock(Session.class); | ||||||||
| ConcurrentLinkedDeque<ISession> queue = | ||||||||
| (ConcurrentLinkedDeque<ISession>) Whitebox.getInternalState(pool, "queue"); | ||||||||
| queue.push(mockSession); | ||||||||
| Whitebox.setInternalState(pool, "size", 1); | ||||||||
|
|
||||||||
| ISession occupiedSession = (ISession) Whitebox.invokeMethod(pool, "getSession"); | ||||||||
| assertEquals(mockSession, occupiedSession); | ||||||||
| assertEquals(0, queue.size()); | ||||||||
|
|
||||||||
| final Exception[] caughtException = {null}; | ||||||||
| CountDownLatch latch = new CountDownLatch(1); | ||||||||
|
|
||||||||
| Thread waiterThread = | ||||||||
| new Thread( | ||||||||
| () -> { | ||||||||
| try { | ||||||||
| latch.countDown(); | ||||||||
| Whitebox.invokeMethod(pool, "getSession"); | ||||||||
| } catch (Exception e) { | ||||||||
| caughtException[0] = e; | ||||||||
| } | ||||||||
| }); | ||||||||
| waiterThread.start(); | ||||||||
|
|
||||||||
| assertTrue("Waiter thread should have started", latch.await(10, TimeUnit.SECONDS)); | ||||||||
| // Give it a moment to enter the wait(1000) block in getSession() | ||||||||
| Thread.sleep(200); | ||||||||
|
|
||||||||
| pool.close(); | ||||||||
|
|
||||||||
| waiterThread.join(500); | ||||||||
| assertTrue("Waiter thread should be unblocked quickly", !waiterThread.isAlive()); | ||||||||
|
|
||||||||
|
Comment on lines
+1657
to
+1677
|
||||||||
| assertNotNull("Waiter thread should have caught an exception", caughtException[0]); | ||||||||
| assertTrue( | ||||||||
| "Exception should be IoTDBConnectionException", | ||||||||
| caughtException[0] instanceof IoTDBConnectionException); | ||||||||
| assertTrue( | ||||||||
| "Exception message should indicate pool is closed", | ||||||||
| caughtException[0].getMessage().contains("closed")); | ||||||||
|
|
||||||||
| } finally { | ||||||||
| try { | ||||||||
| pool.close(); | ||||||||
| } catch (Exception e) { | ||||||||
|
||||||||
| } catch (Exception e) { | |
| } catch (Exception e) { | |
| // ignore: best-effort cleanup in test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR description says waiting threads could remain blocked for “up to the full timeout period”, but
getSession()currently useswait(1000), so the pre-fix worst-case delay appears to be ~1s rather thanwaitToGetSessionTimeoutInMs. Consider updating the description (or the implementation, if full-timeout blocking is still expected) so the rationale matches the current code.