Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,26 @@
package org.apache.hadoop.hive.ql.exec.tez;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

import com.google.protobuf.ServiceException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.tez.monitoring.TezJobMonitor;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;

/**
Expand Down Expand Up @@ -133,6 +142,11 @@ public void close(boolean keepDagFilesDir) throws Exception {
// We never close external sessions that don't have errors.
try {
if (externalAppId != null) {
LOG.debug("Returning external session with appID: {}", externalAppId);
SessionState sessionState = SessionState.get();
if (sessionState != null) {
sessionState.setTezSession(null);
}
registry.returnSession(externalAppId);
}
} catch (Exception e) {
Expand Down Expand Up @@ -181,4 +195,67 @@ public boolean killQuery(String reason) throws HiveException {
killQuery.killQuery(queryId, reason, conf, false);
return true;
}

@Override
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
try {
return getTezClient().submitDAG(dag);
} catch (TezException e) {
if (e.getMessage() == null || !e.getMessage().contains("App master already running a DAG")) {
throw e;
}
tryKillRunningDAGs(getTezClient());
Comment on lines +203 to +207
return getTezClient().submitDAG(dag);
}
}

private void tryKillRunningDAGs(TezClient session) throws TezException {
LOG.info("External session has an AM which is already running a DAG on app ID {}", externalAppId);
DAGClientAMProtocolBlockingPB proxy = session.sendAMHeartbeat(null);
if (proxy == null) {
throw new TezException("Error while trying to connect to AM for app ID " + externalAppId);
}
long killTimeoutMs = TimeUnit.SECONDS.toMillis(
HiveConf.getIntVar(conf, ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS));
try {
DAGClientAMProtocolRPC.GetAllDAGsResponseProto allDAGSResponse =
proxy.getAllDAGs(null, DAGClientAMProtocolRPC.GetAllDAGsRequestProto.newBuilder().build());
for (String dagId : allDAGSResponse.getDagIdList()) {
LOG.info("External session: attempting to kill dagId {} on app ID {}", dagId, externalAppId);
proxy.tryKillDAG(null, DAGClientAMProtocolRPC.TryKillDAGRequestProto.newBuilder().setDagId(dagId).build());
waitForDagTerminal(proxy, dagId, killTimeoutMs);
}
} catch (Exception e) {
throw new TezException("Error while trying to kill existing DAG running on app ID " + externalAppId, e);
}
}

private void waitForDagTerminal(DAGClientAMProtocolBlockingPB proxy, String dagId, long timeoutMs)
throws TezException, ServiceException {
long startTimeMs = System.currentTimeMillis();
long pollIntervalMs = conf.getTimeVar(ConfVars.TEZ_DAG_STATUS_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
while (System.currentTimeMillis() - startTimeMs < timeoutMs) {
long remainingMs = timeoutMs - (System.currentTimeMillis() - startTimeMs);
DAGClientAMProtocolRPC.GetDAGStatusResponseProto response = proxy.getDAGStatus(null,
DAGClientAMProtocolRPC.GetDAGStatusRequestProto.newBuilder()
.setDagId(dagId)
.setTimeout(Math.min(pollIntervalMs, remainingMs))
.build());
if (response.hasDagStatus() && response.getDagStatus().hasState()
&& isTerminalDagState(response.getDagStatus().getState())) {
LOG.info("External session: dagId {} on app ID {} reached terminal state {}", dagId, externalAppId,
response.getDagStatus().getState());
return;
}
}
throw new TezException("Timed out after " + timeoutMs + " ms waiting for orphan DAG " + dagId
+ " on app ID " + externalAppId + " to reach terminal state after kill");
}

private static boolean isTerminalDagState(DAGProtos.DAGStatusStateProto state) {
return switch (state) {
case DAG_SUCCEEDED, DAG_KILLED, DAG_FAILED, DAG_ERROR -> true;
default -> false;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.apache.hadoop.hive.ql.wm.WmContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;

/**
Expand Down Expand Up @@ -86,6 +88,7 @@ public String toString() {

HiveConf getConf();
TezClient getTezClient();
DAGClient submitDAG(DAG dag) throws TezException, IOException;
boolean isOpen();
boolean isOpening();
boolean getDoAsEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,20 @@
+ " belongs to the pool. Put it back in");
defaultSessionPool.returnSession((TezSessionPoolSession)tezSessionState);
}

if (useExternalSessions) {
if (tezSessionState.getTezClient() != null
&& tezSessionState.getTezClient().getAppMasterApplicationId() != null) {

Check failure on line 359 in ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this usage of "getAppMasterApplicationId", it is annotated with @VisibleForTesting and should not be accessed from production code.

See more on https://sonarcloud.io/project/issues?id=apache_hive&issues=AZ6mIPkPX6RwbyiJ1MtB&open=AZ6mIPkPX6RwbyiJ1MtB&pullRequest=6528
try {
tezSessionState.close(false);
Comment on lines +357 to +361
} catch (Exception ex) {
LOG.warn("Failed to return external Tez session {}", tezSessionState.getSessionId(), ex);
}
} else {
LOG.warn("Not returning session '{}' as tez client or app id is null", tezSessionState.getSessionId());
}
}

// non default session nothing changes. The user can continue to use the existing
// session in the SessionState
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import org.apache.hadoop.hive.registry.impl.TezAmInstance;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -337,6 +339,11 @@ public TezClient getTezClient() {
return baseSession.getTezClient();
}

@Override
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
return baseSession.submitDAG(dag);
}

@Override
public boolean isOpening() {
return baseSession.isOpening();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.Progress;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
Expand Down Expand Up @@ -820,6 +822,11 @@ public TezClient getTezClient() {
return session;
}

@Override
public DAGClient submitDAG(DAG dag) throws TezException, IOException {
return getTezClient().submitDAG(dag);
}

@Override
public LocalResource getAppJarLr() {
return appJarLr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ DAGClient submit(DAG dag, Ref<TezSession> sessionStateRef) throws Exception {

private DAGClient submitInternal(DAG dag, TezSession sessionState) throws TezException, IOException {
runtimeContext.init(sessionState);
return sessionState.getTezClient().submitDAG(dag);
return sessionState.submitDAG(dag);
}

private void sessionDestroyOrReturnToPool(Ref<TezSession> sessionStateRef,
Expand Down
Loading
Loading