From afc79a6865804903e6c7c055d6b26888f826ad3f Mon Sep 17 00:00:00 2001 From: Ryu Kobayashi Date: Wed, 10 Dec 2025 12:53:37 +0900 Subject: [PATCH 1/4] YARN-11907: Fix queue starvation in FairSharePolicy when demand=0 but pending apps exist --- .../fair/policies/FairSharePolicy.java | 43 ++++- .../fair/TestFairSharePolicyPendingApps.java | 176 ++++++++++++++++++ 2 files changed, 211 insertions(+), 8 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index ea50a4ca116ed..5fea2b321cce4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -116,18 +116,45 @@ public int compare(Schedulable s1, Schedulable s2) { return res; } + /** + * Compare queues based on demand, distinguishing between truly empty queues + * and queues with pending applications waiting for AM allocation. + * + * A queue is considered "truly empty" only when it has no runnable apps. + * Queues with pending apps (getNumRunnableApps() > 0) should not be + * deprioritized, as they need resources for AM allocation. + * + * This prevents queue starvation where queues with pending apps but + * demand=0 (no running containers yet) get perpetually deprioritized. + */ private int compareDemand(Schedulable s1, Schedulable s2) { - int res = 0; - long demand1 = s1.getDemand().getMemorySize(); - long demand2 = s2.getDemand().getMemorySize(); + // Check if s1 is truly empty (no runnable apps at all) + boolean s1EmptyQueue = false; + if (s1 instanceof FSQueue) { + s1EmptyQueue = (((FSQueue)s1).getNumRunnableApps() == 0); + } else { + s1EmptyQueue = (s1.getDemand().getMemorySize() == 0); + } - if ((demand1 == 0) && (demand2 > 0)) { - res = 1; - } else if ((demand2 == 0) && (demand1 > 0)) { - res = -1; + // Check if s2 is truly empty (no runnable apps at all) + boolean s2EmptyQueue = false; + if (s2 instanceof FSQueue) { + s2EmptyQueue = (((FSQueue)s2).getNumRunnableApps() == 0); + } else { + s2EmptyQueue = (s2.getDemand().getMemorySize() == 0); } - return res; + // Both empty or both non-empty: continue to next comparison stage + if (s1EmptyQueue == s2EmptyQueue) { + return 0; + } + + // One is empty, the other is not: prioritize the non-empty queue + if (s1EmptyQueue) { + return 1; // s1 is empty, lower priority + } else { + return -1; // s2 is empty, lower priority + } } private int compareMinShareUsage(Schedulable s1, Schedulable s2, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java new file mode 100644 index 0000000000000..bee860f789cbf --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Test; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +/** + * Test to verify that FairSharePolicy correctly handles queues with + * pending applications waiting for AM allocation. + * + * This test ensures that queues with demand=0 but getNumRunnableApps() > 0 + * are not treated as empty and can progress to minShare comparison. + */ +public class TestFairSharePolicyPendingApps { + + /** + * Test that a queue with pending apps (demand=0, getNumRunnableApps() > 0) + * is not treated as empty and can progress to Stage 2 (minShare comparison). + */ + @Test + public void testQueueWithPendingAppsNotTreatedAsEmpty() { + // Queue with pending apps but no running containers + FSLeafQueue queueWithPendingApps = mock(FSLeafQueue.class); + when(queueWithPendingApps.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queueWithPendingApps.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queueWithPendingApps.getMinShare()).thenReturn(Resources.createResource(256 * 1024, 1)); + when(queueWithPendingApps.getNumRunnableApps()).thenReturn(6); // Has pending apps + when(queueWithPendingApps.getWeight()).thenReturn(1.0f); + when(queueWithPendingApps.getName()).thenReturn("queueWithPending"); + when(queueWithPendingApps.getStartTime()).thenReturn(1000L); + + // Active queue with demand > 0 + FSLeafQueue activeQueue = mock(FSLeafQueue.class); + when(activeQueue.getDemand()).thenReturn(Resources.createResource(10240, 10)); + when(activeQueue.getResourceUsage()).thenReturn(Resources.createResource(10240, 10)); + when(activeQueue.getMinShare()).thenReturn(Resources.createResource(128 * 1024, 1)); + when(activeQueue.getNumRunnableApps()).thenReturn(5); + when(activeQueue.getWeight()).thenReturn(1.0f); + when(activeQueue.getName()).thenReturn("activeQueue"); + when(activeQueue.getStartTime()).thenReturn(2000L); + + FairSharePolicy policy = new FairSharePolicy(); + int result = policy.getComparator().compare(queueWithPendingApps, activeQueue); + + // Queue with pending apps should NOT be assigned lowest priority + // compareDemand() should return 0, allowing progression to Stage 2 + // Result depends on Stage 2 (minShare comparison) or later stages + assertNotEquals("Queue with pending apps should not get lowest priority from compareDemand()", + 1, result); + } + + /** + * Test that a truly empty queue (getNumRunnableApps() == 0) gets lower + * priority compared to a queue with pending apps. + */ + @Test + public void testTrulyEmptyQueueGetsLowerPriority() { + // Queue with pending apps + FSLeafQueue queueWithPendingApps = mock(FSLeafQueue.class); + when(queueWithPendingApps.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queueWithPendingApps.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queueWithPendingApps.getMinShare()).thenReturn(Resources.createResource(256 * 1024, 1)); + when(queueWithPendingApps.getNumRunnableApps()).thenReturn(6); // Has pending apps + when(queueWithPendingApps.getWeight()).thenReturn(1.0f); + when(queueWithPendingApps.getName()).thenReturn("queueWithPending"); + when(queueWithPendingApps.getStartTime()).thenReturn(1000L); + + // Truly empty queue (no apps at all) + FSLeafQueue emptyQueue = mock(FSLeafQueue.class); + when(emptyQueue.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(emptyQueue.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(emptyQueue.getMinShare()).thenReturn(Resources.createResource(128 * 1024, 1)); + when(emptyQueue.getNumRunnableApps()).thenReturn(0); // Truly empty + when(emptyQueue.getWeight()).thenReturn(1.0f); + when(emptyQueue.getName()).thenReturn("emptyQueue"); + when(emptyQueue.getStartTime()).thenReturn(2000L); + + FairSharePolicy policy = new FairSharePolicy(); + int result = policy.getComparator().compare(queueWithPendingApps, emptyQueue); + + // Queue with pending apps should get higher priority than truly empty queue + assertTrue("Queue with pending apps should have higher priority than truly empty queue", + result < 0); + } + + /** + * Test that minShare comparison works correctly for queues with pending apps. + */ + @Test + public void testMinShareComparisonForQueueWithPendingApps() { + // Queue with pending apps, below minShare + FSLeafQueue queueBelowMinShare = mock(FSLeafQueue.class); + when(queueBelowMinShare.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queueBelowMinShare.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queueBelowMinShare.getMinShare()).thenReturn(Resources.createResource(256 * 1024, 1)); + when(queueBelowMinShare.getNumRunnableApps()).thenReturn(6); + when(queueBelowMinShare.getWeight()).thenReturn(1.0f); + when(queueBelowMinShare.getName()).thenReturn("queueBelowMinShare"); + when(queueBelowMinShare.getStartTime()).thenReturn(1000L); + + // Queue above minShare + FSLeafQueue queueAboveMinShare = mock(FSLeafQueue.class); + when(queueAboveMinShare.getDemand()).thenReturn(Resources.createResource(200 * 1024, 10)); + when(queueAboveMinShare.getResourceUsage()).thenReturn(Resources.createResource(200 * 1024, 10)); + when(queueAboveMinShare.getMinShare()).thenReturn(Resources.createResource(128 * 1024, 1)); + when(queueAboveMinShare.getNumRunnableApps()).thenReturn(5); + when(queueAboveMinShare.getWeight()).thenReturn(1.0f); + when(queueAboveMinShare.getName()).thenReturn("queueAboveMinShare"); + when(queueAboveMinShare.getStartTime()).thenReturn(2000L); + + FairSharePolicy policy = new FairSharePolicy(); + int result = policy.getComparator().compare(queueBelowMinShare, queueAboveMinShare); + + // Queue below minShare should get higher priority (negative result) + assertTrue("Queue below minShare with pending apps should get higher priority", + result < 0); + } + + /** + * Test that two queues with pending apps are compared by later stages + * (not blocked at compareDemand). + */ + @Test + public void testTwoQueuesWithPendingAppsComparedByLaterStages() { + // First queue with pending apps + FSLeafQueue queue1 = mock(FSLeafQueue.class); + when(queue1.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queue1.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queue1.getMinShare()).thenReturn(Resources.createResource(256 * 1024, 1)); + when(queue1.getNumRunnableApps()).thenReturn(6); + when(queue1.getWeight()).thenReturn(1.0f); + when(queue1.getName()).thenReturn("queue1"); + when(queue1.getStartTime()).thenReturn(1000L); + + // Second queue with pending apps + FSLeafQueue queue2 = mock(FSLeafQueue.class); + when(queue2.getDemand()).thenReturn(Resources.createResource(0, 0)); + when(queue2.getResourceUsage()).thenReturn(Resources.createResource(0, 0)); + when(queue2.getMinShare()).thenReturn(Resources.createResource(128 * 1024, 1)); + when(queue2.getNumRunnableApps()).thenReturn(3); + when(queue2.getWeight()).thenReturn(1.0f); + when(queue2.getName()).thenReturn("queue2"); + when(queue2.getStartTime()).thenReturn(2000L); + + FairSharePolicy policy = new FairSharePolicy(); + int result = policy.getComparator().compare(queue1, queue2); + + // Both queues have pending apps, so compareDemand() should return 0 + // The comparison should proceed to Stage 2 (minShare) or later + // queue1 has higher minShare (256GB > 128GB), both at 0% usage + // So queue1 should get higher priority (negative result) + assertTrue("Queues with pending apps should be compared by minShare, not blocked at compareDemand", + result < 0); + } +} From cb5499cc0e909c7caef03d2aedac8ee12818f7e8 Mon Sep 17 00:00:00 2001 From: Ryu Kobayashi Date: Wed, 10 Dec 2025 18:06:31 +0900 Subject: [PATCH 2/4] fix test --- .../scheduler/fair/TestFairSharePolicyPendingApps.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java index bee860f789cbf..cc84ee9c75651 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java @@ -20,9 +20,9 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.Test; +import org.junit.jupiter.api.Test; -import static org.junit.Assert.*; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.*; /** From f800926f2333292a71bd723a3d1f08731ca3784e Mon Sep 17 00:00:00 2001 From: Ryu Kobayashi Date: Wed, 10 Dec 2025 18:08:14 +0900 Subject: [PATCH 3/4] fix test --- .../scheduler/fair/TestFairSharePolicyPendingApps.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java index cc84ee9c75651..01d55f3bc58ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java @@ -22,8 +22,10 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; -import static org.mockito.Mockito.*; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Test to verify that FairSharePolicy correctly handles queues with From 68046eeb3a4fd9e672231d0d872d2fe89f01f9ff Mon Sep 17 00:00:00 2001 From: Ryu Kobayashi Date: Thu, 11 Dec 2025 15:30:20 +0900 Subject: [PATCH 4/4] fix test --- .../fair/TestFairSharePolicyPendingApps.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java index 01d55f3bc58ce..d2051e8e4da31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSharePolicyPendingApps.java @@ -68,8 +68,8 @@ public void testQueueWithPendingAppsNotTreatedAsEmpty() { // Queue with pending apps should NOT be assigned lowest priority // compareDemand() should return 0, allowing progression to Stage 2 // Result depends on Stage 2 (minShare comparison) or later stages - assertNotEquals("Queue with pending apps should not get lowest priority from compareDemand()", - 1, result); + assertNotEquals(1, result, + "Queue with pending apps should not get lowest priority from compareDemand()"); } /** @@ -102,8 +102,8 @@ public void testTrulyEmptyQueueGetsLowerPriority() { int result = policy.getComparator().compare(queueWithPendingApps, emptyQueue); // Queue with pending apps should get higher priority than truly empty queue - assertTrue("Queue with pending apps should have higher priority than truly empty queue", - result < 0); + assertTrue(result < 0, + "Queue with pending apps should have higher priority than truly empty queue"); } /** @@ -135,8 +135,8 @@ public void testMinShareComparisonForQueueWithPendingApps() { int result = policy.getComparator().compare(queueBelowMinShare, queueAboveMinShare); // Queue below minShare should get higher priority (negative result) - assertTrue("Queue below minShare with pending apps should get higher priority", - result < 0); + assertTrue(result < 0, + "Queue below minShare with pending apps should get higher priority"); } /** @@ -172,7 +172,7 @@ public void testTwoQueuesWithPendingAppsComparedByLaterStages() { // The comparison should proceed to Stage 2 (minShare) or later // queue1 has higher minShare (256GB > 128GB), both at 0% usage // So queue1 should get higher priority (negative result) - assertTrue("Queues with pending apps should be compared by minShare, not blocked at compareDemand", - result < 0); + assertTrue(result < 0, + "Queues with pending apps should be compared by minShare, not blocked at compareDemand"); } }