Skip to content

rapids shuffle manager V2 phase 1: writer use as much memory as allowed and pipelined write#13724

Merged
binmahone merged 90 commits intoNVIDIA:mainfrom
binmahone:250923_optimize_nds_more
Jan 16, 2026
Merged

rapids shuffle manager V2 phase 1: writer use as much memory as allowed and pipelined write#13724
binmahone merged 90 commits intoNVIDIA:mainfrom
binmahone:250923_optimize_nds_more

Conversation

@binmahone
Copy link
Copy Markdown
Collaborator

@binmahone binmahone commented Nov 6, 2025

This PR is actually a superset of #13479
Based on #13479 we further added SpillablePartialFileHandle, which is:

SpillablePartialFileHandle

A new spillable storage mechanism for partial shuffle files that optimizes I/O by preferring memory over disk.

Key Features

  • Memory-first storage: Stores data in host memory buffer when memory usage is below threshold
  • Automatic spill: Spills to disk when memory becomes scarce or buffer capacity is exceeded
  • Dynamic buffer expansion: Buffer grows dynamically (doubling) up to configured max limit

Storage Modes

Mode Description
FILE_ONLY Always write to disk (used when memory is tight)
MEMORY_WITH_SPILL Use memory buffer with automatic spill capability

Benefits

  • Reduces disk I/O for shuffle operations when sufficient host memory is available
  • Graceful fallback to disk storage under memory pressure
  • Unified read/write interface regardless of storage location

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
…Next

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
@binmahone
Copy link
Copy Markdown
Collaborator Author

build

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, 6 comments

Edit Code Review Agent Settings | Greptile

@binmahone
Copy link
Copy Markdown
Collaborator Author

Mostly nits. However there are still a lot of greptile comments that need to be looked at and addressed in some way.

Yes, @revans2 , greptile keeps popping new comments, which after double check seems to be over-engineering or just defensive programming. You comments are just resolved, please double check

@binmahone binmahone changed the title rapids shuffle manager V2 writer use as much memory as allowed rapids shuffle manager V2 phase 1: writer use as much memory as allowed and pipelined write Jan 13, 2026
revans2
revans2 previously approved these changes Jan 13, 2026
@abellina abellina self-requested a review January 13, 2026 15:05
@binmahone
Copy link
Copy Markdown
Collaborator Author

Just merged origin/main, so that 250923_optimize_nds_more is on top of latest main.

@binmahone
Copy link
Copy Markdown
Collaborator Author

build

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, 6 comments

Edit Code Review Agent Settings | Greptile

- Improved mapOutputWriter comment to clarify memory buffering via SpillablePartialFileHandle
- Added comments explaining wait(1) defensive measure against missed notifications
- Added synchronization strategy documentation for maxPartitionIdQueued and mergerCondition
- Added comments for batch completion signaling and merger notifications
- Added comment explaining previousMaxPartition reset to -1 for new batch
- Changed incompatible shuffle plugin config from warning to IllegalArgumentException
- Added buffer lifecycle documentation for doCleanUp parameter
- Added comment explaining limiter quota release for heap pressure control
- Added data flow documentation from ColumnarBatch to final output

Follow-up issues created:
- NVIDIA#14145: Buffer list optimization for SpillablePartialFileHandle
- NVIDIA#14146: BatchState API refactoring
@binmahone
Copy link
Copy Markdown
Collaborator Author

build

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Comment thread sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Copy link
Copy Markdown
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last comment, thank you for the updates so far. This is close.

