From 8d02d1db00d19aaa5893caabec4a9e0ce6c813e8 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Tue, 23 Jun 2026 23:35:14 +0800 Subject: [PATCH] HBASE-30251 Async prefetching causes TestClientScannerTimeouts flaky --- .../client/TestClientScannerTimeouts.java | 100 ++++++++++-------- 1 file changed, 55 insertions(+), 45 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java index fafaf6b4cef9..09e48a1c9f09 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java @@ -19,11 +19,14 @@ import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.io.IOException; +import java.time.Duration; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtil; @@ -81,7 +84,7 @@ public class TestClientScannerTimeouts { public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); // Don't report so often so easier to see other rpcs - conf.setInt("hbase.regionserver.msginterval", 3 * 10000); + // conf.setInt("hbase.regionserver.msginterval", 3 * 10000); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); @@ -241,55 +244,52 @@ private void expectRetryOutOfOrderScannerNext(Supplier scannerSup LOG.info( "Opening scanner, expecting no errors from first next() call from openScanner response"); - ResultScanner scanner = scannerSupplier.get(); - Result result = scanner.next(); - expectRow(ROW0, result); - expectNumTries(0); + try (ResultScanner scanner = scannerSupplier.get()) { + Result result = scanner.next(); + expectRow(ROW0, result); + expectNumTries(0); - LOG.info("Making first next() RPC, expecting no errors for seqNo 0"); - result = scanner.next(); - expectRow(ROW1, result); - expectNumTries(0); + LOG.info("Making first next() RPC, expecting no errors for seqNo 0"); + result = scanner.next(); + expectRow(ROW1, result); + expectNumTries(0); - LOG.info( - "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry"); - result = scanner.next(); - expectRow(ROW2, result); - expectNumTries(1); + LOG.info( + "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry"); + result = scanner.next(); + expectRow(ROW2, result); + expectNumTries(1); - // reset so no errors. since last call restarted the scan and following - // call would otherwise fail - RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1; + // reset so no errors. since last call restarted the scan and following + // call would otherwise fail + RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1; - LOG.info("Finishing scan, expecting no errors"); - result = scanner.next(); - expectRow(ROW3, result); - scanner.close(); + LOG.info("Finishing scan, expecting no errors"); + result = scanner.next(); + expectRow(ROW3, result); + } + + // the close operation maybe asynchronous, wait until closed to avoid messing up later + // assertions. + await().atMost(Duration.ofSeconds(5)).until(() -> RSRpcServicesWithScanTimeout.closed); LOG.info("Testing always throw exception"); byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 }; - int i = 0; - // test the case that RPC always throws - scanner = scannerSupplier.get(); + RSRpcServicesWithScanTimeout.reset(); RSRpcServicesWithScanTimeout.throwAlways = true; - while (true) { - LOG.info("Calling scanner.next()"); - result = scanner.next(); - if (result == null) { - break; - } else { - byte[] expectedResult = expectedResults[i++]; - expectRow(expectedResult, result); + // test the case that RPC always throws + try (ResultScanner scanner = scannerSupplier.get()) { + for (int i = 0; i < expectedResults.length; i++) { + LOG.info("Calling scanner.next()"); + Result result = scanner.next(); + assertNotNull(result, + "missing row index=" + i + ", row=" + Bytes.toStringBinary(expectedResults[i])); + expectRow(expectedResults[i], result); } } - // ensure we verified all rows. this along with the expectRow check above - // proves that we didn't miss any rows. - assertEquals(expectedResults.length, i, "Expected to exhaust expectedResults array length=" - + expectedResults.length + ", actual index=" + i); - // expect all but the first row (which came from initial openScanner) to have thrown an error expectNumTries(expectedResults.length - 1); @@ -345,6 +345,8 @@ private void expectTimeout(long start, int timeout) { private ResultScanner getScanner() { Scan scan = new Scan(); scan.setCaching(1); + // make sure we will not fetch rows in background + scan.setMaxResultSize(1); try { return table.getScanner(scan); } catch (IOException e) { @@ -355,6 +357,8 @@ private ResultScanner getScanner() { private ResultScanner getAsyncScanner() { Scan scan = new Scan(); scan.setCaching(1); + // make sure we will not fetch rows in background + scan.setMaxResultSize(1); return asyncTable.getScanner(scan); } @@ -379,16 +383,17 @@ protected RSRpcServices createRpcServices() throws IOException { private static class RSRpcServicesWithScanTimeout extends RSRpcServices { private long tableScannerId; - private static long seqNoToThrowOn = -1; - private static boolean throwAlways = false; - private static boolean threw; + private static volatile long seqNoToThrowOn = -1; + private static volatile boolean throwAlways = false; + private static volatile boolean threw; - private static long seqNoToSleepOn = -1; - private static boolean sleepOnOpen = false; + private static volatile long seqNoToSleepOn = -1; + private static volatile boolean sleepOnOpen = false; private static volatile boolean slept; - private static int tryNumber = 0; + private static volatile int tryNumber = 0; + private static volatile boolean closed = false; - private static int sleepTime = rpcTimeout + 500; + private static volatile int sleepTime = rpcTimeout + 500; public static void setSleepForTimeout(int timeout) { sleepTime = timeout + 500; @@ -404,6 +409,7 @@ public static void reset() { sleepOnOpen = false; slept = false; tryNumber = 0; + closed = false; } public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException { @@ -415,7 +421,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque throws ServiceException { if (request.hasScannerId()) { ScanResponse scanResponse = super.scan(controller, request); - if (tableScannerId != request.getScannerId() || request.getCloseScanner()) { + if (tableScannerId != request.getScannerId()) { + return scanResponse; + } + if (request.getCloseScanner()) { + closed = true; return scanResponse; }