Skip to content

Conversation

@firestarman
Copy link
Collaborator

@firestarman firestarman commented Dec 3, 2025

Contributes to #13412

Rapids UDAF is designed to support executing an UDAF (User Defined Aggregate Function) in the columnar way to get accelerated by GPU.

Complete support of RapidsUDAF covers too many things and a single PR (#13450) is too large to review. So instead it's better to be added in piece by piece.

And this PR is the second one (the first one is at #13870) who adds in basic support to let ScalaUDAF run on GPU. It covers two main changes.

  1. introducing 3 new classes.
    • UDAFAggregate to connect the RapidsUDAFGroupByAggregation with the GPU aggregate framework,
    • GpuUserDefinedAggregateFunction to connect the RapidsUDAF with the GPU aggregate framework,
    • GpuScalaUDAF, the GPU version of ScalaUDAF.
  2. updating GpuAggregateExec to support UDAF things for the whole aggregation process.

Perf Regerssion

No obvious perf regression is found according to the NDS runs as below: (in seconds)

Runs With UDAF No UDAF
1 1499.306 1544.629
2 1567.290 1528.895
avg. 1533.298 1536.762

Perf Improvement

The following performance numbers are got from a local test. (in seconds)

Runs CPU avg UDAF GPU avg UDAF GPU avg Function
1 109.504 19.452 18.267
2 110.575 18.100 18.219
avg 110.040 18.78 18.243
  • The UDAF implementation is from the file "ScalaUDAFSuite.scala"
  • Titan V and 10 CPU cores

Test dataset schema:

scala> sql("select * from udaf_perf_test").printSchema
root
 |-- vid: long (nullable = true)
 |-- click_cnt: integer (nullable = true)

Test dataset size:

user:/bigdata/test$ du -d1 -h udaf_perf/
  23G	udaf_perf/

More details:

scala> spark.conf.set("spark.rapids.sql.enabled", "false")

scala> spark.time(sql("select vid, intAverage(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/cpu"))
Time taken: 109504 ms                                                           

scala> spark.time(sql("select vid, intAverage(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/cpu"))
Time taken: 110575 ms                                                           

scala> spark.conf.set("spark.rapids.sql.enabled", "true")

scala> spark.time(sql("select vid, intAverage(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/gpu_avg_udaf"))
Time taken: 19452 ms

scala> spark.time(sql("select vid, intAverage(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/gpu_avg_udaf"))
Time taken: 18100 ms                                                            

scala> spark.time(sql("select vid,        avg(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/gpu_avg_func"))
Time taken: 18267 ms                                                            

scala> spark.time(sql("select vid,        avg(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/gpu_avg_func"))
Time taken: 18219 ms        

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman firestarman requested a review from a team December 3, 2025 06:25
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Dec 3, 2025

Greptile Summary

  • Implements GPU acceleration support for ScalaUDAF (User Defined Aggregate Functions) in the RAPIDS accelerator, enabling up to 5x performance improvement for custom aggregation operations
  • Introduces three new classes (UDAFAggregate, GpuUserDefinedAggregateFunction, GpuScalaUDAF) and modifies GpuAggregateExec to support dual processing pipeline for built-in and UDAF aggregates
  • Adds comprehensive test coverage and documentation for the new UDAF functionality with proper resource management and variable-length output support

Important Files Changed

Filename Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala Major refactor to support dual aggregate processing pipeline for UDAFs alongside built-in cuDF aggregates
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/udaf.scala New file implementing core UDAF GPU framework with three classes and type inference utilities
tests/src/test/scala/com/nvidia/spark/rapids/ScalaUDAFSuite.scala New comprehensive test suite demonstrating UDAF implementation and testing group-by, reduction, and empty dataset scenarios

Confidence score: 4/5

  • This PR is generally safe to merge with careful attention to the aggregate execution changes
  • Score reflects the complexity of the aggregation pipeline modifications and the introduction of new GPU memory management patterns for variable-length UDAF outputs
  • Pay close attention to GpuAggregateExec.scala for potential resource leaks and column ordering correctness

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@firestarman firestarman changed the title Support ScalaUDAF run on GPU Support ScalaUDAF run on GPU[databricks] Dec 3, 2025
@firestarman firestarman requested a review from abellina December 3, 2025 06:37
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@firestarman
Copy link
Collaborator Author

build

1 similar comment
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@firestarman
Copy link
Collaborator Author

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a pass through some of the code, but I am concerned about all of the special case handling that we are adding in for this. RapidsSimpleGroupByAggregation is very close to a CudfAggregate and that was on purpose. There would be so much less code if we could wrap the pre and post steps in expressions and if we could make RapidsSimpleGroupByAggregation look and act a lot like a CudfAggregate. I am mostly curious if there was a reason we didn't do it that way?


withResource(aggTbl) { _ =>
GpuColumnVector.from(aggTbl, postStepDataTypes.toArray)
// The output types of UDAF aggs can not be predicated, so need to infer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does Spark do this for a Scala UDAF? Why are we not doing the same?

Copy link
Collaborator Author

@firestarman firestarman Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, we have a postStep which allows the reduce/aggregate produces data with an arbitrary schema. But Spark has no postStep so the output of aggregate should follow the aggregate buffer schema, just like the output of our postStep.

// UDAF arguments
override final lazy val inputProjection: Seq[Expression] = children

override final lazy val initialValues: Seq[Expression] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we not trying to wrap defaultValues as an expression? It looks like we are putting in a lot of code to special case GpuAggregateFunction when we could reuse a lot of existing code if we could just wrap the user supplied code in expressions.

Copy link
Collaborator Author

@firestarman firestarman Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did, but greptile found a potential use-after-close issue.
Here is the old version.

  override final lazy val initialValues: Seq[Expression] = {
    closeOnExcept(function.getDefaultValue) { udafDefValues =>
      require(udafDefValues.length == aggBufferTypes.length,
        s"The default values number (${udafDefValues.length}) is NOT equal to " +
          s"the aggregation buffers number(${aggBufferTypes.length})")
      udafDefValues.zip(aggBufferTypes).map { case (scalar, dt) =>
        GpuLiteral(scalar, dt)
      }
    }
  }

Comments from greptile
"""
greptile-apps bot
18 hours ago
logic: Storing cuDF Scalar objects directly in GpuLiteral may cause a use-after-free issue. When generateEmptyReductionBatch calls GpuScalar(litVal, dt) where litVal is a cuDF Scalar, it wraps without incrementing the reference count, then closes it via withResource. The next access to initialValues would use an already-closed Scalar.
"""

@firestarman
Copy link
Collaborator Author

firestarman commented Dec 4, 2025

I am mostly curious if there was a reason we didn't do it that way?

Thx for review and this is a good question.

Personally the main reason is related to the length of arguments. all APIs defined in RapidsSimpleGroupByAggregation accept multiple columns as input and output, but the current reduceAggregate/groupByAggregate in CudfAggregate accepts only one column as the input/output.
Yeah we could adapt to it by leveraging a struct column, which I already tried. But when it came to the aggregate API, the input is an array of int, not column, so the struct column way can not be used. Since when we wrap the UDAF inputs into a single struct column, the input here has only one "int" in the array, we can not access its children to specify different GroupByAggregationOnColumns.

The current aggregate framework in GpuAggregateExec is designed for fixed-length of input and output for each aggregate stage, and this is OK for all the built-in aggregates, since we know all the details of each of them in advance.
To follow the current expression way, we can also add in more APIs to RapidsSimpleGroupByAggregation to ask for all the necessary info. (e.g. the output types of preStep, reduce/aggregate.) from users, however this will make the API implementation more complicated. So I chose to move the complexity into our framework, to make the user APIs as simple as possible.

One more solution is to do some refactor to the current aggregate framework to allow multiple input/outputs to each CudfAggregate, but this will be a big change I think. So instead, i chose the solution in this PR, "appending" the UDAF support to the existing system.

Anyway, i am happy to know if any better solutions that can reduce the change in GpuAggregateExec.

@sameerz sameerz added the feature request New feature or request label Dec 5, 2025
@firestarman firestarman requested a review from revans2 December 8, 2025 02:12
@abellina
Copy link
Collaborator

abellina commented Dec 8, 2025

@firestarman I'd like to take a look at this. I would love to find ways of reducing changes to the aggregate, unless we absolutely need them.

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Dec 23, 2025

Greptile's behavior is changing!

From now on, if a review finishes with no comments, we will not post an additional "statistics" comment to confirm that our review found nothing to comment on. However, you can confirm that we reviewed your changes in the status check section.

This feature can be toggled off in your Code Review Settings by deselecting "Create a status check for each PR".

@firestarman
Copy link
Collaborator Author

I would love to find ways of reducing changes to the aggregate, unless we absolutely need them.

@abellina can you take a look at this ? I tried my best to reduce the change in GPU Aggregate, i'd love to know if any suggestion to reduce more.

@firestarman
Copy link
Collaborator Author

build

@firestarman
Copy link
Collaborator Author

build

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman
Copy link
Collaborator Author

build

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR adds GPU acceleration support for Scala UDAFs (User-Defined Aggregate Functions) by introducing three new classes and integrating them into the existing aggregation pipeline.

Key Changes

  • New UDAF framework classes in udaf.scala: UDAFAggregate wraps RapidsUDAFGroupByAggregation for GPU interaction, GpuUserDefinedAggregateFunction provides common GPU UDAF implementation, and GpuScalaUDAF is the GPU version of ScalaUDAF
  • GpuAggregateExec refactoring: Splits aggregation processing into built-in and UDAF paths throughout the entire pipeline (preProcess, reduce/aggregate, postProcess), handles variable-length outputs from UDAF operations, and reorders columns to match Spark's expectations
  • Test coverage: Added comprehensive test suite with IntAverageUDAF example covering groupby, reduction, and empty dataset scenarios
  • Documentation: Updated supported operations and configuration documentation

Performance Impact

No regression detected in NDS benchmarks. Performance improvement for UDAF operations shows ~6x speedup (110s CPU vs 18.8s GPU) compared to CPU execution.

Confidence Score: 4/5

  • This PR is safe to merge with some minor considerations
  • The implementation is well-structured with proper resource management (closeOnExcept patterns), comprehensive test coverage, and no performance regression. The code handles variable-length UDAF outputs correctly and maintains column ordering. Score reduced by one point due to the complex nature of the changes affecting critical aggregation paths and the relatively new UDAF framework integration that would benefit from additional testing in production-like scenarios.
  • Pay close attention to GpuAggregateExec.scala - the UDAF integration touches critical aggregation logic with complex column reordering

Important Files Changed

File Analysis

Filename Score Overview
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/udaf.scala 4/5 New file introducing UDAFAggregate, GpuUserDefinedAggregateFunction, and GpuScalaUDAF classes for GPU UDAF support with type inference utilities
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala 4/5 Major refactoring to integrate UDAF support throughout aggregation pipeline (preProcess, reduce, aggregate, postProcess), handles variable-length outputs
tests/src/test/scala/com/nvidia/spark/rapids/ScalaUDAFSuite.scala 5/5 New test suite with IntAverageUDAF implementation covering groupby, reduction, and empty dataset scenarios

Sequence Diagram

sequenceDiagram
    participant Client as Spark Query
    participant GpuAggExec as GpuAggregateExec
    participant AggHelper as AggHelper
    participant UDAFAgg as UDAFAggregate
    participant UDAF as GpuScalaUDAF/RapidsUDAF
    participant cuDF as cuDF Native

    Note over Client,cuDF: Aggregation Pipeline with UDAF Support

    Client->>GpuAggExec: Execute aggregate query
    GpuAggExec->>AggHelper: preProcess(inputBatch)
    
    alt Has UDAF aggregates
        AggHelper->>AggHelper: preStepBound.project (built-in aggs)
        AggHelper->>UDAFAgg: preStepAndClose(args)
        UDAFAgg->>UDAF: preStep(numRows, args)
        UDAF->>cuDF: Transform columns (e.g., cast INT to LONG)
        cuDF-->>UDAF: Transformed columns
        UDAF-->>UDAFAgg: Array[ColumnVector]
        UDAFAgg-->>AggHelper: Array[GpuColumnVector]
        Note over AggHelper: Cache udafAggArgLens for next step
        AggHelper-->>GpuAggExec: Combined batch (built-in + UDAF cols)
    else No UDAF aggregates
        AggHelper->>AggHelper: preStepBound.project only
        AggHelper-->>GpuAggExec: Preprocessed batch
    end

    GpuAggExec->>AggHelper: aggregate/reduce(preprocessed)
    
    alt Group-by aggregation
        AggHelper->>cuDF: groupBy().aggregate(built-in aggs)
        loop For each UDAF
            AggHelper->>UDAFAgg: aggregate(inputIndices)
            UDAFAgg->>UDAF: aggregate(inputIndices)
            UDAF-->>UDAFAgg: GroupByAggregationOnColumn[]
        end
        AggHelper->>cuDF: groupBy().aggregate(all aggs)
        cuDF-->>AggHelper: Aggregated table
        Note over AggHelper: Cache udafPostStepArgLens
    else Reduction
        AggHelper->>cuDF: reductionAggregate(built-in aggs)
        loop For each UDAF
            AggHelper->>UDAFAgg: reduce(numRows, args)
            UDAFAgg->>UDAF: reduce(numRows, args)
            UDAF->>cuDF: sum(), count(), etc.
            cuDF-->>UDAF: Scalars
            UDAF-->>UDAFAgg: Array[Scalar]
            UDAFAgg-->>AggHelper: Array[GpuScalar]
        end
        Note over AggHelper: Cache udafPostStepArgLens
        AggHelper-->>GpuAggExec: Reduced batch
    end

    GpuAggExec->>AggHelper: postProcess(aggregated)
    
    alt Has UDAF aggregates
        AggHelper->>AggHelper: postStepBound.project (built-in)
        loop For each UDAF
            AggHelper->>UDAFAgg: postStepAndClose(args)
            UDAFAgg->>UDAF: postStep(numRows, args)
            UDAF-->>UDAFAgg: Array[ColumnVector]
            UDAFAgg-->>AggHelper: Array[GpuColumnVector]
        end
        AggHelper->>AggHelper: mergeWithOriginalOrderAndClose
        Note over AggHelper: Reorder columns to match Spark expectation
        AggHelper-->>GpuAggExec: Post-processed batch
    else No UDAF aggregates
        AggHelper->>AggHelper: postStepBound.project only
        AggHelper-->>GpuAggExec: Post-processed batch
    end

    alt Final/Complete mode
        GpuAggExec->>UDAF: resultEvalAndClose(args)
        UDAF->>cuDF: getResult (e.g., sum/count)
        cuDF-->>UDAF: Final result column
        UDAF-->>GpuAggExec: Result column
    end

    GpuAggExec-->>Client: Final result
Loading

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants