diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc index 89f1dbcda62..c014d73d9bf 100644 --- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc @@ -494,6 +494,7 @@ arrow::Status VeloxHashShuffleWriter::splitRowVector(const facebook::velox::RowV } arrow::Status VeloxHashShuffleWriter::splitFixedWidthValueBuffer(const facebook::velox::RowVector& rv) { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitFixedWidth]); for (auto col = 0; col < fixedWidthColumnCount_; ++col) { auto colIdx = simpleColumnIndices_[col]; auto& column = rv.childAt(colIdx); @@ -636,6 +637,7 @@ void VeloxHashShuffleWriter::splitBoolType(const uint8_t* srcAddr, const std::ve } arrow::Status VeloxHashShuffleWriter::splitValidityBuffer(const facebook::velox::RowVector& rv) { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitValidity]); for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) { auto colIdx = simpleColumnIndices_[col]; auto& column = rv.childAt(colIdx); @@ -727,6 +729,7 @@ arrow::Status VeloxHashShuffleWriter::splitBinaryType( } arrow::Status VeloxHashShuffleWriter::splitBinaryArray(const facebook::velox::RowVector& rv) { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitBinary]); for (auto col = fixedWidthColumnCount_; col < simpleColumnIndices_.size(); ++col) { auto binaryIdx = col - fixedWidthColumnCount_; auto& dstAddrs = partitionBinaryAddrs_[binaryIdx]; @@ -738,6 +741,7 @@ arrow::Status VeloxHashShuffleWriter::splitBinaryArray(const facebook::velox::Ro } arrow::Status VeloxHashShuffleWriter::splitComplexType(const facebook::velox::RowVector& rv) { + SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitComplex]); if (complexColumnIndices_.size() == 0) { return arrow::Status::OK(); } diff --git a/cpp/velox/shuffle/VeloxShuffleWriter.h b/cpp/velox/shuffle/VeloxShuffleWriter.h index 45276e3cd41..f93629eeaeb 100644 --- a/cpp/velox/shuffle/VeloxShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxShuffleWriter.h @@ -113,40 +113,6 @@ class VeloxShuffleWriter : public ShuffleWriter { return partitionBufferPool_->max_memory() + veloxPool_->peakBytes(); } - protected: - VeloxShuffleWriter( - uint32_t numPartitions, - const std::shared_ptr& partitionWriter, - const std::shared_ptr& options, - MemoryManager* memoryManager) - : ShuffleWriter(numPartitions, options->partitioning), - partitionBufferPool_(memoryManager->getOrCreateArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")), - veloxPool_(dynamic_cast(memoryManager)->getLeafMemoryPool()), - partitionWriter_(partitionWriter) { - partitioner_ = Partitioner::make(options->partitioning, numPartitions_, options->startPartitionId); - serdeOptions_.useLosslessTimestamp = true; - } - - virtual ~VeloxShuffleWriter() = default; - - // Memory Pool used to track memory usage of partition buffers. - // The actual allocation is delegated to options_.memoryPool. - std::shared_ptr partitionBufferPool_; - - std::shared_ptr veloxPool_; - - // PartitionWriter must destruct before partitionBufferPool_, as it may hold buffers allocated by - // partitionBufferPool_. - std::shared_ptr partitionWriter_; - - std::shared_ptr partitioner_; - - facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_; - - int32_t maxBatchSize_{0}; - - enum EvictState { kEvictable, kUnevictable }; - // stat enum CpuWallTimingType { CpuWallTimingBegin = 0, @@ -160,7 +126,18 @@ class VeloxShuffleWriter : public ShuffleWriter { CpuWallTimingMakeRB, CpuWallTimingCacheRB, CpuWallTimingFlattenRV, + // Outer SplitRV bucket wraps `splitRowVector()` end-to-end. The four + // entries below subdivide its body so reviewers / profilers can tell + // which inner split path dominates when SplitRV does. Inner buckets + // overlap the outer in time (the outer continues to run while each + // inner is active); the outer minus the sum-of-inners is therefore + // the time spent in `splitRowVector()` outside the four sub-paths + // (mostly the post-split `partitionBufferBase_` update loop). CpuWallTimingSplitRV, + CpuWallTimingSplitFixedWidth, + CpuWallTimingSplitValidity, + CpuWallTimingSplitBinary, + CpuWallTimingSplitComplex, CpuWallTimingIteratePartitions, CpuWallTimingStop, CpuWallTimingEnd, @@ -191,6 +168,14 @@ class VeloxShuffleWriter : public ShuffleWriter { return "CpuWallTimingFlattenRV"; case CpuWallTimingSplitRV: return "CpuWallTimingSplitRV"; + case CpuWallTimingSplitFixedWidth: + return "CpuWallTimingSplitFixedWidth"; + case CpuWallTimingSplitValidity: + return "CpuWallTimingSplitValidity"; + case CpuWallTimingSplitBinary: + return "CpuWallTimingSplitBinary"; + case CpuWallTimingSplitComplex: + return "CpuWallTimingSplitComplex"; case CpuWallTimingIteratePartitions: return "CpuWallTimingIteratePartitions"; case CpuWallTimingStop: @@ -200,6 +185,47 @@ class VeloxShuffleWriter : public ShuffleWriter { } } + // Read-only access to a single per-stage timer. Useful for tests and for + // anyone wanting to drive the existing `cpuWallTimingList_` data through a + // channel other than the `VELOX_SHUFFLE_WRITER_LOG_FLAG` compile-time log. + const facebook::velox::CpuWallTiming& cpuWallTiming(CpuWallTimingType type) const { + return cpuWallTimingList_[type]; + } + + protected: + VeloxShuffleWriter( + uint32_t numPartitions, + const std::shared_ptr& partitionWriter, + const std::shared_ptr& options, + MemoryManager* memoryManager) + : ShuffleWriter(numPartitions, options->partitioning), + partitionBufferPool_(memoryManager->getOrCreateArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")), + veloxPool_(dynamic_cast(memoryManager)->getLeafMemoryPool()), + partitionWriter_(partitionWriter) { + partitioner_ = Partitioner::make(options->partitioning, numPartitions_, options->startPartitionId); + serdeOptions_.useLosslessTimestamp = true; + } + + virtual ~VeloxShuffleWriter() = default; + + // Memory Pool used to track memory usage of partition buffers. + // The actual allocation is delegated to options_.memoryPool. + std::shared_ptr partitionBufferPool_; + + std::shared_ptr veloxPool_; + + // PartitionWriter must destruct before partitionBufferPool_, as it may hold buffers allocated by + // partitionBufferPool_. + std::shared_ptr partitionWriter_; + + std::shared_ptr partitioner_; + + facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_; + + int32_t maxBatchSize_{0}; + + enum EvictState { kEvictable, kUnevictable }; + facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum]; EvictState evictState_{kEvictable}; diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index 00c0c2df69c..086c897df33 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -103,6 +103,9 @@ add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc) add_velox_test(velox_hash_shuffle_writer_input_encoding_test SOURCES VeloxHashShuffleWriterInputEncodingTest.cc) +add_velox_test(velox_hash_shuffle_writer_split_rv_substages_test SOURCES + VeloxHashShuffleWriterSplitRvSubStagesTest.cc) + add_velox_test(velox_shuffle_writer_spill_test SOURCES VeloxShuffleWriterSpillTest.cc) diff --git a/cpp/velox/tests/VeloxHashShuffleWriterSplitRvSubStagesTest.cc b/cpp/velox/tests/VeloxHashShuffleWriterSplitRvSubStagesTest.cc new file mode 100644 index 00000000000..95e12b53a69 --- /dev/null +++ b/cpp/velox/tests/VeloxHashShuffleWriterSplitRvSubStagesTest.cc @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "shuffle/VeloxHashShuffleWriter.h" + +#include "VeloxShuffleWriterTestBase.h" +#include "utils/Macros.h" +#include "utils/TestUtils.h" + +namespace gluten { + +namespace { + +std::shared_ptr makeLocalPartitionWriter( + uint32_t numPartitions, + const std::string& dataFile, + const std::vector& localDirs) { + GLUTEN_ASSIGN_OR_THROW(auto codec, arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME)); + auto options = std::make_shared(); + return std::make_shared( + numPartitions, std::move(codec), getDefaultMemoryManager(), options, dataFile, localDirs); +} + +} // namespace + +class HashShuffleWriterSplitRvSubStagesTest : public ::testing::Test, public VeloxShuffleWriterTestBase { + protected: + static void SetUpTestSuite() { + setUpVeloxBackend(); + } + + static void TearDownTestSuite() { + tearDownVeloxBackend(); + } + + void SetUp() override { + VeloxShuffleWriterTestBase::setUpTestData(); + } + + std::shared_ptr createWriter(uint32_t numPartitions) { + auto options = std::make_shared(); + options->partitioning = Partitioning::kHash; + options->splitBufferSize = 4096; + + auto partitionWriter = makeLocalPartitionWriter(numPartitions, dataFile_, localDirs_); + + GLUTEN_ASSIGN_OR_THROW( + auto writer, + VeloxShuffleWriter::create( + ShuffleWriterType::kHashShuffle, numPartitions, partitionWriter, options, getDefaultMemoryManager())); + return writer; + } + + arrow::Status writeBatch(VeloxShuffleWriter& writer, facebook::velox::RowVectorPtr rv) { + std::shared_ptr cb = std::make_shared(rv); + return writer.write(cb, ShuffleWriter::kMinMemLimit); + } +}; + +// Verifies that the human-readable names for the 4 new sub-stages match the +// enum identifiers exactly (this is what shows up in the +// `VELOX_SHUFFLE_WRITER_LOG_FLAG` log line, so the strings are part of the +// stable observable surface). +TEST_F(HashShuffleWriterSplitRvSubStagesTest, enumNames) { + EXPECT_EQ( + VeloxShuffleWriter::CpuWallTimingName(VeloxShuffleWriter::CpuWallTimingSplitFixedWidth), + "CpuWallTimingSplitFixedWidth"); + EXPECT_EQ( + VeloxShuffleWriter::CpuWallTimingName(VeloxShuffleWriter::CpuWallTimingSplitValidity), + "CpuWallTimingSplitValidity"); + EXPECT_EQ( + VeloxShuffleWriter::CpuWallTimingName(VeloxShuffleWriter::CpuWallTimingSplitBinary), "CpuWallTimingSplitBinary"); + EXPECT_EQ( + VeloxShuffleWriter::CpuWallTimingName(VeloxShuffleWriter::CpuWallTimingSplitComplex), + "CpuWallTimingSplitComplex"); +} + +// A batch with one fixed-width data column should tick the fixed-width +// sub-stage and visit (but not necessarily do meaningful work in) the other +// three. We assert `count >= 1` rather than `>0 ns wall` because the +// SCOPED_TIMER timing resolution can round empty paths to 0 ns; the count +// is the reliable signal that the timer was reached. +TEST_F(HashShuffleWriterSplitRvSubStagesTest, fixedWidthBumpsItsBucket) { + auto writer = createWriter(2); + auto rv = makeRowVector({ + makeFlatVector({0, 1, 0, 1}), // partition key + makeFlatVector({100, 200, 300, 400}), + }); + ASSERT_NOT_OK(writeBatch(*writer, rv)); + + EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitFixedWidth).count, 1); + EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitValidity).count, 1); + EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitBinary).count, 1); + EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitComplex).count, 1); +} + +// A batch with a VARCHAR column should tick the binary sub-stage with +// nonzero wall time (there's real work copying string data per partition). +TEST_F(HashShuffleWriterSplitRvSubStagesTest, binaryBumpsItsBucket) { + auto writer = createWriter(2); + auto rv = makeRowVector({ + makeFlatVector({0, 1, 0, 1}), // partition key + makeFlatVector({"alpha", "beta", "gamma", "delta"}), + }); + ASSERT_NOT_OK(writeBatch(*writer, rv)); + + EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitBinary).count, 1); + EXPECT_GT(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitBinary).wallNanos, 0); +} + +// A batch with a complex (ARRAY) column should tick the complex sub-stage +// with nonzero wall time (Presto serializer round-trip per partition). +TEST_F(HashShuffleWriterSplitRvSubStagesTest, complexBumpsItsBucket) { + auto writer = createWriter(2); + auto rv = makeRowVector({ + makeFlatVector({0, 1, 0, 1}), // partition key + makeArrayVector({ + {1, 2, 3}, + {4, 5}, + {6}, + {7, 8, 9, 10}, + }), + }); + ASSERT_NOT_OK(writeBatch(*writer, rv)); + + EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitComplex).count, 1); + EXPECT_GT(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitComplex).wallNanos, 0); +} + +// Sanity: the outer SplitRV bucket continues to tick once per batch — the +// new sub-stage timers do not replace it, they refine it. +TEST_F(HashShuffleWriterSplitRvSubStagesTest, outerSplitRvStillCounted) { + auto writer = createWriter(2); + auto rv = makeRowVector({ + makeFlatVector({0, 1, 0, 1}), + makeFlatVector({100, 200, 300, 400}), + }); + ASSERT_NOT_OK(writeBatch(*writer, rv)); + ASSERT_NOT_OK(writeBatch(*writer, rv)); + + EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitRV).count, 2); +} + +} // namespace gluten + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}