From 144cc8d472aeb5650a457416f7d53f8b17e479d8 Mon Sep 17 00:00:00 2001 From: zhli1142015 Date: Sat, 16 May 2026 14:48:47 +0800 Subject: [PATCH 1/3] [VL] Enable Velox batch resizer copyRanges fast path Add a default-enabled VeloxBatchResizer fast path that collects small dense batches, allocates the output RowVector once, and bulk-copies child vector ranges with copyRanges. The config remains available as an opt-out switch. Wire the flag through Scala, Java, and JNI, add C++ coverage for fast-path and fallback behavior, add config default coverage, and add dense-vector benchmark scenarios comparing the append opt-out baseline, default copyRanges path, direct child copyRanges, reader-side raw payload bulk-copy model, and pre-merged flush model. Benchmark results from velox_batch_resizer_benchmark (CPU time; ASLR enabled, so numbers may have noise): - Mixed_64x64: append opt-out baseline 95.1us, default copyRanges 19.7us, direct child copyRanges 17.4us, raw bulk-copy model 33.3us. - Mixed_16x256: append opt-out baseline 33.7us, default copyRanges 6.4us, direct child copyRanges 5.0us, raw bulk-copy model 10.5us. - Mixed_256x16: append opt-out baseline 217.7us, default copyRanges 50.4us, direct child copyRanges 28.6us, raw bulk-copy model 112.6us. - Fixed2_64x64: append opt-out baseline 26.6us, default copyRanges 5.5us, direct child copyRanges 2.0us, raw bulk-copy model 13.7us. - Fixed16_64x64: append opt-out baseline 121.6us, default copyRanges 27.0us, direct child copyRanges 17.4us, raw bulk-copy model 92.9us. - LongString_64x64: append opt-out baseline 31.7us, default copyRanges 7.1us, direct child copyRanges 4.5us, raw bulk-copy model 15.3us. - BoolHeavy_64x64: append opt-out baseline 68.7us, default copyRanges 10.9us, direct child copyRanges 5.4us, raw bulk-copy model 37.7us. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../gluten/utils/VeloxBatchResizer.java | 2 + .../utils/VeloxBatchResizerJniWrapper.java | 1 + .../apache/gluten/config/VeloxConfig.scala | 21 + .../execution/VeloxResizeBatchesExec.scala | 8 +- .../gluten/config/AllVeloxConfiguration.scala | 5 + cpp/velox/benchmarks/CMakeLists.txt | 2 + .../benchmarks/VeloxBatchResizerBenchmark.cc | 568 ++++++++++++++++++ cpp/velox/jni/VeloxJniWrapper.cc | 8 +- cpp/velox/tests/VeloxBatchResizerTest.cc | 319 ++++++++++ cpp/velox/utils/VeloxBatchResizer.cc | 141 ++++- cpp/velox/utils/VeloxBatchResizer.h | 21 +- docs/velox-configuration.md | 2 +- 12 files changed, 1090 insertions(+), 8 deletions(-) create mode 100644 cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc diff --git a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java index fec8e059789..5617c4fc475 100644 --- a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java +++ b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizer.java @@ -31,6 +31,7 @@ public static ColumnarBatchOutIterator create( int minOutputBatchSize, int maxOutputBatchSize, long preferredBatchBytes, + boolean enableCopyRanges, Iterator in) { final Runtime runtime = Runtimes.contextInstance(BackendsApiManager.getBackendName(), "VeloxBatchResizer"); @@ -40,6 +41,7 @@ public static ColumnarBatchOutIterator create( minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes, + enableCopyRanges, new ColumnarBatchInIterator(BackendsApiManager.getBackendName(), in)); return new ColumnarBatchOutIterator(runtime, outHandle); } diff --git a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java index e5b558e97d3..908b6a2445b 100644 --- a/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java +++ b/backends-velox/src/main/java/org/apache/gluten/utils/VeloxBatchResizerJniWrapper.java @@ -40,5 +40,6 @@ public native long create( int minOutputBatchSize, int maxOutputBatchSize, long preferredBatchBytes, + boolean enableCopyRanges, ColumnarBatchInIterator itr); } diff --git a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala index 9ea203d42be..52c964dfe25 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala @@ -40,6 +40,9 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) { def enableHashShuffleReaderStreamMerge: Boolean = getConf(COLUMNAR_VELOX_HASH_SHUFFLE_READER_STREAM_MERGE_ENABLED) + def enableVeloxResizeBatchesCopyRanges: Boolean = + getConf(COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED) + case class ResizeRange(min: Int, max: Int) { assert(max >= min) assert(min > 0, "Min batch size should be larger than 0") @@ -339,6 +342,24 @@ object VeloxConfig extends ConfigRegistry { .booleanConf .createWithDefault(false) + val COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED = + buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.copyRanges.enabled") + .doc( + "Enables a VeloxResizeBatchesExec fast path that combines eligible batches using " + + "Velox vector copyRanges instead of generic RowVector append. When possible, it " + + "collects the small input batches for one VeloxResizeBatchesExec output, allocates " + + "the output RowVector once, and bulk-copies child vector ranges. This is most useful " + + "for shuffle-read outputs where plain hash shuffle payloads are materialized as " + + "dense flat vectors. Complex vectors can also use copyRanges, but ARRAY and MAP " + + "still rebuild nested offsets and sizes while bulk-copying child ranges. Unsupported " + + "encodings such as dictionary and constant vectors fall back to the generic copy " + + "path. This option is enabled by default and complements the reader-side raw " + + "payload merge fast path: that path avoids materializing small plain payload " + + "batches, while this option optimizes VeloxResizeBatchesExec when that operator " + + "is enabled.") + .booleanConf + .createWithDefault(true) + val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_MIN_SIZE = buildConf("spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize") .doc( diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala index 3b2c9490e75..0ca76bd97a0 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala @@ -17,6 +17,7 @@ package org.apache.gluten.execution import org.apache.gluten.backendsapi.velox.VeloxBatchType +import org.apache.gluten.config.VeloxConfig import org.apache.gluten.extension.columnar.transition.Convention import org.apache.gluten.iterator.ClosableIterator import org.apache.gluten.utils.VeloxBatchResizer @@ -41,7 +42,12 @@ case class VeloxResizeBatchesExec( override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = { VeloxBatchResizer - .create(minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes, in.asJava) + .create( + minOutputBatchSize, + maxOutputBatchSize, + preferredBatchBytes, + VeloxConfig.get.enableVeloxResizeBatchesCopyRanges, + in.asJava) .asScala } diff --git a/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala b/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala index 65059972b97..057a3124e7a 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/config/AllVeloxConfiguration.scala @@ -81,4 +81,9 @@ class AllVeloxConfiguration extends AnyFunSuite { builder.toMarkdown, "dev/gen-all-config-docs.sh") } + + test("Velox resize batches copyRanges is enabled by default") { + assert( + VeloxConfig.COLUMNAR_VELOX_RESIZE_BATCHES_COPY_RANGES_ENABLED.defaultValue.contains(true)) + } } diff --git a/cpp/velox/benchmarks/CMakeLists.txt b/cpp/velox/benchmarks/CMakeLists.txt index e56627466c9..fdfd7bda25f 100644 --- a/cpp/velox/benchmarks/CMakeLists.txt +++ b/cpp/velox/benchmarks/CMakeLists.txt @@ -35,3 +35,5 @@ add_velox_benchmark(generic_benchmark GenericBenchmark.cc) add_velox_benchmark(parquet_write_benchmark ParquetWriteBenchmark.cc) add_velox_benchmark(plan_validator_util PlanValidatorUtil.cc) + +add_velox_benchmark(velox_batch_resizer_benchmark VeloxBatchResizerBenchmark.cc) diff --git a/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc b/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc new file mode 100644 index 00000000000..4584dbe1c84 --- /dev/null +++ b/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc @@ -0,0 +1,568 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "memory/ColumnarBatchIterator.h" +#include "memory/VeloxColumnarBatch.h" +#include "shuffle/Payload.h" +#include "utils/Exception.h" +#include "utils/VeloxBatchResizer.h" +#include "velox/common/memory/Memory.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" + +using namespace facebook::velox; + +namespace gluten { +namespace { + +constexpr int32_t kInputBatches = 64; +constexpr int32_t kRowsPerBatch = 64; +constexpr int32_t kTotalRows = kInputBatches * kRowsPerBatch; +constexpr int64_t kPreferredBatchBytes = std::numeric_limits::max(); + +enum class DenseVectorKind { + kMixed, + kFixedWidth, + kStringOnly, + kBoolHeavy, +}; + +struct DenseBenchmarkScenario { + int32_t inputBatches; + int32_t rowsPerBatch; + DenseVectorKind kind; + int32_t fixedWidthColumns; + int32_t stringBytes; + int32_t boolColumns; + bool nullable; +}; + +constexpr DenseBenchmarkScenario kMixed64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kMixed, 0, 16, 1, true}; +constexpr DenseBenchmarkScenario kMixed16x256{16, 256, DenseVectorKind::kMixed, 0, 16, 1, true}; +constexpr DenseBenchmarkScenario kMixed256x16{256, 16, DenseVectorKind::kMixed, 0, 16, 1, true}; +constexpr DenseBenchmarkScenario + kFixed2_64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kFixedWidth, 2, 0, 0, false}; +constexpr DenseBenchmarkScenario + kFixed16_64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kFixedWidth, 16, 0, 0, false}; +constexpr DenseBenchmarkScenario + kLongString64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kStringOnly, 0, 64, 0, false}; +constexpr DenseBenchmarkScenario + kBoolHeavy64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kBoolHeavy, 0, 0, 8, false}; + +class ColumnarBatchArray : public ColumnarBatchIterator { + public: + explicit ColumnarBatchArray(std::vector> batches) : batches_(std::move(batches)) {} + + std::shared_ptr next() override { + if (cursor_ >= batches_.size()) { + return nullptr; + } + return batches_[cursor_++]; + } + + private: + std::vector> batches_; + size_t cursor_{0}; +}; + +std::string makeStringValue(int32_t value, int32_t bytes) { + auto stringValue = std::to_string(value); + if (stringValue.size() < bytes) { + stringValue.append(bytes - stringValue.size(), 'x'); + } + return stringValue; +} + +RowVectorPtr makeMixedVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + auto i32 = BaseVector::create>(INTEGER(), rows, pool); + auto i64 = BaseVector::create>(BIGINT(), rows, pool); + auto flag = BaseVector::create>(BOOLEAN(), rows, pool); + auto str = BaseVector::create>(VARCHAR(), rows, pool); + + for (auto row = 0; row < rows; ++row) { + const auto value = start + row; + i32->set(row, value); + if (scenario.nullable && row % 7 == 0) { + i64->setNull(row, true); + } else { + i64->set(row, value); + } + flag->set(row, row % 2 == 0); + const auto stringValue = makeStringValue(value, scenario.stringBytes); + str->set(row, StringView(stringValue)); + } + + return std::make_shared( + pool, + ROW({INTEGER(), BIGINT(), BOOLEAN(), VARCHAR()}), + nullptr, + rows, + std::vector{i32, i64, flag, str}); +} + +RowVectorPtr makeFixedWidthVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + std::vector children; + std::vector types; + children.reserve(scenario.fixedWidthColumns); + types.reserve(scenario.fixedWidthColumns); + for (auto channel = 0; channel < scenario.fixedWidthColumns; ++channel) { + auto vector = BaseVector::create>(BIGINT(), rows, pool); + for (auto row = 0; row < rows; ++row) { + vector->set(row, static_cast(start + row + channel)); + } + children.push_back(std::move(vector)); + types.push_back(BIGINT()); + } + + return std::make_shared(pool, ROW(std::move(types)), nullptr, rows, std::move(children)); +} + +RowVectorPtr makeStringVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + auto str = BaseVector::create>(VARCHAR(), rows, pool); + for (auto row = 0; row < rows; ++row) { + const auto value = start + row; + const auto stringValue = makeStringValue(value, scenario.stringBytes); + str->set(row, StringView(stringValue)); + } + + return std::make_shared(pool, ROW({VARCHAR()}), nullptr, rows, std::vector{str}); +} + +RowVectorPtr makeBoolHeavyVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + std::vector children; + std::vector types; + children.reserve(scenario.boolColumns); + types.reserve(scenario.boolColumns); + for (auto channel = 0; channel < scenario.boolColumns; ++channel) { + auto vector = BaseVector::create>(BOOLEAN(), rows, pool); + for (auto row = 0; row < rows; ++row) { + vector->set(row, (start + row + channel) % 2 == 0); + } + children.push_back(std::move(vector)); + types.push_back(BOOLEAN()); + } + + return std::make_shared(pool, ROW(std::move(types)), nullptr, rows, std::move(children)); +} + +RowVectorPtr makeDenseVector(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario, int32_t start) { + switch (scenario.kind) { + case DenseVectorKind::kMixed: + return makeMixedVector(pool, scenario, start); + case DenseVectorKind::kFixedWidth: + return makeFixedWidthVector(pool, scenario, start); + case DenseVectorKind::kStringOnly: + return makeStringVector(pool, scenario, start); + case DenseVectorKind::kBoolHeavy: + return makeBoolHeavyVector(pool, scenario, start); + } + VELOX_UNREACHABLE(); +} + +std::vector makeSmallVectors(memory::MemoryPool* pool, const DenseBenchmarkScenario& scenario) { + std::vector vectors; + vectors.reserve(scenario.inputBatches); + for (auto batch = 0; batch < scenario.inputBatches; ++batch) { + vectors.push_back(makeDenseVector(pool, scenario, batch * scenario.rowsPerBatch)); + } + return vectors; +} + +std::unique_ptr makeIterator(const std::vector& vectors) { + std::vector> batches; + batches.reserve(vectors.size()); + for (const auto& vector : vectors) { + batches.push_back(std::make_shared(vector)); + } + return std::make_unique(std::move(batches)); +} + +int64_t totalRows(const DenseBenchmarkScenario& scenario) { + return static_cast(scenario.inputBatches) * scenario.rowsPerBatch; +} + +VeloxBatchResizer makeResizeBenchmarkResizer( + memory::MemoryPool* pool, + int64_t outputBatchSize, + std::unique_ptr iterator, + std::optional enableCopyRanges) { + if (enableCopyRanges.has_value()) { + return VeloxBatchResizer( + pool, + outputBatchSize, + std::numeric_limits::max(), + kPreferredBatchBytes, + std::move(iterator), + enableCopyRanges.value()); + } + return VeloxBatchResizer( + pool, outputBatchSize, std::numeric_limits::max(), kPreferredBatchBytes, std::move(iterator)); +} + +void runResizeBenchmark( + benchmark::State& state, + const DenseBenchmarkScenario& scenario, + std::optional enableCopyRanges) { + auto pool = memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmark"); + const auto vectors = makeSmallVectors(pool.get(), scenario); + int64_t rows = 0; + + for (auto _ : state) { + auto resizer = makeResizeBenchmarkResizer(pool.get(), totalRows(scenario), makeIterator(vectors), enableCopyRanges); + while (auto out = resizer.next()) { + rows += out->numRows(); + } + } + + benchmark::DoNotOptimize(rows); + state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); +} + +void runDirectChildCopyRangesBenchmark(benchmark::State& state, const DenseBenchmarkScenario& scenario) { + auto pool = memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmarkDirectCopy"); + const auto vectors = makeSmallVectors(pool.get(), scenario); + int64_t rows = 0; + + for (auto _ : state) { + auto output = RowVector::createEmpty(vectors[0]->type(), pool.get()); + output->resize(totalRows(scenario)); + vector_size_t offset = 0; + for (const auto& input : vectors) { + const BaseVector::CopyRange range{0, offset, input->size()}; + for (auto channel = 0; channel < input->children().size(); ++channel) { + output->childAt(channel)->copyRanges(input->childAt(channel)->loadedVector(), folly::Range(&range, 1)); + } + offset += input->size(); + } + rows += output->size(); + benchmark::DoNotOptimize(output); + } + + benchmark::DoNotOptimize(rows); + state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); +} + +std::shared_ptr allocatePayloadBuffer(arrow::MemoryPool* pool, int64_t size) { + std::shared_ptr buffer; + GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool)); + memset(buffer->mutable_data(), 0x5A, size); + return buffer; +} + +std::shared_ptr allocateEmptyPayloadBuffer(arrow::MemoryPool* pool, int64_t size) { + std::shared_ptr buffer; + GLUTEN_ASSIGN_OR_THROW(buffer, arrow::AllocateResizableBuffer(size, pool)); + return buffer; +} + +void addFixedWidthRawBuffers( + arrow::MemoryPool* pool, + int32_t rows, + int32_t columns, + int32_t valueBytes, + std::vector& validityBuffers, + std::vector>& buffers) { + for (auto channel = 0; channel < columns; ++channel) { + validityBuffers.push_back(true); + buffers.push_back(nullptr); + validityBuffers.push_back(false); + buffers.push_back(allocatePayloadBuffer(pool, rows * valueBytes)); + } +} + +void addFixedWidthRawLayout(int32_t columns, std::vector& validityBuffers) { + for (auto channel = 0; channel < columns; ++channel) { + validityBuffers.push_back(true); + validityBuffers.push_back(false); + } +} + +void addStringRawBuffers( + arrow::MemoryPool* pool, + int32_t rows, + int32_t stringBytes, + bool nullable, + std::vector& validityBuffers, + std::vector>& buffers) { + validityBuffers.push_back(true); + buffers.push_back(nullable ? allocatePayloadBuffer(pool, arrow::bit_util::BytesForBits(rows)) : nullptr); + validityBuffers.push_back(false); + buffers.push_back(allocatePayloadBuffer(pool, rows * sizeof(int32_t))); + validityBuffers.push_back(false); + buffers.push_back(allocatePayloadBuffer(pool, rows * stringBytes)); +} + +void addStringRawLayout(std::vector& validityBuffers) { + validityBuffers.push_back(true); + validityBuffers.push_back(false); + validityBuffers.push_back(false); +} + +void addBoolRawBuffers( + arrow::MemoryPool* pool, + int32_t rows, + int32_t columns, + std::vector& validityBuffers, + std::vector>& buffers) { + for (auto channel = 0; channel < columns; ++channel) { + validityBuffers.push_back(true); + buffers.push_back(nullptr); + validityBuffers.push_back(true); + buffers.push_back(allocatePayloadBuffer(pool, arrow::bit_util::BytesForBits(rows))); + } +} + +void addBoolRawLayout(int32_t columns, std::vector& validityBuffers) { + for (auto channel = 0; channel < columns; ++channel) { + validityBuffers.push_back(true); + validityBuffers.push_back(true); + } +} + +std::vector makeRawPayloadValidityBuffers(const DenseBenchmarkScenario& scenario) { + std::vector validityBuffers; + switch (scenario.kind) { + case DenseVectorKind::kMixed: + addFixedWidthRawLayout(1, validityBuffers); + validityBuffers.push_back(true); + validityBuffers.push_back(false); + addBoolRawLayout(scenario.boolColumns, validityBuffers); + addStringRawLayout(validityBuffers); + break; + case DenseVectorKind::kFixedWidth: + addFixedWidthRawLayout(scenario.fixedWidthColumns, validityBuffers); + break; + case DenseVectorKind::kStringOnly: + addStringRawLayout(validityBuffers); + break; + case DenseVectorKind::kBoolHeavy: + addBoolRawLayout(scenario.boolColumns, validityBuffers); + break; + } + return validityBuffers; +} + +std::unique_ptr makeRawPayload( + arrow::MemoryPool* pool, + const DenseBenchmarkScenario& scenario, + const std::vector& validityBuffers) { + const auto rows = scenario.rowsPerBatch; + std::vector> buffers; + buffers.reserve(validityBuffers.size()); + std::vector generatedValidityBuffers; + switch (scenario.kind) { + case DenseVectorKind::kMixed: + addFixedWidthRawBuffers(pool, rows, 1, sizeof(int32_t), generatedValidityBuffers, buffers); + generatedValidityBuffers.push_back(true); + buffers.push_back(scenario.nullable ? allocatePayloadBuffer(pool, arrow::bit_util::BytesForBits(rows)) : nullptr); + generatedValidityBuffers.push_back(false); + buffers.push_back(allocatePayloadBuffer(pool, rows * sizeof(int64_t))); + addBoolRawBuffers(pool, rows, scenario.boolColumns, generatedValidityBuffers, buffers); + addStringRawBuffers(pool, rows, scenario.stringBytes, false, generatedValidityBuffers, buffers); + break; + case DenseVectorKind::kFixedWidth: + addFixedWidthRawBuffers( + pool, rows, scenario.fixedWidthColumns, sizeof(int64_t), generatedValidityBuffers, buffers); + break; + case DenseVectorKind::kStringOnly: + addStringRawBuffers(pool, rows, scenario.stringBytes, scenario.nullable, generatedValidityBuffers, buffers); + break; + case DenseVectorKind::kBoolHeavy: + addBoolRawBuffers(pool, rows, scenario.boolColumns, generatedValidityBuffers, buffers); + break; + } + GLUTEN_CHECK(generatedValidityBuffers == validityBuffers, "Invalid raw payload buffer layout"); + return std::make_unique(rows, &validityBuffers, nullptr, std::move(buffers)); +} + +std::vector> makeRawPayloads( + arrow::MemoryPool* pool, + const DenseBenchmarkScenario& scenario, + const std::vector& validityBuffers) { + std::vector> payloads; + payloads.reserve(scenario.inputBatches); + for (auto batch = 0; batch < scenario.inputBatches; ++batch) { + payloads.push_back(makeRawPayload(pool, scenario, validityBuffers)); + } + return payloads; +} + +std::unique_ptr mergeRawPayloadsBulkCopy( + std::vector> payloads, + const std::vector& validityBuffers, + arrow::MemoryPool* pool) { + GLUTEN_CHECK(!payloads.empty(), "Cannot merge empty payloads"); + + const auto numBuffers = payloads[0]->numBuffers(); + std::vector payloadRows; + payloadRows.reserve(payloads.size()); + uint32_t totalRows = 0; + std::vector>> inputBuffers(payloads.size()); + std::vector outputSizes(numBuffers, 0); + std::vector hasBuffer(numBuffers, false); + + for (auto payloadIdx = 0; payloadIdx < payloads.size(); ++payloadIdx) { + const auto rows = payloads[payloadIdx]->numRows(); + payloadRows.push_back(rows); + totalRows += rows; + inputBuffers[payloadIdx].reserve(numBuffers); + for (auto bufferIdx = 0; bufferIdx < numBuffers; ++bufferIdx) { + GLUTEN_ASSIGN_OR_THROW(auto buffer, payloads[payloadIdx]->readBufferAt(bufferIdx)); + if (buffer != nullptr) { + hasBuffer[bufferIdx] = true; + if (validityBuffers[bufferIdx]) { + outputSizes[bufferIdx] = arrow::bit_util::BytesForBits(totalRows); + } else { + outputSizes[bufferIdx] += buffer->size(); + } + } + inputBuffers[payloadIdx].push_back(std::move(buffer)); + } + } + + std::vector> outputBuffers(numBuffers); + for (auto bufferIdx = 0; bufferIdx < numBuffers; ++bufferIdx) { + if (hasBuffer[bufferIdx]) { + outputBuffers[bufferIdx] = allocateEmptyPayloadBuffer(pool, outputSizes[bufferIdx]); + } + } + + std::vector byteOffsets(numBuffers, 0); + uint32_t rowOffset = 0; + for (auto payloadIdx = 0; payloadIdx < inputBuffers.size(); ++payloadIdx) { + const auto rows = payloadRows[payloadIdx]; + for (auto bufferIdx = 0; bufferIdx < numBuffers; ++bufferIdx) { + auto& output = outputBuffers[bufferIdx]; + if (output == nullptr) { + continue; + } + + const auto& input = inputBuffers[payloadIdx][bufferIdx]; + if (validityBuffers[bufferIdx]) { + if (input == nullptr) { + arrow::bit_util::SetBitsTo(output->mutable_data(), rowOffset, rows, true); + } else { + arrow::internal::CopyBitmap(input->data(), 0, rows, output->mutable_data(), rowOffset); + } + } else if (input != nullptr) { + memcpy(output->mutable_data() + byteOffsets[bufferIdx], input->data(), input->size()); + byteOffsets[bufferIdx] += input->size(); + } + } + rowOffset += rows; + } + + return std::make_unique(totalRows, &validityBuffers, nullptr, std::move(outputBuffers)); +} + +void BM_VeloxBatchResizerAppendOptOutBaseline(benchmark::State& state, DenseBenchmarkScenario scenario) { + runResizeBenchmark(state, scenario, false); +} + +void BM_VeloxBatchResizerDefaultCopyRanges(benchmark::State& state, DenseBenchmarkScenario scenario) { + runResizeBenchmark(state, scenario, std::nullopt); +} + +void BM_DirectChildCopyRanges(benchmark::State& state, DenseBenchmarkScenario scenario) { + runDirectChildCopyRangesBenchmark(state, scenario); +} + +void BM_ReaderSideRawPayloadBulkCopyModel(benchmark::State& state, DenseBenchmarkScenario scenario) { + auto* pool = arrow::default_memory_pool(); + const auto validityBuffers = makeRawPayloadValidityBuffers(scenario); + int64_t rows = 0; + + for (auto _ : state) { + state.PauseTiming(); + auto payloads = makeRawPayloads(pool, scenario, validityBuffers); + state.ResumeTiming(); + + auto merged = mergeRawPayloadsBulkCopy(std::move(payloads), validityBuffers, pool); + rows += merged->numRows(); + benchmark::DoNotOptimize(merged); + } + + benchmark::DoNotOptimize(rows); + state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); +} + +void BM_ReaderSidePreMergedBatchModel(benchmark::State& state, DenseBenchmarkScenario scenario) { + auto pool = memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmarkRawMergeModel"); + auto mergedScenario = scenario; + mergedScenario.inputBatches = 1; + mergedScenario.rowsPerBatch = totalRows(scenario); + const std::vector mergedVector{makeDenseVector(pool.get(), mergedScenario, 0)}; + int64_t rows = 0; + + for (auto _ : state) { + VeloxBatchResizer resizer( + pool.get(), + totalRows(scenario), + std::numeric_limits::max(), + kPreferredBatchBytes, + makeIterator(mergedVector), + false); + while (auto out = resizer.next()) { + rows += out->numRows(); + } + } + + benchmark::DoNotOptimize(rows); + state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); +} + +#define REGISTER_DENSE_SCENARIO_BENCHMARKS(name, scenario) \ + BENCHMARK_CAPTURE(BM_VeloxBatchResizerAppendOptOutBaseline, name, scenario); \ + BENCHMARK_CAPTURE(BM_VeloxBatchResizerDefaultCopyRanges, name, scenario); \ + BENCHMARK_CAPTURE(BM_DirectChildCopyRanges, name, scenario); \ + BENCHMARK_CAPTURE(BM_ReaderSideRawPayloadBulkCopyModel, name, scenario); \ + BENCHMARK_CAPTURE(BM_ReaderSidePreMergedBatchModel, name, scenario) + +REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_64x64, kMixed64x64); +REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_16x256, kMixed16x256); +REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_256x16, kMixed256x16); +REGISTER_DENSE_SCENARIO_BENCHMARKS(Fixed2_64x64, kFixed2_64x64); +REGISTER_DENSE_SCENARIO_BENCHMARKS(Fixed16_64x64, kFixed16_64x64); +REGISTER_DENSE_SCENARIO_BENCHMARKS(LongString_64x64, kLongString64x64); +REGISTER_DENSE_SCENARIO_BENCHMARKS(BoolHeavy_64x64, kBoolHeavy64x64); + +#undef REGISTER_DENSE_SCENARIO_BENCHMARKS + +} // namespace +} // namespace gluten + +int main(int argc, char** argv) { + facebook::velox::memory::MemoryManager::initialize(facebook::velox::memory::MemoryManager::Options{}); + ::benchmark::Initialize(&argc, argv); + ::benchmark::RunSpecifiedBenchmarks(); + ::benchmark::Shutdown(); + return 0; +} diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index 72640b21af7..aa4d9599435 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -464,13 +464,19 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_VeloxBatchResizerJniWrapper jint minOutputBatchSize, jint maxOutputBatchSize, jlong preferredBatchBytes, + jboolean enableCopyRanges, jobject jIter) { JNI_METHOD_START auto ctx = getRuntime(env, wrapper); auto pool = dynamic_cast(ctx->memoryManager())->getLeafMemoryPool(); auto iter = makeJniColumnarBatchIterator(env, jIter, ctx); auto appender = std::make_shared(std::make_unique( - pool.get(), minOutputBatchSize, maxOutputBatchSize, preferredBatchBytes, std::move(iter))); + pool.get(), + minOutputBatchSize, + maxOutputBatchSize, + preferredBatchBytes, + std::move(iter), + enableCopyRanges == JNI_TRUE)); return ctx->saveObject(appender); JNI_METHOD_END(kInvalidObjectHandle) } diff --git a/cpp/velox/tests/VeloxBatchResizerTest.cc b/cpp/velox/tests/VeloxBatchResizerTest.cc index b23606b1c86..715857052b2 100644 --- a/cpp/velox/tests/VeloxBatchResizerTest.cc +++ b/cpp/velox/tests/VeloxBatchResizerTest.cc @@ -16,6 +16,9 @@ */ #include +#include +#include +#include #include "utils/VeloxBatchResizer.h" #include "velox/vector/tests/utils/VectorTestBase.h" @@ -53,6 +56,105 @@ class VeloxBatchResizerTest : public ::testing::Test, public test::VectorTestBas return out; } + RowVectorPtr newDenseFlatVector(size_t numRows, int32_t start = 0) { + std::vector> nullableValues; + nullableValues.reserve(numRows); + std::vector strings; + strings.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + nullableValues.emplace_back(i % 3 == 0 ? std::nullopt : std::optional(start + i)); + strings.emplace_back("long-string-value-" + std::to_string(start + i)); + } + return makeRowVector( + {"i32", "i64", "flag", "str"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeNullableFlatVector(nullableValues), + makeFlatVector(numRows, [](auto row) { return row % 2 == 0; }), + makeFlatVector(numRows, [&strings](auto row) { return StringView(strings[row]); })}); + } + + RowVectorPtr newFlatIntVector(size_t numRows, int32_t start = 0) { + return makeRowVector({"i32"}, {makeFlatVector(numRows, [start](auto row) { return start + row; })}); + } + + RowVectorPtr newComplexVector(size_t numRows, int32_t start = 0) { + std::vector> arrays; + arrays.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + arrays.push_back({start + static_cast(i), start + static_cast(i) + 1}); + } + return makeRowVector( + {"i32", "arr"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeArrayVector(arrays)}); + } + + RowVectorPtr newNullableComplexVector(size_t numRows, int32_t start = 0) { + std::vector>>> arrays; + arrays.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + if (i % 5 == 0) { + arrays.emplace_back(std::nullopt); + } else { + arrays.emplace_back(std::vector>{ + start + static_cast(i), std::nullopt, start + static_cast(i) + 1}); + } + } + return makeRowVector( + {"i32", "arr"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeNullableArrayVector(arrays)}); + } + + RowVectorPtr newMapVector(size_t numRows, int32_t start = 0) { + std::vector>>> maps; + maps.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + const auto value = start + static_cast(i); + maps.push_back({{value, value + 1}, {value + 2, value + 3}}); + } + return makeRowVector( + {"i32", "map"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeMapVector(maps)}); + } + + RowVectorPtr newNullableMapVector(size_t numRows, int32_t start = 0) { + std::vector>>>> maps; + maps.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + if (i % 5 == 0) { + maps.emplace_back(std::nullopt); + } else { + const auto value = start + static_cast(i); + maps.emplace_back( + std::vector>>{{value, value + 1}, {value + 2, std::nullopt}}); + } + } + return makeRowVector( + {"i32", "map"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeNullableMapVector(maps)}); + } + + RowVectorPtr newDictionaryVector(size_t numRows, int32_t start = 0) { + auto base = makeFlatVector(numRows, [start](auto row) { return start + row; }); + auto indices = makeIndices(numRows, [](auto row) { return row; }); + return makeRowVector({"dict"}, {wrapInDictionary(indices, numRows, base)}); + } + + RowVectorPtr newTopLevelNullVector(size_t numRows, int32_t start = 0) { + auto nulls = allocateNulls(numRows, pool()); + bits::setNull(nulls->asMutable(), 0, true); + return std::make_shared( + pool(), + ROW({"i32"}, {INTEGER()}), + nulls, + numRows, + std::vector{makeFlatVector(numRows, [start](auto row) { return start + row; })}, + 1); + } + void checkResize( int32_t min, int32_t max, @@ -76,6 +178,42 @@ class VeloxBatchResizerTest : public ::testing::Test, public test::VectorTestBas } ASSERT_EQ(actualOutSizes, outSizes); } + + RowVectorPtr + resizeOnce(const std::vector& vectors, bool enableDenseFlatCopy, VeloxBatchResizeStats* stats) { + auto out = resizeAll(vectors, 100, std::numeric_limits::max(), (10L << 20), enableDenseFlatCopy, stats); + EXPECT_EQ(out.size(), 1); + return out[0]; + } + + std::vector resizeAll( + const std::vector& vectors, + int32_t minOutputBatchSize, + int32_t maxOutputBatchSize, + int64_t preferredBatchBytes, + bool enableDenseFlatCopy, + VeloxBatchResizeStats* stats) { + std::vector> inBatches; + inBatches.reserve(vectors.size()); + for (const auto& vector : vectors) { + inBatches.push_back(std::make_shared(vector)); + } + VeloxBatchResizer resizer( + pool(), + minOutputBatchSize, + maxOutputBatchSize, + preferredBatchBytes, + std::make_unique(std::move(inBatches)), + enableDenseFlatCopy, + stats); + std::vector out; + while (auto next = resizer.next()) { + auto veloxBatch = std::dynamic_pointer_cast(next); + EXPECT_NE(veloxBatch, nullptr); + out.push_back(veloxBatch->getRowVector()); + } + return out; + } }; TEST_F(VeloxBatchResizerTest, sanity) { @@ -100,4 +238,185 @@ TEST_F(VeloxBatchResizerTest, preferredBatchBytesTest) { ASSERT_ANY_THROW(checkResize(0, 0, 0, {}, {})); } +TEST_F(VeloxBatchResizerTest, denseFlatCopyDisabledUsesAppendPath) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newDenseFlatVector(30, 0), newDenseFlatVector(40, 100)}; + auto actual = resizeOnce(vectors, false, &stats); + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 0); + EXPECT_EQ(stats.appendCopyBatches, 2); +} + +TEST_F(VeloxBatchResizerTest, denseFlatCopyEnabledUsesCopyRangesForFixedWidthAndString) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newDenseFlatVector(30, 0), newDenseFlatVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsComplexType) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newComplexVector(30, 0), newComplexVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsNullableComplexType) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newNullableComplexVector(30, 0), newNullableComplexVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsMapType) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newMapVector(30, 0), newMapVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsNullableMapType) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newNullableMapVector(30, 0), newNullableMapVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFallsBackForConstantEncoding) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newVector(30), newVector(40)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 0); + EXPECT_EQ(stats.appendCopyBatches, 2); + EXPECT_EQ(stats.copyRangesFallbackBatches, 2); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFallsBackForTopLevelNulls) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newTopLevelNullVector(30, 0), newTopLevelNullVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 0); + EXPECT_EQ(stats.copyRangesOutputBatches, 0); + EXPECT_EQ(stats.appendCopyBatches, 2); + EXPECT_EQ(stats.copyRangesFallbackBatches, 2); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledCanMixSmallDenseSparseAndDenseBatches) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newFlatIntVector(30, 0), newVector(40), newFlatIntVector(20, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + ASSERT_EQ(actual->size(), expected->size()); + EXPECT_EQ(actual->size(), 90); + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 1); + EXPECT_EQ(stats.copyRangesFallbackBatches, 1); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFlushesCollectedInputsBeforeSplit) { + VeloxBatchResizeStats stats; + auto vectors = + std::vector{newFlatIntVector(40, 0), newFlatIntVector(40, 100), newFlatIntVector(40, 200)}; + + auto actual = resizeAll(vectors, 100, 50, (10L << 20), true, &stats); + + ASSERT_EQ(actual.size(), 3); + test::assertEqualVectors(vectors[0], actual[0]); + test::assertEqualVectors(vectors[1], actual[1]); + test::assertEqualVectors(vectors[2], actual[2]); + EXPECT_EQ(stats.copyRangesBatches, 2); + EXPECT_EQ(stats.copyRangesOutputBatches, 2); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFlushesCollectedInputsAtEndOfInput) { + VeloxBatchResizeStats stats; + auto vectors = + std::vector{newFlatIntVector(30, 0), newFlatIntVector(40, 100), newFlatIntVector(20, 200)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeAll(vectors, 100, std::numeric_limits::max(), (10L << 20), false, &appendStats); + + auto actual = resizeAll(vectors, 100, std::numeric_limits::max(), (10L << 20), true, &stats); + + ASSERT_EQ(actual.size(), 1); + ASSERT_EQ(expected.size(), 1); + test::assertEqualVectors(expected[0], actual[0]); + EXPECT_EQ(actual[0]->size(), 90); + EXPECT_EQ(stats.copyRangesBatches, 3); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledFallsBackForDictionaryEncoding) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{newDictionaryVector(30, 0), newDictionaryVector(40, 100)}; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 0); + EXPECT_EQ(stats.appendCopyBatches, 2); + EXPECT_EQ(stats.copyRangesFallbackBatches, 2); +} + } // namespace gluten diff --git a/cpp/velox/utils/VeloxBatchResizer.cc b/cpp/velox/utils/VeloxBatchResizer.cc index a9196a46719..c67b1c75111 100644 --- a/cpp/velox/utils/VeloxBatchResizer.cc +++ b/cpp/velox/utils/VeloxBatchResizer.cc @@ -20,6 +20,52 @@ namespace gluten { namespace { +bool supportsCopyRanges(const facebook::velox::VectorPtr& vector) { + if (vector == nullptr) { + return false; + } + + if (vector->isFlatEncoding()) { + return true; + } + + if (vector->encoding() != facebook::velox::VectorEncoding::Simple::ROW && + vector->encoding() != facebook::velox::VectorEncoding::Simple::ARRAY && + vector->encoding() != facebook::velox::VectorEncoding::Simple::MAP) { + return false; + } + + switch (vector->typeKind()) { + case facebook::velox::TypeKind::ROW: { + const auto* row = vector->as(); + for (const auto& child : row->children()) { + if (!supportsCopyRanges(child)) { + return false; + } + } + return true; + } + case facebook::velox::TypeKind::ARRAY: { + const auto* array = vector->as(); + return supportsCopyRanges(array->elements()); + } + case facebook::velox::TypeKind::MAP: { + const auto* map = vector->as(); + return supportsCopyRanges(map->mapKeys()) && supportsCopyRanges(map->mapValues()); + } + default: + return false; + } +} + +bool supportsCopyRanges(const facebook::velox::RowVectorPtr& rowVector) { + if (rowVector == nullptr || rowVector->encoding() != facebook::velox::VectorEncoding::Simple::ROW || + rowVector->mayHaveNulls()) { + return false; + } + return supportsCopyRanges(std::static_pointer_cast(rowVector)); +} + class SliceRowVector : public ColumnarBatchIterator { public: SliceRowVector(int32_t maxOutputBatchSize, facebook::velox::RowVectorPtr in) @@ -50,17 +96,100 @@ gluten::VeloxBatchResizer::VeloxBatchResizer( int32_t minOutputBatchSize, int32_t maxOutputBatchSize, int64_t preferredBatchBytes, - std::unique_ptr in) + std::unique_ptr in, + bool enableCopyRanges, + VeloxBatchResizeStats* stats) : pool_(pool), minOutputBatchSize_(minOutputBatchSize), maxOutputBatchSize_(maxOutputBatchSize), preferredBatchBytes_(static_cast(preferredBatchBytes)), - in_(std::move(in)) { + enableCopyRanges_(enableCopyRanges), + in_(std::move(in)), + stats_(stats) { GLUTEN_CHECK( minOutputBatchSize_ > 0 && maxOutputBatchSize_ > 0, "Either minOutputBatchSize or maxOutputBatchSize should be larger than 0"); } +void VeloxBatchResizer::appendToBuffer( + facebook::velox::RowVectorPtr& buffer, + const facebook::velox::RowVectorPtr& input) { + buffer->append(input.get()); + if (stats_ != nullptr) { + ++stats_->appendCopyBatches; + } +} + +facebook::velox::RowVectorPtr VeloxBatchResizer::copyBufferedInputs( + const std::vector& inputs) { + GLUTEN_CHECK(!inputs.empty(), "Cannot copy empty inputs"); + + facebook::velox::vector_size_t totalRows = 0; + for (const auto& input : inputs) { + totalRows += input->size(); + } + + auto buffer = facebook::velox::RowVector::createEmpty(inputs[0]->type(), pool_); + buffer->resize(totalRows); + + bool usedCopyRanges = false; + facebook::velox::vector_size_t offset = 0; + for (const auto& input : inputs) { + if (supportsCopyRanges(input)) { + const facebook::velox::BaseVector::CopyRange range{0, offset, input->size()}; + for (auto channel = 0; channel < input->children().size(); ++channel) { + buffer->childAt(channel)->copyRanges(input->childAt(channel)->loadedVector(), folly::Range(&range, 1)); + } + usedCopyRanges = true; + if (stats_ != nullptr) { + ++stats_->copyRangesBatches; + } + } else { + buffer->copy(input.get(), offset, 0, input->size()); + if (stats_ != nullptr) { + ++stats_->copyRangesFallbackBatches; + ++stats_->appendCopyBatches; + } + } + offset += input->size(); + } + + if (usedCopyRanges && stats_ != nullptr) { + ++stats_->copyRangesOutputBatches; + } + return buffer; +} + +std::shared_ptr VeloxBatchResizer::collectAndCopy( + facebook::velox::RowVectorPtr firstInput, + uint64_t numBytes) { + std::vector inputs; + inputs.push_back(std::move(firstInput)); + facebook::velox::vector_size_t bufferedRows = inputs.back()->size(); + + std::shared_ptr cb; + for (cb = in_->next(); cb != nullptr; cb = in_->next()) { + auto vb = VeloxColumnarBatch::from(pool_, cb); + auto rv = vb->getRowVector(); + uint64_t addedBytes = cb->numBytes(); + if (bufferedRows + rv->size() > maxOutputBatchSize_ || + numBytes + addedBytes > static_cast(preferredBatchBytes_)) { + GLUTEN_CHECK(next_ == nullptr, "Invalid state"); + next_ = std::make_unique(maxOutputBatchSize_, rv); + return std::make_shared(copyBufferedInputs(inputs)); + } + + numBytes += addedBytes; + bufferedRows += rv->size(); + inputs.push_back(std::move(rv)); + if (bufferedRows >= minOutputBatchSize_) { + break; + } + } + + return std::make_shared(copyBufferedInputs(inputs)); +} + std::shared_ptr VeloxBatchResizer::next() { if (next_) { auto next = next_->next(); @@ -81,8 +210,12 @@ std::shared_ptr VeloxBatchResizer::next() { if (cb->numRows() < minOutputBatchSize_ && numBytes <= preferredBatchBytes_) { auto vb = VeloxColumnarBatch::from(pool_, cb); auto rv = vb->getRowVector(); + if (enableCopyRanges_) { + return collectAndCopy(std::move(rv), numBytes); + } + auto buffer = facebook::velox::RowVector::createEmpty(rv->type(), pool_); - buffer->append(rv.get()); + appendToBuffer(buffer, rv); for (cb = in_->next(); cb != nullptr; cb = in_->next()) { vb = VeloxColumnarBatch::from(pool_, cb); @@ -95,7 +228,7 @@ std::shared_ptr VeloxBatchResizer::next() { return std::make_shared(buffer); } numBytes += addedBytes; - buffer->append(rv.get()); + appendToBuffer(buffer, rv); if (buffer->size() >= minOutputBatchSize_) { // Buffer is full. break; diff --git a/cpp/velox/utils/VeloxBatchResizer.h b/cpp/velox/utils/VeloxBatchResizer.h index 8afd191dca4..5a6b71e931b 100644 --- a/cpp/velox/utils/VeloxBatchResizer.h +++ b/cpp/velox/utils/VeloxBatchResizer.h @@ -23,6 +23,15 @@ namespace gluten { +struct VeloxBatchResizeStats { + int64_t copyRangesBatches{0}; + int64_t copyRangesOutputBatches{0}; + // Counts generic copies: RowVector::append when copyRanges is disabled and + // RowVector::copy fallbacks when copyRanges is enabled. + int64_t appendCopyBatches{0}; + int64_t copyRangesFallbackBatches{0}; +}; + class VeloxBatchResizer : public ColumnarBatchIterator { public: VeloxBatchResizer( @@ -30,7 +39,9 @@ class VeloxBatchResizer : public ColumnarBatchIterator { int32_t minOutputBatchSize, int32_t maxOutputBatchSize, int64_t preferredBatchBytes, - std::unique_ptr in); + std::unique_ptr in, + bool enableCopyRanges = true, + VeloxBatchResizeStats* stats = nullptr); std::shared_ptr next() override; @@ -41,9 +52,17 @@ class VeloxBatchResizer : public ColumnarBatchIterator { const int32_t minOutputBatchSize_; const int32_t maxOutputBatchSize_; const uint64_t preferredBatchBytes_; + const bool enableCopyRanges_; std::unique_ptr in_; + VeloxBatchResizeStats* stats_; std::unique_ptr next_ = nullptr; + + void appendToBuffer(facebook::velox::RowVectorPtr& buffer, const facebook::velox::RowVectorPtr& input); + + facebook::velox::RowVectorPtr copyBufferedInputs(const std::vector& inputs); + + std::shared_ptr collectAndCopy(facebook::velox::RowVectorPtr firstInput, uint64_t numBytes); }; } // namespace gluten diff --git a/docs/velox-configuration.md b/docs/velox-configuration.md index a0c0691e2fb..816ea11c399 100644 --- a/docs/velox-configuration.md +++ b/docs/velox-configuration.md @@ -60,6 +60,7 @@ nav_order: 16 | spark.gluten.sql.columnar.backend.velox.prefetchRowGroups | 1 | Set the prefetch row groups for velox file scan | | spark.gluten.sql.columnar.backend.velox.queryTraceEnabled | false | Enable query tracing flag. | | spark.gluten.sql.columnar.backend.velox.reclaimMaxWaitMs | 3600000ms | The max time in ms to wait for memory reclaim. | +| spark.gluten.sql.columnar.backend.velox.resizeBatches.copyRanges.enabled | true | Enables a VeloxResizeBatchesExec fast path that combines eligible batches using Velox vector copyRanges instead of generic RowVector append. When possible, it collects the small input batches for one VeloxResizeBatchesExec output, allocates the output RowVector once, and bulk-copies child vector ranges. This is most useful for shuffle-read outputs where plain hash shuffle payloads are materialized as dense flat vectors. Complex vectors can also use copyRanges, but ARRAY and MAP still rebuild nested offsets and sizes while bulk-copying child ranges. Unsupported encodings such as dictionary and constant vectors fall back to the generic copy path. This option is enabled by default and complements the reader-side raw payload merge fast path: that path avoids materializing small plain payload batches, while this option optimizes VeloxResizeBatchesExec when that operator is enabled. | | spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput | true | If true, combine small columnar batches together before sending to shuffle. The default minimum output batch size is equal to 0.25 * spark.gluten.sql.columnar.maxBatchSize | | spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput.minSize | <undefined> | The minimum batch size for shuffle. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput is set to true. Default value: 0.25 * | | spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInputOuptut.minSize | <undefined> | The minimum batch size for shuffle input and output. If size of an input batch is smaller than the value, it will be combined with other batches before sending to shuffle. The same applies for batches output by shuffle read. Only functions when spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleInput or spark.gluten.sql.columnar.backend.velox.resizeBatches.shuffleOutput is set to true. Default value: 0.25 * | @@ -91,4 +92,3 @@ nav_order: 16 | spark.gluten.velox.joinBuildVectorHasherMaxNumDistinct | 1000000 | Experimental: maximum number of distinct values to keep when merging vector hashers in join HashBuild. | | spark.gluten.velox.minTableRowsForParallelJoinBuild | 1000 | Experimental: the minimum number of table rows that can trigger the parallel hash join table build. | | spark.gluten.velox.offHeapBroadcastBuildRelation.enabled | false | Experimental: If enabled, broadcast build relation will use offheap memory. Otherwise, broadcast build relation will use onheap memory. | - From 8a62075b6f425a0ad6d7cc4fc525af137b8e70ee Mon Sep 17 00:00:00 2001 From: zhli1142015 Date: Tue, 19 May 2026 17:29:16 +0800 Subject: [PATCH 2/3] add test --- cpp/core/shuffle/Payload.h | 2 + cpp/velox/tests/VeloxBatchResizerTest.cc | 102 +++++++++++++++++++++++ 2 files changed, 104 insertions(+) diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h index eb130c63758..bb7e2ea1d51 100644 --- a/cpp/core/shuffle/Payload.h +++ b/cpp/core/shuffle/Payload.h @@ -55,6 +55,8 @@ class Payload { return numRows_; } + // Marks buffers merged with bit-level row offsets, including null validity + // bitmaps and CPU boolean value bitmaps. const std::vector* isValidityBuffer() const { return isValidityBuffer_; } diff --git a/cpp/velox/tests/VeloxBatchResizerTest.cc b/cpp/velox/tests/VeloxBatchResizerTest.cc index 715857052b2..e8c9cebdda5 100644 --- a/cpp/velox/tests/VeloxBatchResizerTest.cc +++ b/cpp/velox/tests/VeloxBatchResizerTest.cc @@ -73,6 +73,39 @@ class VeloxBatchResizerTest : public ::testing::Test, public test::VectorTestBas makeFlatVector(numRows, [&strings](auto row) { return StringView(strings[row]); })}); } + RowVectorPtr newNullableBoolVector(size_t numRows, int32_t start = 0) { + std::vector> flags; + flags.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + if ((start + i) % 5 == 0) { + flags.emplace_back(std::nullopt); + } else { + flags.emplace_back((start + i) % 3 == 0); + } + } + return makeRowVector({"flag"}, {makeNullableFlatVector(flags)}); + } + + RowVectorPtr newDenseFlatVectorWithNullableBool(size_t numRows, int32_t start = 0) { + std::vector> nullableFlags; + nullableFlags.reserve(numRows); + std::vector> nullableValues; + nullableValues.reserve(numRows); + for (auto i = 0; i < numRows; ++i) { + if ((start + i) % 7 == 0) { + nullableFlags.emplace_back(std::nullopt); + } else { + nullableFlags.emplace_back((start + i) % 2 == 0); + } + nullableValues.emplace_back((start + i) % 4 == 0 ? std::nullopt : std::optional(start + i)); + } + return makeRowVector( + {"i32", "flag", "i64"}, + {makeFlatVector(numRows, [start](auto row) { return start + row; }), + makeNullableFlatVector(nullableFlags), + makeNullableFlatVector(nullableValues)}); + } + RowVectorPtr newFlatIntVector(size_t numRows, int32_t start = 0) { return makeRowVector({"i32"}, {makeFlatVector(numRows, [start](auto row) { return start + row; })}); } @@ -263,6 +296,75 @@ TEST_F(VeloxBatchResizerTest, denseFlatCopyEnabledUsesCopyRangesForFixedWidthAnd EXPECT_EQ(stats.appendCopyBatches, 0); } +TEST_F(VeloxBatchResizerTest, copyRangesEnabledHandlesNullableBoolBitmaps) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{ + newNullableBoolVector(3, 0), + newNullableBoolVector(5, 100), + newNullableBoolVector(9, 200), + newNullableBoolVector(17, 300), + }; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 4); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, copyRangesEnabledHandlesMixedNullableBitmapsAtUnalignedOffsets) { + VeloxBatchResizeStats stats; + auto vectors = std::vector{ + newDenseFlatVectorWithNullableBool(1, 0), + newDenseFlatVectorWithNullableBool(6, 100), + newDenseFlatVectorWithNullableBool(10, 200), + newDenseFlatVectorWithNullableBool(15, 300), + }; + auto appendStats = VeloxBatchResizeStats{}; + auto expected = resizeOnce(vectors, false, &appendStats); + + auto actual = resizeOnce(vectors, true, &stats); + + test::assertEqualVectors(expected, actual); + EXPECT_EQ(stats.copyRangesBatches, 4); + EXPECT_EQ(stats.copyRangesOutputBatches, 1); + EXPECT_EQ(stats.appendCopyBatches, 0); + EXPECT_EQ(stats.copyRangesFallbackBatches, 0); +} + +TEST_F(VeloxBatchResizerTest, veloxCopyRangesHandlesNullableBoolBitmapsAtUnalignedOffsets) { + auto source = newNullableBoolVector(48, 100)->childAt(0)->loadedVector(); + auto actual = newNullableBoolVector(64, 500)->childAt(0); + auto expected = newNullableBoolVector(64, 500)->childAt(0); + auto expectedFlat = expected->asFlatVector(); + auto sourceFlat = source->asFlatVector(); + const std::vector ranges{ + {.sourceIndex = 1, .targetIndex = 2, .count = 5}, + {.sourceIndex = 7, .targetIndex = 11, .count = 9}, + {.sourceIndex = 18, .targetIndex = 24, .count = 13}, + {.sourceIndex = 33, .targetIndex = 43, .count = 4}, + }; + for (const auto& range : ranges) { + for (auto i = 0; i < range.count; ++i) { + const auto sourceIndex = range.sourceIndex + i; + const auto targetIndex = range.targetIndex + i; + if (source->isNullAt(sourceIndex)) { + expectedFlat->setNull(targetIndex, true); + } else { + expectedFlat->set(targetIndex, sourceFlat->valueAt(sourceIndex)); + } + } + } + + actual->copyRanges(source.get(), ranges); + + test::assertEqualVectors(expected, actual); +} + TEST_F(VeloxBatchResizerTest, copyRangesEnabledSupportsComplexType) { VeloxBatchResizeStats stats; auto vectors = std::vector{newComplexVector(30, 0), newComplexVector(40, 100)}; From 6080daeca6a650212ea41cfcea4f479e617b704c Mon Sep 17 00:00:00 2001 From: zhli1142015 Date: Tue, 19 May 2026 17:52:24 +0800 Subject: [PATCH 3/3] [VL] Add batch resizer fallback benchmarks Add dictionary-heavy and constant-heavy VeloxBatchResizer benchmark scenarios so the default copyRanges-enabled path is compared against the append opt-out baseline when inputs fall back to RowVector::copy. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../benchmarks/VeloxBatchResizerBenchmark.cc | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc b/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc index 4584dbe1c84..7d1c9b8f4de 100644 --- a/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc +++ b/cpp/velox/benchmarks/VeloxBatchResizerBenchmark.cc @@ -15,6 +15,7 @@ * limitations under the License. */ +#include #include #include #include @@ -31,6 +32,7 @@ #include "utils/Exception.h" #include "utils/VeloxBatchResizer.h" #include "velox/common/memory/Memory.h" +#include "velox/vector/BaseVector.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/FlatVector.h" @@ -73,6 +75,31 @@ constexpr DenseBenchmarkScenario constexpr DenseBenchmarkScenario kBoolHeavy64x64{kInputBatches, kRowsPerBatch, DenseVectorKind::kBoolHeavy, 0, 0, 8, false}; +enum class EncodedVectorKind { + kDictionary, + kConstant, +}; + +struct EncodedBenchmarkScenario { + int32_t inputBatches; + int32_t rowsPerBatch; + EncodedVectorKind kind; + int32_t columns; +}; + +constexpr EncodedBenchmarkScenario kDictionaryHeavy64x64{ + kInputBatches, + kRowsPerBatch, + EncodedVectorKind::kDictionary, + 8, +}; +constexpr EncodedBenchmarkScenario kConstantHeavy64x64{ + kInputBatches, + kRowsPerBatch, + EncodedVectorKind::kConstant, + 8, +}; + class ColumnarBatchArray : public ColumnarBatchIterator { public: explicit ColumnarBatchArray(std::vector> batches) : batches_(std::move(batches)) {} @@ -196,6 +223,66 @@ std::vector makeSmallVectors(memory::MemoryPool* pool, const Dense return vectors; } +RowVectorPtr +makeDictionaryHeavyVector(memory::MemoryPool* pool, const EncodedBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + const auto dictionarySize = std::max(1, rows / 4); + std::vector children; + std::vector types; + children.reserve(scenario.columns); + types.reserve(scenario.columns); + for (auto channel = 0; channel < scenario.columns; ++channel) { + auto base = BaseVector::create>(BIGINT(), dictionarySize, pool); + for (auto row = 0; row < dictionarySize; ++row) { + base->set(row, static_cast(start + row + channel)); + } + + auto indices = allocateIndices(rows, pool); + auto* rawIndices = indices->asMutable(); + for (auto row = 0; row < rows; ++row) { + rawIndices[row] = (start + row + channel) % dictionarySize; + } + children.push_back(BaseVector::wrapInDictionary(nullptr, std::move(indices), rows, std::move(base))); + types.push_back(BIGINT()); + } + + return std::make_shared(pool, ROW(std::move(types)), nullptr, rows, std::move(children)); +} + +RowVectorPtr +makeConstantHeavyVector(memory::MemoryPool* pool, const EncodedBenchmarkScenario& scenario, int32_t start) { + const auto rows = scenario.rowsPerBatch; + std::vector children; + std::vector types; + children.reserve(scenario.columns); + types.reserve(scenario.columns); + for (auto channel = 0; channel < scenario.columns; ++channel) { + children.push_back(BaseVector::createConstant(BIGINT(), static_cast(start + channel), rows, pool)); + types.push_back(BIGINT()); + } + + return std::make_shared(pool, ROW(std::move(types)), nullptr, rows, std::move(children)); +} + +RowVectorPtr makeEncodedVector(memory::MemoryPool* pool, const EncodedBenchmarkScenario& scenario, int32_t start) { + switch (scenario.kind) { + case EncodedVectorKind::kDictionary: + return makeDictionaryHeavyVector(pool, scenario, start); + case EncodedVectorKind::kConstant: + return makeConstantHeavyVector(pool, scenario, start); + } + VELOX_UNREACHABLE(); +} + +std::vector makeSmallVectors(memory::MemoryPool* pool, const EncodedBenchmarkScenario& scenario) { + std::vector vectors; + vectors.reserve(scenario.inputBatches); + for (auto batch = 0; batch < scenario.inputBatches; ++batch) { + vectors.push_back(makeEncodedVector(pool, scenario, batch * scenario.rowsPerBatch)); + } + return vectors; +} + std::unique_ptr makeIterator(const std::vector& vectors) { std::vector> batches; batches.reserve(vectors.size()); @@ -209,6 +296,10 @@ int64_t totalRows(const DenseBenchmarkScenario& scenario) { return static_cast(scenario.inputBatches) * scenario.rowsPerBatch; } +int64_t totalRows(const EncodedBenchmarkScenario& scenario) { + return static_cast(scenario.inputBatches) * scenario.rowsPerBatch; +} + VeloxBatchResizer makeResizeBenchmarkResizer( memory::MemoryPool* pool, int64_t outputBatchSize, @@ -246,6 +337,25 @@ void runResizeBenchmark( state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); } +void runFallbackResizeBenchmark( + benchmark::State& state, + const EncodedBenchmarkScenario& scenario, + std::optional enableCopyRanges) { + auto pool = memory::memoryManager()->addLeafPool("VeloxBatchResizerFallbackBenchmark"); + const auto vectors = makeSmallVectors(pool.get(), scenario); + int64_t rows = 0; + + for (auto _ : state) { + auto resizer = makeResizeBenchmarkResizer(pool.get(), totalRows(scenario), makeIterator(vectors), enableCopyRanges); + while (auto out = resizer.next()) { + rows += out->numRows(); + } + } + + benchmark::DoNotOptimize(rows); + state.SetItemsProcessed(static_cast(state.iterations()) * totalRows(scenario)); +} + void runDirectChildCopyRangesBenchmark(benchmark::State& state, const DenseBenchmarkScenario& scenario) { auto pool = memory::memoryManager()->addLeafPool("VeloxBatchResizerBenchmarkDirectCopy"); const auto vectors = makeSmallVectors(pool.get(), scenario); @@ -491,6 +601,14 @@ void BM_VeloxBatchResizerDefaultCopyRanges(benchmark::State& state, DenseBenchma runResizeBenchmark(state, scenario, std::nullopt); } +void BM_VeloxBatchResizerFallbackAppendOptOutBaseline(benchmark::State& state, EncodedBenchmarkScenario scenario) { + runFallbackResizeBenchmark(state, scenario, false); +} + +void BM_VeloxBatchResizerDefaultCopyRangesFallback(benchmark::State& state, EncodedBenchmarkScenario scenario) { + runFallbackResizeBenchmark(state, scenario, std::nullopt); +} + void BM_DirectChildCopyRanges(benchmark::State& state, DenseBenchmarkScenario scenario) { runDirectChildCopyRangesBenchmark(state, scenario); } @@ -546,6 +664,10 @@ void BM_ReaderSidePreMergedBatchModel(benchmark::State& state, DenseBenchmarkSce BENCHMARK_CAPTURE(BM_ReaderSideRawPayloadBulkCopyModel, name, scenario); \ BENCHMARK_CAPTURE(BM_ReaderSidePreMergedBatchModel, name, scenario) +#define REGISTER_FALLBACK_SCENARIO_BENCHMARKS(name, scenario) \ + BENCHMARK_CAPTURE(BM_VeloxBatchResizerFallbackAppendOptOutBaseline, name, scenario); \ + BENCHMARK_CAPTURE(BM_VeloxBatchResizerDefaultCopyRangesFallback, name, scenario) + REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_64x64, kMixed64x64); REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_16x256, kMixed16x256); REGISTER_DENSE_SCENARIO_BENCHMARKS(Mixed_256x16, kMixed256x16); @@ -553,8 +675,11 @@ REGISTER_DENSE_SCENARIO_BENCHMARKS(Fixed2_64x64, kFixed2_64x64); REGISTER_DENSE_SCENARIO_BENCHMARKS(Fixed16_64x64, kFixed16_64x64); REGISTER_DENSE_SCENARIO_BENCHMARKS(LongString_64x64, kLongString64x64); REGISTER_DENSE_SCENARIO_BENCHMARKS(BoolHeavy_64x64, kBoolHeavy64x64); +REGISTER_FALLBACK_SCENARIO_BENCHMARKS(DictionaryHeavy_64x64, kDictionaryHeavy64x64); +REGISTER_FALLBACK_SCENARIO_BENCHMARKS(ConstantHeavy_64x64, kConstantHeavy64x64); #undef REGISTER_DENSE_SCENARIO_BENCHMARKS +#undef REGISTER_FALLBACK_SCENARIO_BENCHMARKS } // namespace } // namespace gluten