Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private LinkedBlockingDeque<CallRunner> queue;

// so we can calculate actual threshold to switch to LIFO under load
private int maxCapacity;
private volatile int currentQueueLimit;

// metrics (shared across all queues)
private LongAdder numGeneralCallsDropped;
Expand All @@ -71,27 +71,30 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private AtomicBoolean isOverloaded = new AtomicBoolean(false);

public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) {
this.maxCapacity = capacity;
double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches,
int currentQueueLimit) {
this.queue = new LinkedBlockingDeque<>(capacity);
this.codelTargetDelay = targetDelay;
this.codelInterval = interval;
this.lifoThreshold = lifoThreshold;
this.numGeneralCallsDropped = numGeneralCallsDropped;
this.numLifoModeSwitches = numLifoModeSwitches;
this.currentQueueLimit = currentQueueLimit;
}

/**
* Update tunables.
* @param newCodelTargetDelay new CoDel target delay
* @param newCodelInterval new CoDel interval
* @param newLifoThreshold new Adaptive Lifo threshold
* @param currentQueueLimit new limit of queue
*/
public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
double newLifoThreshold) {
public void updateTunables(int newCodelTargetDelay, int newCodelInterval, double newLifoThreshold,
int currentQueueLimit) {
this.codelTargetDelay = newCodelTargetDelay;
this.codelInterval = newCodelInterval;
this.lifoThreshold = newLifoThreshold;
this.currentQueueLimit = currentQueueLimit;
}

/**
Expand All @@ -104,7 +107,7 @@ public void updateTunables(int newCodelTargetDelay, int newCodelInterval,
public CallRunner take() throws InterruptedException {
CallRunner cr;
while (true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
if (((double) queue.size() / this.currentQueueLimit) > lifoThreshold) {
numLifoModeSwitches.increment();
cr = queue.takeLast();
} else {
Expand All @@ -124,7 +127,7 @@ public CallRunner poll() {
CallRunner cr;
boolean switched = false;
while (true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
if (((double) queue.size() / this.currentQueueLimit) > lifoThreshold) {
// Only count once per switch.
if (!switched) {
switched = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public abstract class RpcExecutor {
private final Class<? extends BlockingQueue> queueClass;
private final Object[] queueInitArgs;

// this is soft limit of the queue, while initializing we will use hard limit as the size of queue
protected volatile int currentQueueLimit;

private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
Expand Down Expand Up @@ -161,11 +162,13 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
int handlerCountPerQueue = this.handlerCount / this.numCallQueues;
maxQueueLength = handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER;
}
currentQueueLimit = maxQueueLength;
int queueHardLimit = Math.max(maxQueueLength, DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);

if (isDeadlineQueueType(callQueueType)) {
this.name += ".Deadline";
this.queueInitArgs =
new Object[] { maxQueueLength, new CallPriorityComparator(conf, priority) };
new Object[] { queueHardLimit, new CallPriorityComparator(conf, priority) };
this.queueClass = BoundedPriorityBlockingQueue.class;
} else if (isCodelQueueType(callQueueType)) {
this.name += ".Codel";
Expand All @@ -174,8 +177,8 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
double codelLifoThreshold =
conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
this.queueInitArgs = new Object[] { queueHardLimit, codelTargetDelay, codelInterval,
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches, currentQueueLimit };
this.queueClass = AdaptiveLifoCoDelCallQueue.class;
} else if (isPluggableQueueType(callQueueType)) {
Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass =
Expand All @@ -185,12 +188,12 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
throw new PluggableRpcQueueNotFound(
"Pluggable call queue failed to load and selected call" + " queue type required");
} else {
this.queueInitArgs = new Object[] { maxQueueLength, priority, conf };
this.queueInitArgs = new Object[] { queueHardLimit, priority, conf };
this.queueClass = pluggableQueueClass.get();
}
} else {
this.name += ".Fifo";
this.queueInitArgs = new Object[] { maxQueueLength };
this.queueInitArgs = new Object[] { queueHardLimit };
this.queueClass = LinkedBlockingQueue.class;
}

Expand Down Expand Up @@ -231,20 +234,7 @@ public Map<String, Long> getCallQueueSizeSummary() {
.collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond)));
}

// This method can only be called ONCE per executor instance.
// Before calling: queueInitArgs[0] contains the soft limit (desired queue capacity)
// After calling: queueInitArgs[0] is set to hard limit and currentQueueLimit stores the original
// soft limit.
// Multiple calls would incorrectly use the hard limit as the soft limit.
// As all the queues has same initArgs and queueClass, there should be no need to call this again.
protected void initializeQueues(final int numQueues) {
if (!queues.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this guard?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we don't need it now.

throw new RuntimeException("Queues are already initialized");
}
if (queueInitArgs.length > 0) {
currentQueueLimit = (int) queueInitArgs[0];
queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
}
for (int i = 0; i < numQueues; ++i) {
queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs));
}
Expand Down Expand Up @@ -479,8 +469,10 @@ public void onConfigurationChange(Configuration conf) {

for (BlockingQueue<CallRunner> queue : queues) {
if (queue instanceof AdaptiveLifoCoDelCallQueue) {
// current queue Limit for executor is already updated as part of resizeQueues, we need to
// let codel queue also make aware of it
((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, codelInterval,
codelLifoThreshold);
codelLifoThreshold, currentQueueLimit);
} else if (queue instanceof ConfigurationObserver) {
((ConfigurationObserver) queue).onConfigurationChange(conf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;

import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand All @@ -34,13 +35,15 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseClassTestRule;
Expand Down Expand Up @@ -814,4 +817,166 @@ public void drop() {
}
};
}

/**
* Test LIFO switching behavior through actual RPC calls. This test verifies that when the queue
* fills beyond the LIFO threshold, newer calls are processed before older calls (LIFO mode).
*/
@Test
public void testCoDelLifoWithRpcCalls() throws Exception {
Configuration testConf = HBaseConfiguration.create();
testConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
int maxCallQueueLength = 50;
double codelLifoThreshold = 0.8;
testConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, maxCallQueueLength);
testConf.setDouble(RpcExecutor.CALL_QUEUE_CODEL_LIFO_THRESHOLD, codelLifoThreshold);
testConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_TARGET_DELAY, 100);
testConf.setInt(RpcExecutor.CALL_QUEUE_CODEL_INTERVAL, 100);
testConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1); // Single handler to control
// processing

PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
SimpleRpcScheduler scheduler =
new SimpleRpcScheduler(testConf, 1, 0, 0, priority, HConstants.QOS_THRESHOLD);

try {
scheduler.init(CONTEXT);
scheduler.start();

// Track completion order
final List<Integer> completedCalls = Collections.synchronizedList(new ArrayList<>());

// Dispatch many slow calls rapidly to fill the queue beyond 80% threshold
// With queue limit of 50, we need > 40 calls to cross 80%
int numCalls = 48;
for (int i = 0; i < numCalls; i++) {
final int callId = i;
CallRunner call = createMockTask(HConstants.NORMAL_QOS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in two more cases createMockTask will mock a ServerCall without setting getReceiveTime(). It will default to 0. Doesn't this mess up the call delay calculation that coDel will do? It will be something like callDelay = System.currentTimeMillis() - 0 . I guess your tests are passing but CoDel will immediately detect overload. The LIFO switch counter will still increment but other aspects are probably wrong.
You should look at this.

call.setStatus(new MonitoredRPCHandlerImpl("test"));
doAnswer(invocation -> {
completedCalls.add(callId);
Thread.sleep(100); // Slow processing to allow queue to build up
return null;
}).when(call).run();
scheduler.dispatch(call);
// No delay between dispatches - rapidly fill the queue
}

// Wait for some calls to complete
await().atMost(1, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 2);

// Check that we had LIFO switches
long lifoSwitches = scheduler.getNumLifoModeSwitches();
assertTrue("Should have switched to LIFO mode at least once, but got: " + lifoSwitches,
lifoSwitches > 0);

// Verify LIFO behavior: Among first completed calls, we should see higher call IDs
// (indicating later dispatched calls completed first)
int maxCallIdCompleted = -1;
for (int i = 0; i < 5 && i < completedCalls.size(); i++) {
maxCallIdCompleted = Math.max(maxCallIdCompleted, completedCalls.get(i));
}
// At least one of the early completed calls should have a high ID (>20)
// indicating LIFO processing
assertTrue(
"Expected LIFO behavior: early completed calls should include call arrived after threshold "
+ "maxCallIdCompleted: " + maxCallIdCompleted,
maxCallIdCompleted > maxCallQueueLength * codelLifoThreshold);

} finally {
scheduler.stop();
}
}

