Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.time.Duration;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
Expand Down Expand Up @@ -81,7 +84,7 @@ public class TestClientScannerTimeouts {
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
// Don't report so often so easier to see other rpcs
conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
// conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
Expand Down Expand Up @@ -241,55 +244,52 @@ private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSup

LOG.info(
"Opening scanner, expecting no errors from first next() call from openScanner response");
ResultScanner scanner = scannerSupplier.get();
Result result = scanner.next();
expectRow(ROW0, result);
expectNumTries(0);
try (ResultScanner scanner = scannerSupplier.get()) {
Result result = scanner.next();
expectRow(ROW0, result);
expectNumTries(0);

LOG.info("Making first next() RPC, expecting no errors for seqNo 0");
result = scanner.next();
expectRow(ROW1, result);
expectNumTries(0);
LOG.info("Making first next() RPC, expecting no errors for seqNo 0");
result = scanner.next();
expectRow(ROW1, result);
expectNumTries(0);

LOG.info(
"Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry");
result = scanner.next();
expectRow(ROW2, result);
expectNumTries(1);
LOG.info(
"Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry");
result = scanner.next();
expectRow(ROW2, result);
expectNumTries(1);

// reset so no errors. since last call restarted the scan and following
// call would otherwise fail
RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1;
// reset so no errors. since last call restarted the scan and following
// call would otherwise fail
RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1;

LOG.info("Finishing scan, expecting no errors");
result = scanner.next();
expectRow(ROW3, result);
scanner.close();
LOG.info("Finishing scan, expecting no errors");
result = scanner.next();
expectRow(ROW3, result);
}

// the close operation maybe asynchronous, wait until closed to avoid messing up later
// assertions.
await().atMost(Duration.ofSeconds(5)).until(() -> RSRpcServicesWithScanTimeout.closed);

LOG.info("Testing always throw exception");
byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 };
int i = 0;

// test the case that RPC always throws
scanner = scannerSupplier.get();
RSRpcServicesWithScanTimeout.reset();
RSRpcServicesWithScanTimeout.throwAlways = true;

while (true) {
LOG.info("Calling scanner.next()");
result = scanner.next();
if (result == null) {
break;
} else {
byte[] expectedResult = expectedResults[i++];
expectRow(expectedResult, result);
// test the case that RPC always throws
try (ResultScanner scanner = scannerSupplier.get()) {
for (int i = 0; i < expectedResults.length; i++) {
LOG.info("Calling scanner.next()");
Result result = scanner.next();
assertNotNull(result,
"missing row index=" + i + ", row=" + Bytes.toStringBinary(expectedResults[i]));
expectRow(expectedResults[i], result);
}
}

// ensure we verified all rows. this along with the expectRow check above
// proves that we didn't miss any rows.
assertEquals(expectedResults.length, i, "Expected to exhaust expectedResults array length="
+ expectedResults.length + ", actual index=" + i);

// expect all but the first row (which came from initial openScanner) to have thrown an error
expectNumTries(expectedResults.length - 1);

Expand Down Expand Up @@ -345,6 +345,8 @@ private void expectTimeout(long start, int timeout) {
private ResultScanner getScanner() {
Scan scan = new Scan();
scan.setCaching(1);
// make sure we will not fetch rows in background
scan.setMaxResultSize(1);
try {
return table.getScanner(scan);
} catch (IOException e) {
Expand All @@ -355,6 +357,8 @@ private ResultScanner getScanner() {
private ResultScanner getAsyncScanner() {
Scan scan = new Scan();
scan.setCaching(1);
// make sure we will not fetch rows in background
scan.setMaxResultSize(1);
return asyncTable.getScanner(scan);
}

Expand All @@ -379,16 +383,17 @@ protected RSRpcServices createRpcServices() throws IOException {
private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
private long tableScannerId;

private static long seqNoToThrowOn = -1;
private static boolean throwAlways = false;
private static boolean threw;
private static volatile long seqNoToThrowOn = -1;
private static volatile boolean throwAlways = false;
private static volatile boolean threw;

private static long seqNoToSleepOn = -1;
private static boolean sleepOnOpen = false;
private static volatile long seqNoToSleepOn = -1;
private static volatile boolean sleepOnOpen = false;
private static volatile boolean slept;
private static int tryNumber = 0;
private static volatile int tryNumber = 0;
private static volatile boolean closed = false;

private static int sleepTime = rpcTimeout + 500;
private static volatile int sleepTime = rpcTimeout + 500;

public static void setSleepForTimeout(int timeout) {
sleepTime = timeout + 500;
Expand All @@ -404,6 +409,7 @@ public static void reset() {
sleepOnOpen = false;
slept = false;
tryNumber = 0;
closed = false;
}

public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
Expand All @@ -415,7 +421,11 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
throws ServiceException {
if (request.hasScannerId()) {
ScanResponse scanResponse = super.scan(controller, request);
if (tableScannerId != request.getScannerId() || request.getCloseScanner()) {
if (tableScannerId != request.getScannerId()) {
return scanResponse;
}
if (request.getCloseScanner()) {
closed = true;
return scanResponse;
}

Expand Down
Loading