-
Notifications
You must be signed in to change notification settings - Fork 268
Add in basic GPU/CPU bridge operation [databricks] #14003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add in basic GPU/CPU bridge operation [databricks] #14003
Conversation
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
Greptile OverviewGreptile SummaryThis PR implements basic GPU/CPU bridge functionality that enables CPU expression evaluation within GPU execution plans. The feature is disabled by default ( Key Changes:
Architecture: Safety Considerations:
Confidence Score: 4/5
Important Files ChangedFile Analysis
Sequence DiagramsequenceDiagram
participant Plan as SparkPlan
participant Bridge as GpuCpuBridgeExpression
participant GPU as GPU Memory
participant Host as Host Memory
participant CPU as CPU Expression
participant Builder as RapidsHostColumnBuilder
Plan->>Bridge: columnarEval(batch)
Note over Bridge: Start wait time tracking
Bridge->>Bridge: Evaluate GPU input expressions
Bridge->>GPU: Get GPU column data
GPU-->>Bridge: GPU columns
Bridge->>Bridge: Create ColumnarBatch with GPU columns
Note over Bridge: Start processing time tracking
Bridge->>Host: ColumnarToRowIterator (GPU→Host)
Note over Host: Data copied to host memory
Host->>CPU: Iterate rows through projection
loop For each row
CPU->>CPU: Evaluate CPU expression
CPU->>Builder: Append result to builder
end
Builder->>Host: Build host column
Host->>GPU: buildAndPutOnDevice() (Host→GPU)
Note over GPU: Result copied back to GPU
GPU-->>Bridge: GPU result column
Note over Bridge: End processing time
Bridge-->>Plan: GpuColumnVector
Note over Bridge: End wait time
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (1)
-
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeExpression.scala, line 112-113 (link)style: Consider cleanup for thread-local state
The
ThreadLocalfor projections is never explicitly removed. In long-running applications or thread pools, consider adding cleanup logic, though Spark's task model may naturally handle this through task completion.
11 files reviewed, 1 comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds basic GPU/CPU bridge operation functionality that enables CPU expressions to run while keeping the overall execution plan on the GPU. The bridge is disabled by default since performance optimizations (thread pool, optimizer) are not yet implemented. This represents step 3 in splitting a larger feature into smaller, reviewable pieces.
Key changes include:
- New
GpuCpuBridgeExpressionthat wraps CPU expressions and manages data transfer between GPU and CPU - Code generation support via
BridgeGenerateUnsafeProjectionfor efficient columnar-to-row conversion - Configuration options to enable/disable the bridge and maintain a disallow list
- Comprehensive test coverage for various data types and nested structures
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
BridgeUnsafeProjectionSuite.scala |
Comprehensive test suite covering all data types, nested structures, and edge cases for the bridge projection functionality |
GpuCpuBridgeSuite.scala |
Unit tests for bridge expression properties, configuration, and compatibility checking |
BridgeGenerateUnsafeProjection.scala |
Code generation implementation for efficient bridge projections with fallback to interpreted mode |
GpuCpuBridgeExpression.scala |
Main bridge expression implementation handling GPU-to-CPU-to-GPU data flow with metrics |
RapidsMeta.scala |
Metadata support for bridge expressions including compatibility checks and conversion logic |
RapidsConf.scala |
Configuration options for enabling bridge and maintaining disallow list |
TypeConverter.java |
Extracted public interface for type conversion (visibility change from Scala to Java) |
GpuRowToColumnarExec.scala |
Visibility changes to expose TypeConverter and related methods |
GpuTypeShims.scala (both versions) |
Import reorganization to use the new public TypeConverter interface |
GpuMetrics.scala |
New metrics for tracking CPU bridge processing and wait times |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/BridgeGenerateUnsafeProjection.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/java/com/nvidia/spark/rapids/TypeConverter.java
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala
Outdated
Show resolved
Hide resolved
|
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additional Comments (1)
-
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCpuBridgeExpression.scala, line 54 (link)syntax: Comment says "Only GPU inputs are children" but the code includes
cpuExpressionin children. This creates inconsistency with the comment on line 42 that says cpuExpression is "not included as children"
11 files reviewed, 1 comment
|
CI failed because of #14009 but our CI currently has not way to turn off the databricks tests when you touched something even remotely related to databricks. |
|
build |
|
I upmerged to make sure that I had the latest fixes for what caused CI to fail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
11 files reviewed, no comments
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
12 files reviewed, no comments
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
12 files reviewed, no comments
|
build |
| /** | ||
| * Append row value to the column builder and return the number of data bytes written | ||
| */ | ||
| public abstract double append(SpecializedGetters row, int column, RapidsHostColumnBuilder builder); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be nice to add a comment about the return type here (double). Why can't the type be for a whole number?
| @@ -0,0 +1,43 @@ | |||
| /* | |||
| * Copyright (c) 2025, NVIDIA CORPORATION. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
copyrights need updating.
| * ahead of time. Also because structs push nulls down to the children this size should | ||
| * assume a validity even if the schema says it cannot be null. | ||
| */ | ||
| public abstract double getNullSize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question, why double?
| val gpuInputColumns = gpuInputs.safeMap(_.columnarEval(batch)) | ||
|
|
||
| // Time the CPU processing (columnar->row->CPU expr->columnar) | ||
| val processingStartTime = System.nanoTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, we could use GpuMetric.ns(metric) { }, but it doesn't take an optional metric.
| // Configuration Tests | ||
| // ============================================================================ | ||
|
|
||
| test("Bridge config controls feature enablement") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this test is probably not necessary. What I think we need is testing the fallback, either in a suite or integration test.
| (Float.MaxValue, false), | ||
| (0.0f, true), // null | ||
| (3.14159f, false), | ||
| (Float.NaN, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also test negative floats and NaN ranges? I know we teted this in the past (SparkQueryCompareTestSuite) has float positive/negative nan lower/upper values.
| (Double.MaxValue, false), | ||
| (0.0, true), // null | ||
| (3.141592653589793, false), | ||
| (Double.NaN, false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same question about nan ranges and sign.
| (0L, false), // 1970-01-01 00:00:00 UTC | ||
| (-1000000L, false), // Before epoch | ||
| (0L, true), // null | ||
| (1672531200000000L, false) // 2023-01-01 00:00:00 UTC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about h/m/s and ms?
| timezoneCheck() | ||
| } | ||
| // Update our expressions to allow them to run with the bridge if possible | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit extra space
| ) | ||
| } | ||
|
|
||
| test("multiple expressions") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we add DayTimeIntervalType and YearMonthIntervalType and possibly an unsupported type?
|
|
||
| val numVarLenFields = exprSchemas.count { | ||
| case Schema(dt, _) => !UnsafeRow.isFixedLength(dt) | ||
| // TODO: consider large decimal and interval type |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a follow on for this TODO?
| } | ||
|
|
||
| val writeFieldsCode = if (isTopLevel && (row == null || ctx.currentVars != null)) { | ||
| // TODO: support whole stage codegen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow on for this TODO?
This is step 3 in splitting #13368 into smaller pieces
Description
This adds in basic GPU/CPU bridge functionality, but it is off by default because the performance would not be good without the thread pool and optimizer.
Checklists
(Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.)
The performance is expected to be bad so it is off by default an not tested. I did add some basic tests to verify that the code works.