/**
* Test that CoDel queue returns to FIFO mode after draining below threshold.
*/
@Test
public void testCoDelQueueDrainAndFifoReturn() throws Exception {
Configuration testConf = HBaseConfiguration.create();
testConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
testConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 50);
testConf.setDouble(RpcExecutor.CALL_QUEUE_CODEL_LIFO_THRESHOLD, 0.8);
testConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);

PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.NORMAL_QOS);
SimpleRpcScheduler scheduler =
new SimpleRpcScheduler(testConf, 2, 0, 0, priority, HConstants.QOS_THRESHOLD);

try {
scheduler.init(CONTEXT);
scheduler.start();

final List<Integer> completedCalls = Collections.synchronizedList(new ArrayList<>());

// Phase 1: Fill queue rapidly to trigger LIFO (>40 calls for 80% of 50)
for (int i = 0; i < 48; i++) {
final int callId = i;
CallRunner call = createMockTask(HConstants.NORMAL_QOS);
call.setStatus(new MonitoredRPCHandlerImpl("test"));
doAnswer(invocation -> {
completedCalls.add(callId);
Thread.sleep(80);
return null;
}).when(call).run();
scheduler.dispatch(call);
}

// Wait for calls to complete
await().atMost(1, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 2);
long lifoSwitchesPhase1 = scheduler.getNumLifoModeSwitches();
assertTrue("Should have entered LIFO mode", lifoSwitchesPhase1 > 0);

// Phase 2: Let queue drain
await().atMost(2, TimeUnit.SECONDS).until(() -> scheduler.getGeneralQueueLength() <= 0);
int queueLength = scheduler.getGeneralQueueLength();
assertTrue("Queue should have drained significantly, but got: " + queueLength,
queueLength < 25);

// Phase 3: Send new calls - should process in FIFO order
completedCalls.clear();
for (int i = 100; i < 105; i++) {
final int callId = i;
CallRunner call = createMockTask(HConstants.NORMAL_QOS);
call.setStatus(new MonitoredRPCHandlerImpl("test"));
doAnswer(invocation -> {
completedCalls.add(callId);
Thread.sleep(80);
return null;
}).when(call).run();
scheduler.dispatch(call);
}

// Wait for these calls to complete
await().atMost(2, TimeUnit.SECONDS).until(() -> completedCalls.size() >= 2);

// Verify FIFO behavior: calls should complete in order (100, 101, 102, 103, 104)
assertTrue(
"Should have completed test calls, but count of completed calls: " + completedCalls.size(),
completedCalls.size() >= 2);
// Check that calls completed roughly in order (allowing some variance due to threading with 2
// handlers)
// With 2 handlers, we expect general FIFO order but some interleaving is possible
// Just verify the general trend is increasing
int violations = 0;
for (int i = 0; i < completedCalls.size() - 1; i++) {
int current = completedCalls.get(i);
int next = completedCalls.get(i + 1);
if (next < current) {
violations++;
}
}
// Allow at most 1 violation due to concurrent execution by 2 handlers
assertTrue("Calls should complete in approximate FIFO order, violations: " + violations
+ ", order: " + completedCalls, violations <= 1);

} finally {
scheduler.stop();
}
}

}