feat: cuDF window functionality#1765
Open
patdevinwilson wants to merge 1 commit intoIBM:ibm-research-preview-2026-03-03from
Open
feat: cuDF window functionality#1765patdevinwilson wants to merge 1 commit intoIBM:ibm-research-preview-2026-03-03from
patdevinwilson wants to merge 1 commit intoIBM:ibm-research-preview-2026-03-03from
Conversation
c9b60e7 to
279de0c
Compare
|
@patdevinwilson is this ready to review? |
Author
the first unit test is failing :/ working on it. |
279de0c to
d7ea608
Compare
…WindowTest) Signed-off-by: patdevinwilson <pwilson@nvidia.com> Made-with: Cursor Signed-off-by: patdevinwilson <pwilson@nvidia.com>
d7ea608 to
9252516
Compare
Author
|
Ready for review @devavret :) Thank you for your patience. |
devavret
suggested changes
Mar 23, 2026
devavret
left a comment
There was a problem hiding this comment.
Thanks for taking the time to add this operator. I have a couple of suggestions:
- It doesn't seem like this PR depends on functionality added to this fork that is yet to be upstreamed. Can you please re-open this PR on https://github.com/facebookincubator/velox
- Please update the PR description. It seems to contain description of only one change and fails to mention that the window function operator was added in this PR.
- The PR proposes a collect all and output once approach. This would be fine if most of TPC-DS queries flow a small amount of data through the window plan node but would fail at scale. I suggest a more robust approach which consumes the incoming batches immediately (on addInput) and reduces the amount of data this operator has to hold.
Author
|
I see the concern that addInput just stores CudfVector smart pointers (GPU batches) in a vector, and getOutput concatenates everything at once. We should concatenate incrementally in addInput so we hold one accumulated GPU table rather than N separate batch pointers, thus reducing peak memory. upstreaming in a new PR |
Author
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Fixes CudfToVelox conversion for window operator results and a
row_number()correctness bug so that Presto window queries (LAG, LEAD, ROW_NUMBER, FIRST_VALUE, LAST_VALUE, and aggregate windows SUM/MIN/MAX/COUNT/AVG) run correctly on the GPU path. Also improves the error message when cuDF operator replacement fails with CPU fallback disabled.Problem
CudfToVelox::getOutput()converted the cuDF table to a VeloxRowVectorviatoVeloxColumn(..., ""), which produced columns named"0","1","2". The plan's output type had real names (e.g.field,field_0,row_number). Callingoutput->setType(outputType_)then failed because Velox does not allow changing a RowVector's type to one with different field names.INTEGERforROW_NUMBER()andCOUNT(*), while the plan expectsBIGINT. After fixing names,setType(outputType_)still failed with "underlying physical types must match."cudf.allow_cpu_fallback=false, the failure message was only "Replacement with cuDF operator failed" with no indication of which operator or plan node failed.row_number()off-by-one:CudfWindowusedcudf::window_bounds::get(1)as thefollowingparameter forgrouped_rolling_window, giving a frame of UNBOUNDED PRECEDING to 1 FOLLOWING instead of UNBOUNDED PRECEDING to CURRENT ROW. This maderow_number()return values +1 too high for every non-last row in each partition (e.g.2,3,3instead of1,2,3).Solution
CudfConversion.cpp
outputType_->names()intotoVeloxColumn()so the RowVector is built with the correct schema from the start (passthrough and concatenation paths).castColumnToPlanType()to cast integral columns (INTEGER/SMALLINT/TINYINT) to BIGINT when the plan expects BIGINT. After conversion, if!output->type()->kindEquals(outputType_), build a new RowVector with the same nulls/size and children produced bycastColumnToPlanType(output->childAt(i), outputType_->childAt(i), pool()); only callsetType(outputType_)when the converted type already matches.DecodedVector,FlatVector,SelectivityVector, andType.hfor the cast helper.CudfWindow.cpp
row_number()window bounds: Changecudf::window_bounds::get(1)tocudf::window_bounds::get(0)for thefollowingparameter so the rolling COUNT uses the correct frame (UNBOUNDED PRECEDING to CURRENT ROW).ToCudf.cpp
!allowCpuFallbackand an operator is not replaced, build the message withfmt::format()to include the failing operator and plan node (e.g. "Replacement with cuDF operator failed. Operator: LocalMerge[675] 0. PlanNode: …") so users can see exactly which operator blocked full-GPU execution.#include <fmt/format.h>.Testing
Unit test:
velox/velox/experimental/cudf/tests/WindowTest.cpp–CudfWindowTestexercises the cuDF window path with hardcoded expected results (avoiding DuckDB, which is incompatible with 64KB-page ARM kernels such as GH200):rowNumberPartitionOrder: ROW_NUMBER() OVER (PARTITION BY id ORDER BY val), validates output type, column names, and correct 1-based numbering.lagLead: LAG(val, 1) and LEAD(val, 1) OVER (PARTITION BY id ORDER BY val), validates null handling at partition boundaries.Run:
ctest -R velox_cudf_window_test(or./velox_cudf_window_test) from the build dir; test has labelcuda_driverand requires a GPU.Presto: Window test SQL run against Presto with native GPU worker: LAG, LEAD, ROW_NUMBER, FIRST_VALUE, LAST_VALUE, and aggregate windows (SUM, MIN, MAX, COUNT, AVG) with PARTITION BY and ORDER BY all complete successfully.
With
cudf.debug_enabled=true, worker logs show the window pipeline after adaptation:LocalExchange→CudfWindow→CudfOrderBy→CudfToVelox→CallbackSink(compute on GPU); only LocalMerge and PartitionedOutput remain on CPU (no GPU adapter).