Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/velox/shuffle/VeloxHashShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ arrow::Status VeloxHashShuffleWriter::splitRowVector(const facebook::velox::RowV
}

arrow::Status VeloxHashShuffleWriter::splitFixedWidthValueBuffer(const facebook::velox::RowVector& rv) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitFixedWidth]);
for (auto col = 0; col < fixedWidthColumnCount_; ++col) {
auto colIdx = simpleColumnIndices_[col];
auto& column = rv.childAt(colIdx);
Expand Down Expand Up @@ -604,6 +605,7 @@ void VeloxHashShuffleWriter::splitBoolType(const uint8_t* srcAddr, const std::ve
}

arrow::Status VeloxHashShuffleWriter::splitValidityBuffer(const facebook::velox::RowVector& rv) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitValidity]);
for (size_t col = 0; col < simpleColumnIndices_.size(); ++col) {
auto colIdx = simpleColumnIndices_[col];
auto& column = rv.childAt(colIdx);
Expand Down Expand Up @@ -695,6 +697,7 @@ arrow::Status VeloxHashShuffleWriter::splitBinaryType(
}

arrow::Status VeloxHashShuffleWriter::splitBinaryArray(const facebook::velox::RowVector& rv) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitBinary]);
for (auto col = fixedWidthColumnCount_; col < simpleColumnIndices_.size(); ++col) {
auto binaryIdx = col - fixedWidthColumnCount_;
auto& dstAddrs = partitionBinaryAddrs_[binaryIdx];
Expand All @@ -706,6 +709,7 @@ arrow::Status VeloxHashShuffleWriter::splitBinaryArray(const facebook::velox::Ro
}

arrow::Status VeloxHashShuffleWriter::splitComplexType(const facebook::velox::RowVector& rv) {
SCOPED_TIMER(cpuWallTimingList_[CpuWallTimingSplitComplex]);
if (complexColumnIndices_.size() == 0) {
return arrow::Status::OK();
}
Expand Down
94 changes: 60 additions & 34 deletions cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,40 +113,6 @@ class VeloxShuffleWriter : public ShuffleWriter {
return partitionBufferPool_->max_memory() + veloxPool_->peakBytes();
}

protected:
VeloxShuffleWriter(
uint32_t numPartitions,
const std::shared_ptr<PartitionWriter>& partitionWriter,
const std::shared_ptr<ShuffleWriterOptions>& options,
MemoryManager* memoryManager)
: ShuffleWriter(numPartitions, options->partitioning),
partitionBufferPool_(memoryManager->getOrCreateArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")),
veloxPool_(dynamic_cast<VeloxMemoryManager*>(memoryManager)->getLeafMemoryPool()),
partitionWriter_(partitionWriter) {
partitioner_ = Partitioner::make(options->partitioning, numPartitions_, options->startPartitionId);
serdeOptions_.useLosslessTimestamp = true;
}

virtual ~VeloxShuffleWriter() = default;

// Memory Pool used to track memory usage of partition buffers.
// The actual allocation is delegated to options_.memoryPool.
std::shared_ptr<arrow::MemoryPool> partitionBufferPool_;

std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;

// PartitionWriter must destruct before partitionBufferPool_, as it may hold buffers allocated by
// partitionBufferPool_.
std::shared_ptr<PartitionWriter> partitionWriter_;

std::shared_ptr<Partitioner> partitioner_;

facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_;

int32_t maxBatchSize_{0};

enum EvictState { kEvictable, kUnevictable };

// stat
enum CpuWallTimingType {
CpuWallTimingBegin = 0,
Expand All @@ -160,7 +126,18 @@ class VeloxShuffleWriter : public ShuffleWriter {
CpuWallTimingMakeRB,
CpuWallTimingCacheRB,
CpuWallTimingFlattenRV,
// Outer SplitRV bucket wraps `splitRowVector()` end-to-end. The four
// entries below subdivide its body so reviewers / profilers can tell
// which inner split path dominates when SplitRV does. Inner buckets
// overlap the outer in time (the outer continues to run while each
// inner is active); the outer minus the sum-of-inners is therefore
// the time spent in `splitRowVector()` outside the four sub-paths
// (mostly the post-split `partitionBufferBase_` update loop).
CpuWallTimingSplitRV,
CpuWallTimingSplitFixedWidth,
CpuWallTimingSplitValidity,
CpuWallTimingSplitBinary,
CpuWallTimingSplitComplex,
CpuWallTimingIteratePartitions,
CpuWallTimingStop,
CpuWallTimingEnd,
Expand Down Expand Up @@ -191,6 +168,14 @@ class VeloxShuffleWriter : public ShuffleWriter {
return "CpuWallTimingFlattenRV";
case CpuWallTimingSplitRV:
return "CpuWallTimingSplitRV";
case CpuWallTimingSplitFixedWidth:
return "CpuWallTimingSplitFixedWidth";
case CpuWallTimingSplitValidity:
return "CpuWallTimingSplitValidity";
case CpuWallTimingSplitBinary:
return "CpuWallTimingSplitBinary";
case CpuWallTimingSplitComplex:
return "CpuWallTimingSplitComplex";
case CpuWallTimingIteratePartitions:
return "CpuWallTimingIteratePartitions";
case CpuWallTimingStop:
Expand All @@ -200,6 +185,47 @@ class VeloxShuffleWriter : public ShuffleWriter {
}
}

// Read-only access to a single per-stage timer. Useful for tests and for
// anyone wanting to drive the existing `cpuWallTimingList_` data through a
// channel other than the `VELOX_SHUFFLE_WRITER_LOG_FLAG` compile-time log.
const facebook::velox::CpuWallTiming& cpuWallTiming(CpuWallTimingType type) const {
return cpuWallTimingList_[type];
}

protected:
VeloxShuffleWriter(
uint32_t numPartitions,
const std::shared_ptr<PartitionWriter>& partitionWriter,
const std::shared_ptr<ShuffleWriterOptions>& options,
MemoryManager* memoryManager)
: ShuffleWriter(numPartitions, options->partitioning),
partitionBufferPool_(memoryManager->getOrCreateArrowMemoryPool("VeloxShuffleWriter.partitionBufferPool")),
veloxPool_(dynamic_cast<VeloxMemoryManager*>(memoryManager)->getLeafMemoryPool()),
partitionWriter_(partitionWriter) {
partitioner_ = Partitioner::make(options->partitioning, numPartitions_, options->startPartitionId);
serdeOptions_.useLosslessTimestamp = true;
}

virtual ~VeloxShuffleWriter() = default;

// Memory Pool used to track memory usage of partition buffers.
// The actual allocation is delegated to options_.memoryPool.
std::shared_ptr<arrow::MemoryPool> partitionBufferPool_;

std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;

// PartitionWriter must destruct before partitionBufferPool_, as it may hold buffers allocated by
// partitionBufferPool_.
std::shared_ptr<PartitionWriter> partitionWriter_;

std::shared_ptr<Partitioner> partitioner_;

facebook::velox::serializer::presto::PrestoVectorSerde::PrestoOptions serdeOptions_;

int32_t maxBatchSize_{0};

enum EvictState { kEvictable, kUnevictable };

facebook::velox::CpuWallTiming cpuWallTimingList_[CpuWallTimingNum];

EvictState evictState_{kEvictable};
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ add_velox_test(velox_shuffle_writer_test SOURCES VeloxShuffleWriterTest.cc)
add_velox_test(velox_hash_shuffle_writer_input_encoding_test SOURCES
VeloxHashShuffleWriterInputEncodingTest.cc)

add_velox_test(velox_hash_shuffle_writer_split_rv_substages_test SOURCES
VeloxHashShuffleWriterSplitRvSubStagesTest.cc)

add_velox_test(velox_shuffle_writer_spill_test SOURCES
VeloxShuffleWriterSpillTest.cc)

Expand Down
165 changes: 165 additions & 0 deletions cpp/velox/tests/VeloxHashShuffleWriterSplitRvSubStagesTest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <gtest/gtest.h>

#include "shuffle/VeloxHashShuffleWriter.h"

#include "VeloxShuffleWriterTestBase.h"
#include "utils/Macros.h"
#include "utils/TestUtils.h"

namespace gluten {

namespace {

std::shared_ptr<PartitionWriter> makeLocalPartitionWriter(
uint32_t numPartitions,
const std::string& dataFile,
const std::vector<std::string>& localDirs) {
GLUTEN_ASSIGN_OR_THROW(auto codec, arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME));
auto options = std::make_shared<LocalPartitionWriterOptions>();
return std::make_shared<LocalPartitionWriter>(
numPartitions, std::move(codec), getDefaultMemoryManager(), options, dataFile, localDirs);
}

} // namespace

class HashShuffleWriterSplitRvSubStagesTest : public ::testing::Test, public VeloxShuffleWriterTestBase {
protected:
static void SetUpTestSuite() {
setUpVeloxBackend();
}

static void TearDownTestSuite() {
tearDownVeloxBackend();
}

void SetUp() override {
VeloxShuffleWriterTestBase::setUpTestData();
}

std::shared_ptr<VeloxShuffleWriter> createWriter(uint32_t numPartitions) {
auto options = std::make_shared<HashShuffleWriterOptions>();
options->partitioning = Partitioning::kHash;
options->splitBufferSize = 4096;

auto partitionWriter = makeLocalPartitionWriter(numPartitions, dataFile_, localDirs_);

GLUTEN_ASSIGN_OR_THROW(
auto writer,
VeloxShuffleWriter::create(
ShuffleWriterType::kHashShuffle, numPartitions, partitionWriter, options, getDefaultMemoryManager()));
return writer;
}

arrow::Status writeBatch(VeloxShuffleWriter& writer, facebook::velox::RowVectorPtr rv) {
std::shared_ptr<ColumnarBatch> cb = std::make_shared<VeloxColumnarBatch>(rv);
return writer.write(cb, ShuffleWriter::kMinMemLimit);
}
};

// Verifies that the human-readable names for the 4 new sub-stages match the
// enum identifiers exactly (this is what shows up in the
// `VELOX_SHUFFLE_WRITER_LOG_FLAG` log line, so the strings are part of the
// stable observable surface).
TEST_F(HashShuffleWriterSplitRvSubStagesTest, enumNames) {
EXPECT_EQ(
VeloxShuffleWriter::CpuWallTimingName(VeloxShuffleWriter::CpuWallTimingSplitFixedWidth),
"CpuWallTimingSplitFixedWidth");
EXPECT_EQ(
VeloxShuffleWriter::CpuWallTimingName(VeloxShuffleWriter::CpuWallTimingSplitValidity),
"CpuWallTimingSplitValidity");
EXPECT_EQ(
VeloxShuffleWriter::CpuWallTimingName(VeloxShuffleWriter::CpuWallTimingSplitBinary), "CpuWallTimingSplitBinary");
EXPECT_EQ(
VeloxShuffleWriter::CpuWallTimingName(VeloxShuffleWriter::CpuWallTimingSplitComplex),
"CpuWallTimingSplitComplex");
}

// A batch with one fixed-width data column should tick the fixed-width
// sub-stage and visit (but not necessarily do meaningful work in) the other
// three. We assert `count >= 1` rather than `>0 ns wall` because the
// SCOPED_TIMER timing resolution can round empty paths to 0 ns; the count
// is the reliable signal that the timer was reached.
TEST_F(HashShuffleWriterSplitRvSubStagesTest, fixedWidthBumpsItsBucket) {
auto writer = createWriter(2);
auto rv = makeRowVector({
makeFlatVector<int32_t>({0, 1, 0, 1}), // partition key
makeFlatVector<int64_t>({100, 200, 300, 400}),
});
ASSERT_NOT_OK(writeBatch(*writer, rv));

EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitFixedWidth).count, 1);
EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitValidity).count, 1);
EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitBinary).count, 1);
EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitComplex).count, 1);
}

// A batch with a VARCHAR column should tick the binary sub-stage with
// nonzero wall time (there's real work copying string data per partition).
TEST_F(HashShuffleWriterSplitRvSubStagesTest, binaryBumpsItsBucket) {
auto writer = createWriter(2);
auto rv = makeRowVector({
makeFlatVector<int32_t>({0, 1, 0, 1}), // partition key
makeFlatVector<facebook::velox::StringView>({"alpha", "beta", "gamma", "delta"}),
});
ASSERT_NOT_OK(writeBatch(*writer, rv));

EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitBinary).count, 1);
EXPECT_GT(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitBinary).wallNanos, 0);
}

// A batch with a complex (ARRAY) column should tick the complex sub-stage
// with nonzero wall time (Presto serializer round-trip per partition).
TEST_F(HashShuffleWriterSplitRvSubStagesTest, complexBumpsItsBucket) {
auto writer = createWriter(2);
auto rv = makeRowVector({
makeFlatVector<int32_t>({0, 1, 0, 1}), // partition key
makeArrayVector<int64_t>({
{1, 2, 3},
{4, 5},
{6},
{7, 8, 9, 10},
}),
});
ASSERT_NOT_OK(writeBatch(*writer, rv));

EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitComplex).count, 1);
EXPECT_GT(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitComplex).wallNanos, 0);
}

// Sanity: the outer SplitRV bucket continues to tick once per batch — the
// new sub-stage timers do not replace it, they refine it.
TEST_F(HashShuffleWriterSplitRvSubStagesTest, outerSplitRvStillCounted) {
auto writer = createWriter(2);
auto rv = makeRowVector({
makeFlatVector<int32_t>({0, 1, 0, 1}),
makeFlatVector<int64_t>({100, 200, 300, 400}),
});
ASSERT_NOT_OK(writeBatch(*writer, rv));
ASSERT_NOT_OK(writeBatch(*writer, rv));

EXPECT_EQ(writer->cpuWallTiming(VeloxShuffleWriter::CpuWallTimingSplitRV).count, 2);
}

} // namespace gluten

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading