Skip to content

feat(PartitionedOutput): Add BufferState in PrestoIterativePartitioningSerialize#1916

Open
xin-zhang2 wants to merge 1 commit into
IBM:optimized_partitionedoutputfrom
xin-zhang2:PartitionedOutput4.0-bytesBuffered
Open

feat(PartitionedOutput): Add BufferState in PrestoIterativePartitioningSerialize#1916
xin-zhang2 wants to merge 1 commit into
IBM:optimized_partitionedoutputfrom
xin-zhang2:PartitionedOutput4.0-bytesBuffered

Conversation

@xin-zhang2
Copy link
Copy Markdown
Member

@xin-zhang2 xin-zhang2 commented Apr 13, 2026

Add BufferState in PrestoIterativePartitioningSerializer to track the buffered state.

Copy link
Copy Markdown
Collaborator

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retainedSize() is also used as a initial stage convenience in OptimizedPartitionedOutput:

void OptimizedPartitionedOutput::addInput(RowVectorPtr input) {
  VELOX_USER_CHECK(
      !replicateNullsAndAny_,
      "replicateNullsAndAny is not yet supported by OptimizedPartitionedOutput");

  if (serializer_->bytesBuffered() + input->retainedSize() >=
      maxOutputBufferBytes_) {
    flush();
  }

This requires the bufferedBYtes_ can be easily added without really committing the change, while the current implementation in this PR just update the serializer's buffered sizes. It doesn't satisfy the requirements.

I'm thinking that maybe we can introduce a struct to track the accumulated sizes/numNulls instead of just a vector or vector of vectors. This struct should share the same tree structure of the input type, something like

class BufferState {
public:
    BufferState(VectorPtr vector); // construct on the first incoming vector
    add(VectorPtr newVec);
    BufferState tryAdd(VectorPtr newVec); // returns a new one
    clear();
protected:
    TypePtr type;
    int64_t numValuesBuffered;
    int64_t numNullsBuffered;
    int64_t bytesBuffered;
    ... // add or remove fields if you think necessary
}
class RowBufferState : public BufferState {
    std::vector<std::unique_ptr<BufferState>> childrenStates;
}
class ArrayBufferState : public BufferState {
    std::unique_ptr<BufferState> childState;
}
class MapBufferState : public BufferState {
    std::unique_ptr<BufferState> keyState;
    std::unique_ptr<BufferState> valuesState;
}

THe constructor would merge what we have done in computeColumnFlushSizes(). flushSizes_ would be replaced by the bufferState_ member. At the end of PrestoIterativePartitioningSerializer::append():

    bufferState_->add(input);

The operator can say

    if (serializer_->bufferState()->tryAdd(input)->bytesBuffered() >=
      maxOutputBufferBytes_) {
    flush();
  }

What do you think?

@xin-zhang2
Copy link
Copy Markdown
Member Author

@yingsu00
Yes, the current implementation doesn't work with input->retainedSize(), and it makes sense to add a method like tryAdd. We need to estimate the output size of the current input before adding it to partitionedRowVectors_, so that the total flush size doesn't exceed the limit.

Introducing BufferState seems like a good design direction. It would make it easier to support complex types, and allow us to define specialized subclasses like FixedWidthBufferState, and VariableWidthBufferState to better handle the size calculation.

I will try this approach, and one thing to note is that we might still need to keep BuffereState members like numValuesBuffered as a vector so as to track sizes per partition.

@xin-zhang2 xin-zhang2 force-pushed the optimized_partitionedoutput branch from 57c7447 to d375a33 Compare April 14, 2026 16:49
@xin-zhang2 xin-zhang2 force-pushed the optimized_partitionedoutput branch from d375a33 to 70ad998 Compare April 15, 2026 10:02
@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch from ab088f7 to c7bbf52 Compare April 15, 2026 17:03
@xin-zhang2 xin-zhang2 changed the title feat(PartitionedOutput): update the calculation of bytesBuffered feat(PartitionedOutput): Add BufferState in PrestoIterativePartitioningSerialize Apr 15, 2026
@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch from c7bbf52 to 6ea273f Compare April 15, 2026 19:59
@xin-zhang2 xin-zhang2 removed the request for review from majetideepak April 16, 2026 22:10
@xin-zhang2 xin-zhang2 force-pushed the optimized_partitionedoutput branch from 70ad998 to 211901c Compare April 17, 2026 12:12
@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch from 6ea273f to 7addaaf Compare April 21, 2026 11:00
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.h Outdated
}