// to queue more compression tasks. Use short timeout (1ms) as defensive
// measure to avoid potential deadlock if a notify is missed.
mergerCondition.synchronized {
mergerCondition.wait(1)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for adding the comments. It is clearer to me what is going on here.

There are two things, one that has to be addressed in this PR is that this wait just means we are going to busy loop. The pattern to wait/notify is classically the following:

synchronized {
  while(condition==false) {
     wait( .. sure with optional timeout.. )
  }
}
synchronized {
  condition = true
  notify(); // or notifyAll() if multiple waiters.
}

Because we don't have a condition we are checking, this is just going to wait 1ms, sometimes, and just keep going, busy looping and consuming CPU.

I would like us to check a variable in the waiter and loop until the condition says "we should loop around and do work" in this PR.

As a subsequent improvement, we can look at doing this with a Future. As the get function in the Future should handle this waiter pattern.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that wait(1) with a 1ms timeout causes frequent spurious wakeups, which is inefficient. I've considered the alternatives:

Option 1: Classic condition flag pattern

var hasWork = false
while (!hasWork) {
  mergerCondition.wait()
}
hasWork = false

This requires additional synchronization to keep the hasWork flag consistent with the actual data state (partitionFutures, maxPartitionIdQueued, etc.), which adds complexity and potential for bugs.

Option 2: Increase timeout to wait(100)

Simple change - just increase the timeout from 1ms to 100ms.

I believe Option 2 is the most pragmatic choice:

  • Minimal code change, lowest risk
  • 100ms timeout eliminates busy-loop concerns
  • Does NOT affect responsiveness: if notifyAll() is called, the thread wakes up immediately (not after 100ms)
  • The timeout is purely defensive - in normal operation, the thread is always woken by notifyAll()

The current code already has the correct notifyAll() signals in place (after queuing tasks, after batch completion). The 100ms timeout just serves as a safety net in case a signal is missed, rather than spinning at 1ms intervals.

Would this approach address your concern?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to poll? why don't we use the classic flag pattern? The flag tells us right away what it is that we are waiting on, it's a clear signal on what we need to proceed, instead of being notified somewhere else in the code and we need to go read why that notification happened. Imagine trying to debug a race here. If we had a clear flag that flipped, we can log it or detect it. I would really really like us to go Option 1 here.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated , please check again @abellina

…rison

Changed from <= to < when detecting new upstream GPU batches. Consecutive
identical partition IDs should NOT trigger new shuffle batches because they
can occur in two valid scenarios:
1. Reslicing: when a partition's data exceeds maxCpuBatchSize
2. Data skew: multiple GPU batches each containing only the same partition

Merging them into a single shuffle batch is correct and more efficient
(fewer partial files, less merge overhead).

Updated test case and comments to reflect this change.
Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

Comment thread sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Increase the defensive timeout from 1ms to 100ms to avoid frequent spurious
wakeups. The timeout is purely defensive - in normal operation, the thread
is always woken by notifyAll(). If notifyAll() is called, the thread wakes
up immediately (not after 100ms).
Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Comment thread sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
@binmahone
Copy link
Copy Markdown
Collaborator Author

build

…ad synchronization

- Add hasNewWork AtomicBoolean to BatchState for explicit signaling
- Replace wait(100) polling with while(!hasNewWork) wait() pattern
- Set hasNewWork flag before notifyAll() calls
- This provides clearer debugging signal and avoids busy-loop polling
@binmahone
Copy link
Copy Markdown
Collaborator Author

build

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

20 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +134 to +137
var newCapacity = oldCapacity
while (newCapacity < requiredCapacity && newCapacity < maxBufferSize) {
newCapacity = math.min(newCapacity * 2, maxBufferSize)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: infinite loop when oldCapacity is 0

if oldCapacity is 0, then newCapacity = 0 and 0 * 2 = 0 forever, causing the while loop to never terminate

while the default is 128MB, users can configure spark.rapids.memory.host.partialFileBufferInitialSize to 0, which causes this infinite loop

Suggested change
var newCapacity = oldCapacity
while (newCapacity < requiredCapacity && newCapacity < maxBufferSize) {
newCapacity = math.min(newCapacity * 2, maxBufferSize)
}
// Calculate new capacity: keep doubling until >= requiredCapacity
var newCapacity = if (oldCapacity == 0) 1 else oldCapacity
while (newCapacity < requiredCapacity && newCapacity < maxBufferSize) {
newCapacity = math.min(newCapacity * 2, maxBufferSize)
}

Comment on lines +564 to +575
val PARTIAL_FILE_BUFFER_INITIAL_SIZE =
conf("spark.rapids.memory.host.partialFileBufferInitialSize")
.doc("The initial size in bytes for a host memory buffer used by " +
"SpillablePartialFileHandle during shuffle write. This buffer allows shuffle " +
"data to be kept in memory instead of writing to disk immediately, reducing " +
"I/O overhead. The buffer can expand dynamically up to partialFileBufferMaxSize. " +
"A smaller initial size reduces upfront memory allocation but may require more " +
"expansions.")
.startupOnly()
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(128L * 1024 * 1024) // 128MB default
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: missing validation allows 0 which causes infinite loop in SpillablePartialFileHandle.expandBuffer()

add .checkValue(v => v > 0, "must be > 0") before createWithDefault to prevent the infinite loop at SpillablePartialFileHandle.scala:134

Suggested change
val PARTIAL_FILE_BUFFER_INITIAL_SIZE =
conf("spark.rapids.memory.host.partialFileBufferInitialSize")
.doc("The initial size in bytes for a host memory buffer used by " +
"SpillablePartialFileHandle during shuffle write. This buffer allows shuffle " +
"data to be kept in memory instead of writing to disk immediately, reducing " +
"I/O overhead. The buffer can expand dynamically up to partialFileBufferMaxSize. " +
"A smaller initial size reduces upfront memory allocation but may require more " +
"expansions.")
.startupOnly()
.internal()
.bytesConf(ByteUnit.BYTE)
.createWithDefault(128L * 1024 * 1024) // 128MB default
val PARTIAL_FILE_BUFFER_INITIAL_SIZE =
conf("spark.rapids.memory.host.partialFileBufferInitialSize")
.doc("The initial size in bytes for a host memory buffer used by " +
"SpillablePartialFileHandle during shuffle write. This buffer allows shuffle " +
"data to be kept in memory instead of writing to disk immediately, reducing " +
"I/O overhead. The buffer can expand dynamically up to partialFileBufferMaxSize. " +
"A smaller initial size reduces upfront memory allocation but may require more " +
"expansions.")
.startupOnly()
.internal()
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v > 0, "must be > 0")
.createWithDefault(128L * 1024 * 1024) // 128MB default

@binmahone binmahone merged commit e465ec3 into NVIDIA:main Jan 16, 2026
44 checks passed
revans2 pushed a commit that referenced this pull request Jan 28, 2026
…ossible (#14090)

- phase 1 design:
https://github.com/NVIDIA/spark-rapids/pull/14090/changes#diff-986110ce9e01fa823d715caefb69f3cbebbd64c9915740bb5369d733ee5b1edf
- phase 2 design:
https://github.com/NVIDIA/spark-rapids/pull/14090/changes#diff-8dcf252d92655b955712e184b4c31a59611ca6ef763368bbec0f7d95cc71828e

Actually, we'll check in phase 1 work with
#13724 first.
After #13724 is checked in,
this PR will has much less changes than now.

# Improvement

phase 1 vs. main : NDS overall **8.7%** improvement on dataproc (details
in
https://github.com/binmahone/mahone-dataproc-1224-formal-ab-4w8e-mtread20-8ssd-results/blob/main/REPORT.md)

phase 1 + phase 2 vs. main: about **18%** NDS overall improvement based
on early results, detailed report on the way.
 
<img width="1800" height="1500" alt="image"
src="https://github.com/user-attachments/assets/048f6f7d-dc80-442d-ac21-0270c6828d90"
/>

---------

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma <mahongbin@apache.org>
revans2 pushed a commit to revans2/spark-rapids that referenced this pull request Jan 28, 2026
…ossible (NVIDIA#14090)

- phase 1 design:
https://github.com/NVIDIA/spark-rapids/pull/14090/changes#diff-986110ce9e01fa823d715caefb69f3cbebbd64c9915740bb5369d733ee5b1edf
- phase 2 design:
https://github.com/NVIDIA/spark-rapids/pull/14090/changes#diff-8dcf252d92655b955712e184b4c31a59611ca6ef763368bbec0f7d95cc71828e

Actually, we'll check in phase 1 work with
NVIDIA#13724 first.
After NVIDIA#13724 is checked in,
this PR will has much less changes than now.

# Improvement

phase 1 vs. main : NDS overall **8.7%** improvement on dataproc (details
in
https://github.com/binmahone/mahone-dataproc-1224-formal-ab-4w8e-mtread20-8ssd-results/blob/main/REPORT.md)

phase 1 + phase 2 vs. main: about **18%** NDS overall improvement based
on early results, detailed report on the way.
 
<img width="1800" height="1500" alt="image"
src="https://github.com/user-attachments/assets/048f6f7d-dc80-442d-ac21-0270c6828d90"
/>

---------

Signed-off-by: Hongbin Ma (Mahone) <mahongbin@apache.org>
Signed-off-by: Hongbin Ma <mahongbin@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

performance A performance related task/issue

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants