-
Notifications
You must be signed in to change notification settings - Fork 268
Allow for AST join build side selection and add some heuristics [databricks] #14035
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces AST join build side selection and heuristics to optimize GPU join operations. The key innovation is the ability to dynamically select the build side for joins at runtime based on row counts, rather than being locked into a fixed build side determined by the query plan.
Key changes:
- New
LazyCompiledConditionclass that compiles AST expressions for either build side on-demand - Build side selection strategies (AUTO, FIXED, SMALLEST) configurable via
spark.rapids.sql.join.buildSide - Heuristics that automatically switch to INNER_HASH_WITH_POST strategy for better performance with conditional joins and when the smaller side isn't the natural build side
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| LazyCompiledCondition.scala | New class providing lazy, cached AST compilation for both build sides by transforming bound reference ordinals |
| GpuHashJoin.scala | Adds JoinBuildSideSelection enum, selectStrategy heuristics, refactors join methods to support dynamic build side selection, updates iterators to use LazyCompiledCondition |
| RapidsConf.scala | Adds JOIN_BUILD_SIDE configuration option and updates getJoinOptions to include build side selection |
| GpuShuffledSizedHashJoinExec.scala | Updates BoundJoinExprs to bind conditions to leftOutput++rightOutput, adds createLazyCompiledCondition method, updates iterators |
| join_test.py | Adds comprehensive tests for build side selection with unconditional, conditional, and symmetric joins |
| GpuDeleteFilter.scala | Removes numFirstConditionColumns parameter now handled by LazyCompiledCondition, updates to pass GpuBuildRight explicitly |
Comments suppressed due to low confidence (2)
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala:1610
- There is a mismatch between the build side selected by innerHashJoin and the build side used to get the compiled condition. The innerHashJoin method selects a build side based on buildSideSelection and row counts (which may differ from the buildSide parameter when buildSideSelection is AUTO or SMALLEST). However, the code then uses the original buildSide parameter to get the compiled condition and for AST filtering. This could result in using the wrong compiled AST condition. The selected build side should be computed once before both calls and used consistently, or innerHashJoin should return which build side it selected.
val innerMaps = JoinImpl.innerHashJoin(leftKeys, rightKeys,
nullEquality == NullEquality.EQUAL, joinOptions.buildSideSelection, buildSide)
// Get the appropriate compiled condition based on the build side
val compiledCondition = lazyCompiledCondition.getForBuildSide(buildSide)
// Filter by AST condition
val filteredMaps = withResource(innerMaps) { _ =>
JoinImpl.filterInnerJoinWithASTSwapByJoinTypeAndBuildSide(innerMaps, leftTable, rightTable,
compiledCondition, joinType, buildSide)
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala:1640
- Similar to the issue in computeInnerHashWithPost, there is a mismatch between the build side selected by innerSortJoin and the build side used to get the compiled condition. The selected build side should be computed once and used consistently for both the join operation and getting the compiled condition.
val innerMaps = JoinImpl.innerSortJoin(leftKeys, rightKeys,
nullEquality == NullEquality.EQUAL, joinOptions.buildSideSelection, buildSide)
// Get the appropriate compiled condition based on the build side
val compiledCondition = lazyCompiledCondition.getForBuildSide(buildSide)
// Filter by AST condition
val filteredMaps = withResource(innerMaps) { _ =>
JoinImpl.filterInnerJoinWithASTSwapByJoinTypeAndBuildSide(innerMaps, leftTable, rightTable,
compiledCondition, joinType, buildSide)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Heuristic 2: For unconditional LEFT_OUTER/RIGHT_OUTER when build side selection is AUTO, | ||
| // if the smaller side is not the fixed build side, use INNER_HASH_WITH_POST to enable | ||
| // dynamic build side selection |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment says "when build side selection is AUTO" but the condition checks for both AUTO and SMALLEST. The comment should be updated to reflect that this heuristic applies to both AUTO and SMALLEST build side selection strategies.
| // Heuristic 2: For unconditional LEFT_OUTER/RIGHT_OUTER when build side selection is AUTO, | |
| // if the smaller side is not the fixed build side, use INNER_HASH_WITH_POST to enable | |
| // dynamic build side selection | |
| // Heuristic 2: For unconditional LEFT_OUTER/RIGHT_OUTER when build side selection is | |
| // AUTO or SMALLEST, if the smaller side is not the fixed build side, use | |
| // INNER_HASH_WITH_POST to enable dynamic build side selection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with copilot, we should make the comment match
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an impressive catch.
Also, are we using Copilot or Greptile? Both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like to use both. They tend to catch different things
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we are alternating, there might be an A/B test :)
| val innerMaps = JoinImpl.innerHashJoin(leftKeys, rightKeys, compareNullsEqual, | ||
| joinOptions.buildSideSelection, cudfBuildSide) | ||
|
|
||
| // Get the appropriate compiled condition based on the cudfBuildSide | ||
| val compiledCondition = lazyCondition.getForBuildSide(cudfBuildSide) |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar build side mismatch issue: innerHashJoin is called with cudfBuildSide but may select a different build side when buildSideSelection is AUTO or SMALLEST. The code then uses cudfBuildSide to get the compiled condition, which could be incorrect. The selected build side should be determined before both the join and getting the compiled condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually in cases when we do an INNER join with post processing to turn it into another join this is not a problem. The AST filtering is totally separated from the join build side. The only time they have to match is when they are done together as a single cudf call. In response to the obvious confusion about this I have updated the comments and the names of several variables to make it clear the distinction between the build side used for data movement and the build side used at the physical layer for when the join actually happens. The two are related to each other and are likely to have a lot of interplay, especially when we start to do build object caching. But until then the code is actually correct.
| // Get the appropriate compiled condition based on the cudfBuildSide | ||
| val compiledCondition = lazyCondition.getForBuildSide(cudfBuildSide) |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar build side mismatch issue: innerSortJoin is called with cudfBuildSide but may select a different build side when buildSideSelection is AUTO or SMALLEST. The code then uses cudfBuildSide to get the compiled condition, which could be incorrect.
| // Get the appropriate compiled condition based on the cudfBuildSide | |
| val compiledCondition = lazyCondition.getForBuildSide(cudfBuildSide) | |
| // Get the appropriate compiled condition based on the actual build side used | |
| val compiledCondition = lazyCondition.getForBuildSide(innerMaps.buildSide) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to add a clarifying comment here. It's not clear to me either why buildSideSelection is used in line 1984 and cudfBuildSide is used after to materialize a non-lazy compiled condition.
| * Options to control join behavior. | ||
| * @param strategy the join strategy to use (AUTO, INNER_HASH_WITH_POST, INNER_SORT_WITH_POST, | ||
| * or HASH_ONLY) | ||
| * @param buildSideSelection the build side selection strategy (AUTO, DEFAULT, or SMALLEST) |
Copilot
AI
Dec 17, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation comment refers to "DEFAULT" as a build side selection strategy, but the enumeration defines "FIXED" instead. The comment should say "FIXED" to match the actual enumeration value.
| * @param buildSideSelection the build side selection strategy (AUTO, DEFAULT, or SMALLEST) | |
| * @param buildSideSelection the build side selection strategy (AUTO, FIXED, or SMALLEST) |
| // Heuristic 2: For unconditional LEFT_OUTER/RIGHT_OUTER when build side selection is AUTO, | ||
| // if the smaller side is not the fixed build side, use INNER_HASH_WITH_POST to enable | ||
| // dynamic build side selection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with copilot, we should make the comment match
| rightRowCount: Long): JoinStrategy = { | ||
| if (configuredStrategy != AUTO) { | ||
| // If not AUTO, use the configured strategy as-is | ||
| return configuredStrategy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, should we rework this without return? I know that's discouraged in scala.
| // Get the appropriate compiled condition based on the cudfBuildSide | ||
| val compiledCondition = lazyCondition.getForBuildSide(cudfBuildSide) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to add a clarifying comment here. It's not clear to me either why buildSideSelection is used in line 1984 and cudfBuildSide is used after to materialize a non-lazy compiled condition.
|
build |
Greptile Summary
Important Files Changed
Confidence score: 4/5
Sequence DiagramsequenceDiagram
participant User
participant GpuHashJoin as "GpuHashJoin"
participant JoinStrategy as "JoinStrategy"
participant LazyCompiledCondition as "LazyCompiledCondition"
participant JoinImpl as "JoinImpl"
participant BaseHashJoinIterator as "BaseHashJoinIterator"
User->>GpuHashJoin: "Execute join query"
GpuHashJoin->>GpuHashJoin: "Parse join configuration"
GpuHashJoin->>JoinStrategy: "selectStrategy(configuredStrategy, joinType, hasCondition, buildSideSelection, leftRowCount, rightRowCount)"
alt Strategy is AUTO
JoinStrategy->>JoinStrategy: "Apply heuristics for conditional joins"
JoinStrategy->>JoinStrategy: "Apply heuristics for build side selection"
JoinStrategy-->>GpuHashJoin: "Return INNER_HASH_WITH_POST or HASH_ONLY"
else Strategy is not AUTO
JoinStrategy-->>GpuHashJoin: "Return configured strategy"
end
alt Has AST condition
GpuHashJoin->>LazyCompiledCondition: "new LazyCompiledCondition(condition, numLeftColumns, numRightColumns)"
LazyCompiledCondition-->>GpuHashJoin: "LazyCompiledCondition instance"
end
GpuHashJoin->>BaseHashJoinIterator: "Create join iterator with strategy and options"
BaseHashJoinIterator->>BaseHashJoinIterator: "Begin processing stream batches"
loop For each stream batch
BaseHashJoinIterator->>JoinStrategy: "Apply effective strategy"
alt Strategy is INNER_HASH_WITH_POST
BaseHashJoinIterator->>JoinImpl: "innerHashJoin(leftKeys, rightKeys, buildSideSelection)"
JoinImpl->>JoinImpl: "selectPhysicalBuildSide(buildSideSelection, leftRowCount, rightRowCount)"
JoinImpl->>JoinImpl: "Perform inner hash join with selected build side"
JoinImpl-->>BaseHashJoinIterator: "Return inner join gather maps"
alt Has AST condition
BaseHashJoinIterator->>LazyCompiledCondition: "getForBuildSide(selectedBuildSide)"
LazyCompiledCondition->>LazyCompiledCondition: "Compile AST for selected build side"
LazyCompiledCondition-->>BaseHashJoinIterator: "Return compiled condition"
BaseHashJoinIterator->>JoinImpl: "filterInnerJoinWithAST(innerMaps, condition)"
JoinImpl-->>BaseHashJoinIterator: "Return filtered gather maps"
end
BaseHashJoinIterator->>BaseHashJoinIterator: "convertInnerJoinMapsToTargetType(innerMaps, targetJoinType)"
else Strategy is HASH_ONLY
BaseHashJoinIterator->>JoinImpl: "Perform traditional hash join for target type"
JoinImpl-->>BaseHashJoinIterator: "Return gather maps"
end
BaseHashJoinIterator->>BaseHashJoinIterator: "Create join gatherer with results"
BaseHashJoinIterator-->>User: "Return columnar batch result"
end
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (3)
-
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala, line 888-890 (link)logic: Logic assumes left < right means left should be build side, but for LEFT OUTER the comment correctly notes data movement build side must be right (GpuBuildRight). Verify this heuristic doesn't conflict with join semantics - LEFT OUTER needs right side preserved. Have you verified this heuristic works correctly for all LEFT OUTER join scenarios, particularly when the left side is much smaller but semantically must preserve all left rows?
-
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala, line 1697-1707 (link)logic: Dynamic build side selection for InnerLike conditional joins uses different LazyCompiledCondition methods (getForBuildLeft vs getForBuildRight) based on selected build side. The AST compilation depends on table order - verify the condition compilation matches the selected physical build side. How is it ensured that the LazyCompiledCondition provides the correctly compiled AST for the dynamically selected build side?
-
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuHashJoin.scala, line 2052-2064 (link)logic: This inner sub-join selection reuses the same dynamic build side logic but with cudfBuildSide as the plan build side. The cudfBuildSide is derived from subJoinType, not the original build side - ensure this mapping is correct for all join type transformations.
6 files reviewed, 3 comments
|
|
||
| override def close(): Unit = { | ||
| synchronized { | ||
| if (compiledBuildRight != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could safeClose these.
|
I ran the benchmarks and it is showing a slight regression on NDS (about 2.4 seconds). I am going to put this back in draft until I can figure out a slightly better heuristic. There are clearly cases when this wins, and cases when it loses. The losses outweigh the wins by a little bit, so I need to spend some more time on it. |
|
|
||
| trait GpuHashJoin extends GpuJoinExec { | ||
| /** | ||
| * The build side determines which side of the join is materialized/buffered/broadcast. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment is helpful, thank you
abellina
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good to me, pending your analysis of the performance.
This is a step towards more join optimizations. It makes it so that the AST we compile is not tied to a specific build side. This gives us the flexibility to switch the build side so that the smaller wins, at least where the APIs allow it. It does not give a huge performance boost, because most of the places where we could take advantage of that were already in place. But it does allow us to get a more apples to apples comparison for some heuristics that are being added in.
Generally for inner, left outer, and right outer joins with AST post processing doing an inner + post filtering is much faster. So this adds a new heuristic to allow that. Also when no AST is involved the smaller build side can be a big win so we switch to an inner + post on left outer or right outer joins so that we can get the smaller build side.
This second heuristic is not as clean of a difference, and I need to do some more benchmarking to hopefully make it better. But for now it provides some decent uplift.
Description
I ran two sets of benchmarks locally. In once case I did a join between 50m rows on each side, with quite a bit of skew in the data.
In the second set I set it up to almost be distinct, but because the data is generated randomly it is not 100% distinct.
In the first case the heuristic with AUTO for JoinStrategy and AUTO for BuildSide selection ended up being about about 10% faster than the HASH_ONLY (Mostly FIXED) config that we have as the default today. Note that the SORT with post was way faster than the hash based joins, but I need to finish rapidsai/cudf#20826 before I can add int he heuristic for that.
In the other case it was about a 2% improvement. It wasn't huge, but it also was not a loss in performance on average. In that case the sort merge join was way slower than the hash based joins. Hence the need for the heuristic.
I do want to find some time to run NDS, but I think the results are compelling enough to get the PR up before I do that.
Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)