serializer_->append(input, partitions_);
auto prepared = serializer_->prepareAppend(input, partitions_);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This breaks the work into two-phase flow and makes the API more complex than necessary.

Although asking the estimated size is a real need in OptimizedPartitionedOutput, the current solution pushes too much work into the serializer and makes the flow too complex. IMHO, a simpler design would be

  • Keep the serializer stateful and append-only.
  • Add one cheap “estimate/predict” API for a candidate batch.
  • Keep actual ownership of appended data entirely inside append().
    We want to keep the lines like this:
if (serializer_->bytesBuffered() +
        serializer_->incrementalBytesFor(input, partitions_) >
    maxOutputBufferBytes_) {
  flush();
}
serializer_->append(input, partitions_);

};
};

PrestoIterativePartitioningSerializer::AppendState::~AppendState() = default;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this AppendState, and replace it with incrementalBytesFor(...). When appending, we don't need the exact size 100%, so we don't have to partition the vector before append. This will make the flow much easier. incrementalBytesFor(vector) would get the estimated flat size of vector, and maybe adjust the size of nulls by checking the number of nulls for each column in existing BufferState. This operation is very light and the code will be a lot simpler

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch from 7addaaf to 7344d82 Compare April 28, 2026 16:54
@xin-zhang2
Copy link
Copy Markdown
Member Author

@yingsu00 I've removed the two-phase append and use the incrementalBytesFor approach now. The estimated bytes is still exact for now. Can you please take a look? Thanks!

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch from 7344d82 to 9055037 Compare April 30, 2026 16:41
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
///
/// For each partition, this node tracks page-level row counts and aggregates
/// the serialized-byte contributions of all top-level column states.
class PageBufferState : public BufferState {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PageBufferState -> RowBufferState

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need to keep PageBufferState here, as RowBufferState is reserved for the ROW encoding columns, and PageBufferState and RowBufferState are different because their different wire format.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate what is different betweenPageBufferState and RowBufferState? In Velox, the top level rows are expressed as a RowVector, the only difference is all top level rows are not null. This is different from Presto. So it is actually BufferState for RowVectors.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that both top-level rows and row-type columns are expressed as RowVector. The distinction here is at the serialization level. They need separate buffer states because their wire format are different.

PageBufferState is for top-level rows. It includes all child (top column) buffer states, and accounts for page-level bytes such as the serialized page header.

Image

While RowBufferState is for row-type columns. It includes the column header (length varies across encodings) and all its child (field of the row) buffer states. Additionally, it needs to account for all row-specfic details such as offset and null bitmaps, and it also need to take row nulls into account when computing the child state.

Image Image

Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
/// state.
int64_t incrementalBytesFor(
const BaseVector& input,
const std::vector<uint32_t>& partitions) const {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is only used in OptimizedPartitionedOutput operator before calling serializer_append(). As I said on Slack, you don' t need to calculate the per partition stats. A rough total estimation is good. At that time the vector is not partitioned yet. The accurate BufferState will be calculated after partitionedVector is created. Change this function to

int64_t bytesEstimationAfter(
      const BaseVector& input)

When implementing it, we will need to check the numNulls per column/field though

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch 2 times, most recently from e72bc10 to dc15f8c Compare May 5, 2026 15:04
@xin-zhang2 xin-zhang2 requested a review from yingsu00 May 5, 2026 15:05
VELOX_CHECK_NOT_NULL(pool_);

numColumns_ = type_->size();
bufferState_ = createPageState();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't create the createPageState() function in the Serializer class. Move the logic into BufferState classes. Then in constructor initializer list, do bufferState_{std::make_unique(Type, ...))
or create static create() function in
and
bufferState_{RowBufferState::create(Type, ...))

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved them out of Serializer.

Comment thread velox/exec/OptimizedPartitionedOutput.cpp
///
/// For each partition, this node tracks page-level row counts and aggregates
/// the serialized-byte contributions of all top-level column states.
class PageBufferState : public BufferState {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please elaborate what is different betweenPageBufferState and RowBufferState? In Velox, the top level rows are expressed as a RowVector, the only difference is all top level rows are not null. This is different from Presto. So it is actually BufferState for RowVectors.

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch from dc15f8c to 06dee0d Compare May 8, 2026 14:19
@xin-zhang2 xin-zhang2 requested a review from yingsu00 May 8, 2026 16:33
@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch from 06dee0d to 5e8ce3f Compare May 8, 2026 20:57
Copy link
Copy Markdown
Collaborator

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More to come

} // namespace

switch (colType->kind()) {
static std::unique_ptr<BufferState> makeBufferState(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this BufferState class and make it a static create function

class BufferState {
public:
static std::unique_ptr<BufferState> create(...) {...}
...
 protected:
  BufferState(TypePtr type, uint32_t numPartitions)
      : type_(std::move(type)),
        numPartitions_(numPartitions),
        rowsPerPartition_(numPartitions, 0),
        nullsPerPartition_(numPartitions, 0),
        bytesPerPartition_(numPartitions, 0) {}


// In single-partition case, if the second addInput() is estimated to stay
// below the partitioned-output limit, it doesn't flush before appending.
TEST_F(OptimizedPartitionedOutputTest, noPreFlushWhenEstimateBelowLimit) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add tests for ConstantVectors with nulls and non-nulls

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added two cases in PrestoIterativePartitioningSerializerTest to verify bytesEstimationAfter works correctly with constant vectors.

The tests here in OptimizedPartitionedOutputTest are just to ensure the flush in addInput works as expected based on the comparison between bytesBuffered and maxPartitionedOutputBufferSize.

/// Recurses into nested ROW columns.
/// Returns the null counts if it can be derived without row-by-row checks,
/// otherwise returns std::nullopt.
std::optional<int64_t> countNulls(const BaseVector& vector) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is not needed. BaseVector::countNulls() can do the job correctly.

  auto constant = makeConstant<int64_t>(std::nullopt, 3);
  printf("%d", BaseVector::countNulls(constant->nulls(), constant->size()));

This prints 3

  auto constant2 = makeConstant<int64_t>(1L, 3);
  printf("%d", BaseVector::countNulls(constant2->nulls(), constant2->size()));

This prints 0

Copy link
Copy Markdown
Member Author

@xin-zhang2 xin-zhang2 May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size of nulls() is always 1 for constant vectors, so it doesn't work for vectors with size > 64.

  auto constant = makeConstant<int64_t>(std::nullopt, 80);
  printf("%d", BaseVector::countNulls(constant->nulls(), constant->size()));

doesn't print 80. It prints 74 under debug build.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's surprising. Mine prints 80. Does it always print 74? Did you change anything in BaseVector?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yingsu00 Are you using a release build?
In ConstantVector constructor, the nulls buffer is allocated by makeNullsBuffer().

BaseVector::nulls_ =
        AlignedBuffer::allocate<uint64_t>(1, BaseVector::pool());
    BaseVector::nulls_->setSize(1);
    BaseVector::rawNulls_ = BaseVector::nulls_->as<uint64_t>();
    *BaseVector::nulls_->asMutable<uint64_t>() = bits::kNull64;

It's size is always 1 and only the first 64 bits is set to kNulls. Although it may have a larger capacity, the other bytes are either uninitialized in release mode or initialized to 0xa1 (10100001) in debug mode in fillNewMemory().

#ifndef NDEBUG
      // Initialize with unlikely constant value in debug mode to make
      // uninitialized status visible in debugger.
      memset(asMutable<char>() + oldBytes, 0xa1, capacity() - oldBytes);
#endif

So in debug mode, only the first 64 bits are set to 0, leaving the last 16 bits to contain six 1s. Therefore the result is 74.

The countNulls function I added here is to make it work for both flat and constant vectors. For flat vectors, BaseVector::countNulls() is used while for constant vector, vector.size() is returned.

Copy link
Copy Markdown
Collaborator

@yingsu00 yingsu00 May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also got 74 in debug build. This is not ideal but it's pre-existing so yes we will have to count nulls here.

But we need to add special handling of dict vecctor here:

case VectorEncoding::Simple::DICTIONARY: {
        int64_t count = 0;
        for (vector_size_t i = 0; i < numRows; ++i) {
          count += vector.isNullAt(i);
        }
        return count;
      }

Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
@xin-zhang2
Copy link
Copy Markdown
Member Author

@yingsu00
I've addressed the comments. Could you please take another look? Thanks!

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch from 5e8ce3f to 9c26c1b Compare May 13, 2026 11:06
@xin-zhang2 xin-zhang2 requested a review from yingsu00 May 13, 2026 11:06
Copy link
Copy Markdown
Collaborator

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xin-zhang2 The structure looks good now, but the size calculation may need some work.

  1. All calculations were on TypeKind, but the original intention was to make different vector encodings like CONSTANT, DICT vectors size correct. I think we should switch on encoding instead of TypeKind in all size related calculations
  2. I think we are overly complicating bytesEstimationAfter().
  • The header sizes estimation took many lines but they are not significant size even if there're 1000 nodes in the cluster. Can we simplify it by removing the #partitions estimation and just assume the new vector will go to all partitions?
  • For normal flat vectors, we can just use BaseVector::estimateFlatSize()

std::unique_ptr<ColumnBufferState> ColumnBufferState::create(
const TypePtr& type,
uint32_t numPartitions) {
switch (type->kind()) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just check type->isFixedWidth(). No need to use the huge switch block

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only have primitive types supported now, so using type->isFixedWidth() in a if statement is fine.
In the future we might need to add BufferState derived classes for complex types such as ROW and MAP, so it would be better to use a switch statement here.

@xin-zhang2
Copy link
Copy Markdown
Member Author

@yingsu00

All calculations were on TypeKind, but the original intention was to make different vector encodings like CONSTANT, DICT vectors size correct. I think we should switch on encoding instead of TypeKind in all size related calculations

The calculations estimates buffered serialized bytes, so it should align with Presto serialization wire-format encodings such as XXX_ARRAY, VARIABLE_WIDTH, ROW, MAP etc., because they have different serialization layouts. Those encodings are different from the Velox encodings like FLAT, CONSTANT, and DICTIONARY, which describe the in-memory presentation of the input vector.
Since those wire-format encodings are effectively derived from the typeKind, I think it would be better to keep using typeKind.


The header sizes estimation took many lines but they are not significant size even if there're 1000 nodes in the cluster. Can we simplify it by removing the #partitions estimation and just assume the new vector will go to all partitions?

In practice, header sizes are not negligible. Suppose we have 1000 partitions, the input contains 50 columns, and the average header size per-column is approximately 15 bytes. If we simply assume that an input goes to all partitions, the additional header overhead would be

(25 + 15 * 50) * 1000 =775,000 bytes (~775KB), where 25 bytes is the page header size

If we simplify the logic, there are two obvious choices:

  1. Include this header overhead on every append
    This is simple, but it would significantly overestimate and trigger unnecessary flushes.
  2. Include this header overhead only on the first append after a flush (for example, when bytesBuffered == 0)
    This avoids repeated overestimation, but it can underestimate subsequent appends. The reason is that bytesBuffered tracks only the actual buffered size, while later batches may still cause additional partitions to become non-empty and therefore introduce new header overhead.

So I think the current logic would be a reasonable trade-off. It adds a bit more calculation, but it keeps the estimate conservative without assuming the worst case on every append.


  • For normal flat vectors, we can just use BaseVector::estimateFlatSize()

BaseVector::estimateFlatSize() might not be a good fit here, even for FlatVectors. It estimates the retained size of a flat in-memory representation, while here we are trying to estimate the buffered size in Presto serialization wire format. So this would be a semantic change, not just a simplification.
For example, it doesn't inlcudes page/column headers, wire format overhead such as offsets for StringView, and it can diverge from the wire layout for some types such as bool. (bool is bit-packed in memory, but serialized as bytes).

@yingsu00
Copy link
Copy Markdown
Collaborator

The calculations estimates buffered serialized bytes, so it should align with Presto serialization wire-format encodings such as XXX_ARRAY, VARIABLE_WIDTH, ROW, MAP etc., because they have different serialization layouts. Those encodings are different from the Velox encodings like FLAT, CONSTANT, and DICTIONARY, which describe the in-memory presentation of the input vector. Since those wire-format encodings are effectively derived from the typeKind, I think it would be better to keep using typeKind.

It is true that the Presto serialization format is on the TypeKinds most of the times, but they are flattened based on the encodings like CONSTANT and DICT in IterativeSerializer. In the iterative mode, multiple incoming vectors are buffered, and CONSTANT and DICTIONERY vectors get flatten out. A CONSTANT vector of 100 1L's will be expanded to actual 800 bytes on the wire, and the encoding information will get lost. The receiver won't know if a piece of data is from a constant vector or a flat vector.

The other mode is the "batch" mode where BatchSerializer is used. Encoding is preserved here, and Presto actually support sending a RleBlock(Constant Vector) in the original encoding, so 100 1's will be just a few bytes. But we're working on iterative serializer so you have to peel off the encodings first. ANd this was the original reason why we wanted to introduce BufferState because you found that sending a constant vector would explode the data size and make the estimation far off. Do you not remember that?

So my suggestion was to make top level switch on ENCODING then the second level on TypeKind. We can't leave the encoding treatment undone, right?

Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
Comment thread velox/serializers/PrestoIterativePartitioningSerializer.cpp Outdated
/// partition first, then one byte is added for every 8 remaining rows.
int64_t maxBitmapBytes(int64_t totalRows, int64_t numPartitionsWithNulls) {
VELOX_DCHECK_LE(numPartitionsWithNulls, totalRows);
return numPartitionsWithNulls + (totalRows - numPartitionsWithNulls) / 8;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add this
if (numPartitionsWithNulls == 0) {
return 0;
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Thanks!

bufferState_->clear();
}

int64_t PrestoIterativePartitioningSerializer::bytesEstimationAfter(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Xin, I took the liberty and shorten this function to:

int64_t PrestoIterativePartitioningSerializer::bytesEstimationAfter(
    const RowVectorPtr& input) const {
  VELOX_CHECK_NOT_NULL(input);

  const auto numRows = input->size();
  if (numRows == 0) {
    return bytesBuffered();
  }

  // Worst case: each input row lands in a distinct empty partition, capped by
  // the number of empty partitions.
  const auto numNewPartitions = std::min<uint32_t>(
      numRows, numPartitions_ - bufferState_->numNonEmptyPartitions());
  const auto numNonEmptyPartitionsAfter =
      bufferState_->numNonEmptyPartitions() + numNewPartitions;
  // One page header per newly non-empty partition.
  auto estimatedBytes =
      bufferState_->bytesBuffered() + numNewPartitions * (kHeaderSize + 4);

  for (auto column = 0; column < numColumns_; ++column) {
    const auto& columnType = type_->childAt(column);
    if (columnType->isFixedWidth()) {
      const auto* columnState = bufferState_->children()[column].get();
      const auto inputNulls = countNulls(*input->childAt(column));
      const auto partitionsWithNulls = std::min(
          numNonEmptyPartitionsAfter,
          columnState->numPartitionsWithNulls() + inputNulls.value_or(numRows));
      const auto nullBitmapBytes = maxBitmapBytes(
          bufferState_->rowsBuffered() + numRows, partitionsWithNulls);
      VELOX_DCHECK_GE(nullBitmapBytes, columnState->nullBitmapBytesBuffered());
      estimatedBytes += numNewPartitions * simpleColumnBytes(columnType, 0, 0) +
          nullBitmapBytes - columnState->nullBitmapBytesBuffered() +
          (numRows - inputNulls.value_or(0)) *
              fixedTypeWidth(columnType->kind());
    } else {
      switch (columnType->kind()) {
        case TypeKind::VARCHAR:
        case TypeKind::VARBINARY:
        case TypeKind::ROW:
        case TypeKind::ARRAY:
        case TypeKind::MAP:
          VELOX_NYI(
              "Unsupported type kind for "
              "PrestoIterativePartitioningSerializer::bytesEstimationAfter: {}",
              columnType->kind());
        default:
          VELOX_UNSUPPORTED(
              "Invalid type kind for "
              "PrestoIterativePartitioningSerializer::bytesEstimationAfter: {}",
              columnType->kind());
      }
    }
  }

  return estimatedBytes;
}

Can you please check if this looks right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for help simplify this function.

  1. The code logic is correct, and inputNulls.value_or(numRows) provides a better estimate for partitionsWithNulls than my original code.
  2. If we use isFixedWidth(), we might need to specially handle TypeKind::UNKNOWN and TypeKind::TIMESTAMP, which are also fixed-width. UNKNOWN is not supported yet, and for TIMESTAMP we can provide a more accurate estimate based on serialization option useLosslessTimestamp.
  3. If nullBitmapBytesBuffered_ is removed and nullBitmapBytesBuffered() has O(numPartitions) complexity, it would be better to store the result in a local variable to avoid calling it twice.

VELOX_UNSUPPORTED(
"computeColumnFlushSizes: unsupported type kind {}",
TypeKindName::toName(colType->kind()));
for (auto column = 0; column < children_.size(); ++column) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this block up so the two for (auto p = 0; p < numPartitions_; ++p) blocks can be merged

children_[column]->append(rowVector->childAt(column));
}

for (auto p = 0; p < numPartitions_; ++p) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge the two partition loops into

for (auto p = 0; p < numPartitions_; ++p) {
      const auto numRows = partitionedVector->numRowsAt(p);
      if (numRows == 0) {
        continue;
      }
      if (rowsPerPartition_[p] == 0) {
        ++numNonEmptyPartitions_;
      }
      rowsPerPartition_[p] += numRows;

      int64_t partitionBytes = kHeaderSize + 4;
      for (const auto& child : children_) {
        partitionBytes += child->bytesPerPartition()[p];
      }
      bytesBuffered_ += partitionBytes - bytesPerPartition_[p];
      bytesPerPartition_[p] = partitionBytes;
    }

std::fill(bytesPerPartition_.begin(), bytesPerPartition_.end(), 0);
numNonEmptyPartitions_ = 0;
numPartitionsWithNulls_ = 0;
nullBitmapBytesBuffered_ = 0;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found having this incrementally maintained adds extra complexity in the code. For a function called once per batch, an O(numPartitions) recompute is cheap (a few hundred adds). Besides you had to do an bits::nbytes(rows) option in append() anyways. I suggest we drop the field entirely and directly calcualte it this way:

  // In ColumnBufferState (or FixedWidthBufferState):
  int64_t nullBitmapBytesBuffered() const {
    int64_t total = 0;
    for (auto p = 0; p < numPartitions_; ++p) {
      if (nullsPerPartition_[p] > 0) {
        total += bits::nbytes(rowsPerPartition_[p]);
      }
    }
    return total;
  }

/// Recurses into nested ROW columns.
/// Returns the null counts if it can be derived without row-by-row checks,
/// otherwise returns std::nullopt.
std::optional<int64_t> countNulls(const BaseVector& vector) {
Copy link
Copy Markdown
Collaborator

@yingsu00 yingsu00 May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also got 74 in debug build. This is not ideal but it's pre-existing so yes we will have to count nulls here.

But we need to add special handling of dict vecctor here:

case VectorEncoding::Simple::DICTIONARY: {
        int64_t count = 0;
        for (vector_size_t i = 0; i < numRows; ++i) {
          count += vector.isNullAt(i);
        }
        return count;
      }

FixedWidthBufferState(TypePtr type, uint32_t numPartitions)
: ColumnBufferState(std::move(type), numPartitions) {}

void append(const PartitionedVectorPtr& partitionedVector) override {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove nullBitmapBytesBuffered_ and FixedWidthBufferState::append can be condensed to:

  void append(const PartitionedVectorPtr& partitionedVector) override {
    for (auto p = 0; p < numPartitions_; ++p) {
      const auto numRows = partitionedVector->numRowsAt(p);
      if (numRows == 0) {
        continue;
      }
      const auto numNulls = partitionedVector->numNullsAt(p);
      numNonEmptyPartitions_ += (rowsPerPartition_[p] == 0);
      numPartitionsWithNulls_ += (nullsPerPartition_[p] == 0 && numNulls > 0);
      rowsPerPartition_[p] += numRows;
      nullsPerPartition_[p] += numNulls;
      bytesPerPartition_[p] =
          simpleColumnBytes(type_, rowsPerPartition_[p], nullsPerPartition_[p]);
    }
  }

@xin-zhang2
Copy link
Copy Markdown
Member Author

xin-zhang2 commented May 21, 2026

ANd this was the original reason why we wanted to introduce BufferState because you found that sending a constant vector would explode the data size and make the estimation far off. Do you not remember that?

Yes, but the original issue is not about encoding itself. The real proplem is that we were using retainedSize, which describes the in-memory size of the vector, to estimate the serialized size. ConstantVector simply exposes the issue more cleary through the size explosion. Even for FlatVector, retainedSize is not a suitable estimate, as I mentioned in #1916 (comment).

So my suggestion was to make top level switch on ENCODING then the second level on TypeKind. We can't leave the encoding treatment undone, right?

Right, we do have to handle encodings properly, but how to handle them may depend on the TypeKind.
For example, for fixed-width types, we can get the numRows from vector.size() regardlessly of encoding. What the encoding only matters is numNulls, which is used to calculate the size of null bitmap. That't why countNulls is enough in this PR. While for StringViews, we also need to estimate the value bytes based on the encoding, like the variableWidthDataBytes function I implemented in the StringView PR.

So IMO, it may make more sense to use TypeKind as the top-level switch, and then handle encodings differently within each type-specific branch as needed. What do you think?

@xin-zhang2 xin-zhang2 force-pushed the PartitionedOutput4.0-bytesBuffered branch from 9c26c1b to dc9c5dd Compare May 22, 2026 09:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants