Skip to content

[SPARK-56235][CORE] Add reverse index in TaskSetManager to avoid O(N) scans in executorLost#55030

Open
DenineLu wants to merge 1 commit intoapache:masterfrom
DenineLu:optimize-executor-lost
Open

[SPARK-56235][CORE] Add reverse index in TaskSetManager to avoid O(N) scans in executorLost#55030
DenineLu wants to merge 1 commit intoapache:masterfrom
DenineLu:optimize-executor-lost

Conversation

@DenineLu
Copy link
Contributor

@DenineLu DenineLu commented Mar 26, 2026

What changes were proposed in this pull request?

This PR adds a reverse index executorIdToTaskIds: HashMap[String, OpenHashSet[Long]] in TaskSetManager to efficiently look up tasks by executor ID, replacing O(N) full scans over taskInfos in executorLost() with O(K) direct lookups (K = tasks per executor).

Changes:

  • Added executorIdToTaskIds field in TaskSetManager, populated at task launch in prepareLaunchingTask()
  • Rewrote the two loops in executorLost() to iterate only over tasks on the lost executor via the reverse index

Why are the changes needed?

In a production Spark job (Spark 3.5.1, dynamic allocation enabled, disable shuffle tracking) with a single stage containing 5 million tasks, we observed that near the end of the stage, the Spark UI showed the last few tasks stuck in "RUNNING" state for 1-2 hours.
However, checking executor thread dumps confirmed that no task threads were actually running — the tasks had already completed on the executor side, but the Driver had not processed their completion messages.
image
image

CPU profiling of the Driver JVM (5-minute snapshot) revealed that TaskSetManager.executorLost() was consuming 99.5% of all CPU samples, due to O(N) full scans over the taskInfos HashMap (N = 5,000,000 entries).
image

The executorLost() method scans the entire taskInfos map to find tasks on the lost executor:

// Before: O(N) — scans ALL task attempts to find those on the lost executor
for ((tid, info) <- taskInfos if info.executorId == execId) { ... }

The blocking is amplified when the following conditions are present:

  1. Long-tail tasks at stage end — a few remaining tasks take longer than spark.dynamicAllocation.executorIdleTimeout (default 60s) to complete. Most executors have finished their work and sit idle, while these slow tasks are still running.
  2. Batch executor removal — after the idle timeout is triggered, a large number of RemoveExecutor messages continue to be sent to the DriverEndpoint RPC queue.

After this PR, the same workload (5M tasks, 10K executors, dynamic allocation enabled) no longer exhibits the stall. Execution time reduced from 117 minutes to 45 minutes. At the end of the Stage, the optimization has eliminated the previous executorLost hotspot issue.

Before After
Job Timeline image image
Driver CPU Top Threads (Stage Tail) image image

Memory overhead (measured with jmap -histo:live):

Metric Value
Total added memory ~81 MB
vs taskInfos overhead (829 MB) ~10%
vs Driver heap old gen used (9.3 GB) < 1%

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added 2 tests in TaskSetManagerSuite

Was this patch authored or co-authored using generative AI tooling?

No.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants