diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java index dee517c2e0af..0be9ef80b61c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java @@ -44,7 +44,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { private LinkedBlockingDeque queue; // so we can calculate actual threshold to switch to LIFO under load - private int maxCapacity; + private volatile int currentQueueLimit; // metrics (shared across all queues) private LongAdder numGeneralCallsDropped; @@ -71,14 +71,15 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { private AtomicBoolean isOverloaded = new AtomicBoolean(false); public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval, - double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) { - this.maxCapacity = capacity; + double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches, + int currentQueueLimit) { this.queue = new LinkedBlockingDeque<>(capacity); this.codelTargetDelay = targetDelay; this.codelInterval = interval; this.lifoThreshold = lifoThreshold; this.numGeneralCallsDropped = numGeneralCallsDropped; this.numLifoModeSwitches = numLifoModeSwitches; + this.currentQueueLimit = currentQueueLimit; } /** @@ -86,12 +87,14 @@ public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval, * @param newCodelTargetDelay new CoDel target delay * @param newCodelInterval new CoDel interval * @param newLifoThreshold new Adaptive Lifo threshold + * @param currentQueueLimit new limit of queue */ - public void updateTunables(int newCodelTargetDelay, int newCodelInterval, - double newLifoThreshold) { + public void updateTunables(int newCodelTargetDelay, int newCodelInterval, double newLifoThreshold, + int currentQueueLimit) { this.codelTargetDelay = newCodelTargetDelay; this.codelInterval = newCodelInterval; this.lifoThreshold = newLifoThreshold; + this.currentQueueLimit = currentQueueLimit; } /** @@ -104,7 +107,7 @@ public void updateTunables(int newCodelTargetDelay, int newCodelInterval, public CallRunner take() throws InterruptedException { CallRunner cr; while (true) { - if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { + if (((double) queue.size() / this.currentQueueLimit) > lifoThreshold) { numLifoModeSwitches.increment(); cr = queue.takeLast(); } else { @@ -124,7 +127,7 @@ public CallRunner poll() { CallRunner cr; boolean switched = false; while (true) { - if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { + if (((double) queue.size() / this.currentQueueLimit) > lifoThreshold) { // Only count once per switch. if (!switched) { switched = true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index eb1eda1f9f49..aaccd2be042b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -107,6 +107,7 @@ public abstract class RpcExecutor { private final Class queueClass; private final Object[] queueInitArgs; + // this is soft limit of the queue, while initializing we will use hard limit as the size of queue protected volatile int currentQueueLimit; private final AtomicInteger activeHandlerCount = new AtomicInteger(0); @@ -161,11 +162,13 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ int handlerCountPerQueue = this.handlerCount / this.numCallQueues; maxQueueLength = handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; } + currentQueueLimit = maxQueueLength; + int queueHardLimit = Math.max(maxQueueLength, DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); if (isDeadlineQueueType(callQueueType)) { this.name += ".Deadline"; this.queueInitArgs = - new Object[] { maxQueueLength, new CallPriorityComparator(conf, priority) }; + new Object[] { queueHardLimit, new CallPriorityComparator(conf, priority) }; this.queueClass = BoundedPriorityBlockingQueue.class; } else if (isCodelQueueType(callQueueType)) { this.name += ".Codel"; @@ -174,8 +177,8 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL); double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD); - this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval, - codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches }; + this.queueInitArgs = new Object[] { queueHardLimit, codelTargetDelay, codelInterval, + codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches, currentQueueLimit }; this.queueClass = AdaptiveLifoCoDelCallQueue.class; } else if (isPluggableQueueType(callQueueType)) { Optional>> pluggableQueueClass = @@ -185,12 +188,12 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ throw new PluggableRpcQueueNotFound( "Pluggable call queue failed to load and selected call" + " queue type required"); } else { - this.queueInitArgs = new Object[] { maxQueueLength, priority, conf }; + this.queueInitArgs = new Object[] { queueHardLimit, priority, conf }; this.queueClass = pluggableQueueClass.get(); } } else { this.name += ".Fifo"; - this.queueInitArgs = new Object[] { maxQueueLength }; + this.queueInitArgs = new Object[] { queueHardLimit }; this.queueClass = LinkedBlockingQueue.class; } @@ -231,20 +234,7 @@ public Map getCallQueueSizeSummary() { .collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond))); } - // This method can only be called ONCE per executor instance. - // Before calling: queueInitArgs[0] contains the soft limit (desired queue capacity) - // After calling: queueInitArgs[0] is set to hard limit and currentQueueLimit stores the original - // soft limit. - // Multiple calls would incorrectly use the hard limit as the soft limit. - // As all the queues has same initArgs and queueClass, there should be no need to call this again. protected void initializeQueues(final int numQueues) { - if (!queues.isEmpty()) { - throw new RuntimeException("Queues are already initialized"); - } - if (queueInitArgs.length > 0) { - currentQueueLimit = (int) queueInitArgs[0]; - queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); - } for (int i = 0; i < numQueues; ++i) { queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs)); } @@ -479,8 +469,10 @@ public void onConfigurationChange(Configuration conf) { for (BlockingQueue queue : queues) { if (queue instanceof AdaptiveLifoCoDelCallQueue) { + // current queue Limit for executor is already updated as part of resizeQueues, we need to + // let codel queue also make aware of it ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval, - codelLifoThreshold); + codelLifoThreshold, currentQueueLimit); } else if (queue instanceof ConfigurationObserver) { ((ConfigurationObserver) queue).onConfigurationChange(conf); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index eed7d98d7358..baf70c982c9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.ipc; +import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -34,6 +35,7 @@ import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -41,6 +43,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -814,4 +817,166 @@ public void drop() { } }; } + + /** + * Test LIFO switching behavior through actual RPC calls. This test verifies that when the queue + * fills beyond the LIFO threshold, newer calls are processed before older calls (LIFO mode). + */ + @Test + public void testCoDelLifoWithRpcCalls() throws Exception { + Configuration testConf = HBaseConfiguration.create(); + testConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, + RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); + int maxCallQueueLength = 50; + double codelLifoThreshold = 0.8; + testConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, maxCallQueueLength); + testConf.setDouble(RpcExecutor.CALL_QUEUE_CODEL_LIFO_THRESHOLD, codelLifoThreshold); + testConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_TARGET_DELAY, 100); + testConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_INTERVAL, 100); + testConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); // Single handler to control + // processing + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); + SimpleRpcScheduler scheduler = + new SimpleRpcScheduler(testConf, 1, 0, 0, priority, HConstants.QOS_THRESHOLD); + + try { + scheduler.init(CONTEXT); + scheduler.start(); + + // Track completion order + final List completedCalls = Collections.synchronizedList(new ArrayList<>()); + + // Dispatch many slow calls rapidly to fill the queue beyond 80% threshold + // With queue limit of 50, we need > 40 calls to cross 80% + int numCalls = 48; + for (int i = 0; i < numCalls; i++) { + final int callId = i; + CallRunner call = createMockTask(HConstants.NORMAL_QOS); + call.setStatus(new MonitoredRPCHandlerImpl("test")); + doAnswer(invocation -> { + completedCalls.add(callId); + Thread.sleep(100); // Slow processing to allow queue to build up + return null; + }).when(call).run(); + scheduler.dispatch(call); + // No delay between dispatches - rapidly fill the queue + } + + // Wait for some calls to complete + await().atMost(1, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 2); + + // Check that we had LIFO switches + long lifoSwitches = scheduler.getNumLifoModeSwitches(); + assertTrue("Should have switched to LIFO mode at least once, but got: " + lifoSwitches, + lifoSwitches > 0); + + // Verify LIFO behavior: Among first completed calls, we should see higher call IDs + // (indicating later dispatched calls completed first) + int maxCallIdCompleted = -1; + for (int i = 0; i < 5 && i < completedCalls.size(); i++) { + maxCallIdCompleted = Math.max(maxCallIdCompleted, completedCalls.get(i)); + } + // At least one of the early completed calls should have a high ID (>20) + // indicating LIFO processing + assertTrue( + "Expected LIFO behavior: early completed calls should include call arrived after threshold " + + "maxCallIdCompleted: " + maxCallIdCompleted, + maxCallIdCompleted > maxCallQueueLength * codelLifoThreshold); + + } finally { + scheduler.stop(); + } + } + + /** + * Test that CoDel queue returns to FIFO mode after draining below threshold. + */ + @Test + public void testCoDelQueueDrainAndFifoReturn() throws Exception { + Configuration testConf = HBaseConfiguration.create(); + testConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, + RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); + testConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 50); + testConf.setDouble(RpcExecutor.CALL_QUEUE_CODEL_LIFO_THRESHOLD, 0.8); + testConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2); + + PriorityFunction priority = mock(PriorityFunction.class); + when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS); + SimpleRpcScheduler scheduler = + new SimpleRpcScheduler(testConf, 2, 0, 0, priority, HConstants.QOS_THRESHOLD); + + try { + scheduler.init(CONTEXT); + scheduler.start(); + + final List completedCalls = Collections.synchronizedList(new ArrayList<>()); + + // Phase 1: Fill queue rapidly to trigger LIFO (>40 calls for 80% of 50) + for (int i = 0; i < 48; i++) { + final int callId = i; + CallRunner call = createMockTask(HConstants.NORMAL_QOS); + call.setStatus(new MonitoredRPCHandlerImpl("test")); + doAnswer(invocation -> { + completedCalls.add(callId); + Thread.sleep(80); + return null; + }).when(call).run(); + scheduler.dispatch(call); + } + + // Wait for calls to complete + await().atMost(1, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 2); + long lifoSwitchesPhase1 = scheduler.getNumLifoModeSwitches(); + assertTrue("Should have entered LIFO mode", lifoSwitchesPhase1 > 0); + + // Phase 2: Let queue drain + await().atMost(2, TimeUnit.SECONDS).until(() -> scheduler.getGeneralQueueLength() <= 0); + int queueLength = scheduler.getGeneralQueueLength(); + assertTrue("Queue should have drained significantly, but got: " + queueLength, + queueLength < 25); + + // Phase 3: Send new calls - should process in FIFO order + completedCalls.clear(); + for (int i = 100; i < 105; i++) { + final int callId = i; + CallRunner call = createMockTask(HConstants.NORMAL_QOS); + call.setStatus(new MonitoredRPCHandlerImpl("test")); + doAnswer(invocation -> { + completedCalls.add(callId); + Thread.sleep(80); + return null; + }).when(call).run(); + scheduler.dispatch(call); + } + + // Wait for these calls to complete + await().atMost(2, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 2); + + // Verify FIFO behavior: calls should complete in order (100, 101, 102, 103, 104) + assertTrue( + "Should have completed test calls, but count of completed calls: " + completedCalls.size(), + completedCalls.size() >= 2); + // Check that calls completed roughly in order (allowing some variance due to threading with 2 + // handlers) + // With 2 handlers, we expect general FIFO order but some interleaving is possible + // Just verify the general trend is increasing + int violations = 0; + for (int i = 0; i < completedCalls.size() - 1; i++) { + int current = completedCalls.get(i); + int next = completedCalls.get(i + 1); + if (next < current) { + violations++; + } + } + // Allow at most 1 violation due to concurrent execution by 2 handlers + assertTrue("Calls should complete in approximate FIFO order, violations: " + violations + + ", order: " + completedCalls, violations <= 1); + + } finally { + scheduler.stop(); + } + } + }