Skip to content

feat(PartitionedOutput): Add outputChannels support#1972

Open
xin-zhang2 wants to merge 1 commit into
IBM:optimized_partitionedoutputfrom
xin-zhang2:PartitionedOutput-output
Open

feat(PartitionedOutput): Add outputChannels support#1972
xin-zhang2 wants to merge 1 commit into
IBM:optimized_partitionedoutputfrom
xin-zhang2:PartitionedOutput-output

Conversation

@xin-zhang2
Copy link
Copy Markdown
Member

@xin-zhang2 xin-zhang2 commented Apr 28, 2026

Add outputChannels support in OptimizedPartitionedOutput.

@xin-zhang2 xin-zhang2 requested a review from yingsu00 April 28, 2026 18:34
@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput-output branch 3 times, most recently from a884aa0 to 960998f Compare April 28, 2026 22:09
@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput-output branch 2 times, most recently from e1eb062 to e335c23 Compare May 7, 2026 15:20
});
}

RowVectorPtr OptimizedPartitionedOutput::prepareOutput(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is preparing input, not output. Rename to prepareInput

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to prepareSerializerInput

return input;
}

std::vector<VectorPtr> outputColumns;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputColumns -> reorderedInputColumns

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to serializerInputColumns as it is passed to the serializer append() and it only contains the unique columns from output.

Comment thread velox/vector/PartitionedVector.cpp Outdated
PartitionBuildContext& ctx) {
auto* rowVector = vector_->as<RowVector>();
partitionedChildren_.reserve(rowVector->childrenSize());
std::unordered_map<const BaseVector*, PartitionedVectorPtr>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think the input-output mapping shall be done in PrestoIterativePartitioningSerializer::flushRowChildren(), not in PartitionedVector level. The PartitionedVector is NOT supposed to handle or know the remapping business which should happen in upper levels. Also, the change made here is hard to understand.

});
}

RowVectorPtr OptimizedPartitionedOutput::prepareOutput(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, let's not do the mapping at AddInput time, but at flush time. The place it should happen is PrestoIterativePartitioningSerializer::flushRowChildren().

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput-output branch from e335c23 to f66413c Compare May 15, 2026 14:18
@xin-zhang2
Copy link
Copy Markdown
Member Author

@yingsu00 I've moved the mapping to flush time. Now the serailizer includes a member outputToInputChannels_ for this mapping, and the input passed to append() is prepared in OptimizedPartitionedOutput to include only the unique columns from the output.
Could you please take a look? Thanks.

@xin-zhang2 xin-zhang2 requested a review from yingsu00 May 15, 2026 14:32
Comment thread velox/exec/OptimizedPartitionedOutput.cpp
input->size(),
partitions.size(),
"partitions.size() must equal input->size()");

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safer to additionaly check the mapped types in append() when outputToInputChannels_ is non-empty. We can validate each mapped channel against input->childrenSize() and validate input->childAt(mapped)->type() against type_->childAt(outputColumn).

/// Builds the RowVector consumed by the serializer. When the output layout
/// has duplicated columns, this projects only the distinct columns and
/// leaves duplication to flush time.
RowVectorPtr prepareSerializerInput(const RowVectorPtr& input) const;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this before flush(). For funcitons with the same access level, order them in the order they are called.


/// Row type passed to serializer_->append(). It only includes distinct
/// columns from the output layout.
RowTypePtr serializerInputType_;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the three new members to after the serializer_ member


namespace facebook::velox::exec {

void OptimizedPartitionedOutput::initializeSerializerLayout() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Order the function definitions in the same order in .h

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants