Skip to content

HIVE-29651: Update ZookeeperExternalSessionsRegistryClient to handle …#6528

Open
tanishq-chugh wants to merge 16 commits into
apache:masterfrom
tanishq-chugh:zk-conc-issue
Open

HIVE-29651: Update ZookeeperExternalSessionsRegistryClient to handle …#6528
tanishq-chugh wants to merge 16 commits into
apache:masterfrom
tanishq-chugh:zk-conc-issue

Conversation

@tanishq-chugh

Copy link
Copy Markdown
Contributor

…multiple HiveServer2 instances submitting DAGs concurrently to available Tez External Sessions

What changes were proposed in this pull request?

This PR introduces a distributed locking mechanism to synchronize Tez session assignments across multiple HiveServer2 instances.

Why are the changes needed?

To prevent Execution errors, when multiple HS2 instances tend to submit DAGs concurrently to same tez AMs

Does this PR introduce any user-facing change?

No

How was this patch tested?

Manual Testing + added UTs

@tanishq-chugh

Copy link
Copy Markdown
Contributor Author

Hi @ayushtkn , Could you please help with a review on this patch ?
Thanks!

@ayushtkn ayushtkn left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanx @tanishq-chugh have dropped some comments

try {
client.delete().guaranteed().forPath(claimsPath + "/" + appId);
} catch (KeeperException.NoNodeException e) {
// If the claim Node has already been deleted, we can ignore it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a debug log here

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this in commit - 7f6b08f


try {
synchronized (lock) {
while (System.nanoTime() < endTimeNs) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is wrong, known anti-patern, this will screw up if endTimeNs goes -ve due to overflow

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, changed the code to fix this in commit : fdead45

// We never close external sessions that don't have errors.
try {
if (externalAppId != null) {
LOG.info("Returning external session with appID: {}", externalAppId);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too Much Information :-), Please change it to debug

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the change in commit: 422c01f

HiveConf conf = new HiveConf();
conf.setVar(ConfVars.HIVE_ZOOKEEPER_QUORUM, connectString);
conf.setVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_NAMESPACE, "/tez_ns_fifo");
conf.setIntVar(ConfVars.HIVE_SERVER2_TEZ_EXTERNAL_SESSIONS_WAIT_MAX_ATTEMPTS, 15);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do u reach to this magic number 15 here and 5 above? it is by default 60, why we need to change it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leftover, removed in commit: 6bab8dc

ZookeeperExternalSessionsRegistryClient registry3 = new ZookeeperExternalSessionsRegistryClient(conf);
try {
Future<String> future1 = executor.submit(registry1::getSession);
Thread.sleep(500);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will lead to flaky behaviour and neither guarantees FIFO. Ideally should use latches

something like

CountDownLatch r1Started = new CountDownLatch(1);
CountDownLatch r2Started = new CountDownLatch(1);

Future<String> future1 = executor.submit(() -> {
  r1Started.countDown();
  return registry1.getSession();
});

r1Started.await();

Future<String> future2 = executor.submit(() -> {
  r2Started.countDown();
  return registry2.getSession();
});

r2Started.await();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, changed the test case in commit: 6bab8dc

case CHILD_ADDED:
// A Tez AM was claimed by another HS2, so remove the AM from the available list of this particular HS2
available.remove(applicationId);
break;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currious about the connection events, are we sure, if we loose connection and then CONNECTION_RECONNECTED is sent, are we sure the cache will replay all the missed events and our state would be correct?

Even more curios about the Connection Lost case
The network was down longer than the session timeout. Zookeeper deleted all of your ephemeral claim nodes. If you don't handle LOST, your local taken set will think it still owns the Tez AMs, but other HiveServer2 instances will see the nodes disappear and claim them right out from under you

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch @ayushtkn !
In case of CONNECTION_RECONNECTED , the cache does replay all the missed events, but while testing encountered a race condition between two listeners. Have addressed the same in commit : 26ef308

Regarding Connection Lost case, have added logic to kill running DAGs & clear taken state by the particular HS2 at the same time when ZK deletes its ephemeral claim nodes, in the same commit: 26ef308

}
}

private final class ClaimsPathListener implements PathChildrenCacheListener {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking do we need this? Was something like this possible

      CuratorCacheListener claimsListener = CuratorCacheListener.builder().forCreates(
          childData -> {
        if (childData == null)
          return;
        String applicationId = getApplicationId(childData);
        synchronized (lock) {
          available.remove(applicationId);
        }
      }).forDeletes(
          childData -> {
        if (childData == null)
          return;
        String applicationId = getApplicationId(childData);
        synchronized (lock) {
          if (!taken.contains(applicationId)) {
            available.add(applicationId);
            lock.notifyAll();
          }
        }
      }).build();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, changed this in commit: 484b2e4

@sonarqubecloud

Copy link
Copy Markdown

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR aims to prevent multiple HiveServer2 instances from concurrently submitting DAGs to the same Tez external session by introducing ZooKeeper-based distributed coordination (a global queue + per-session claim znodes), and by routing DAG submission through a TezSession.submitDAG abstraction (with special handling for external sessions).

Changes:

  • Add a distributed locking/claim mechanism in ZookeeperExternalSessionsRegistryClient to ensure each external Tez AM is claimed by at most one HS2 at a time.
  • Add submitDAG(DAG) to TezSession and route Tez DAG submission through the session abstraction (enabling external-session-specific retry/cleanup behavior).
  • Add/adjust unit tests to cover multi-client session claiming and FIFO behavior.

Reviewed changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ZookeeperExternalSessionsRegistryClient.java Introduces global mutex queue + ephemeral claim znodes; adds connection-lost handling and claim cache.
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSession.java Extends session interface with submitDAG(DAG) to centralize submission behavior.
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Implements submitDAG by delegating to the underlying TezClient.
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java Implements submitDAG by delegating through the wrapped base session.
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Switches DAG submission to sessionState.submitDAG(dag).
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezExternalSessionState.java Adds external-session submit behavior (retry after killing running/orphan DAGs).
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Adds external-session handling on return-to-pool path (noted concurrency concern).
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/TezJobMonitor.java Adds helper to kill running DAGs for a specific application/session id.
ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestZookeeperExternalSessionsRegistryClient.java Adds tests for multi-client claims and mutex FIFO queue behavior.
ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Updates mocks/verifications to match the new submitDAG call path.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +509 to +511
} catch (Exception e) {
LOG.error("Error while trying to kill running DAG on tez session {}", applicationId);
}
Comment on lines +357 to +361
if (useExternalSessions) {
if (tezSessionState.getTezClient() != null
&& tezSessionState.getTezClient().getAppMasterApplicationId() != null) {
try {
tezSessionState.close(false);
Comment on lines +203 to +207
} catch (TezException e) {
if (e.getMessage() == null || !e.getMessage().contains("App master already running a DAG")) {
throw e;
}
tryKillRunningDAGs(getTezClient());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants