From 977f27b49838f623a8e4276fc7f86457352d1c63 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 4 Jun 2026 17:21:51 +0530 Subject: [PATCH 01/16] HIVE-29651: Update ZookeeperExternalSessionsRegistryClient to handle multiple HiveServer2 instances submitting DAGs concurrently to available Tez External Sessions --- .../ql/exec/tez/TezExternalSessionState.java | 6 + .../ql/exec/tez/TezSessionPoolManager.java | 14 ++ ...okeeperExternalSessionsRegistryClient.java | 126 +++++++++++++++--- ...okeeperExternalSessionsRegistryClient.java | 119 +++++++++++++++++ 4 files changed, 247 insertions(+), 18 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java index b3103d3f5918..72ff43f0922b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -133,6 +134,11 @@ public void close(boolean keepDagFilesDir) throws Exception { // We never close external sessions that don't have errors. try { if (externalAppId != null) { + LOG.info("Returning external session with appID: {}", externalAppId); + SessionState sessionState = SessionState.get(); + if (sessionState != null) { + sessionState.setTezSession(null); + } registry.returnSession(externalAppId); } } catch (Exception e) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 605a92ebc8f5..b3b1c835e5b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -353,6 +353,20 @@ void returnSession(TezSession tezSessionState) { + " belongs to the pool. Put it back in"); defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState); } + + if (useExternalSessions) { + if (tezSessionState.getTezClient() != null + && tezSessionState.getTezClient().getAppMasterApplicationId() != null) { + try { + tezSessionState.close(false); + } catch (Exception ex) { + LOG.warn("Failed to return external Tez session {}", tezSessionState.getSessionId(), ex); + } + } else { + LOG.warn("Not returning session '{}' as tez client or app id is null", tezSessionState.getSessionId()); + } + } + // non default session nothing changes. The user can continue to use the existing // session in the SessionState } finally { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index 550c77e573ab..c38b3bed5739 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import org.apache.curator.framework.CuratorFramework; @@ -31,9 +32,12 @@ import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +52,11 @@ public class ZookeeperExternalSessionsRegistryClient implements ExternalSessions private final Set taken = new HashSet<>(); private final Object lock = new Object(); private final int maxAttempts; - + private CuratorFramework client; private CuratorCache cache; + private CuratorCache claimsCache; + private InterProcessMutex globalQueue; + private String claimsPath; private volatile boolean isInitialized; @@ -66,15 +73,27 @@ private void init() { String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); String effectivePath = normalizeZkPath(zkNamespace); - CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3)); + String queuePath = effectivePath + "-queue"; + this.claimsPath = effectivePath + "-claims"; + this.client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3)); + synchronized (lock) { client.start(); + this.globalQueue = new InterProcessMutex(client, queuePath); this.cache = CuratorCache.build(client, effectivePath); CuratorCacheListener listener = CuratorCacheListener.builder() .forPathChildrenCache(effectivePath, client, new ExternalSessionsPathListener()) .build(); cache.listenable().addListener(listener); cache.start(); + + this.claimsCache = CuratorCache.build(client, claimsPath); + CuratorCacheListener claimsListener = CuratorCacheListener.builder() + .forPathChildrenCache(claimsPath, client, new ClaimsPathListener()) + .build(); + claimsCache.listenable().addListener(claimsListener); + claimsCache.start(); + cache.stream() .filter(childData -> childData.getPath() != null && childData.getPath().startsWith(effectivePath + "/")) @@ -91,22 +110,47 @@ static String normalizeZkPath(String zkNamespace) { @Override public String getSession() throws Exception { - synchronized (lock) { - if (!isInitialized) { - init(); - } - long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts); - while (available.isEmpty() && ((endTimeNs - System.nanoTime()) > 0)) { - lock.wait(1000L); + if (!isInitialized) { + synchronized (lock) { + if (!isInitialized) { + init(); + } } - Iterator iter = available.iterator(); - if (!iter.hasNext()) { - throw new IOException("Cannot get a session after " + maxAttempts + " attempts"); + } + + long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts); + long queueWaitTimeMs = Math.max(0, (endTimeNs - System.nanoTime()) / 1000000L); + if (!globalQueue.acquire(queueWaitTimeMs, TimeUnit.MILLISECONDS)) { + throw new IOException("Cannot get a session (timed out in queue) after " + maxAttempts + " seconds"); + } + + try { + synchronized (lock) { + while (System.nanoTime() < endTimeNs) { + Iterator iter = available.iterator(); + + while (iter.hasNext()) { + String appId = iter.next(); + try { + String claimNodePath = claimsPath + "/" + appId; + client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(claimNodePath); + iter.remove(); + taken.add(appId); + return appId; + } catch (KeeperException.NodeExistsException e) { + iter.remove(); + } + } + long remainingTimeNs = endTimeNs - System.nanoTime(); + if (remainingTimeNs > 0) { + long waitTimeMs = Math.min(1000L, remainingTimeNs / 1_000_000L); + lock.wait(waitTimeMs); + } + } + throw new IOException("Cannot get a session after waiting for " + maxAttempts + " seconds (timeout exhausted)"); } - String appId = iter.next(); - iter.remove(); - taken.add(appId); - return appId; + } finally { + globalQueue.release(); } } @@ -117,8 +161,17 @@ public void returnSession(String appId) { throw new IllegalStateException("Not initialized"); } if (!taken.remove(appId)) { - return; // Session has been removed from ZK. + return; // Session has already been removed from ZK. } + + try { + client.delete().guaranteed().forPath(claimsPath + "/" + appId); + } catch (KeeperException.NoNodeException e) { + // If the claim Node has already been deleted, we can ignore it. + } catch (Exception e) { + LOG.warn("Failed to delete claim node for session {}", appId, e); + } + available.add(appId); lock.notifyAll(); } @@ -126,9 +179,15 @@ public void returnSession(String appId) { @Override public void close() { + if (claimsCache != null) { + claimsCache.close(); + } if (cache != null) { cache.close(); } + if (client != null) { + client.close(); + } } private final class ExternalSessionsPathListener implements PathChildrenCacheListener { @@ -148,9 +207,10 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve switch (event.getType()) { case CHILD_UPDATED, CHILD_ADDED: if (available.contains(applicationId) || taken.contains(applicationId)) { - return; // We do not expect updates to existing sessions; ignore them for now. + return; } available.add(applicationId); + lock.notifyAll(); break; case CHILD_REMOVED: if (taken.remove(applicationId)) { @@ -165,4 +225,34 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve } } } + + private final class ClaimsPathListener implements PathChildrenCacheListener { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { + ChildData childData = event.getData(); + if (childData == null) { + return; + } + + String applicationId = getApplicationId(childData); + synchronized (lock) { + switch (event.getType()) { + case CHILD_REMOVED: + if (!taken.contains(applicationId)) { + // if the claim node was released by this particular HS2 itself, + // it will be added back to the available list & locks are notified as part of returnSession() + available.add(applicationId); + lock.notifyAll(); + } + break; + case CHILD_ADDED: + // A Tez AM was claimed by another HS2, so remove the AM from the available list of this particular HS2 + available.remove(applicationId); + break; + default: + break; + } + } + } + } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java index 8274e87187b0..1c1d7d9fe1b0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.tez; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -30,6 +31,11 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.Test; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** * Tests for {@link ZookeeperExternalSessionsRegistryClient}. */ @@ -128,5 +134,118 @@ public void testReuseSameSession() throws Exception { } } } + + /** + * Tests that multiple registry clients (simulating multiple HS2 instances) + * respect the global distributed lock (claims) and do not claim the same session simultaneously. + */ + @Test + public void testSessionClaimsFromDifferentRegistryClients() throws Exception { + CuratorFramework client = null; + ZookeeperExternalSessionsRegistryClient registry1 = null; + ZookeeperExternalSessionsRegistryClient registry2 = null; + + try (TestingServer server = new TestingServer()) { + String connectString = server.getConnectString(); + + HiveConf conf = new HiveConf(); + conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_concurrent"); + conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 5); + + String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + client = builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + + client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_1"); + client.create().forPath(effectivePath + "/app_2"); + + registry1 = new ZookeeperExternalSessionsRegistryClient(conf); + registry2 = new ZookeeperExternalSessionsRegistryClient(conf); + + String sessionFromRegistry1 = registry1.getSession(); + String sessionFromRegistry2 = registry2.getSession(); + + assertNotNull("Registry 1 should have claimed a session", sessionFromRegistry1); + assertNotNull("Registry 2 should have claimed a session", sessionFromRegistry2); + + assertNotEquals("The two registries should claim different sessions!", + sessionFromRegistry1, sessionFromRegistry2); + + registry1.returnSession(sessionFromRegistry1); + + String session3FromRegistry2 = registry2.getSession(); + assertEquals("Registry 2 should be able to claim the newly released session", + sessionFromRegistry1, session3FromRegistry2); + + registry2.returnSession(sessionFromRegistry2); + registry2.returnSession(session3FromRegistry2); + } finally { + if (registry1 != null) { + registry1.close(); + } + if (registry2 != null) { + registry2.close(); + } + if (client != null) { + client.close(); + } + } + } + + /** + * Tests that the InterProcessMutex enforces strict Global FIFO ordering. + * Clients form a queue when no sessions are available, and are served in exact order. + */ + @Test + public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception { + try (TestingServer server = new TestingServer()) { + String connectString = server.getConnectString(); + + HiveConf conf = new HiveConf(); + conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_fifo"); + conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 15); + + String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + CuratorFramework client = builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + + ExecutorService executor = Executors.newFixedThreadPool(3); + ZookeeperExternalSessionsRegistryClient registry1 = new ZookeeperExternalSessionsRegistryClient(conf); + ZookeeperExternalSessionsRegistryClient registry2 = new ZookeeperExternalSessionsRegistryClient(conf); + ZookeeperExternalSessionsRegistryClient registry3 = new ZookeeperExternalSessionsRegistryClient(conf); + try { + Future future1 = executor.submit(registry1::getSession); + Thread.sleep(500); + Future future2 = executor.submit(registry2::getSession); + Thread.sleep(500); + Future future3 = executor.submit(registry3::getSession); + + client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_first"); + assertEquals("Registry 1 should get the first AM", "app_first", future1.get(5, TimeUnit.SECONDS)); + + client.create().forPath(effectivePath + "/app_second"); + String session2 = future2.get(5, TimeUnit.SECONDS); + + assertEquals("Registry 2 should get the second AM", "app_second", session2); + registry2.returnSession(session2); + + assertEquals("Registry 3 should get the second AM", "app_second", future3.get(5, TimeUnit.SECONDS)); + } finally { + registry1.close(); + registry2.close(); + registry3.close(); + client.close(); + executor.shutdownNow(); + } + } + } } From cc5445370009484ff1808d5ee32493d623aa0755 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Mon, 8 Jun 2026 15:26:53 +0530 Subject: [PATCH 02/16] Fix formatting --- ...okeeperExternalSessionsRegistryClient.java | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index c38b3bed5739..b6113057f6a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -205,22 +205,22 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve synchronized (lock) { switch (event.getType()) { - case CHILD_UPDATED, CHILD_ADDED: - if (available.contains(applicationId) || taken.contains(applicationId)) { - return; - } - available.add(applicationId); - lock.notifyAll(); - break; - case CHILD_REMOVED: - if (taken.remove(applicationId)) { - LOG.warn("The session in use has disappeared from the registry ({})", applicationId); - } else if (!available.remove(applicationId)) { - LOG.warn("An unknown session has been removed ({})", applicationId); - } - break; - default: - // Ignore all the other events; logged above. + case CHILD_UPDATED, CHILD_ADDED: + if (available.contains(applicationId) || taken.contains(applicationId)) { + return; // We do not expect updates to existing sessions; ignore them for now. + } + available.add(applicationId); + lock.notifyAll(); + break; + case CHILD_REMOVED: + if (taken.remove(applicationId)) { + LOG.warn("The session in use has disappeared from the registry ({})", applicationId); + } else if (!available.remove(applicationId)) { + LOG.warn("An unknown session has been removed ({})", applicationId); + } + break; + default: + // Ignore all the other events; logged above. } } } @@ -237,20 +237,20 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { String applicationId = getApplicationId(childData); synchronized (lock) { switch (event.getType()) { - case CHILD_REMOVED: - if (!taken.contains(applicationId)) { - // if the claim node was released by this particular HS2 itself, - // it will be added back to the available list & locks are notified as part of returnSession() - available.add(applicationId); - lock.notifyAll(); - } - break; - case CHILD_ADDED: - // A Tez AM was claimed by another HS2, so remove the AM from the available list of this particular HS2 - available.remove(applicationId); - break; - default: - break; + case CHILD_REMOVED: + if (!taken.contains(applicationId)) { + // if the claim node was released by this particular HS2 itself, + // it will be added back to the available list & locks are notified as part of returnSession() + available.add(applicationId); + lock.notifyAll(); + } + break; + case CHILD_ADDED: + // A Tez AM was claimed by another HS2, so remove the AM from the available list of this particular HS2 + available.remove(applicationId); + break; + default: + break; } } } From fad541d4f34c6b9e4ff89b312cbe484359d6f04f Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Mon, 8 Jun 2026 15:30:08 +0530 Subject: [PATCH 03/16] Logging changes to address review comments --- .../apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java | 2 +- .../ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java index 72ff43f0922b..e53b1bf7f082 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java @@ -134,7 +134,7 @@ public void close(boolean keepDagFilesDir) throws Exception { // We never close external sessions that don't have errors. try { if (externalAppId != null) { - LOG.info("Returning external session with appID: {}", externalAppId); + LOG.debug("Returning external session with appID: {}", externalAppId); SessionState sessionState = SessionState.get(); if (sessionState != null) { sessionState.setTezSession(null); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index b6113057f6a3..1629a53e8379 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -168,6 +168,7 @@ public void returnSession(String appId) { client.delete().guaranteed().forPath(claimsPath + "/" + appId); } catch (KeeperException.NoNodeException e) { // If the claim Node has already been deleted, we can ignore it. + LOG.debug("Claim Node has already been deleted for the session {}", appId, e); } catch (Exception e) { LOG.warn("Failed to delete claim node for session {}", appId, e); } From 11eef83f3f357c1cffdf39731125931c074ec86c Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Mon, 8 Jun 2026 16:22:59 +0530 Subject: [PATCH 04/16] Update timeout calculation in getSession to prevent overflow --- .../tez/ZookeeperExternalSessionsRegistryClient.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index 1629a53e8379..1f2939d39663 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -118,15 +118,16 @@ public String getSession() throws Exception { } } - long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts); - long queueWaitTimeMs = Math.max(0, (endTimeNs - System.nanoTime()) / 1000000L); + long startTimeNs = System.nanoTime(); + long timeoutNs = TimeUnit.SECONDS.toNanos(maxAttempts); + long queueWaitTimeMs = Math.max(0, (timeoutNs - (System.nanoTime() - startTimeNs)) / 1000000L); if (!globalQueue.acquire(queueWaitTimeMs, TimeUnit.MILLISECONDS)) { throw new IOException("Cannot get a session (timed out in queue) after " + maxAttempts + " seconds"); } try { synchronized (lock) { - while (System.nanoTime() < endTimeNs) { + while (System.nanoTime() - startTimeNs < timeoutNs) { Iterator iter = available.iterator(); while (iter.hasNext()) { @@ -141,7 +142,7 @@ public String getSession() throws Exception { iter.remove(); } } - long remainingTimeNs = endTimeNs - System.nanoTime(); + long remainingTimeNs = timeoutNs - (System.nanoTime() - startTimeNs); if (remainingTimeNs > 0) { long waitTimeMs = Math.min(1000L, remainingTimeNs / 1_000_000L); lock.wait(waitTimeMs); From 41b5134326e005dcd95fd94d8d7304bcd8223397 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Mon, 8 Jun 2026 16:47:19 +0530 Subject: [PATCH 05/16] Refactor ClaimsPathListener to use in-place methods instead of a class definition --- ...okeeperExternalSessionsRegistryClient.java | 55 ++++++++----------- 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index 1f2939d39663..fa1c6d85bd4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -88,9 +88,28 @@ private void init() { cache.start(); this.claimsCache = CuratorCache.build(client, claimsPath); - CuratorCacheListener claimsListener = CuratorCacheListener.builder() - .forPathChildrenCache(claimsPath, client, new ClaimsPathListener()) - .build(); + CuratorCacheListener claimsListener = CuratorCacheListener.builder().forCreates( + childData -> { + if (childData == null) { + return; + } + String applicationId = getApplicationId(childData); + synchronized (lock) { + available.remove(applicationId); + } + }).forDeletes( + childData -> { + if (childData == null) { + return; + } + String applicationId = getApplicationId(childData); + synchronized (lock) { + if (!taken.contains(applicationId)) { + available.add(applicationId); + lock.notifyAll(); + } + } + }).build(); claimsCache.listenable().addListener(claimsListener); claimsCache.start(); @@ -227,34 +246,4 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve } } } - - private final class ClaimsPathListener implements PathChildrenCacheListener { - @Override - public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) { - ChildData childData = event.getData(); - if (childData == null) { - return; - } - - String applicationId = getApplicationId(childData); - synchronized (lock) { - switch (event.getType()) { - case CHILD_REMOVED: - if (!taken.contains(applicationId)) { - // if the claim node was released by this particular HS2 itself, - // it will be added back to the available list & locks are notified as part of returnSession() - available.add(applicationId); - lock.notifyAll(); - } - break; - case CHILD_ADDED: - // A Tez AM was claimed by another HS2, so remove the AM from the available list of this particular HS2 - available.remove(applicationId); - break; - default: - break; - } - } - } - } } From 4777ab48f2f8d98a8d09d7db667f496d9841367d Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Mon, 8 Jun 2026 18:20:43 +0530 Subject: [PATCH 06/16] Fix the FIFO test as per the review comment & remove leftover config change --- ...okeeperExternalSessionsRegistryClient.java | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java index 1c1d7d9fe1b0..95d279af49b9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.junit.Test; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -151,7 +152,6 @@ public void testSessionClaimsFromDifferentRegistryClients() throws Exception { HiveConf conf = new HiveConf(); conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_concurrent"); - conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 5); String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace); @@ -208,7 +208,6 @@ public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception { HiveConf conf = new HiveConf(); conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_fifo"); - conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 15); String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace); @@ -222,10 +221,34 @@ public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception { ZookeeperExternalSessionsRegistryClient registry2 = new ZookeeperExternalSessionsRegistryClient(conf); ZookeeperExternalSessionsRegistryClient registry3 = new ZookeeperExternalSessionsRegistryClient(conf); try { - Future future1 = executor.submit(registry1::getSession); - Thread.sleep(500); - Future future2 = executor.submit(registry2::getSession); - Thread.sleep(500); + // Each registry does a lazy initialization on its first getSession() call, which is time-consuming and + // completion time of the same can vary amongst the registries. CountDownLatch only confirms the worker thread + // has started, not that session request has reached the globalQueue, so the warmup initializes all three + // registry clients upfront and lets the latches reliably enforce queue ordering avoiding flakiness. + client.create().creatingParentsIfNeeded().forPath(effectivePath + "/warmup"); + for (ZookeeperExternalSessionsRegistryClient registry : + new ZookeeperExternalSessionsRegistryClient[] {registry1, registry2, registry3}) { + String session = registry.getSession(); + System.out.println("warmup session: " + session); + registry.returnSession(session); + } + client.delete().forPath(effectivePath + "/warmup"); + + CountDownLatch r1Started = new CountDownLatch(1); + CountDownLatch r2Started = new CountDownLatch(1); + + Future future1 = executor.submit(() -> { + r1Started.countDown(); + return registry1.getSession(); + }); + r1Started.await(); + + Future future2 = executor.submit(() -> { + r2Started.countDown(); + return registry2.getSession(); + }); + r2Started.await(); + Future future3 = executor.submit(registry3::getSession); client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_first"); From 260adfc5eafbfadb3d41c512fa7993ae2f3fa74a Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Mon, 8 Jun 2026 23:28:47 +0530 Subject: [PATCH 07/16] Address the cases of HS2-ZK connection getting lost / connection getting reconnected after being suspended --- ...okeeperExternalSessionsRegistryClient.java | 72 +++++++++++++++---- .../ql/exec/tez/monitoring/TezJobMonitor.java | 14 ++++ 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index fa1c6d85bd4a..2796c040d6bc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -33,9 +33,12 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -56,6 +59,7 @@ public class ZookeeperExternalSessionsRegistryClient implements ExternalSessions private CuratorCache cache; private CuratorCache claimsCache; private InterProcessMutex globalQueue; + private String sessionsPath; private String claimsPath; private volatile boolean isInitialized; @@ -71,18 +75,51 @@ private static String getApplicationId(final ChildData childData) { private void init() { String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); + int sessionTimeoutMs = (int) HiveConf.getTimeVar(initConf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, + TimeUnit.MILLISECONDS); + int connectionTimeoutMs = (int) HiveConf.getTimeVar(initConf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, + TimeUnit.MILLISECONDS); + int baseSleepTimeMs = (int) HiveConf.getTimeVar(initConf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, + TimeUnit.MILLISECONDS); + int maxRetries = HiveConf.getIntVar(initConf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); - String effectivePath = normalizeZkPath(zkNamespace); - String queuePath = effectivePath + "-queue"; - this.claimsPath = effectivePath + "-claims"; - this.client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3)); - + this.sessionsPath = normalizeZkPath(zkNamespace); + this.claimsPath = this.sessionsPath + "-claims"; + // After connection state changes to SUSPENDED, the client has already consumed ~2/3 of the negotiated session + // timeout. Use 33% of the remaining window so LOST aligns with when the ZK server expires the session and drops + // ephemeral claim nodes. For Ref: Curator TN14 + this.client = CuratorFrameworkFactory.builder() + .connectString(zkServer) + .sessionTimeoutMs(sessionTimeoutMs) + .connectionTimeoutMs(connectionTimeoutMs) + .simulatedSessionExpirationPercent(33) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)) + .build(); + synchronized (lock) { client.start(); - this.globalQueue = new InterProcessMutex(client, queuePath); - this.cache = CuratorCache.build(client, effectivePath); + + client.getConnectionStateListenable().addListener(new ConnectionStateListener() { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) { + if (newState == ConnectionState.LOST) { + Set sessionsToKill; + synchronized (lock) { + LOG.error("ZK connection state has changed to lost; killing running DAGs on claimed AMs: {}", taken); + sessionsToKill = new HashSet<>(taken); + taken.clear(); + } + for (String appId : sessionsToKill) { + TezJobMonitor.killRunningDAGsForApplication(appId); + } + } + } + }); + + this.globalQueue = new InterProcessMutex(client, sessionsPath + "-queue"); + this.cache = CuratorCache.build(client, sessionsPath); CuratorCacheListener listener = CuratorCacheListener.builder() - .forPathChildrenCache(effectivePath, client, new ExternalSessionsPathListener()) + .forPathChildrenCache(sessionsPath, client, new ExternalSessionsPathListener()) .build(); cache.listenable().addListener(listener); cache.start(); @@ -105,8 +142,12 @@ private void init() { String applicationId = getApplicationId(childData); synchronized (lock) { if (!taken.contains(applicationId)) { - available.add(applicationId); - lock.notifyAll(); + if (cache.get(sessionsPath + "/" + applicationId).isPresent()) { + available.add(applicationId); + lock.notifyAll(); + } else { + LOG.info("Ignoring AM claim removal for {} because the base AM node no longer exists.", applicationId); + } } } }).build(); @@ -115,7 +156,7 @@ private void init() { cache.stream() .filter(childData -> childData.getPath() != null - && childData.getPath().startsWith(effectivePath + "/")) + && childData.getPath().startsWith(sessionsPath + "/")) .forEach(childData -> available.add(getApplicationId(childData))); LOG.info("Initial external sessions: {}", available); isInitialized = true; @@ -143,7 +184,6 @@ public String getSession() throws Exception { if (!globalQueue.acquire(queueWaitTimeMs, TimeUnit.MILLISECONDS)) { throw new IOException("Cannot get a session (timed out in queue) after " + maxAttempts + " seconds"); } - try { synchronized (lock) { while (System.nanoTime() - startTimeNs < timeoutNs) { @@ -181,14 +221,14 @@ public void returnSession(String appId) { throw new IllegalStateException("Not initialized"); } if (!taken.remove(appId)) { - return; // Session has already been removed from ZK. + return; // Session has been removed from ZK. } try { client.delete().guaranteed().forPath(claimsPath + "/" + appId); } catch (KeeperException.NoNodeException e) { // If the claim Node has already been deleted, we can ignore it. - LOG.debug("Claim Node has already been deleted for the session {}", appId, e); + LOG.warn("Claim Node has already been deleted for the session {}", appId, e); } catch (Exception e) { LOG.warn("Failed to delete claim node for session {}", appId, e); } @@ -230,6 +270,10 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve if (available.contains(applicationId) || taken.contains(applicationId)) { return; // We do not expect updates to existing sessions; ignore them for now. } + if (claimsCache.get(claimsPath + "/" + applicationId).isPresent()) { + LOG.info("Ignoring newly added AM {} because it is already claimed by another session.", applicationId); + return; + } available.add(applicationId); lock.notifyAll(); break; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 92844f4d5716..433f84397e70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -499,6 +499,20 @@ public static void killRunningJobs() { } } + public static void killRunningDAGsForApplication(String applicationId) { + synchronized (shutdownList) { + for (DAGClient c : shutdownList) { + try { + if (applicationId.equals(c.getSessionIdentifierString())) { + c.tryKillDAG(); + } + } catch (Exception e) { + LOG.error("Error while trying to kill running DAG on tez session {}", applicationId); + } + } + } + } + static long getCounterValueByGroupName(TezCounters vertexCounters, String groupNamePattern, String counterName) { TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName); From b3cd065ca7eaf814e88b24b1643775fab5c6c77b Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Wed, 10 Jun 2026 16:00:02 +0530 Subject: [PATCH 08/16] Change the log line added to debug --- .../ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index 2796c040d6bc..3cdec1e51769 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -228,7 +228,7 @@ public void returnSession(String appId) { client.delete().guaranteed().forPath(claimsPath + "/" + appId); } catch (KeeperException.NoNodeException e) { // If the claim Node has already been deleted, we can ignore it. - LOG.warn("Claim Node has already been deleted for the session {}", appId, e); + LOG.debug("Claim Node has already been deleted for the session {}", appId, e); } catch (Exception e) { LOG.warn("Failed to delete claim node for session {}", appId, e); } From 35f39903e94277b215347e729870f54aa8508dc2 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 11 Jun 2026 00:31:59 +0530 Subject: [PATCH 09/16] Fix formatting issues --- ...okeeperExternalSessionsRegistryClient.java | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index 3cdec1e51769..963489cae095 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -127,30 +127,30 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) { this.claimsCache = CuratorCache.build(client, claimsPath); CuratorCacheListener claimsListener = CuratorCacheListener.builder().forCreates( childData -> { - if (childData == null) { - return; - } - String applicationId = getApplicationId(childData); - synchronized (lock) { - available.remove(applicationId); - } - }).forDeletes( + if (childData == null) { + return; + } + String applicationId = getApplicationId(childData); + synchronized (lock) { + available.remove(applicationId); + } + }).forDeletes( childData -> { - if (childData == null) { - return; - } - String applicationId = getApplicationId(childData); - synchronized (lock) { - if (!taken.contains(applicationId)) { - if (cache.get(sessionsPath + "/" + applicationId).isPresent()) { - available.add(applicationId); - lock.notifyAll(); - } else { - LOG.info("Ignoring AM claim removal for {} because the base AM node no longer exists.", applicationId); + if (childData == null) { + return; } - } - } - }).build(); + String applicationId = getApplicationId(childData); + synchronized (lock) { + if (!taken.contains(applicationId)) { + if (cache.get(sessionsPath + "/" + applicationId).isPresent()) { + available.add(applicationId); + lock.notifyAll(); + } else { + LOG.info("Ignoring AM claim removal for {} because the base AM node no longer exists.", applicationId); + } + } + } + }).build(); claimsCache.listenable().addListener(claimsListener); claimsCache.start(); From 9e8e77df008b341be62e2a2c16e42cfb9cdd30f5 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 11 Jun 2026 15:06:08 +0530 Subject: [PATCH 10/16] Address SonarQube issues - 1 --- ...okeeperExternalSessionsRegistryClient.java | 56 +++++++++---------- 1 file changed, 27 insertions(+), 29 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index 963489cae095..62656b567c03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -34,7 +34,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.framework.state.ConnectionState; -import org.apache.curator.framework.state.ConnectionStateListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -49,6 +48,7 @@ // TODO: tez should provide this registry public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry { private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class); + private static final String PATH_SEPARATOR = "/"; private final HiveConf initConf; private final Set available = new HashSet<>(); @@ -59,7 +59,6 @@ public class ZookeeperExternalSessionsRegistryClient implements ExternalSessions private CuratorCache cache; private CuratorCache claimsCache; private InterProcessMutex globalQueue; - private String sessionsPath; private String claimsPath; private volatile boolean isInitialized; @@ -70,7 +69,7 @@ public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) { } private static String getApplicationId(final ChildData childData) { - return childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1); + return childData.getPath().substring(childData.getPath().lastIndexOf(PATH_SEPARATOR) + 1); } private void init() { @@ -83,8 +82,8 @@ private void init() { TimeUnit.MILLISECONDS); int maxRetries = HiveConf.getIntVar(initConf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); - this.sessionsPath = normalizeZkPath(zkNamespace); - this.claimsPath = this.sessionsPath + "-claims"; + String effectivePath = normalizeZkPath(zkNamespace); + this.claimsPath = effectivePath + "-claims"; // After connection state changes to SUSPENDED, the client has already consumed ~2/3 of the negotiated session // timeout. Use 33% of the remaining window so LOST aligns with when the ZK server expires the session and drops // ephemeral claim nodes. For Ref: Curator TN14 @@ -99,27 +98,25 @@ private void init() { synchronized (lock) { client.start(); - client.getConnectionStateListenable().addListener(new ConnectionStateListener() { - @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - if (newState == ConnectionState.LOST) { - Set sessionsToKill; - synchronized (lock) { - LOG.error("ZK connection state has changed to lost; killing running DAGs on claimed AMs: {}", taken); - sessionsToKill = new HashSet<>(taken); - taken.clear(); - } - for (String appId : sessionsToKill) { - TezJobMonitor.killRunningDAGsForApplication(appId); - } - } + client.getConnectionStateListenable().addListener((client, newState) -> { + if (newState != ConnectionState.LOST) { + return; + } + Set sessionsToKill; + synchronized (lock) { + LOG.error("ZK connection state has changed to lost; killing running DAGs on claimed AMs: {}", taken); + sessionsToKill = new HashSet<>(taken); + taken.clear(); + } + for (String appId : sessionsToKill) { + TezJobMonitor.killRunningDAGsForApplication(appId); } }); - this.globalQueue = new InterProcessMutex(client, sessionsPath + "-queue"); - this.cache = CuratorCache.build(client, sessionsPath); + this.globalQueue = new InterProcessMutex(client, effectivePath + "-queue"); + this.cache = CuratorCache.build(client, effectivePath); CuratorCacheListener listener = CuratorCacheListener.builder() - .forPathChildrenCache(sessionsPath, client, new ExternalSessionsPathListener()) + .forPathChildrenCache(effectivePath, client, new ExternalSessionsPathListener()) .build(); cache.listenable().addListener(listener); cache.start(); @@ -142,11 +139,12 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) { String applicationId = getApplicationId(childData); synchronized (lock) { if (!taken.contains(applicationId)) { - if (cache.get(sessionsPath + "/" + applicationId).isPresent()) { + if (cache.get(effectivePath + PATH_SEPARATOR + applicationId).isPresent()) { available.add(applicationId); lock.notifyAll(); } else { - LOG.info("Ignoring AM claim removal for {} because the base AM node no longer exists.", applicationId); + LOG.info("Ignoring AM claim removal for {} because the base AM node no longer exists.", + applicationId); } } } @@ -156,7 +154,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) { cache.stream() .filter(childData -> childData.getPath() != null - && childData.getPath().startsWith(sessionsPath + "/")) + && childData.getPath().startsWith(effectivePath + PATH_SEPARATOR)) .forEach(childData -> available.add(getApplicationId(childData))); LOG.info("Initial external sessions: {}", available); isInitialized = true; @@ -165,7 +163,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) { @VisibleForTesting static String normalizeZkPath(String zkNamespace) { - return (zkNamespace.startsWith("/") ? zkNamespace : "/" + zkNamespace); + return (zkNamespace.startsWith(PATH_SEPARATOR) ? zkNamespace : PATH_SEPARATOR + zkNamespace); } @Override @@ -192,7 +190,7 @@ public String getSession() throws Exception { while (iter.hasNext()) { String appId = iter.next(); try { - String claimNodePath = claimsPath + "/" + appId; + String claimNodePath = claimsPath + PATH_SEPARATOR + appId; client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(claimNodePath); iter.remove(); taken.add(appId); @@ -225,7 +223,7 @@ public void returnSession(String appId) { } try { - client.delete().guaranteed().forPath(claimsPath + "/" + appId); + client.delete().guaranteed().forPath(claimsPath + PATH_SEPARATOR + appId); } catch (KeeperException.NoNodeException e) { // If the claim Node has already been deleted, we can ignore it. LOG.debug("Claim Node has already been deleted for the session {}", appId, e); @@ -270,7 +268,7 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve if (available.contains(applicationId) || taken.contains(applicationId)) { return; // We do not expect updates to existing sessions; ignore them for now. } - if (claimsCache.get(claimsPath + "/" + applicationId).isPresent()) { + if (claimsCache.get(claimsPath + PATH_SEPARATOR + applicationId).isPresent()) { LOG.info("Ignoring newly added AM {} because it is already claimed by another session.", applicationId); return; } From ab4ad79016dd6af75a4ff43779f55d006e59b0e9 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 11 Jun 2026 20:05:48 +0530 Subject: [PATCH 11/16] Logic to kill orphan DAGs left behind by crashed HS2 --- .../ql/exec/tez/TezExternalSessionState.java | 35 +++++++++++++++++++ .../hadoop/hive/ql/exec/tez/TezSession.java | 3 ++ .../ql/exec/tez/TezSessionPoolSession.java | 7 ++++ .../hive/ql/exec/tez/TezSessionState.java | 7 ++++ .../hadoop/hive/ql/exec/tez/TezTask.java | 2 +- 5 files changed, 53 insertions(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java index e53b1bf7f082..60a0bf8e4688 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java @@ -29,8 +29,12 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; /** @@ -187,4 +191,35 @@ public boolean killQuery(String reason) throws HiveException { killQuery.killQuery(queryId, reason, conf, false); return true; } + + @Override + public DAGClient submitDAG(DAG dag) throws TezException, IOException { + try { + return getTezClient().submitDAG(dag); + } catch (TezException e) { + if (e.getMessage() == null || !e.getMessage().contains("App master already running a DAG")) { + throw e; + } + tryKillRunningDAGs(getTezClient()); + return getTezClient().submitDAG(dag); + } + } + + private void tryKillRunningDAGs(TezClient session) throws TezException { + LOG.info("External session has an AM which is already running a DAG on app ID {}", externalAppId); + DAGClientAMProtocolBlockingPB proxy = session.sendAMHeartbeat(null); + if (proxy == null) { + throw new TezException("Error while trying to connect to AM for app ID " + externalAppId); + } + try { + DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse = + proxy.getAllDAGs(null, DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build()); + for (String dagId : allDAGSResponse.getDagIdList()) { + LOG.info("External session: attempting to kill dagId {} on app ID {}", dagId, externalAppId); + proxy.tryKillDAG(null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build()); + } + } catch (Exception e) { + throw new TezException("Error while trying to kill existing DAG running on app ID " + externalAppId, e); + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java index 68844bd81728..2f64ec41a58f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; /** @@ -86,6 +88,7 @@ public String toString() { HiveConf getConf(); TezClient getTezClient(); + DAGClient submitDAG(DAG dag) throws TezException, IOException; boolean isOpen(); boolean isOpening(); boolean getDoAsEnabled(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 415072f221da..a473b32e8879 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -337,6 +339,11 @@ public TezClient getTezClient() { return baseSession.getTezClient(); } + @Override + public DAGClient submitDAG(DAG dag) throws TezException, IOException { + return baseSession.submitDAG(dag); + } + @Override public boolean isOpening() { return baseSession.isOpening(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 2924416ad480..b50c15cf9b63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -76,11 +76,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClient; import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; @@ -820,6 +822,11 @@ public TezClient getTezClient() { return session; } + @Override + public DAGClient submitDAG(DAG dag) throws TezException, IOException { + return getTezClient().submitDAG(dag); + } + @Override public LocalResource getAppJarLr() { return appJarLr; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 6bd801dd64c3..b1645a197cfd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -692,7 +692,7 @@ DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception { private DAGClient submitInternal(DAG dag, TezSession sessionState) throws TezException, IOException { runtimeContext.init(sessionState); - return sessionState.getTezClient().submitDAG(dag); + return sessionState.submitDAG(dag); } private void sessionDestroyOrReturnToPool(Ref sessionStateRef, From 8a0dd36dac9738151a4fe10f35cbdf100c26c848 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Thu, 11 Jun 2026 21:41:58 +0530 Subject: [PATCH 12/16] Address Sonarqube - 2 --- .../ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index 62656b567c03..399a4d6d9882 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -98,7 +98,7 @@ private void init() { synchronized (lock) { client.start(); - client.getConnectionStateListenable().addListener((client, newState) -> { + client.getConnectionStateListenable().addListener((curatorClient, newState) -> { if (newState != ConnectionState.LOST) { return; } From 1061ff4f4c18d5f047b8ff99c1e05a4d4b577d93 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Fri, 12 Jun 2026 10:16:52 +0530 Subject: [PATCH 13/16] Update Mocks in TestTezTask as per the new code logic & Remove leftover from TestZookeeperExternalSessionsRegistryClient --- .../apache/hadoop/hive/ql/exec/tez/TestTezTask.java | 12 ++++++------ .../TestZookeeperExternalSessionsRegistryClient.java | 1 - 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index fa71845ece27..34aa466084a1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -180,7 +180,7 @@ public Edge answer(InvocationOnMock invocation) throws Throwable { sessionState = mock(TezSessionState.class); when(sessionState.getTezClient()).thenReturn(session); when(sessionState.reopen()).thenReturn(sessionState); - when(session.submitDAG(any(DAG.class))) + when(sessionState.submitDAG(any(DAG.class))) .thenThrow(new SessionNotRunning("")) .thenReturn(mock(DAGClient.class)); } @@ -229,7 +229,7 @@ public void testSubmit() throws Exception { task.submit(dag, Ref.from(sessionState)); // validate close/reopen verify(sessionState, times(1)).reopen(); - verify(session, times(2)).submitDAG(any(DAG.class)); + verify(sessionState, times(2)).submitDAG(any(DAG.class)); } @Test @@ -241,14 +241,14 @@ public void testSubmitOnNonPoolSession() throws Exception { TezClient tezClient = mock(TezClient.class); when(tezSessionState.reopen()).thenThrow(new HiveException("Dag cannot be submitted")); when(tezSessionState.getTezClient()).thenReturn(tezClient); - when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); + when(tezSessionState.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); doNothing().when(tezSessionState).destroy(); boolean isException = false; try { task.submit(dag, Ref.from(tezSessionState)); } catch (Exception e) { isException = true; - verify(tezClient, times(1)).submitDAG(any(DAG.class)); + verify(tezSessionState, times(1)).submitDAG(any(DAG.class)); verify(tezSessionState, times(2)).reopen(); verify(tezSessionState, times(1)).destroy(); verify(tezSessionState, times(0)).returnToSessionManager(); @@ -266,13 +266,13 @@ public void testSubmitOnPoolSession() throws Exception { doNothing().when(tezSessionPoolSession).returnToSessionManager(); when(tezSessionPoolSession.getTezClient()).thenReturn(tezClient); when(tezSessionPoolSession.isDefault()).thenReturn(true); - when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); + when(tezSessionPoolSession.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); boolean isException = false; try { task.submit(dag, Ref.from(tezSessionPoolSession)); } catch (Exception e) { isException = true; - verify(tezClient, times(1)).submitDAG(any(DAG.class)); + verify(tezSessionPoolSession, times(1)).submitDAG(any(DAG.class)); verify(tezSessionPoolSession, times(2)).reopen(); verify(tezSessionPoolSession, times(0)).destroy(); verify(tezSessionPoolSession, times(1)).returnToSessionManager(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java index 95d279af49b9..3a02fe472152 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java @@ -229,7 +229,6 @@ public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception { for (ZookeeperExternalSessionsRegistryClient registry : new ZookeeperExternalSessionsRegistryClient[] {registry1, registry2, registry3}) { String session = registry.getSession(); - System.out.println("warmup session: " + session); registry.returnSession(session); } client.delete().forPath(effectivePath + "/warmup"); From 203e1365ab9f99d6aee9dcfc9aee93ec34ab994d Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Fri, 12 Jun 2026 12:24:25 +0530 Subject: [PATCH 14/16] Fix flakiness of newly added UT testFIFOSessionClaimsFromDifferentRegistries --- ...okeeperExternalSessionsRegistryClient.java | 59 ++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java index 3a02fe472152..789e70f869a1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -29,9 +30,11 @@ import org.apache.curator.test.TestingServer; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.zookeeper.KeeperException; import org.junit.Test; -import java.util.concurrent.CountDownLatch; +import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -221,34 +224,17 @@ public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception { ZookeeperExternalSessionsRegistryClient registry2 = new ZookeeperExternalSessionsRegistryClient(conf); ZookeeperExternalSessionsRegistryClient registry3 = new ZookeeperExternalSessionsRegistryClient(conf); try { - // Each registry does a lazy initialization on its first getSession() call, which is time-consuming and - // completion time of the same can vary amongst the registries. CountDownLatch only confirms the worker thread - // has started, not that session request has reached the globalQueue, so the warmup initializes all three - // registry clients upfront and lets the latches reliably enforce queue ordering avoiding flakiness. - client.create().creatingParentsIfNeeded().forPath(effectivePath + "/warmup"); - for (ZookeeperExternalSessionsRegistryClient registry : - new ZookeeperExternalSessionsRegistryClient[] {registry1, registry2, registry3}) { - String session = registry.getSession(); - registry.returnSession(session); - } - client.delete().forPath(effectivePath + "/warmup"); - - CountDownLatch r1Started = new CountDownLatch(1); - CountDownLatch r2Started = new CountDownLatch(1); - - Future future1 = executor.submit(() -> { - r1Started.countDown(); - return registry1.getSession(); - }); - r1Started.await(); - - Future future2 = executor.submit(() -> { - r2Started.countDown(); - return registry2.getSession(); - }); - r2Started.await(); + // Submit getSession() for one registry at a time and wait for each to reach globalQueue which is visible + // as a sequential lock znode, before starting the next, so FIFO order matches registry1→2→3. + String queuePath = effectivePath + "-queue"; + Future future1 = executor.submit(registry1::getSession); + awaitMutexQueueSize(client, queuePath, 1); + + Future future2 = executor.submit(registry2::getSession); + awaitMutexQueueSize(client, queuePath, 2); Future future3 = executor.submit(registry3::getSession); + awaitMutexQueueSize(client, queuePath, 3); client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_first"); assertEquals("Registry 1 should get the first AM", "app_first", future1.get(5, TimeUnit.SECONDS)); @@ -269,5 +255,24 @@ public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception { } } } + + private static void awaitMutexQueueSize(CuratorFramework client, String queuePath, int expectedSize) + throws Exception { + long startTimeNs = System.nanoTime(); + long timeoutNs = TimeUnit.SECONDS.toNanos(30); + while (System.nanoTime() - startTimeNs < timeoutNs) { + List childQueueNodes; + try { + childQueueNodes = client.getChildren().forPath(queuePath); + } catch (KeeperException.NoNodeException e) { + childQueueNodes = Collections.emptyList(); + } + if (childQueueNodes.size() >= expectedSize) { + return; + } + Thread.sleep(100); + } + fail("Timed out waiting for " + expectedSize + " mutex queue participants under " + queuePath); + } } From 1eddf6a39ad0a18e7b0dc073458894a2e11218e7 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Tue, 16 Jun 2026 00:23:20 +0530 Subject: [PATCH 15/16] Address kill dag Race condition in TezExternalSessionState --- .../ql/exec/tez/TezExternalSessionState.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java index 60a0bf8e4688..ae7e8860df71 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -35,6 +37,7 @@ import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC; +import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; /** @@ -211,15 +214,47 @@ private void tryKillRunningDAGs(TezClient session) throws TezException { if (proxy == null) { throw new TezException("Error while trying to connect to AM for app ID " + externalAppId); } + long killTimeoutMs = TimeUnit.SECONDS.toMillis( + HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS)); try { DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse = proxy.getAllDAGs(null, DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build()); for (String dagId : allDAGSResponse.getDagIdList()) { LOG.info("External session: attempting to kill dagId {} on app ID {}", dagId, externalAppId); proxy.tryKillDAG(null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build()); + waitForDagTerminal(proxy, dagId, killTimeoutMs); } } catch (Exception e) { throw new TezException("Error while trying to kill existing DAG running on app ID " + externalAppId, e); } } + + private void waitForDagTerminal(DAGClientAMProtocolBlockingPB proxy, String dagId, long timeoutMs) + throws Exception { + long startTimeMs = System.currentTimeMillis(); + long pollIntervalMs = conf.getTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + while (System.currentTimeMillis() - startTimeMs < timeoutMs) { + long remainingMs = timeoutMs - (System.currentTimeMillis() - startTimeMs); + DAGClientAMProtocolRPC.GetDAGStatusResponseProto response = proxy.getDAGStatus(null, + DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder() + .setDagId(dagId) + .setTimeout(Math.min(pollIntervalMs, remainingMs)) + .build()); + if (response.hasDagStatus() && response.getDagStatus().hasState() + && isTerminalDagState(response.getDagStatus().getState())) { + LOG.info("External session: dagId {} on app ID {} reached terminal state {}", dagId, externalAppId, + response.getDagStatus().getState()); + return; + } + } + throw new TezException("Timed out after " + timeoutMs + " ms waiting for orphan DAG " + dagId + + " on app ID " + externalAppId + " to reach terminal state after kill"); + } + + private static boolean isTerminalDagState(DAGProtos.DAGStatusStateProto state) { + return switch (state) { + case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true; + default -> false; + }; + } } From 50434501709ee460e3496132bf1bc91c4f1c9bb5 Mon Sep 17 00:00:00 2001 From: tanishq-chugh Date: Wed, 17 Jun 2026 09:31:19 +0530 Subject: [PATCH 16/16] Address SonarQube - 2 --- .../hadoop/hive/ql/exec/tez/TezExternalSessionState.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java index ae7e8860df71..f55659638a6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -230,7 +231,7 @@ private void tryKillRunningDAGs(TezClient session) throws TezException { } private void waitForDagTerminal(DAGClientAMProtocolBlockingPB proxy, String dagId, long timeoutMs) - throws Exception { + throws TezException, ServiceException { long startTimeMs = System.currentTimeMillis(); long pollIntervalMs = conf.getTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, TimeUnit.MILLISECONDS); while (System.currentTimeMillis() - startTimeMs < timeoutMs) { @@ -253,8 +254,8 @@ && isTerminalDagState(response.getDagStatus().getState())) { private static boolean isTerminalDagState(DAGProtos.DAGStatusStateProto state) { return switch (state) { - case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true; - default -> false; + case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true; + default -> false; }; } }