Skip to content

support withRetry with split for shuffle exchange exec base#13975

Merged
zpuller merged 19 commits intoNVIDIA:mainfrom
zpuller:kudo_split
Dec 22, 2025
Merged

support withRetry with split for shuffle exchange exec base#13975
zpuller merged 19 commits intoNVIDIA:mainfrom
zpuller:kudo_split

Conversation

@zpuller
Copy link
Copy Markdown
Collaborator

@zpuller zpuller commented Dec 8, 2025

Fixes #13951 (along with #14010)

Description

We convert the internal state of prepareBatchShuffleDependency in rddWithPartitionIds into a spliterator to support split retires. This happens a level above where we call into the partitioner which does the gpu kudo serialization.

Performance testing so far shows no significant change for low memory scenarios in NDS. I'll continue with perf testing while the PR is under review.

Checklists

  • This PR has added documentation for new or modified features or behaviors.
  • This PR has added new tests or modified existing tests to cover new code paths.
    (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)
  • Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description.

Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 8, 2025

build

1 similar comment
@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 8, 2025

build

Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>

Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 10, 2025

build

Signed-off-by: Zach Puller <zpuller@nvidia.com>
@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 10, 2025

build

1 similar comment
@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 10, 2025

build

Signed-off-by: Zach Puller <zpuller@nvidia.com>
@sameerz
Copy link
Copy Markdown
Collaborator

sameerz commented Dec 11, 2025

build

@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 11, 2025

build

@zpuller zpuller changed the title [DRAFT] support withRetry with split for shuffle exchange exec base support withRetry with split for shuffle exchange exec base Dec 15, 2025
@zpuller zpuller marked this pull request as ready for review December 15, 2025 23:08
Signed-off-by: Zach Puller <zpuller@nvidia.com>
@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 15, 2025

build

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Dec 15, 2025

Greptile Summary

  • Refactors GPU Kudo shuffle operations to support split retries at the shuffle exchange level for better memory management in low-memory GPU environments
  • Removes withRetryNoSplit retry mechanisms from individual partitioners (GpuRangePartitioner, GpuHashPartitioningBase, GpuRoundRobinPartitioning) and centralizes retry logic in GpuShuffleExchangeExecBase
  • Updates shuffle exchange iterator state management to handle multiple partition result arrays when batch splitting occurs, enabling OOM recovery through half-batch splitting

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala Refactored iterator state management to support split retries with partitionedIter for handling multiple result arrays from split operations
tests/src/test/scala/com/nvidia/spark/rapids/GpuKudoWritePartitioningSuite.scala New comprehensive test suite to validate split retry functionality with OOM injection and data integrity verification

Confidence score: 4/5

  • This PR is mostly safe to merge with some complexity concerns around state management changes
  • Score reflects thorough testing and solid architectural improvements, with minor deduction for complex iterator logic changes that require careful review
  • Pay close attention to the iterator state management changes in GpuShuffleExchangeExecBase.scala and the new test suite validation logic

Sequence Diagram

sequenceDiagram
    participant User
    participant GpuShuffleExchangeExecBase as "GpuShuffleExchangeExecBase"
    participant prepareBatchShuffleDependency as "prepareBatchShuffleDependency"
    participant RddWithPartitionIds as "rddWithPartitionIds"
    participant withRetry as "withRetry"
    participant GpuPartitioning as "GpuPartitioning"
    participant RmmSpark as "RmmSpark"

    User->>GpuShuffleExchangeExecBase: "execute shuffle"
    GpuShuffleExchangeExecBase->>prepareBatchShuffleDependency: "create shuffle dependency"
    prepareBatchShuffleDependency->>RddWithPartitionIds: "create partitioned RDD"
    
    RddWithPartitionIds->>RddWithPartitionIds: "iterate batches"
    RddWithPartitionIds->>withRetry: "withRetry(spillableBatch, splitSpillableInHalfByRows)"
    
    withRetry->>GpuPartitioning: "columnarEvalAny(batch)"
    
    alt Success Case
        GpuPartitioning-->>withRetry: "partitioned data"
        withRetry-->>RddWithPartitionIds: "Array[(ColumnarBatch, Int)]"
    else OOM Exception
        GpuPartitioning->>RmmSpark: "throw GpuSplitAndRetryOOM"
        RmmSpark-->>withRetry: "OOM exception"
        withRetry->>withRetry: "splitSpillableInHalfByRows(spillableBatch)"
        withRetry->>GpuPartitioning: "columnarEvalAny(splitBatch1)"
        GpuPartitioning-->>withRetry: "partitioned data 1"
        withRetry->>GpuPartitioning: "columnarEvalAny(splitBatch2)" 
        GpuPartitioning-->>withRetry: "partitioned data 2"
        withRetry-->>RddWithPartitionIds: "Array[(ColumnarBatch, Int)] (split results)"
    end
    
    RddWithPartitionIds-->>prepareBatchShuffleDependency: "Product2[Int, ColumnarBatch]"
    prepareBatchShuffleDependency-->>GpuShuffleExchangeExecBase: "ShuffleDependency"
    GpuShuffleExchangeExecBase-->>User: "shuffled data"
Loading

Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@zpuller zpuller requested review from a team and abellina December 15, 2025 23:22
revans2
revans2 previously approved these changes Dec 17, 2025
Copy link
Copy Markdown
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like us to add a bit the test, unless I've missed it.

s"Expected at least one split retry, but saw $retryCount retries")

// Verify batch contents match expected data (even after split retry)
verifyBatchContents(allPartitionedBatches, totalRowsSeen, serializer,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should verify that the number of partitioned arrays seen is double given the split, than without the split.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me trying to verify this behavior in an integration test is that is not overloading the memory system is not going to work. We have to have a unit test of some kind and we can use the RMM injection code to verify that we are doing the right thing.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment here at the site where we could be doing verification, without looking at the matrics: https://github.com/NVIDIA/spark-rapids/pull/13975/files#r2631405357

Signed-off-by: Zach Puller <zpuller@nvidia.com>
Copy link
Copy Markdown
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really think we need a new metric: NUM_PARTITIONED_ARRAYS

We already have NUM_OUTPUT_BATCHES https://github.com/NVIDIA/spark-rapids/pull/13975/files#diff-2519047533f3504238e111c11b9d2a55903af1dfd89ea01be31233f15315dc01R445.

In this case we'll output more batches, which would be a way to check in the tests.

Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. tests/src/test/scala/com/nvidia/spark/rapids/GpuKudoWritePartitioningSuite.scala, line 314 (link)

    style: Redundant cast - intVal is already of type Integer

    Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

6 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Signed-off-by: Zach Puller <zpuller@nvidia.com>
Copy link
Copy Markdown
Contributor

@greptile-apps greptile-apps Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional Comments (1)

  1. tests/src/test/scala/com/nvidia/spark/rapids/GpuKudoWritePartitioningSuite.scala, line 467-470 (link)

    style: Verify that the test actually triggers the split retry path as intended. The test injects OOM on the first next() call and validates numNextCalls == 3 (one batch splits into 2, plus 1 unsplit batch). Check that the retry count assertion on line 476 consistently passes across different environments.

6 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment thread tests/src/test/scala/com/nvidia/spark/rapids/GpuKudoWritePartitioningSuite.scala Outdated
Comment thread tests/src/test/scala/com/nvidia/spark/rapids/GpuKudoWritePartitioningSuite.scala Outdated
Copy link
Copy Markdown
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like us to improve the suite further

Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
Signed-off-by: Zach Puller <zpuller@nvidia.com>
@zpuller zpuller requested a review from abellina December 19, 2025 19:28
Copy link
Copy Markdown
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zpuller

@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 19, 2025

build

2 similar comments
@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 21, 2025

build

@zpuller
Copy link
Copy Markdown
Collaborator Author

zpuller commented Dec 22, 2025

build

@zpuller zpuller merged commit 1ab486e into NVIDIA:main Dec 22, 2025
44 checks passed
@sameerz sameerz added the reliability Features to improve reliability or bugs that severly impact the reliability of the plugin label Jan 3, 2026
zpuller added a commit that referenced this pull request Jan 4, 2026
Fixes #13951 (along with
#13975)

### Description

Use `AutoCloseableTargetSize` to create a split policy which would halve
the target size of an incoming batch when doing a split retry.

Performance testing so far shows no significant change for low memory
scenarios in NDS. I'll continue with perf testing while the PR is under
review.

### Checklists

- [ ] This PR has added documentation for new or modified features or
behaviors.
- [x] This PR has added new tests or modified existing tests to cover
new code paths.
(Please explain in the PR description how the new code paths are tested,
such as names of the new/existing tests that cover them.)
- [x] Performance testing has been performed and its results are added
in the PR description. Or, an issue has been filed with a link in the PR
description.

---------

Signed-off-by: Zach Puller <zpuller@nvidia.com>
@zpuller zpuller deleted the kudo_split branch January 5, 2026 17:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

reliability Features to improve reliability or bugs that severly impact the reliability of the plugin

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEA] Gpu Kudo read and write should be splittable

5 participants