Skip to content

feat: cuDF window functionality#1765

Open
patdevinwilson wants to merge 1 commit intoIBM:ibm-research-preview-2026-03-03from
patdevinwilson:ibm-research-preview-2026-03-03
Open

feat: cuDF window functionality#1765
patdevinwilson wants to merge 1 commit intoIBM:ibm-research-preview-2026-03-03from
patdevinwilson:ibm-research-preview-2026-03-03

Conversation

@patdevinwilson
Copy link
Copy Markdown

@patdevinwilson patdevinwilson commented Mar 4, 2026

Summary

Fixes CudfToVelox conversion for window operator results and a row_number() correctness bug so that Presto window queries (LAG, LEAD, ROW_NUMBER, FIRST_VALUE, LAST_VALUE, and aggregate windows SUM/MIN/MAX/COUNT/AVG) run correctly on the GPU path. Also improves the error message when cuDF operator replacement fails with CPU fallback disabled.

Problem

  1. Row type / column names: CudfToVelox::getOutput() converted the cuDF table to a Velox RowVector via toVeloxColumn(..., ""), which produced columns named "0", "1", "2". The plan's output type had real names (e.g. field, field_0, row_number). Calling output->setType(outputType_) then failed because Velox does not allow changing a RowVector's type to one with different field names.
  2. Physical type mismatch: cuDF/Arrow produced INTEGER for ROW_NUMBER() and COUNT(*), while the plan expects BIGINT. After fixing names, setType(outputType_) still failed with "underlying physical types must match."
  3. Debugging with fallback disabled: With cudf.allow_cpu_fallback=false, the failure message was only "Replacement with cuDF operator failed" with no indication of which operator or plan node failed.
  4. row_number() off-by-one: CudfWindow used cudf::window_bounds::get(1) as the following parameter for grouped_rolling_window, giving a frame of UNBOUNDED PRECEDING to 1 FOLLOWING instead of UNBOUNDED PRECEDING to CURRENT ROW. This made row_number() return values +1 too high for every non-last row in each partition (e.g. 2,3,3 instead of 1,2,3).

Solution

CudfConversion.cpp

  • Use plan output names in conversion: Pass outputType_->names() into toVeloxColumn() so the RowVector is built with the correct schema from the start (passthrough and concatenation paths).
  • Cast when types differ: Add castColumnToPlanType() to cast integral columns (INTEGER/SMALLINT/TINYINT) to BIGINT when the plan expects BIGINT. After conversion, if !output->type()->kindEquals(outputType_), build a new RowVector with the same nulls/size and children produced by castColumnToPlanType(output->childAt(i), outputType_->childAt(i), pool()); only call setType(outputType_) when the converted type already matches.
  • Includes: Add DecodedVector, FlatVector, SelectivityVector, and Type.h for the cast helper.

CudfWindow.cpp

  • Fix row_number() window bounds: Change cudf::window_bounds::get(1) to cudf::window_bounds::get(0) for the following parameter so the rolling COUNT uses the correct frame (UNBOUNDED PRECEDING to CURRENT ROW).

ToCudf.cpp

  • Clearer failure message: When !allowCpuFallback and an operator is not replaced, build the message with fmt::format() to include the failing operator and plan node (e.g. "Replacement with cuDF operator failed. Operator: LocalMerge[675] 0. PlanNode: …") so users can see exactly which operator blocked full-GPU execution.
  • Include: Add #include <fmt/format.h>.

Testing

  • Unit test: velox/velox/experimental/cudf/tests/WindowTest.cppCudfWindowTest exercises the cuDF window path with hardcoded expected results (avoiding DuckDB, which is incompatible with 64KB-page ARM kernels such as GH200):

    • rowNumberPartitionOrder: ROW_NUMBER() OVER (PARTITION BY id ORDER BY val), validates output type, column names, and correct 1-based numbering.
    • lagLead: LAG(val, 1) and LEAD(val, 1) OVER (PARTITION BY id ORDER BY val), validates null handling at partition boundaries.

    Run: ctest -R velox_cudf_window_test (or ./velox_cudf_window_test) from the build dir; test has label cuda_driver and requires a GPU.

  • Presto: Window test SQL run against Presto with native GPU worker: LAG, LEAD, ROW_NUMBER, FIRST_VALUE, LAST_VALUE, and aggregate windows (SUM, MIN, MAX, COUNT, AVG) with PARTITION BY and ORDER BY all complete successfully.

  • With cudf.debug_enabled=true, worker logs show the window pipeline after adaptation: LocalExchangeCudfWindowCudfOrderByCudfToVeloxCallbackSink (compute on GPU); only LocalMerge and PartitionedOutput remain on CPU (no GPU adapter).

@patdevinwilson patdevinwilson force-pushed the ibm-research-preview-2026-03-03 branch from c9b60e7 to 279de0c Compare March 4, 2026 14:59
@devavret
Copy link
Copy Markdown

devavret commented Mar 9, 2026

@patdevinwilson is this ready to review?

@patdevinwilson
Copy link
Copy Markdown
Author

devavret

the first unit test is failing :/ working on it.

@patdevinwilson patdevinwilson force-pushed the ibm-research-preview-2026-03-03 branch from 279de0c to d7ea608 Compare March 19, 2026 21:54
…WindowTest)

Signed-off-by: patdevinwilson <pwilson@nvidia.com>
Made-with: Cursor
Signed-off-by: patdevinwilson <pwilson@nvidia.com>
@patdevinwilson patdevinwilson force-pushed the ibm-research-preview-2026-03-03 branch from d7ea608 to 9252516 Compare March 19, 2026 22:35
@patdevinwilson patdevinwilson marked this pull request as ready for review March 19, 2026 22:36
@patdevinwilson
Copy link
Copy Markdown
Author

Ready for review @devavret :) Thank you for your patience.

Copy link
Copy Markdown

@devavret devavret left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking the time to add this operator. I have a couple of suggestions:

  • It doesn't seem like this PR depends on functionality added to this fork that is yet to be upstreamed. Can you please re-open this PR on https://github.com/facebookincubator/velox
  • Please update the PR description. It seems to contain description of only one change and fails to mention that the window function operator was added in this PR.
  • The PR proposes a collect all and output once approach. This would be fine if most of TPC-DS queries flow a small amount of data through the window plan node but would fail at scale. I suggest a more robust approach which consumes the incoming batches immediately (on addInput) and reduces the amount of data this operator has to hold.

@patdevinwilson
Copy link
Copy Markdown
Author

I see the concern that addInput just stores CudfVector smart pointers (GPU batches) in a vector, and getOutput concatenates everything at once. We should concatenate incrementally in addInput so we hold one accumulated GPU table rather than N separate batch pointers, thus reducing peak memory.

upstreaming in a new PR

@patdevinwilson
Copy link
Copy Markdown
Author

facebookincubator#16892

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants