Skip to content

Commit 5c244d7

Browse files
committed
shutdown correctness
1 parent cc0c143 commit 5c244d7

File tree

3 files changed

+72
-1
lines changed

3 files changed

+72
-1
lines changed

iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -788,7 +788,7 @@ private void occupy(ISession session) {
788788
occupied.put(session, session);
789789
}
790790

791-
/** close all connections in the pool */
791+
/** close all connections in the pool and unblocks any waiting threads*/
792792
@Override
793793
public synchronized void close() {
794794
for (ISession session : queue) {
@@ -819,6 +819,8 @@ public synchronized void close() {
819819
this.closed = true;
820820
queue.clear();
821821
occupied.clear();
822+
// Notify all waiting threads in getSession() so they wake up immediately
823+
this.notifyAll();
822824
}
823825

824826
@Override

iotdb-client/session/src/main/java/org/apache/iotdb/session/pool/TableSessionPool.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public ITableSession getSession() throws IoTDBConnectionException {
3636
return sessionPool.getPooledTableSession();
3737
}
3838

39+
//Closes the underlying session pool and unblocks any waiting threads.
3940
@Override
4041
public void close() {
4142
this.sessionPool.close();

iotdb-client/session/src/test/java/org/apache/iotdb/session/pool/SessionPoolTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import java.util.concurrent.ConcurrentLinkedDeque;
7777

7878
import static org.junit.Assert.assertEquals;
79+
import static org.junit.Assert.assertNotNull;
7980
import static org.junit.Assert.assertTrue;
8081
import static org.mockito.ArgumentMatchers.any;
8182
import static org.mockito.ArgumentMatchers.eq;
@@ -1623,4 +1624,71 @@ private List<ByteBuffer> FakedFirstFetchTsBlockResult() {
16231624

16241625
return Collections.singletonList(tsBlock);
16251626
}
1627+
1628+
// Regression test for graceful shutdown
1629+
@Test(timeout = 5000)
1630+
public void testCloseNotifiesWaitingThreads() throws Exception {
1631+
SessionPool pool =
1632+
new SessionPool.Builder()
1633+
.host("localhost")
1634+
.port(6667)
1635+
.user("root")
1636+
.password("root")
1637+
.maxSize(1)
1638+
.waitToGetSessionTimeoutInMs(10000)
1639+
// notifyAll()
1640+
.build();
1641+
1642+
try {
1643+
Session mockSession = Mockito.mock(Session.class);
1644+
ConcurrentLinkedDeque<ISession> queue =
1645+
(ConcurrentLinkedDeque<ISession>) Whitebox.getInternalState(pool, "queue");
1646+
queue.push(mockSession);
1647+
Whitebox.setInternalState(pool, "size", 1);
1648+
1649+
ISession occupiedSession = (ISession) Whitebox.invokeMethod(pool, "getSession");
1650+
assertEquals(mockSession, occupiedSession);
1651+
assertEquals(0, queue.size());
1652+
1653+
final Exception[] caughtException = {null};
1654+
1655+
Thread waiterThread =
1656+
new Thread(
1657+
() -> {
1658+
try {
1659+
Whitebox.invokeMethod(pool, "getSession");
1660+
} catch (Exception e) {
1661+
caughtException[0] = e;
1662+
}
1663+
});
1664+
waiterThread.start();
1665+
1666+
Thread.sleep(100);
1667+
1668+
long closeStartTime = System.currentTimeMillis();
1669+
pool.close();
1670+
long closeEndTime = System.currentTimeMillis();
1671+
1672+
waiterThread.join(1000);
1673+
1674+
assertNotNull("Waiter thread should have caught an exception", caughtException[0]);
1675+
assertTrue(
1676+
"Exception should be IoTDBConnectionException",
1677+
caughtException[0] instanceof IoTDBConnectionException);
1678+
assertTrue(
1679+
"Exception message should indicate pool is closed",
1680+
caughtException[0].getMessage().contains("closed"));
1681+
1682+
long closeDuration = closeEndTime - closeStartTime;
1683+
assertTrue(
1684+
"close() should complete quickly, but took " + closeDuration + "ms",
1685+
closeDuration < 1000);
1686+
1687+
} finally {
1688+
try {
1689+
pool.close();
1690+
} catch (Exception e) {
1691+
}
1692+
}
1693+
}
16261694
}

0 commit comments

Comments
 (0)