diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index ea50a4ca116ed..5fea2b321cce4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -116,18 +116,45 @@ public int compare(Schedulable s1, Schedulable s2) { return res; } + /** + * Compare queues based on demand, distinguishing between truly empty queues + * and queues with pending applications waiting for AM allocation. + * + * A queue is considered "truly empty" only when it has no runnable apps. + * Queues with pending apps (getNumRunnableApps() > 0) should not be + * deprioritized, as they need resources for AM allocation. + * + * This prevents queue starvation where queues with pending apps but + * demand=0 (no running containers yet) get perpetually deprioritized. + */ private int compareDemand(Schedulable s1, Schedulable s2) { - int res = 0; - long demand1 = s1.getDemand().getMemorySize(); - long demand2 = s2.getDemand().getMemorySize(); + // Check if s1 is truly empty (no runnable apps at all) + boolean s1EmptyQueue = false; + if (s1 instanceof FSQueue) { + s1EmptyQueue = (((FSQueue)s1).getNumRunnableApps() == 0); + } else { + s1EmptyQueue = (s1.getDemand().getMemorySize() == 0); + } - if ((demand1 == 0) && (demand2 > 0)) { - res = 1; - } else if ((demand2 == 0) && (demand1 > 0)) { - res = -1; + // Check if s2 is truly empty (no runnable apps at all) + boolean s2EmptyQueue = false; + if (s2 instanceof FSQueue) { + s2EmptyQueue = (((FSQueue)s2).getNumRunnableApps() == 0); + } else { + s2EmptyQueue = (s2.getDemand().getMemorySize() == 0); } - return res; + // Both empty or both non-empty: continue to next comparison stage + if (s1EmptyQueue == s2EmptyQueue) { + return 0; + } + + // One is empty, the other is not: prioritize the non-empty queue + if (s1EmptyQueue) { + return 1; // s1 is empty, lower priority + } else { + return -1; // s2 is empty, lower priority + } } private int compareMinShareUsage(Schedulable s1, Schedulable s2, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java new file mode 100644 index 0000000000000..d2051e8e4da31 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test to verify that FairSharePolicy correctly handles queues with + * pending applications waiting for AM allocation. + * + * This test ensures that queues with demand=0 but getNumRunnableApps() > 0 + * are not treated as empty and can progress to minShare comparison. + */ +public class TestFairSharePolicyPendingApps { + + /** + * Test that a queue with pending apps (demand=0, getNumRunnableApps() > 0) + * is not treated as empty and can progress to Stage 2 (minShare comparison). + */ + @Test + public void testQueueWithPendingAppsNotTreatedAsEmpty() { + // Queue with pending apps but no running containers + FSLeafQueue queueWithPendingApps = mock(FSLeafQueue.class); + when(queueWithPendingApps.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queueWithPendingApps.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queueWithPendingApps.getMinShare()).thenReturn(Resources.createResource(256 * 1024, 1)); + when(queueWithPendingApps.getNumRunnableApps()).thenReturn(6); // Has pending apps + when(queueWithPendingApps.getWeight()).thenReturn(1.0f); + when(queueWithPendingApps.getName()).thenReturn("queueWithPending"); + when(queueWithPendingApps.getStartTime()).thenReturn(1000L); + + // Active queue with demand > 0 + FSLeafQueue activeQueue = mock(FSLeafQueue.class); + when(activeQueue.getDemand()).thenReturn(Resources.createResource(10240, 10)); + when(activeQueue.getResourceUsage()).thenReturn(Resources.createResource(10240, 10)); + when(activeQueue.getMinShare()).thenReturn(Resources.createResource(128 * 1024, 1)); + when(activeQueue.getNumRunnableApps()).thenReturn(5); + when(activeQueue.getWeight()).thenReturn(1.0f); + when(activeQueue.getName()).thenReturn("activeQueue"); + when(activeQueue.getStartTime()).thenReturn(2000L); + + FairSharePolicy policy = new FairSharePolicy(); + int result = policy.getComparator().compare(queueWithPendingApps, activeQueue); + + // Queue with pending apps should NOT be assigned lowest priority + // compareDemand() should return 0, allowing progression to Stage 2 + // Result depends on Stage 2 (minShare comparison) or later stages + assertNotEquals(1, result, + "Queue with pending apps should not get lowest priority from compareDemand()"); + } + + /** + * Test that a truly empty queue (getNumRunnableApps() == 0) gets lower + * priority compared to a queue with pending apps. + */ + @Test + public void testTrulyEmptyQueueGetsLowerPriority() { + // Queue with pending apps + FSLeafQueue queueWithPendingApps = mock(FSLeafQueue.class); + when(queueWithPendingApps.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queueWithPendingApps.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queueWithPendingApps.getMinShare()).thenReturn(Resources.createResource(256 * 1024, 1)); + when(queueWithPendingApps.getNumRunnableApps()).thenReturn(6); // Has pending apps + when(queueWithPendingApps.getWeight()).thenReturn(1.0f); + when(queueWithPendingApps.getName()).thenReturn("queueWithPending"); + when(queueWithPendingApps.getStartTime()).thenReturn(1000L); + + // Truly empty queue (no apps at all) + FSLeafQueue emptyQueue = mock(FSLeafQueue.class); + when(emptyQueue.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(emptyQueue.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(emptyQueue.getMinShare()).thenReturn(Resources.createResource(128 * 1024, 1)); + when(emptyQueue.getNumRunnableApps()).thenReturn(0); // Truly empty + when(emptyQueue.getWeight()).thenReturn(1.0f); + when(emptyQueue.getName()).thenReturn("emptyQueue"); + when(emptyQueue.getStartTime()).thenReturn(2000L); + + FairSharePolicy policy = new FairSharePolicy(); + int result = policy.getComparator().compare(queueWithPendingApps, emptyQueue); + + // Queue with pending apps should get higher priority than truly empty queue + assertTrue(result < 0, + "Queue with pending apps should have higher priority than truly empty queue"); + } + + /** + * Test that minShare comparison works correctly for queues with pending apps. + */ + @Test + public void testMinShareComparisonForQueueWithPendingApps() { + // Queue with pending apps, below minShare + FSLeafQueue queueBelowMinShare = mock(FSLeafQueue.class); + when(queueBelowMinShare.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queueBelowMinShare.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queueBelowMinShare.getMinShare()).thenReturn(Resources.createResource(256 * 1024, 1)); + when(queueBelowMinShare.getNumRunnableApps()).thenReturn(6); + when(queueBelowMinShare.getWeight()).thenReturn(1.0f); + when(queueBelowMinShare.getName()).thenReturn("queueBelowMinShare"); + when(queueBelowMinShare.getStartTime()).thenReturn(1000L); + + // Queue above minShare + FSLeafQueue queueAboveMinShare = mock(FSLeafQueue.class); + when(queueAboveMinShare.getDemand()).thenReturn(Resources.createResource(200 * 1024, 10)); + when(queueAboveMinShare.getResourceUsage()).thenReturn(Resources.createResource(200 * 1024, 10)); + when(queueAboveMinShare.getMinShare()).thenReturn(Resources.createResource(128 * 1024, 1)); + when(queueAboveMinShare.getNumRunnableApps()).thenReturn(5); + when(queueAboveMinShare.getWeight()).thenReturn(1.0f); + when(queueAboveMinShare.getName()).thenReturn("queueAboveMinShare"); + when(queueAboveMinShare.getStartTime()).thenReturn(2000L); + + FairSharePolicy policy = new FairSharePolicy(); + int result = policy.getComparator().compare(queueBelowMinShare, queueAboveMinShare); + + // Queue below minShare should get higher priority (negative result) + assertTrue(result < 0, + "Queue below minShare with pending apps should get higher priority"); + } + + /** + * Test that two queues with pending apps are compared by later stages + * (not blocked at compareDemand). + */ + @Test + public void testTwoQueuesWithPendingAppsComparedByLaterStages() { + // First queue with pending apps + FSLeafQueue queue1 = mock(FSLeafQueue.class); + when(queue1.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queue1.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queue1.getMinShare()).thenReturn(Resources.createResource(256 * 1024, 1)); + when(queue1.getNumRunnableApps()).thenReturn(6); + when(queue1.getWeight()).thenReturn(1.0f); + when(queue1.getName()).thenReturn("queue1"); + when(queue1.getStartTime()).thenReturn(1000L); + + // Second queue with pending apps + FSLeafQueue queue2 = mock(FSLeafQueue.class); + when(queue2.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queue2.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queue2.getMinShare()).thenReturn(Resources.createResource(128 * 1024, 1)); + when(queue2.getNumRunnableApps()).thenReturn(3); + when(queue2.getWeight()).thenReturn(1.0f); + when(queue2.getName()).thenReturn("queue2"); + when(queue2.getStartTime()).thenReturn(2000L); + + FairSharePolicy policy = new FairSharePolicy(); + int result = policy.getComparator().compare(queue1, queue2); + + // Both queues have pending apps, so compareDemand() should return 0 + // The comparison should proceed to Stage 2 (minShare) or later + // queue1 has higher minShare (256GB > 128GB), both at 0% usage + // So queue1 should get higher priority (negative result) + assertTrue(result < 0, + "Queues with pending apps should be compared by minShare, not blocked at compareDemand"); + } +}