diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java index b3103d3f5918..f55659638a6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java @@ -19,17 +19,26 @@ package org.apache.hadoop.hive.ql.exec.tez; import java.io.IOException; +import java.util.concurrent.TimeUnit; +import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB; +import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC; +import org.apache.tez.dag.api.records.DAGProtos; import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor; /** @@ -133,6 +142,11 @@ public void close(boolean keepDagFilesDir) throws Exception { // We never close external sessions that don't have errors. try { if (externalAppId != null) { + LOG.debug("Returning external session with appID: {}", externalAppId); + SessionState sessionState = SessionState.get(); + if (sessionState != null) { + sessionState.setTezSession(null); + } registry.returnSession(externalAppId); } } catch (Exception e) { @@ -181,4 +195,67 @@ public boolean killQuery(String reason) throws HiveException { killQuery.killQuery(queryId, reason, conf, false); return true; } + + @Override + public DAGClient submitDAG(DAG dag) throws TezException, IOException { + try { + return getTezClient().submitDAG(dag); + } catch (TezException e) { + if (e.getMessage() == null || !e.getMessage().contains("App master already running a DAG")) { + throw e; + } + tryKillRunningDAGs(getTezClient()); + return getTezClient().submitDAG(dag); + } + } + + private void tryKillRunningDAGs(TezClient session) throws TezException { + LOG.info("External session has an AM which is already running a DAG on app ID {}", externalAppId); + DAGClientAMProtocolBlockingPB proxy = session.sendAMHeartbeat(null); + if (proxy == null) { + throw new TezException("Error while trying to connect to AM for app ID " + externalAppId); + } + long killTimeoutMs = TimeUnit.SECONDS.toMillis( + HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS)); + try { + DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse = + proxy.getAllDAGs(null, DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build()); + for (String dagId : allDAGSResponse.getDagIdList()) { + LOG.info("External session: attempting to kill dagId {} on app ID {}", dagId, externalAppId); + proxy.tryKillDAG(null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build()); + waitForDagTerminal(proxy, dagId, killTimeoutMs); + } + } catch (Exception e) { + throw new TezException("Error while trying to kill existing DAG running on app ID " + externalAppId, e); + } + } + + private void waitForDagTerminal(DAGClientAMProtocolBlockingPB proxy, String dagId, long timeoutMs) + throws TezException, ServiceException { + long startTimeMs = System.currentTimeMillis(); + long pollIntervalMs = conf.getTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, TimeUnit.MILLISECONDS); + while (System.currentTimeMillis() - startTimeMs < timeoutMs) { + long remainingMs = timeoutMs - (System.currentTimeMillis() - startTimeMs); + DAGClientAMProtocolRPC.GetDAGStatusResponseProto response = proxy.getDAGStatus(null, + DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder() + .setDagId(dagId) + .setTimeout(Math.min(pollIntervalMs, remainingMs)) + .build()); + if (response.hasDagStatus() && response.getDagStatus().hasState() + && isTerminalDagState(response.getDagStatus().getState())) { + LOG.info("External session: dagId {} on app ID {} reached terminal state {}", dagId, externalAppId, + response.getDagStatus().getState()); + return; + } + } + throw new TezException("Timed out after " + timeoutMs + " ms waiting for orphan DAG " + dagId + + " on app ID " + externalAppId + " to reach terminal state after kill"); + } + + private static boolean isTerminalDagState(DAGProtos.DAGStatusStateProto state) { + return switch (state) { + case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true; + default -> false; + }; + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java index 68844bd81728..2f64ec41a58f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hive.ql.wm.WmContext; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; /** @@ -86,6 +88,7 @@ public String toString() { HiveConf getConf(); TezClient getTezClient(); + DAGClient submitDAG(DAG dag) throws TezException, IOException; boolean isOpen(); boolean isOpening(); boolean getDoAsEnabled(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 605a92ebc8f5..b3b1c835e5b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -353,6 +353,20 @@ void returnSession(TezSession tezSessionState) { + " belongs to the pool. Put it back in"); defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState); } + + if (useExternalSessions) { + if (tezSessionState.getTezClient() != null + && tezSessionState.getTezClient().getAppMasterApplicationId() != null) { + try { + tezSessionState.close(false); + } catch (Exception ex) { + LOG.warn("Failed to return external Tez session {}", tezSessionState.getSessionId(), ex); + } + } else { + LOG.warn("Not returning session '{}' as tez client or app id is null", tezSessionState.getSessionId()); + } + } + // non default session nothing changes. The user can continue to use the existing // session in the SessionState } finally { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 415072f221da..a473b32e8879 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -34,7 +34,9 @@ import org.apache.hadoop.hive.registry.impl.TezAmInstance; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -337,6 +339,11 @@ public TezClient getTezClient() { return baseSession.getTezClient(); } + @Override + public DAGClient submitDAG(DAG dag) throws TezException, IOException { + return baseSession.submitDAG(dag); + } + @Override public boolean isOpening() { return baseSession.isOpening(); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index 2924416ad480..b50c15cf9b63 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -76,11 +76,13 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.tez.client.TezClient; import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.PreWarmVertex; import org.apache.tez.dag.api.SessionNotRunning; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; import org.apache.tez.mapreduce.hadoop.DeprecatedKeys; @@ -820,6 +822,11 @@ public TezClient getTezClient() { return session; } + @Override + public DAGClient submitDAG(DAG dag) throws TezException, IOException { + return getTezClient().submitDAG(dag); + } + @Override public LocalResource getAppJarLr() { return appJarLr; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 6bd801dd64c3..b1645a197cfd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -692,7 +692,7 @@ DAGClient submit(DAG dag, Ref sessionStateRef) throws Exception { private DAGClient submitInternal(DAG dag, TezSession sessionState) throws TezException, IOException { runtimeContext.init(sessionState); - return sessionState.getTezClient().submitDAG(dag); + return sessionState.submitDAG(dag); } private void sessionDestroyOrReturnToPool(Ref sessionStateRef, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java index 550c77e573ab..399a4d6d9882 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.TimeUnit; import com.google.common.annotations.VisibleForTesting; import org.apache.curator.framework.CuratorFramework; @@ -31,9 +32,14 @@ import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.locks.InterProcessMutex; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,14 +48,18 @@ // TODO: tez should provide this registry public class ZookeeperExternalSessionsRegistryClient implements ExternalSessionsRegistry { private static final Logger LOG = LoggerFactory.getLogger(ZookeeperExternalSessionsRegistryClient.class); + private static final String PATH_SEPARATOR = "/"; private final HiveConf initConf; private final Set available = new HashSet<>(); private final Set taken = new HashSet<>(); private final Object lock = new Object(); private final int maxAttempts; - + private CuratorFramework client; private CuratorCache cache; + private CuratorCache claimsCache; + private InterProcessMutex globalQueue; + private String claimsPath; private volatile boolean isInitialized; @@ -59,25 +69,92 @@ public ZookeeperExternalSessionsRegistryClient(final HiveConf initConf) { } private static String getApplicationId(final ChildData childData) { - return childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1); + return childData.getPath().substring(childData.getPath().lastIndexOf(PATH_SEPARATOR) + 1); } private void init() { String zkServer = HiveConf.getVar(initConf, ConfVars.HIVE_ZOOKEEPER_QUORUM); + int sessionTimeoutMs = (int) HiveConf.getTimeVar(initConf, ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, + TimeUnit.MILLISECONDS); + int connectionTimeoutMs = (int) HiveConf.getTimeVar(initConf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_TIMEOUT, + TimeUnit.MILLISECONDS); + int baseSleepTimeMs = (int) HiveConf.getTimeVar(initConf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, + TimeUnit.MILLISECONDS); + int maxRetries = HiveConf.getIntVar(initConf, ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); String zkNamespace = HiveConf.getVar(initConf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); String effectivePath = normalizeZkPath(zkNamespace); - CuratorFramework client = CuratorFrameworkFactory.newClient(zkServer, new ExponentialBackoffRetry(1000, 3)); + this.claimsPath = effectivePath + "-claims"; + // After connection state changes to SUSPENDED, the client has already consumed ~2/3 of the negotiated session + // timeout. Use 33% of the remaining window so LOST aligns with when the ZK server expires the session and drops + // ephemeral claim nodes. For Ref: Curator TN14 + this.client = CuratorFrameworkFactory.builder() + .connectString(zkServer) + .sessionTimeoutMs(sessionTimeoutMs) + .connectionTimeoutMs(connectionTimeoutMs) + .simulatedSessionExpirationPercent(33) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)) + .build(); + synchronized (lock) { client.start(); + + client.getConnectionStateListenable().addListener((curatorClient, newState) -> { + if (newState != ConnectionState.LOST) { + return; + } + Set sessionsToKill; + synchronized (lock) { + LOG.error("ZK connection state has changed to lost; killing running DAGs on claimed AMs: {}", taken); + sessionsToKill = new HashSet<>(taken); + taken.clear(); + } + for (String appId : sessionsToKill) { + TezJobMonitor.killRunningDAGsForApplication(appId); + } + }); + + this.globalQueue = new InterProcessMutex(client, effectivePath + "-queue"); this.cache = CuratorCache.build(client, effectivePath); CuratorCacheListener listener = CuratorCacheListener.builder() .forPathChildrenCache(effectivePath, client, new ExternalSessionsPathListener()) .build(); cache.listenable().addListener(listener); cache.start(); + + this.claimsCache = CuratorCache.build(client, claimsPath); + CuratorCacheListener claimsListener = CuratorCacheListener.builder().forCreates( + childData -> { + if (childData == null) { + return; + } + String applicationId = getApplicationId(childData); + synchronized (lock) { + available.remove(applicationId); + } + }).forDeletes( + childData -> { + if (childData == null) { + return; + } + String applicationId = getApplicationId(childData); + synchronized (lock) { + if (!taken.contains(applicationId)) { + if (cache.get(effectivePath + PATH_SEPARATOR + applicationId).isPresent()) { + available.add(applicationId); + lock.notifyAll(); + } else { + LOG.info("Ignoring AM claim removal for {} because the base AM node no longer exists.", + applicationId); + } + } + } + }).build(); + claimsCache.listenable().addListener(claimsListener); + claimsCache.start(); + cache.stream() .filter(childData -> childData.getPath() != null - && childData.getPath().startsWith(effectivePath + "/")) + && childData.getPath().startsWith(effectivePath + PATH_SEPARATOR)) .forEach(childData -> available.add(getApplicationId(childData))); LOG.info("Initial external sessions: {}", available); isInitialized = true; @@ -86,27 +163,52 @@ private void init() { @VisibleForTesting static String normalizeZkPath(String zkNamespace) { - return (zkNamespace.startsWith("/") ? zkNamespace : "/" + zkNamespace); + return (zkNamespace.startsWith(PATH_SEPARATOR) ? zkNamespace : PATH_SEPARATOR + zkNamespace); } @Override public String getSession() throws Exception { - synchronized (lock) { - if (!isInitialized) { - init(); - } - long endTimeNs = System.nanoTime() + (1000000000L * maxAttempts); - while (available.isEmpty() && ((endTimeNs - System.nanoTime()) > 0)) { - lock.wait(1000L); + if (!isInitialized) { + synchronized (lock) { + if (!isInitialized) { + init(); + } } - Iterator iter = available.iterator(); - if (!iter.hasNext()) { - throw new IOException("Cannot get a session after " + maxAttempts + " attempts"); + } + + long startTimeNs = System.nanoTime(); + long timeoutNs = TimeUnit.SECONDS.toNanos(maxAttempts); + long queueWaitTimeMs = Math.max(0, (timeoutNs - (System.nanoTime() - startTimeNs)) / 1000000L); + if (!globalQueue.acquire(queueWaitTimeMs, TimeUnit.MILLISECONDS)) { + throw new IOException("Cannot get a session (timed out in queue) after " + maxAttempts + " seconds"); + } + try { + synchronized (lock) { + while (System.nanoTime() - startTimeNs < timeoutNs) { + Iterator iter = available.iterator(); + + while (iter.hasNext()) { + String appId = iter.next(); + try { + String claimNodePath = claimsPath + PATH_SEPARATOR + appId; + client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(claimNodePath); + iter.remove(); + taken.add(appId); + return appId; + } catch (KeeperException.NodeExistsException e) { + iter.remove(); + } + } + long remainingTimeNs = timeoutNs - (System.nanoTime() - startTimeNs); + if (remainingTimeNs > 0) { + long waitTimeMs = Math.min(1000L, remainingTimeNs / 1_000_000L); + lock.wait(waitTimeMs); + } + } + throw new IOException("Cannot get a session after waiting for " + maxAttempts + " seconds (timeout exhausted)"); } - String appId = iter.next(); - iter.remove(); - taken.add(appId); - return appId; + } finally { + globalQueue.release(); } } @@ -119,6 +221,16 @@ public void returnSession(String appId) { if (!taken.remove(appId)) { return; // Session has been removed from ZK. } + + try { + client.delete().guaranteed().forPath(claimsPath + PATH_SEPARATOR + appId); + } catch (KeeperException.NoNodeException e) { + // If the claim Node has already been deleted, we can ignore it. + LOG.debug("Claim Node has already been deleted for the session {}", appId, e); + } catch (Exception e) { + LOG.warn("Failed to delete claim node for session {}", appId, e); + } + available.add(appId); lock.notifyAll(); } @@ -126,9 +238,15 @@ public void returnSession(String appId) { @Override public void close() { + if (claimsCache != null) { + claimsCache.close(); + } if (cache != null) { cache.close(); } + if (client != null) { + client.close(); + } } private final class ExternalSessionsPathListener implements PathChildrenCacheListener { @@ -146,21 +264,26 @@ public void childEvent(final CuratorFramework client, final PathChildrenCacheEve synchronized (lock) { switch (event.getType()) { - case CHILD_UPDATED, CHILD_ADDED: - if (available.contains(applicationId) || taken.contains(applicationId)) { - return; // We do not expect updates to existing sessions; ignore them for now. - } - available.add(applicationId); - break; - case CHILD_REMOVED: - if (taken.remove(applicationId)) { - LOG.warn("The session in use has disappeared from the registry ({})", applicationId); - } else if (!available.remove(applicationId)) { - LOG.warn("An unknown session has been removed ({})", applicationId); - } - break; - default: - // Ignore all the other events; logged above. + case CHILD_UPDATED, CHILD_ADDED: + if (available.contains(applicationId) || taken.contains(applicationId)) { + return; // We do not expect updates to existing sessions; ignore them for now. + } + if (claimsCache.get(claimsPath + PATH_SEPARATOR + applicationId).isPresent()) { + LOG.info("Ignoring newly added AM {} because it is already claimed by another session.", applicationId); + return; + } + available.add(applicationId); + lock.notifyAll(); + break; + case CHILD_REMOVED: + if (taken.remove(applicationId)) { + LOG.warn("The session in use has disappeared from the registry ({})", applicationId); + } else if (!available.remove(applicationId)) { + LOG.warn("An unknown session has been removed ({})", applicationId); + } + break; + default: + // Ignore all the other events; logged above. } } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java index 92844f4d5716..433f84397e70 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java @@ -499,6 +499,20 @@ public static void killRunningJobs() { } } + public static void killRunningDAGsForApplication(String applicationId) { + synchronized (shutdownList) { + for (DAGClient c : shutdownList) { + try { + if (applicationId.equals(c.getSessionIdentifierString())) { + c.tryKillDAG(); + } + } catch (Exception e) { + LOG.error("Error while trying to kill running DAG on tez session {}", applicationId); + } + } + } + } + static long getCounterValueByGroupName(TezCounters vertexCounters, String groupNamePattern, String counterName) { TezCounter tezCounter = vertexCounters.getGroup(groupNamePattern).findCounter(counterName); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index fa71845ece27..34aa466084a1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -180,7 +180,7 @@ public Edge answer(InvocationOnMock invocation) throws Throwable { sessionState = mock(TezSessionState.class); when(sessionState.getTezClient()).thenReturn(session); when(sessionState.reopen()).thenReturn(sessionState); - when(session.submitDAG(any(DAG.class))) + when(sessionState.submitDAG(any(DAG.class))) .thenThrow(new SessionNotRunning("")) .thenReturn(mock(DAGClient.class)); } @@ -229,7 +229,7 @@ public void testSubmit() throws Exception { task.submit(dag, Ref.from(sessionState)); // validate close/reopen verify(sessionState, times(1)).reopen(); - verify(session, times(2)).submitDAG(any(DAG.class)); + verify(sessionState, times(2)).submitDAG(any(DAG.class)); } @Test @@ -241,14 +241,14 @@ public void testSubmitOnNonPoolSession() throws Exception { TezClient tezClient = mock(TezClient.class); when(tezSessionState.reopen()).thenThrow(new HiveException("Dag cannot be submitted")); when(tezSessionState.getTezClient()).thenReturn(tezClient); - when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); + when(tezSessionState.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); doNothing().when(tezSessionState).destroy(); boolean isException = false; try { task.submit(dag, Ref.from(tezSessionState)); } catch (Exception e) { isException = true; - verify(tezClient, times(1)).submitDAG(any(DAG.class)); + verify(tezSessionState, times(1)).submitDAG(any(DAG.class)); verify(tezSessionState, times(2)).reopen(); verify(tezSessionState, times(1)).destroy(); verify(tezSessionState, times(0)).returnToSessionManager(); @@ -266,13 +266,13 @@ public void testSubmitOnPoolSession() throws Exception { doNothing().when(tezSessionPoolSession).returnToSessionManager(); when(tezSessionPoolSession.getTezClient()).thenReturn(tezClient); when(tezSessionPoolSession.isDefault()).thenReturn(true); - when(tezClient.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); + when(tezSessionPoolSession.submitDAG(any(DAG.class))).thenThrow(new SessionNotRunning("")); boolean isException = false; try { task.submit(dag, Ref.from(tezSessionPoolSession)); } catch (Exception e) { isException = true; - verify(tezClient, times(1)).submitDAG(any(DAG.class)); + verify(tezSessionPoolSession, times(1)).submitDAG(any(DAG.class)); verify(tezSessionPoolSession, times(2)).reopen(); verify(tezSessionPoolSession, times(0)).destroy(); verify(tezSessionPoolSession, times(1)).returnToSessionManager(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java index 8274e87187b0..789e70f869a1 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hive.ql.exec.tez; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -28,8 +30,16 @@ import org.apache.curator.test.TestingServer; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.zookeeper.KeeperException; import org.junit.Test; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + /** * Tests for {@link ZookeeperExternalSessionsRegistryClient}. */ @@ -128,5 +138,141 @@ public void testReuseSameSession() throws Exception { } } } + + /** + * Tests that multiple registry clients (simulating multiple HS2 instances) + * respect the global distributed lock (claims) and do not claim the same session simultaneously. + */ + @Test + public void testSessionClaimsFromDifferentRegistryClients() throws Exception { + CuratorFramework client = null; + ZookeeperExternalSessionsRegistryClient registry1 = null; + ZookeeperExternalSessionsRegistryClient registry2 = null; + + try (TestingServer server = new TestingServer()) { + String connectString = server.getConnectString(); + + HiveConf conf = new HiveConf(); + conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_concurrent"); + + String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + client = builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + + client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_1"); + client.create().forPath(effectivePath + "/app_2"); + + registry1 = new ZookeeperExternalSessionsRegistryClient(conf); + registry2 = new ZookeeperExternalSessionsRegistryClient(conf); + + String sessionFromRegistry1 = registry1.getSession(); + String sessionFromRegistry2 = registry2.getSession(); + + assertNotNull("Registry 1 should have claimed a session", sessionFromRegistry1); + assertNotNull("Registry 2 should have claimed a session", sessionFromRegistry2); + + assertNotEquals("The two registries should claim different sessions!", + sessionFromRegistry1, sessionFromRegistry2); + + registry1.returnSession(sessionFromRegistry1); + + String session3FromRegistry2 = registry2.getSession(); + assertEquals("Registry 2 should be able to claim the newly released session", + sessionFromRegistry1, session3FromRegistry2); + + registry2.returnSession(sessionFromRegistry2); + registry2.returnSession(session3FromRegistry2); + } finally { + if (registry1 != null) { + registry1.close(); + } + if (registry2 != null) { + registry2.close(); + } + if (client != null) { + client.close(); + } + } + } + + /** + * Tests that the InterProcessMutex enforces strict Global FIFO ordering. + * Clients form a queue when no sessions are available, and are served in exact order. + */ + @Test + public void testFIFOSessionClaimsFromDifferentRegistries() throws Exception { + try (TestingServer server = new TestingServer()) { + String connectString = server.getConnectString(); + + HiveConf conf = new HiveConf(); + conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString); + conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_fifo"); + + String namespace = HiveConf.getVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE); + String effectivePath = ZookeeperExternalSessionsRegistryClient.normalizeZkPath(namespace); + + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); + CuratorFramework client = builder.connectString(connectString).retryPolicy(new RetryOneTime(1)).build(); + client.start(); + + ExecutorService executor = Executors.newFixedThreadPool(3); + ZookeeperExternalSessionsRegistryClient registry1 = new ZookeeperExternalSessionsRegistryClient(conf); + ZookeeperExternalSessionsRegistryClient registry2 = new ZookeeperExternalSessionsRegistryClient(conf); + ZookeeperExternalSessionsRegistryClient registry3 = new ZookeeperExternalSessionsRegistryClient(conf); + try { + // Submit getSession() for one registry at a time and wait for each to reach globalQueue which is visible + // as a sequential lock znode, before starting the next, so FIFO order matches registry1→2→3. + String queuePath = effectivePath + "-queue"; + Future future1 = executor.submit(registry1::getSession); + awaitMutexQueueSize(client, queuePath, 1); + + Future future2 = executor.submit(registry2::getSession); + awaitMutexQueueSize(client, queuePath, 2); + + Future future3 = executor.submit(registry3::getSession); + awaitMutexQueueSize(client, queuePath, 3); + + client.create().creatingParentsIfNeeded().forPath(effectivePath + "/app_first"); + assertEquals("Registry 1 should get the first AM", "app_first", future1.get(5, TimeUnit.SECONDS)); + + client.create().forPath(effectivePath + "/app_second"); + String session2 = future2.get(5, TimeUnit.SECONDS); + + assertEquals("Registry 2 should get the second AM", "app_second", session2); + registry2.returnSession(session2); + + assertEquals("Registry 3 should get the second AM", "app_second", future3.get(5, TimeUnit.SECONDS)); + } finally { + registry1.close(); + registry2.close(); + registry3.close(); + client.close(); + executor.shutdownNow(); + } + } + } + + private static void awaitMutexQueueSize(CuratorFramework client, String queuePath, int expectedSize) + throws Exception { + long startTimeNs = System.nanoTime(); + long timeoutNs = TimeUnit.SECONDS.toNanos(30); + while (System.nanoTime() - startTimeNs < timeoutNs) { + List childQueueNodes; + try { + childQueueNodes = client.getChildren().forPath(queuePath); + } catch (KeeperException.NoNodeException e) { + childQueueNodes = Collections.emptyList(); + } + if (childQueueNodes.size() >= expectedSize) { + return; + } + Thread.sleep(100); + } + fail("Timed out waiting for " + expectedSize + " mutex queue participants under " + queuePath); + } }