From b4528058282acb1df61aaa892bc3547924619d8b Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Sat, 9 May 2026 12:09:22 -0700 Subject: [PATCH 1/2] perf: Add AVX512 support --- scripts/setup-helper-functions.sh | 23 +++++++++++++++++++---- velox/common/process/ProcessBase.cpp | 11 +++++++++++ velox/common/process/ProcessBase.h | 4 ++++ velox/flag_definitions/flags.cpp | 2 ++ 4 files changed, 36 insertions(+), 4 deletions(-) diff --git a/scripts/setup-helper-functions.sh b/scripts/setup-helper-functions.sh index a50fb02ae0e..06ffbb5b967 100755 --- a/scripts/setup-helper-functions.sh +++ b/scripts/setup-helper-functions.sh @@ -81,7 +81,8 @@ function github_checkout { # The values that CPU_ARCH can take are as follows: # arm64 : Target Apple silicon. # aarch64: Target general 64 bit arm cpus. -# avx: Target Intel CPUs with AVX. +# avx512: Target Intel CPUs with AVX-512. +# avx: Target Intel CPUs with AVX2. # sse: Target Intel CPUs with sse. # Echo's the appropriate compiler flags which can be captured as so # CXX_FLAGS=$(get_cxx_flags) or @@ -102,7 +103,12 @@ function get_cxx_flags { else # x86_64 local CPU_CAPABILITIES CPU_CAPABILITIES=$(sysctl -a | grep machdep.cpu.features | awk '{print tolower($0)}') - if [[ $CPU_CAPABILITIES =~ "avx" ]]; then + if [[ $CPU_CAPABILITIES =~ "avx512f" ]] && + [[ $CPU_CAPABILITIES =~ "avx512dq" ]] && + [[ $CPU_CAPABILITIES =~ "avx512bw" ]] && + [[ $CPU_CAPABILITIES =~ "avx512vl" ]]; then + CPU_ARCH="avx512" + elif [[ $CPU_CAPABILITIES =~ "avx" ]]; then CPU_ARCH="avx" else CPU_ARCH="sse" @@ -114,7 +120,12 @@ function get_cxx_flags { else # x86_64 local CPU_CAPABILITIES CPU_CAPABILITIES=$(cat /proc/cpuinfo | grep flags | head -n 1 | awk '{print tolower($0)}') - if [[ $CPU_CAPABILITIES =~ "avx" ]]; then + if [[ $CPU_CAPABILITIES =~ "avx512f" ]] && + [[ $CPU_CAPABILITIES =~ "avx512dq" ]] && + [[ $CPU_CAPABILITIES =~ "avx512bw" ]] && + [[ $CPU_CAPABILITIES =~ "avx512vl" ]]; then + CPU_ARCH="avx512" + elif [[ $CPU_CAPABILITIES =~ "avx" ]]; then CPU_ARCH="avx" elif [[ $CPU_CAPABILITIES =~ "sse" ]]; then CPU_ARCH="sse" @@ -131,8 +142,12 @@ function get_cxx_flags { echo -n "-mcpu=apple-m1+crc" ;; + "avx512") + echo -n "-mavx512f -mavx512dq -mavx512bw -mavx512vl -mavx2 -mfma -mavx -mf16c -mlzcnt -mbmi2" + ;; + "avx") - echo -n "-mavx2 -mfma -mavx -mf16c -mlzcnt -mbmi2" + echo -n "-mavx2 -mfma -mavx -mf16c -mlzcnt -mbmi2" ;; "sse") diff --git a/velox/common/process/ProcessBase.cpp b/velox/common/process/ProcessBase.cpp index 0b9a4df2c64..3cbb7fa6a42 100644 --- a/velox/common/process/ProcessBase.cpp +++ b/velox/common/process/ProcessBase.cpp @@ -32,6 +32,8 @@ DECLARE_bool(avx2); // Enables use of AVX2 when available NOLINT DECLARE_bool(bmi2); // Enables use of BMI2 when available NOLINT +DECLARE_bool(avx512f); + namespace facebook { namespace velox { namespace process { @@ -106,6 +108,7 @@ uint64_t threadCpuNanos() { namespace { bool bmi2CpuFlag = folly::CpuId().bmi2(); bool avx2CpuFlag = folly::CpuId().avx2(); +bool avx512fCpuFlag = folly::CpuId().avx512f(); } // namespace bool hasAvx2() { @@ -124,6 +127,14 @@ bool hasBmi2() { #endif } +bool hasAvx512f() { +#ifdef __AVX512F__ + return avx512fCpuFlag && FLAGS_avx512f; +#else + return false; +#endif +} + } // namespace process } // namespace velox } // namespace facebook diff --git a/velox/common/process/ProcessBase.h b/velox/common/process/ProcessBase.h index 34edd6d1467..7ca400b4efa 100644 --- a/velox/common/process/ProcessBase.h +++ b/velox/common/process/ProcessBase.h @@ -46,6 +46,10 @@ uint64_t threadCpuNanos(); /// by flag. bool hasAvx2(); +/// True if the machine has Intel AVX512F instructions and these are not +/// disabled by flag. +bool hasAvx512f(); + /// True if the machine has Intel BMI2 instructions and these are not disabled /// by flag. bool hasBmi2(); diff --git a/velox/flag_definitions/flags.cpp b/velox/flag_definitions/flags.cpp index 8648e80a68e..4adc6a5a22c 100644 --- a/velox/flag_definitions/flags.cpp +++ b/velox/flag_definitions/flags.cpp @@ -55,6 +55,8 @@ DEFINE_int32( DEFINE_bool(avx2, true, "Enables use of AVX2 when available"); +DEFINE_bool(avx512f, true, "Enables use of AVX512F when available"); + DEFINE_bool(bmi2, true, "Enables use of BMI2 when available"); // Used in exec/Expr.cpp From 24bd9b807251d0f708ea4f6d53c5d43d604f46c2 Mon Sep 17 00:00:00 2001 From: yingsu00 Date: Sat, 9 May 2026 14:38:05 -0700 Subject: [PATCH 2/2] perf: Introduce OptimizedHashPartitionFunction Introduce OptimizedHashPartitionFunction as a faster drop-in replacement for HashPartitionFunction, gated behind a new query config flag optimized_hash_partition_function_enabled (default false). partition() is improved from 50% to over 200x. Add HashPartitionFunctionBase as a common base exposing numPartitions(), and createHashPartitionFunction() factories that select the implementation based on the flag. Thread QueryConfig* through PartitionFunctionSpec::create() and update callsites (LocalPartition, PartitionedOutput, MarkDistinct, RowNumber, Window, SubPartitionedSortWindowBuild, HiveConnector) to construct partition functions via the factory. Register CMake targets for the new test and benchmark binaries. --- velox/connectors/hive/HiveConnector.cpp | 3 +- velox/connectors/hive/HiveConnector.h | 3 +- velox/core/PlanNode.h | 9 +- velox/core/QueryConfig.cpp | 1 + velox/core/QueryConfig.h | 10 + velox/exec/CMakeLists.txt | 1 + velox/exec/HashPartitionFunction.cpp | 47 +- velox/exec/HashPartitionFunction.h | 38 +- velox/exec/LocalPartition.cpp | 11 +- velox/exec/MarkDistinct.cpp | 10 +- velox/exec/MarkDistinct.h | 2 +- velox/exec/OptimizedHashPartitionFunction.cpp | 270 ++++++++++ velox/exec/OptimizedHashPartitionFunction.h | 72 +++ velox/exec/OptimizedPartitionedOutput.cpp | 6 +- velox/exec/OptimizedVectorHasher.cpp | 28 ++ velox/exec/OptimizedVectorHasher.h | 6 + velox/exec/PartitionedOutput.cpp | 11 +- velox/exec/RoundRobinPartitionFunction.h | 3 +- velox/exec/RowNumber.cpp | 10 +- velox/exec/RowNumber.h | 2 +- velox/exec/ScaleWriterLocalPartition.cpp | 5 +- velox/exec/SubPartitionedSortWindowBuild.cpp | 17 +- velox/exec/SubPartitionedSortWindowBuild.h | 3 +- velox/exec/Window.cpp | 1 + velox/exec/benchmarks/CMakeLists.txt | 12 + ...ptimizedHashPartitionFunctionBenchmark.cpp | 465 ++++++++++++++++++ velox/exec/tests/CMakeLists.txt | 1 + .../OptimizedHashPartitionFunctionTest.cpp | 186 +++++++ velox/exec/tests/utils/PlanBuilder.cpp | 3 +- 29 files changed, 1200 insertions(+), 36 deletions(-) create mode 100644 velox/exec/OptimizedHashPartitionFunction.cpp create mode 100644 velox/exec/OptimizedHashPartitionFunction.h create mode 100644 velox/exec/benchmarks/OptimizedHashPartitionFunctionBenchmark.cpp create mode 100644 velox/exec/tests/OptimizedHashPartitionFunctionTest.cpp diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 062a507fc64..f52aeb7dd37 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -132,7 +132,8 @@ void HiveConnector::registerSerDe() { std::unique_ptr HivePartitionFunctionSpec::create( int numPartitions, - bool localExchange) const { + bool localExchange, + bool /*useOptimizedPartitionFunction*/) const { std::vector bucketToPartitions; if (bucketToPartition_.empty()) { // NOTE: if hive partition function spec doesn't specify bucket to partition diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index 95c175c4f69..e3508cb4729 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -141,7 +141,8 @@ class HivePartitionFunctionSpec : public core::PartitionFunctionSpec { std::unique_ptr create( int numPartitions, - bool localExchange) const override; + bool localExchange, + bool useOptimizedPartitionFunction = false) const override; std::string toString() const override; diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index 4a1ba1579cd..109caf0f45d 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -2500,9 +2500,13 @@ class PartitionFunctionSpec : public ISerializable { public: /// If 'localExchange' is true, the partition function is used for local /// exchange within a velox task. + /// TODO: useOptimizedPartitionFunction = true is only supported in + /// HashPartitionFunction now. Will extend the optimization to other + /// PartitionFunctions soon. virtual std::unique_ptr create( int numPartitions, - bool localExchange = false) const = 0; + bool localExchange = false, + bool useOptimizedPartitionFunction = false) const = 0; virtual ~PartitionFunctionSpec() = default; @@ -2515,7 +2519,8 @@ class GatherPartitionFunctionSpec : public PartitionFunctionSpec { public: std::unique_ptr create( int /*numPartitions*/, - bool /*localExchange*/) const override { + bool /*localExchange*/, + bool /*useOptimizedPartitionFunction*/ = false) const override { VELOX_UNREACHABLE(); } diff --git a/velox/core/QueryConfig.cpp b/velox/core/QueryConfig.cpp index 4a31862590a..8493d6546c7 100644 --- a/velox/core/QueryConfig.cpp +++ b/velox/core/QueryConfig.cpp @@ -90,6 +90,7 @@ const std::vector& QueryConfig::registeredProperties() { // Partitioned output. VELOX_REGISTER_QUERY_CONFIG(kPartitionedOutputEagerFlush); + VELOX_REGISTER_QUERY_CONFIG(kOptimizedHashPartitionFunctionEnabled); VELOX_REGISTER_QUERY_CONFIG(kMaxPartitionedOutputBufferSize); VELOX_REGISTER_QUERY_CONFIG(kMaxOutputBufferSize); diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 0571284aedc..b30fb47bd1a 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -454,6 +454,16 @@ class QueryConfig { false, "Flush PartitionedOutput rows eagerly without buffering.") + /// If true, use OptimizedHashPartitionFunction in place of + /// HashPartitionFunction. + VELOX_QUERY_CONFIG( + kOptimizedHashPartitionFunctionEnabled, + optimizedHashPartitionFunctionEnabled, + "optimized_hash_partition_function_enabled", + bool, + false, + "Use OptimizedHashPartitionFunction instead of HashPartitionFunction.") + /// The maximum number of bytes to buffer in PartitionedOutput operator to /// avoid creating tiny SerializedPages. VELOX_QUERY_CONFIG( diff --git a/velox/exec/CMakeLists.txt b/velox/exec/CMakeLists.txt index d77f0305bfd..626c7c06570 100644 --- a/velox/exec/CMakeLists.txt +++ b/velox/exec/CMakeLists.txt @@ -71,6 +71,7 @@ velox_add_library( OperatorTraceScan.cpp OperatorTraceWriter.cpp OperatorUtils.cpp + OptimizedHashPartitionFunction.cpp OptimizedPartitionedOutput.cpp OptimizedVectorHasher.cpp OrderBy.cpp diff --git a/velox/exec/HashPartitionFunction.cpp b/velox/exec/HashPartitionFunction.cpp index 896facc4efa..44f012e5e00 100644 --- a/velox/exec/HashPartitionFunction.cpp +++ b/velox/exec/HashPartitionFunction.cpp @@ -13,8 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include -#include +#include "velox/exec/HashPartitionFunction.h" + +#include "velox/exec/OptimizedHashPartitionFunction.h" +#include "velox/exec/VectorHasher.h" #define XXH_INLINE_ALL #include // @manual=third-party//xxHash:xxhash @@ -123,9 +125,15 @@ std::optional HashPartitionFunction::partition( std::unique_ptr HashPartitionFunctionSpec::create( int numPartitions, - bool localExchange) const { - return std::make_unique( - localExchange, numPartitions, inputType_, keyChannels_, constValues_); + bool localExchange, + bool useOptimizedPartitionFunction) const { + return createHashPartitionFunction( + localExchange, + numPartitions, + inputType_, + keyChannels_, + constValues_, + useOptimizedPartitionFunction); } std::string HashPartitionFunctionSpec::toString() const { @@ -180,4 +188,33 @@ core::PartitionFunctionSpecPtr HashPartitionFunctionSpec::deserialize( return std::make_shared( ISerializable::deserialize(obj["inputType"]), keys, constValues); } + +std::unique_ptr createHashPartitionFunction( + bool localExchange, + int numPartitions, + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues, + bool useOptimizedPartitionFunction) { + if (useOptimizedPartitionFunction) { + return std::make_unique( + localExchange, numPartitions, inputType, keyChannels, constValues); + } + return std::make_unique( + localExchange, numPartitions, inputType, keyChannels, constValues); +} + +std::unique_ptr createHashPartitionFunction( + const HashBitRange& hashBitRange, + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues, + bool useOptimizedPartitionFunction) { + if (useOptimizedPartitionFunction) { + return std::make_unique( + hashBitRange, inputType, keyChannels, constValues); + } + return std::make_unique( + hashBitRange, inputType, keyChannels, constValues); +} } // namespace facebook::velox::exec diff --git a/velox/exec/HashPartitionFunction.h b/velox/exec/HashPartitionFunction.h index 7aa6a032d6b..ec9a33b82a6 100644 --- a/velox/exec/HashPartitionFunction.h +++ b/velox/exec/HashPartitionFunction.h @@ -15,9 +15,9 @@ */ #pragma once -#include -#include #include "velox/core/PlanNode.h" +#include "velox/exec/HashBitRange.h" +#include "velox/exec/VectorHasher.h" namespace facebook::velox::exec { @@ -27,7 +27,16 @@ namespace facebook::velox::exec { /// numPartitions allows the keyChannels argument to be empty. If keyChannels is /// empty, then the resulting partition number of partition() will always be /// zero. -class HashPartitionFunction : public core::PartitionFunction { +/// Extends PartitionFunction with access to the configured number of +/// partitions. +class HashPartitionFunctionBase : public core::PartitionFunction { + public: + ~HashPartitionFunctionBase() override = default; + + virtual int numPartitions() const = 0; +}; + +class HashPartitionFunction : public HashPartitionFunctionBase { public: HashPartitionFunction( bool localExchange, @@ -48,7 +57,7 @@ class HashPartitionFunction : public core::PartitionFunction { const RowVector& input, std::vector& partitions) override; - int numPartitions() const { + int numPartitions() const override { return numPartitions_; } @@ -85,7 +94,8 @@ class HashPartitionFunctionSpec : public core::PartitionFunctionSpec { std::unique_ptr create( int numPartitions, - bool localExchange) const override; + bool localExchange, + bool useOptimizedPartitionFunction = false) const override; std::string toString() const override; @@ -100,4 +110,22 @@ class HashPartitionFunctionSpec : public core::PartitionFunctionSpec { const std::vector keyChannels_; const std::vector constValues_; }; + +/// Creates either HashPartitionFunction or OptimizedHashPartitionFunction +/// based on 'useOptimizedPartitionFunction'. +std::unique_ptr createHashPartitionFunction( + bool localExchange, + int numPartitions, + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues = {}, + bool useOptimizedPartitionFunction = false); + +std::unique_ptr createHashPartitionFunction( + const HashBitRange& hashBitRange, + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues = {}, + bool useOptimizedPartitionFunction = false); + } // namespace facebook::velox::exec diff --git a/velox/exec/LocalPartition.cpp b/velox/exec/LocalPartition.cpp index eb6eb81add3..231b873d7fa 100644 --- a/velox/exec/LocalPartition.cpp +++ b/velox/exec/LocalPartition.cpp @@ -339,10 +339,13 @@ LocalPartition::LocalPartition( ctx->task->getLocalExchangeQueues(ctx->splitGroupId, planNode->id())}, numPartitions_{queues_.size()}, partitionFunction_( - numPartitions_ == 1 ? nullptr - : planNode->partitionFunctionSpec().create( - numPartitions_, - /*localExchange=*/true)), + numPartitions_ == 1 + ? nullptr + : planNode->partitionFunctionSpec().create( + numPartitions_, + /*localExchange=*/true, + ctx->queryConfig() + .optimizedHashPartitionFunctionEnabled())), singlePartitionBufferSize_{ (numPartitions_ < ctx->queryConfig() diff --git a/velox/exec/MarkDistinct.cpp b/velox/exec/MarkDistinct.cpp index 2b562c714af..83ae15a2cad 100644 --- a/velox/exec/MarkDistinct.cpp +++ b/velox/exec/MarkDistinct.cpp @@ -356,8 +356,14 @@ void MarkDistinct::setupInputSpiller( &spillConfig_.value(), spillStats_.get()); - spillHashFunction_ = std::make_unique( - inputSpiller_->hashBits(), inputType_, distinctKeyChannels_); + spillHashFunction_ = createHashPartitionFunction( + inputSpiller_->hashBits(), + inputType_, + distinctKeyChannels_, + {}, + operatorCtx_->driverCtx() + ->queryConfig() + .optimizedHashPartitionFunctionEnabled()); } void MarkDistinct::spill() { diff --git a/velox/exec/MarkDistinct.h b/velox/exec/MarkDistinct.h index c8c582b5ea8..f386ff77bd9 100644 --- a/velox/exec/MarkDistinct.h +++ b/velox/exec/MarkDistinct.h @@ -106,7 +106,7 @@ class MarkDistinct : public Operator { SpillPartitionSet spillInputPartitionSet_; - std::unique_ptr spillHashFunction_; + std::unique_ptr spillHashFunction_; SpillPartitionSet spillHashTablePartitionSet_; diff --git a/velox/exec/OptimizedHashPartitionFunction.cpp b/velox/exec/OptimizedHashPartitionFunction.cpp new file mode 100644 index 00000000000..64bf813fad8 --- /dev/null +++ b/velox/exec/OptimizedHashPartitionFunction.cpp @@ -0,0 +1,270 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "velox/exec/OptimizedHashPartitionFunction.h" + +#include + +#include + +#include "velox/common/process/ProcessBase.h" + +#if defined(__AVX2__) || defined(__AVX512F__) +#include +#endif + +#define XXH_INLINE_ALL +#include // @manual=third-party//xxHash:xxhash + +namespace facebook::velox::exec { +namespace { +// Gets the hash value for local exchange with given 'rawHash'. 'rawHash' +// is the value computed by this hash function which is used for remote +// shuffle across stages like for Prestissimo. +static inline uint32_t localExchangeHash(uint32_t rawHash) { + // Mix the bits so we don't use the same hash used to distribute between + // stages. + bits::reverseBits(reinterpret_cast(&rawHash), sizeof(rawHash)); + return XXH32(&rawHash, sizeof(rawHash), 0); +} + +FOLLY_ALWAYS_INLINE uint32_t mixedHash(uint64_t hash) { + return static_cast(hash) ^ static_cast(hash >> 32); +} + +FOLLY_ALWAYS_INLINE uint32_t +reduceRange(uint64_t hash, uint32_t numPartitions) { + return (static_cast(mixedHash(hash)) * numPartitions) >> 32; +} + +void rangeReductionPowerOfTwo( + const uint64_t* hashes, + uint32_t* partitions, + int size, + uint32_t numPartitions) { + VELOX_DCHECK(bits::isPowerOfTwo(numPartitions)); + + if (numPartitions == 1) { + std::fill(partitions, partitions + size, 0); + return; + } + + const auto shift = 32 - __builtin_ctz(numPartitions); + for (int index = 0; index < size; ++index) { + partitions[index] = mixedHash(hashes[index]) >> shift; + } +} + +#if defined(__AVX512F__) +void rangeReductionAvx512( + const uint64_t* hashes, + uint32_t* partitions, + int size, + uint32_t numPartitions) { + const __m512i numPartitionsVec = _mm512_set1_epi64(numPartitions); + + int index = 0; + for (; index <= size - 8; index += 8) { + const auto hashesVec = + _mm512_loadu_si512(reinterpret_cast(hashes + index)); + + const auto mixedHashesVec = + _mm512_xor_si512(hashesVec, _mm512_srli_epi64(hashesVec, 32)); + const auto productVec = _mm512_mul_epu32(mixedHashesVec, numPartitionsVec); + const auto shiftedVec = _mm512_srli_epi64(productVec, 32); + const auto packedResults = _mm512_cvtepi64_epi32(shiftedVec); + _mm256_storeu_si256( + reinterpret_cast<__m256i*>(partitions + index), packedResults); + } + + for (; index < size; ++index) { + partitions[index] = reduceRange(hashes[index], numPartitions); + } +} +#endif + +#if defined(__AVX2__) +void rangeReductionAvx2( + const uint64_t* hashes, + uint32_t* partitions, + int size, + uint32_t numPartitions) { + const auto packIndexes = _mm256_setr_epi32(0, 2, 4, 6, 0, 0, 0, 0); + const auto numPartitionsVec = _mm256_set1_epi64x(numPartitions); + + int index = 0; + for (; index <= size - 4; index += 4) { + const auto hashesVec = + _mm256_loadu_si256(reinterpret_cast(hashes + index)); + const auto mixedHashesVec = + _mm256_xor_si256(hashesVec, _mm256_srli_epi64(hashesVec, 32)); + const auto productVec = _mm256_mul_epu32(mixedHashesVec, numPartitionsVec); + const auto shiftedVec = _mm256_srli_epi64(productVec, 32); + const auto packedResults = + _mm256_permutevar8x32_epi32(shiftedVec, packIndexes); + _mm_storeu_si128( + reinterpret_cast<__m128i*>(partitions + index), + _mm256_castsi256_si128(packedResults)); + } + + for (; index < size; ++index) { + partitions[index] = reduceRange(hashes[index], numPartitions); + } +} +#endif + +void rangeReductionImpl( + const uint64_t* hashes, + uint32_t* partitions, + int size, + uint32_t numPartitions) { + if (bits::isPowerOfTwo(numPartitions)) { + rangeReductionPowerOfTwo(hashes, partitions, size, numPartitions); + return; + } + +#if defined(__AVX512F__) + if (process::hasAvx512f()) { + rangeReductionAvx512(hashes, partitions, size, numPartitions); + return; + } +#endif + +#if defined(__AVX2__) + if (process::hasAvx2()) { + rangeReductionAvx2(hashes, partitions, size, numPartitions); + return; + } +#endif + + for (int index = 0; index < size; ++index) { + partitions[index] = reduceRange(hashes[index], numPartitions); + } +} + +void applyLocalExchangeHash(raw_vector& hashes) { + for (auto& hash : hashes) { + hash = localExchangeHash(hash); + } +} + +void applyHashBitRange( + const HashBitRange& hashBitRange, + const raw_vector& hashes, + std::vector& partitions) { + partitions.resize(hashes.size()); + for (auto index = 0; index < hashes.size(); ++index) { + partitions[index] = hashBitRange.partition(hashes[index]); + } +} + +} // namespace + +void rangeReduction( + const uint64_t* hashes, + uint32_t* partitions, + int size, + uint32_t numPartitions) { + rangeReductionImpl(hashes, partitions, size, numPartitions); +} + +OptimizedHashPartitionFunction::OptimizedHashPartitionFunction( + bool localExchange, + int numPartitions, + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues) + : localExchange_{localExchange}, numPartitions_{numPartitions} { + init(inputType, keyChannels, constValues); +} + +OptimizedHashPartitionFunction::OptimizedHashPartitionFunction( + const HashBitRange& hashBitRange, + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues) + : localExchange_{false}, + numPartitions_{hashBitRange.numPartitions()}, + hashBitRange_(hashBitRange) { + VELOX_CHECK_GT(hashBitRange.numPartitions(), 0); + VELOX_CHECK(!keyChannels.empty()); + init(inputType, keyChannels, constValues); +} + +void OptimizedHashPartitionFunction::init( + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues) { + hashers_.reserve(keyChannels.size()); + size_t constChannel{0}; + for (const auto channel : keyChannels) { + if (channel != kConstantChannel) { + hashers_.emplace_back( + OptimizedVectorHasher::create(inputType->childAt(channel), channel)); + } else { + const auto& constValue = constValues[constChannel++]; + hashers_.emplace_back( + OptimizedVectorHasher::create(constValue->type(), channel)); + hashers_.back()->precompute(*constValue); + } + } +} + +std::optional OptimizedHashPartitionFunction::partition( + const RowVector& input, + std::vector& partitions) { + if (hashers_.empty()) { + return 0u; + } + + const auto size = input.size(); + if (size == 0) { + partitions.clear(); + return std::nullopt; + } + + if (!hashBitRange_.has_value() && numPartitions_ == 1) { + return 0u; + } + + rows_.resize(size); + rows_.setAll(); + + hashes_.resize(size); + for (auto i = 0; i < hashers_.size(); ++i) { + auto& hasher = hashers_[i]; + if (hasher->channel() != kConstantChannel) { + hashers_[i]->decode(*input.childAt(hasher->channel()), rows_); + hashers_[i]->hash(rows_, i > 0, hashes_); + } else { + hashers_[i]->hashPrecomputed(i > 0, hashes_); + } + } + + if (localExchange_) { + applyLocalExchangeHash(hashes_); + } + + if (hashBitRange_.has_value()) { + applyHashBitRange(*hashBitRange_, hashes_, partitions); + } else { + partitions.resize(size); + rangeReduction(hashes_.data(), partitions.data(), size, numPartitions_); + } + + return std::nullopt; +} + +} // namespace facebook::velox::exec diff --git a/velox/exec/OptimizedHashPartitionFunction.h b/velox/exec/OptimizedHashPartitionFunction.h new file mode 100644 index 00000000000..3744cd88f76 --- /dev/null +++ b/velox/exec/OptimizedHashPartitionFunction.h @@ -0,0 +1,72 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "velox/exec/HashPartitionFunction.h" +#include "velox/exec/OptimizedVectorHasher.h" + +namespace facebook::velox::exec { + +/// Maps hashes to partitions using range reduction. Visible for testing. +void rangeReduction( + const uint64_t* hashes, + uint32_t* partitions, + int size, + uint32_t numPartitions); + +/// Calculates partition numbers using OptimizedVectorHasher. +class OptimizedHashPartitionFunction : public HashPartitionFunctionBase { + public: + OptimizedHashPartitionFunction( + bool localExchange, + int numPartitions, + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues = {}); + + OptimizedHashPartitionFunction( + const HashBitRange& hashBitRange, + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues = {}); + + ~OptimizedHashPartitionFunction() override = default; + + std::optional partition( + const RowVector& input, + std::vector& partitions) override; + + int numPartitions() const override { + return numPartitions_; + } + + private: + void init( + const RowTypePtr& inputType, + const std::vector& keyChannels, + const std::vector& constValues); + + const bool localExchange_; + const int numPartitions_; + const std::optional hashBitRange_ = std::nullopt; + std::vector> hashers_; + + // Reusable memory. + SelectivityVector rows_; + raw_vector hashes_; +}; + +} // namespace facebook::velox::exec diff --git a/velox/exec/OptimizedPartitionedOutput.cpp b/velox/exec/OptimizedPartitionedOutput.cpp index e825ea94633..321a168a082 100644 --- a/velox/exec/OptimizedPartitionedOutput.cpp +++ b/velox/exec/OptimizedPartitionedOutput.cpp @@ -54,7 +54,11 @@ OptimizedPartitionedOutput::OptimizedPartitionedOutput( partitionFunction_( numDestinations_ == 1 ? nullptr - : planNode->partitionFunctionSpec().create(numDestinations_)) { + : planNode->partitionFunctionSpec().create( + numDestinations_, + /*localExchange=*/false, + ctx->queryConfig() + .optimizedHashPartitionFunctionEnabled())) { if (!planNode->isPartitioned()) { VELOX_USER_CHECK_EQ(numDestinations_, 1); } diff --git a/velox/exec/OptimizedVectorHasher.cpp b/velox/exec/OptimizedVectorHasher.cpp index 507ffc9edb1..891ebd522fe 100644 --- a/velox/exec/OptimizedVectorHasher.cpp +++ b/velox/exec/OptimizedVectorHasher.cpp @@ -383,6 +383,34 @@ void OptimizedVectorHasher::hashPrecomputed( }); } +uint64_t OptimizedVectorHasher::hashPrecomputed(bool mix, uint64_t result) + const { + return mix ? bits::hashMix(result, precomputedHash_) : precomputedHash_; +} + +std::optional OptimizedVectorHasher::hashConstant( + bool mix, + uint64_t result) const { + if (!decoded_.isConstantMapping()) { + return std::nullopt; + } + + VELOX_DCHECK_GT(decoded_.size(), 0); + + uint64_t hash; + if (decoded_.isNullAt(0) || typeKind_ == TypeKind::UNKNOWN) { + hash = kNullHash; + } else if (typeProvidesCustomComparison_) { + hash = VELOX_DYNAMIC_TEMPLATE_TYPE_DISPATCH( + hashOne, true, typeKind_, decoded_, 0); + } else { + hash = VELOX_DYNAMIC_TEMPLATE_TYPE_DISPATCH( + hashOne, false, typeKind_, decoded_, 0); + } + + return mix ? bits::hashMix(result, hash) : hash; +} + void OptimizedVectorHasher::precompute(const BaseVector& value) { if (value.isNullAt(0)) { precomputedHash_ = kNullHash; diff --git a/velox/exec/OptimizedVectorHasher.h b/velox/exec/OptimizedVectorHasher.h index 830b453abe8..5a75c054212 100644 --- a/velox/exec/OptimizedVectorHasher.h +++ b/velox/exec/OptimizedVectorHasher.h @@ -47,6 +47,12 @@ class OptimizedVectorHasher { void hashPrecomputed(bool mix, raw_vector& result) const; + // Computes one hash from a precomputed single value. + uint64_t hashPrecomputed(bool mix, uint64_t result) const; + + // Computes one hash when the decoded vector has constant mapping. + std::optional hashConstant(bool mix, uint64_t result) const; + void precompute(const BaseVector& value); static constexpr uint64_t kNullHash = BaseVector::kNullHash; diff --git a/velox/exec/PartitionedOutput.cpp b/velox/exec/PartitionedOutput.cpp index ba4e23d738b..74320389489 100644 --- a/velox/exec/PartitionedOutput.cpp +++ b/velox/exec/PartitionedOutput.cpp @@ -207,10 +207,13 @@ PartitionedOutput::PartitionedOutput( numDestinations_(planNode->numPartitions()), replicateNullsAndAny_(planNode->isReplicateNullsAndAny()), partitionFunction_( - numDestinations_ == 1 ? nullptr - : planNode->partitionFunctionSpec().create( - numDestinations_, - /*localExchange=*/false)), + numDestinations_ == 1 + ? nullptr + : planNode->partitionFunctionSpec().create( + numDestinations_, + /*localExchange=*/false, + ctx->queryConfig() + .optimizedHashPartitionFunctionEnabled())), outputChannels_(calculateOutputChannels( planNode->inputType(), planNode->outputType(), diff --git a/velox/exec/RoundRobinPartitionFunction.h b/velox/exec/RoundRobinPartitionFunction.h index b84c6d2ffaf..a13ed529f55 100644 --- a/velox/exec/RoundRobinPartitionFunction.h +++ b/velox/exec/RoundRobinPartitionFunction.h @@ -43,7 +43,8 @@ class RoundRobinPartitionFunctionSpec : public core::PartitionFunctionSpec { public: std::unique_ptr create( int numPartitions, - bool /*localExchange*/) const override { + bool /*localExchange*/, + bool /*useOptimizedPartitionFunction*/ = false) const override { return std::make_unique( numPartitions); } diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index cd2cd4ce36a..04427975120 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -449,8 +449,14 @@ void RowNumber::setupInputSpiller( keyChannels.push_back(hasher->channel()); } - spillHashFunction_ = std::make_unique( - inputSpiller_->hashBits(), inputType_, keyChannels); + spillHashFunction_ = createHashPartitionFunction( + inputSpiller_->hashBits(), + inputType_, + keyChannels, + {}, + operatorCtx_->driverCtx() + ->queryConfig() + .optimizedHashPartitionFunctionEnabled()); } void RowNumber::spill() { diff --git a/velox/exec/RowNumber.h b/velox/exec/RowNumber.h index b34fc9d9c20..8e53713fc77 100644 --- a/velox/exec/RowNumber.h +++ b/velox/exec/RowNumber.h @@ -142,7 +142,7 @@ class RowNumber : public Operator { SpillPartitionSet spillInputPartitionSet_; // Used to calculate the spill partition numbers of the inputs. - std::unique_ptr spillHashFunction_; + std::unique_ptr spillHashFunction_; // The cpu may be voluntarily yield after running too long when processing // input from spilled file. diff --git a/velox/exec/ScaleWriterLocalPartition.cpp b/velox/exec/ScaleWriterLocalPartition.cpp index 7530ff403a0..1764adabf6a 100644 --- a/velox/exec/ScaleWriterLocalPartition.cpp +++ b/velox/exec/ScaleWriterLocalPartition.cpp @@ -57,7 +57,10 @@ ScaleWriterPartitioningLocalPartition::ScaleWriterPartitioningLocalPartition( ? nullptr : planNode->partitionFunctionSpec().create( numTablePartitions_, - /*localExchange=*/true); + /*localExchange=*/true, + operatorCtx_->driverCtx() + ->queryConfig() + .optimizedHashPartitionFunctionEnabled()); } void ScaleWriterPartitioningLocalPartition::initialize() { diff --git a/velox/exec/SubPartitionedSortWindowBuild.cpp b/velox/exec/SubPartitionedSortWindowBuild.cpp index 2f2a247a8d4..db437748fbb 100644 --- a/velox/exec/SubPartitionedSortWindowBuild.cpp +++ b/velox/exec/SubPartitionedSortWindowBuild.cpp @@ -22,6 +22,7 @@ namespace facebook::velox::exec { SubPartitionedSortWindowBuild::SubPartitionedSortWindowBuild( const std::shared_ptr& node, int32_t numSubPartitions, + const core::QueryConfig& queryConfig, velox::memory::MemoryPool* pool, common::PrefixSortConfig&& prefixSortConfig, const common::SpillConfig* spillConfig, @@ -40,8 +41,13 @@ SubPartitionedSortWindowBuild::SubPartitionedSortWindowBuild( for (int i = 0; i < numPartitionKeys_; i++) { keyChannels[i] = inputChannels_[i]; } - subPartitioningFunction_ = std::make_unique( - false, numSubPartitions_, node->inputType(), keyChannels); + subPartitioningFunction_ = createHashPartitionFunction( + /*localExchange=*/false, + numSubPartitions_, + node->inputType(), + keyChannels, + {}, + queryConfig.optimizedHashPartitionFunctionEnabled()); subWindowBuilds_.resize(numSubPartitions_); for (int i = 0; i < numSubPartitions_; i++) { subWindowBuilds_[i] = std::make_unique( @@ -59,7 +65,12 @@ void SubPartitionedSortWindowBuild::addInput(RowVectorPtr input) { VELOX_CHECK_LT(currentSubPartition_, 0); subPartitionIdsBuffer_.resize(input->size()); - subPartitioningFunction_->partition(*input, subPartitionIdsBuffer_); + std::optional singlePartition = + subPartitioningFunction_->partition(*input, subPartitionIdsBuffer_); + if (singlePartition.has_value()) { + simd::simdFill( + subPartitionIdsBuffer_.data(), singlePartition.value(), input->size()); + } for (auto i = 0; i < inputChannels_.size(); ++i) { decodedInputVectors_[i].decode(*input->childAt(inputChannels_[i])); diff --git a/velox/exec/SubPartitionedSortWindowBuild.h b/velox/exec/SubPartitionedSortWindowBuild.h index 8735f438d30..f0da95bdf95 100644 --- a/velox/exec/SubPartitionedSortWindowBuild.h +++ b/velox/exec/SubPartitionedSortWindowBuild.h @@ -33,6 +33,7 @@ class SubPartitionedSortWindowBuild : public WindowBuild { SubPartitionedSortWindowBuild( const std::shared_ptr& node, int32_t numSubPartitions, + const core::QueryConfig& queryConfig, velox::memory::MemoryPool* pool, common::PrefixSortConfig&& prefixSortConfig, const common::SpillConfig* spillConfig, @@ -80,7 +81,7 @@ class SubPartitionedSortWindowBuild : public WindowBuild { exec::SpillStats* const spillStats_; // Divide input rows to the corresponding sub partitions. - std::unique_ptr subPartitioningFunction_; + std::unique_ptr subPartitioningFunction_; // WindowBuilds for each sub partition. std::vector> subWindowBuilds_; diff --git a/velox/exec/Window.cpp b/velox/exec/Window.cpp index f9107522f0a..b763371a801 100644 --- a/velox/exec/Window.cpp +++ b/velox/exec/Window.cpp @@ -75,6 +75,7 @@ Window::Window( windowBuild_ = std::make_unique( windowNode, numSubPartitions, + driverCtx->queryConfig(), pool(), makePrefixSortConfig(driverCtx->queryConfig()), spillConfig, diff --git a/velox/exec/benchmarks/CMakeLists.txt b/velox/exec/benchmarks/CMakeLists.txt index 3962d439833..3ccff61baae 100644 --- a/velox/exec/benchmarks/CMakeLists.txt +++ b/velox/exec/benchmarks/CMakeLists.txt @@ -29,6 +29,18 @@ target_link_libraries( Folly::follybenchmark ) +add_executable( + velox_exec_optimized_hash_partition_function_benchmark + OptimizedHashPartitionFunctionBenchmark.cpp +) + +target_link_libraries( + velox_exec_optimized_hash_partition_function_benchmark + velox_exec + velox_vector_test_lib + Folly::follybenchmark +) + add_executable(velox_filter_project_benchmark FilterProjectBenchmark.cpp) target_link_libraries( diff --git a/velox/exec/benchmarks/OptimizedHashPartitionFunctionBenchmark.cpp b/velox/exec/benchmarks/OptimizedHashPartitionFunctionBenchmark.cpp new file mode 100644 index 00000000000..1b6f3631fe9 --- /dev/null +++ b/velox/exec/benchmarks/OptimizedHashPartitionFunctionBenchmark.cpp @@ -0,0 +1,465 @@ +/* + * Copyright (c) International Business Machines Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include "velox/exec/OptimizedHashPartitionFunction.h" +#include "velox/vector/BaseVector.h" +#include "velox/vector/tests/utils/VectorMaker.h" + +// Add the following definitions to allow Clion runs. +DEFINE_bool(gtest_color, false, ""); +DEFINE_string(gtest_filter, "*", ""); + +using namespace facebook; +using namespace facebook::velox; +using namespace facebook::velox::exec; +using namespace facebook::velox::test; + +namespace { + +constexpr vector_size_t kSize = 10'000; +constexpr vector_size_t kDictionarySize = kSize / 5; + +enum class FunctionKind { + kNormal, + kOptimized, +}; + +enum class EncodingMode { + kFlat, + kDictionary, + kConstant, +}; + +enum class NullMode { + kNoNulls, + kHalfNulls, + kAllNulls, +}; + +enum class PartitionMode { + kRemote, + kLocalExchange, + kHashBitRangeFirst8, + kHashBitRangeLast8, +}; + +template +T makeValue(vector_size_t row) { + return static_cast((row * 8191) ^ (row >> 3)); +} + +template <> +bool makeValue(vector_size_t row) { + return (row & 1) == 0; +} + +template <> +StringView makeValue(vector_size_t row) { + thread_local std::array buffer; + const auto length = 5 + row % 16; + for (vector_size_t index = 0; index < length; ++index) { + buffer[index] = 'a' + (row + index * 7) % 26; + } + return StringView(buffer.data(), length); +} + +std::function makeNulls(NullMode nullMode) { + switch (nullMode) { + case NullMode::kNoNulls: + return nullptr; + case NullMode::kHalfNulls: + return [](vector_size_t row) { return (row & 1) == 0; }; + case NullMode::kAllNulls: + return [](vector_size_t /*row*/) { return true; }; + } + + VELOX_UNREACHABLE(); +} + +VectorPtr wrapInDictionary( + const VectorPtr& base, + vector_size_t size, + memory::MemoryPool* pool, + NullMode nullMode = NullMode::kNoNulls) { + auto indices = AlignedBuffer::allocate(size, pool); + auto* rawIndices = indices->asMutable(); + const auto baseSize = base->size(); + for (vector_size_t row = 0; row < size; ++row) { + rawIndices[row] = (size - row - 1) % baseSize; + } + + BufferPtr nulls; + if (nullMode == NullMode::kHalfNulls) { + nulls = AlignedBuffer::allocate(size, pool); + auto* rawNulls = nulls->asMutable(); + bits::fillBits(rawNulls, 0, size, bits::kNotNull); + for (vector_size_t row = 0; row < size; row += 2) { + bits::setNull(rawNulls, row); + } + } else if (nullMode == NullMode::kAllNulls) { + nulls = AlignedBuffer::allocate(size, pool); + auto* rawNulls = nulls->asMutable(); + bits::fillBits(rawNulls, 0, size, bits::kNull); + } + + return BaseVector::wrapInDictionary(nulls, indices, size, base); +} + +template +VectorPtr makeValuesVector( + VectorMaker& vectorMaker, + memory::MemoryPool* pool, + EncodingMode encodingMode, + NullMode nullMode, + vector_size_t size) { + const auto flatSize = + encodingMode == EncodingMode::kDictionary ? kDictionarySize : size; + auto flat = vectorMaker.flatVector( + flatSize, + [](vector_size_t row) { return makeValue(row); }, + makeNulls(nullMode)); + + switch (encodingMode) { + case EncodingMode::kFlat: + return flat; + case EncodingMode::kDictionary: + return wrapInDictionary(flat, size, pool); + case EncodingMode::kConstant: + if (nullMode == NullMode::kAllNulls) { + return BaseVector::createNullConstant( + CppToType::create(), size, pool); + } + if (nullMode == NullMode::kHalfNulls) { + auto constant = BaseVector::wrapInConstant(size, 1, flat); + // ConstantVector has one nullness for all logical rows. Use a + // dictionary wrapper to express alternating nulls while keeping the + // repeated-value payload constant. + return wrapInDictionary(constant, size, pool, nullMode); + } + return BaseVector::wrapInConstant(size, 0, flat); + } + + VELOX_UNREACHABLE(); +} + +template +std::unique_ptr makePartitionFunction( + PartitionMode partitionMode, + const RowTypePtr& inputType, + int numPartitions) { + switch (partitionMode) { + case PartitionMode::kRemote: + if constexpr (Kind == FunctionKind::kNormal) { + return std::make_unique( + false, numPartitions, inputType, std::vector{0}); + } else { + return std::make_unique( + false, numPartitions, inputType, std::vector{0}); + } + case PartitionMode::kLocalExchange: + if constexpr (Kind == FunctionKind::kNormal) { + return std::make_unique( + true, numPartitions, inputType, std::vector{0}); + } else { + return std::make_unique( + true, numPartitions, inputType, std::vector{0}); + } + case PartitionMode::kHashBitRangeFirst8: + if constexpr (Kind == FunctionKind::kNormal) { + return std::make_unique( + HashBitRange{0, 8}, inputType, std::vector{0}); + } else { + return std::make_unique( + HashBitRange{0, 8}, inputType, std::vector{0}); + } + case PartitionMode::kHashBitRangeLast8: + if constexpr (Kind == FunctionKind::kNormal) { + return std::make_unique( + HashBitRange{56, 64}, inputType, std::vector{0}); + } else { + return std::make_unique( + HashBitRange{56, 64}, inputType, std::vector{0}); + } + } + + VELOX_UNREACHABLE(); +} + +void normalRangeReduction( + const uint64_t* hashes, + uint32_t* partitions, + int size, + uint32_t numPartitions) { + for (int index = 0; index < size; ++index) { + partitions[index] = hashes[index] % numPartitions; + } +} + +template +void runRangeReductionBenchmark(uint32_t iterations, uint32_t numPartitions) { + folly::BenchmarkSuspender suspender; + + std::vector hashes(kSize); + std::vector partitions(kSize); + for (vector_size_t row = 0; row < kSize; ++row) { + hashes[row] = (static_cast(row * 8191) << 32) ^ + static_cast(row * 1315423911ULL + 17); + } + + suspender.dismiss(); + + for (uint32_t iteration = 0; iteration < iterations; ++iteration) { + if constexpr (Kind == FunctionKind::kNormal) { + normalRangeReduction( + hashes.data(), partitions.data(), kSize, numPartitions); + } else { + rangeReduction(hashes.data(), partitions.data(), kSize, numPartitions); + } + folly::doNotOptimizeAway(partitions.data()); + } +} + +template +void runPartitionBenchmark( + uint32_t iterations, + PartitionMode partitionMode, + EncodingMode encodingMode, + NullMode nullMode, + int numPartitions) { + folly::BenchmarkSuspender suspender; + + auto pool = memory::memoryManager()->addLeafPool(); + VectorMaker vectorMaker(pool.get()); + auto values = makeValuesVector( + vectorMaker, pool.get(), encodingMode, nullMode, kSize); + auto input = vectorMaker.rowVector({values}); + auto partitionFunction = makePartitionFunction( + partitionMode, asRowType(input->type()), numPartitions); + std::vector partitions; + + suspender.dismiss(); + + for (uint32_t iteration = 0; iteration < iterations; ++iteration) { + partitionFunction->partition(*input, partitions); + folly::doNotOptimizeAway(partitions.data()); + } +} + +template +void benchmarkNormalHashPartitionFunction( + uint32_t iterations, + PartitionMode partitionMode, + EncodingMode encodingMode, + NullMode nullMode, + int numPartitions) { + runPartitionBenchmark( + iterations, partitionMode, encodingMode, nullMode, numPartitions); +} + +template +void benchmarkOptimizedHashPartitionFunction( + uint32_t iterations, + PartitionMode partitionMode, + EncodingMode encodingMode, + NullMode nullMode, + int numPartitions) { + runPartitionBenchmark( + iterations, partitionMode, encodingMode, nullMode, numPartitions); +} + +#define REGISTER_PARTITION_PAIR( \ + T, \ + TYPE_NAME, \ + PARTITION_MODE, \ + PARTITION_NAME, \ + NUM_PARTITIONS, \ + NUM_PARTITIONS_NAME, \ + ENCODING_MODE, \ + ENCODING_NAME, \ + NULL_MODE, \ + NULL_NAME) \ + BENCHMARK( \ + partition_##TYPE_NAME##_##PARTITION_NAME##_##NUM_PARTITIONS_NAME##_##ENCODING_NAME##_##NULL_NAME, \ + iterations) { \ + benchmarkNormalHashPartitionFunction( \ + iterations, PARTITION_MODE, ENCODING_MODE, NULL_MODE, NUM_PARTITIONS); \ + } \ + BENCHMARK_RELATIVE( \ + optimized_partition_##TYPE_NAME##_##PARTITION_NAME##_##NUM_PARTITIONS_NAME##_##ENCODING_NAME##_##NULL_NAME, \ + iterations) { \ + benchmarkOptimizedHashPartitionFunction( \ + iterations, PARTITION_MODE, ENCODING_MODE, NULL_MODE, NUM_PARTITIONS); \ + } \ + BENCHMARK_DRAW_LINE(); + +#define REGISTER_PARTITION_NULL_MODES( \ + T, \ + TYPE_NAME, \ + PARTITION_MODE, \ + PARTITION_NAME, \ + NUM_PARTITIONS, \ + NUM_PARTITIONS_NAME, \ + ENCODING_MODE, \ + ENCODING_NAME) \ + REGISTER_PARTITION_PAIR( \ + T, \ + TYPE_NAME, \ + PARTITION_MODE, \ + PARTITION_NAME, \ + NUM_PARTITIONS, \ + NUM_PARTITIONS_NAME, \ + ENCODING_MODE, \ + ENCODING_NAME, \ + NullMode::kNoNulls, \ + no_null) \ + REGISTER_PARTITION_PAIR( \ + T, \ + TYPE_NAME, \ + PARTITION_MODE, \ + PARTITION_NAME, \ + NUM_PARTITIONS, \ + NUM_PARTITIONS_NAME, \ + ENCODING_MODE, \ + ENCODING_NAME, \ + NullMode::kHalfNulls, \ + half_null) \ + REGISTER_PARTITION_PAIR( \ + T, \ + TYPE_NAME, \ + PARTITION_MODE, \ + PARTITION_NAME, \ + NUM_PARTITIONS, \ + NUM_PARTITIONS_NAME, \ + ENCODING_MODE, \ + ENCODING_NAME, \ + NullMode::kAllNulls, \ + all_null) + +#define REGISTER_PARTITION_ENCODINGS( \ + T, \ + TYPE_NAME, \ + PARTITION_MODE, \ + PARTITION_NAME, \ + NUM_PARTITIONS, \ + NUM_PARTITIONS_NAME) \ + REGISTER_PARTITION_NULL_MODES( \ + T, \ + TYPE_NAME, \ + PARTITION_MODE, \ + PARTITION_NAME, \ + NUM_PARTITIONS, \ + NUM_PARTITIONS_NAME, \ + EncodingMode::kFlat, \ + flat) \ + REGISTER_PARTITION_NULL_MODES( \ + T, \ + TYPE_NAME, \ + PARTITION_MODE, \ + PARTITION_NAME, \ + NUM_PARTITIONS, \ + NUM_PARTITIONS_NAME, \ + EncodingMode::kDictionary, \ + dictionary) \ + REGISTER_PARTITION_NULL_MODES( \ + T, \ + TYPE_NAME, \ + PARTITION_MODE, \ + PARTITION_NAME, \ + NUM_PARTITIONS, \ + NUM_PARTITIONS_NAME, \ + EncodingMode::kConstant, \ + constant) + +#define REGISTER_PARTITION_COUNTS( \ + T, TYPE_NAME, PARTITION_MODE, PARTITION_NAME) \ + REGISTER_PARTITION_ENCODINGS( \ + T, TYPE_NAME, PARTITION_MODE, PARTITION_NAME, 1, p1) \ + REGISTER_PARTITION_ENCODINGS( \ + T, TYPE_NAME, PARTITION_MODE, PARTITION_NAME, 4, p4) \ + REGISTER_PARTITION_ENCODINGS( \ + T, TYPE_NAME, PARTITION_MODE, PARTITION_NAME, 16, p16) \ + REGISTER_PARTITION_ENCODINGS( \ + T, TYPE_NAME, PARTITION_MODE, PARTITION_NAME, 100, p100) \ + REGISTER_PARTITION_ENCODINGS( \ + T, TYPE_NAME, PARTITION_MODE, PARTITION_NAME, 1'000, p1000) \ + REGISTER_PARTITION_ENCODINGS( \ + T, TYPE_NAME, PARTITION_MODE, PARTITION_NAME, 1'024, p1024) + +#define REGISTER_PARTITION_MODES(T, TYPE_NAME) \ + REGISTER_PARTITION_COUNTS(T, TYPE_NAME, PartitionMode::kRemote, remote) \ + REGISTER_PARTITION_COUNTS( \ + T, TYPE_NAME, PartitionMode::kLocalExchange, local_exchange) \ + REGISTER_PARTITION_ENCODINGS( \ + T, \ + TYPE_NAME, \ + PartitionMode::kHashBitRangeFirst8, \ + hashbits_0_8, \ + 0, \ + hashbits) \ + REGISTER_PARTITION_ENCODINGS( \ + T, \ + TYPE_NAME, \ + PartitionMode::kHashBitRangeLast8, \ + hashbits_last_8, \ + 0, \ + hashbits) + +REGISTER_PARTITION_MODES(bool, bool) +REGISTER_PARTITION_MODES(int8_t, tinyint) +REGISTER_PARTITION_MODES(int16_t, smallint) +REGISTER_PARTITION_MODES(int32_t, integer) +REGISTER_PARTITION_MODES(int64_t, bigint) +REGISTER_PARTITION_MODES(StringView, varchar) + +#define REGISTER_RANGE_REDUCTION_PAIR(NUM_PARTITIONS, NUM_PARTITIONS_NAME) \ + BENCHMARK(normal_range_reduction_##NUM_PARTITIONS_NAME, iterations) { \ + runRangeReductionBenchmark( \ + iterations, NUM_PARTITIONS); \ + } \ + BENCHMARK_RELATIVE( \ + optimized_range_reduction_##NUM_PARTITIONS_NAME, iterations) { \ + runRangeReductionBenchmark( \ + iterations, NUM_PARTITIONS); \ + } \ + BENCHMARK_DRAW_LINE(); + +REGISTER_RANGE_REDUCTION_PAIR(1, p1) +REGISTER_RANGE_REDUCTION_PAIR(4, p4) +REGISTER_RANGE_REDUCTION_PAIR(16, p16) +REGISTER_RANGE_REDUCTION_PAIR(100, p100) +REGISTER_RANGE_REDUCTION_PAIR(1'000, p1000) +REGISTER_RANGE_REDUCTION_PAIR(1'024, p1024) + +#undef REGISTER_PARTITION_MODES +#undef REGISTER_PARTITION_COUNTS +#undef REGISTER_PARTITION_ENCODINGS +#undef REGISTER_PARTITION_NULL_MODES +#undef REGISTER_PARTITION_PAIR +#undef REGISTER_RANGE_REDUCTION_PAIR + +} // namespace + +int main(int argc, char** argv) { + folly::Init init{&argc, &argv}; + memory::MemoryManager::initialize(memory::MemoryManager::Options{}); + folly::runBenchmarks(); + return 0; +} diff --git a/velox/exec/tests/CMakeLists.txt b/velox/exec/tests/CMakeLists.txt index 119fcecc0bd..189e7fc8680 100644 --- a/velox/exec/tests/CMakeLists.txt +++ b/velox/exec/tests/CMakeLists.txt @@ -65,6 +65,7 @@ set( EnforceDistinctTest.cpp TraceUtilTest.cpp HashPartitionFunctionTest.cpp + OptimizedHashPartitionFunctionTest.cpp SpatialIndexTest.cpp ValuesTest.cpp ParallelProjectTest.cpp diff --git a/velox/exec/tests/OptimizedHashPartitionFunctionTest.cpp b/velox/exec/tests/OptimizedHashPartitionFunctionTest.cpp new file mode 100644 index 00000000000..bcb10aa28df --- /dev/null +++ b/velox/exec/tests/OptimizedHashPartitionFunctionTest.cpp @@ -0,0 +1,186 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/exec/OptimizedHashPartitionFunction.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook; +using namespace facebook::velox; +using namespace facebook::velox::exec; + +class OptimizedHashPartitionFunctionTest : public velox::test::VectorTestBase, + public testing::Test { + protected: + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{}); + } +}; + +TEST_F( + OptimizedHashPartitionFunctionTest, + powerOfTwoRangeReductionMatchesMultiplyHigh) { + const std::vector hashes = { + 0, + 1, + 0x0000'0001'0000'0000ULL, + 0x1234'5678'9abc'def0ULL, + 0xffff'ffff'ffff'ffffULL, + }; + + for (const auto numPartitions : {1, 2, 4, 1'024}) { + std::vector partitions(hashes.size()); + rangeReduction( + hashes.data(), + partitions.data(), + static_cast(hashes.size()), + numPartitions); + + std::vector expected; + expected.reserve(hashes.size()); + for (const auto hash : hashes) { + const auto mixedHash = + static_cast(hash) ^ static_cast(hash >> 32); + expected.push_back( + (static_cast(mixedHash) * numPartitions) >> 32); + } + + EXPECT_EQ(partitions, expected); + } +} + +TEST_F( + OptimizedHashPartitionFunctionTest, + optimizedHashBitRangeMatchesRegular) { + const auto numRows = 10'000; + auto input = makeRowVector( + {makeNullableFlatVector([&] { + std::vector> values; + values.reserve(numRows); + for (auto row = 0; row < numRows; ++row) { + values.emplace_back( + row % 17 == 0 ? std::nullopt : std::optional(row * 13)); + } + return values; + }()), + makeFlatVector(numRows, [](auto row) { + return StringView::makeInline(fmt::format("value_{}", row % 97)); + })}); + const auto rowType = asRowType(input->type()); + + HashPartitionFunction regular(HashBitRange{0, 5}, rowType, {0, 1}); + OptimizedHashPartitionFunction optimized(HashBitRange{0, 5}, rowType, {0, 1}); + + std::vector regularPartitions; + std::vector optimizedPartitions; + EXPECT_EQ( + regular.partition(*input, regularPartitions), + optimized.partition(*input, optimizedPartitions)); + EXPECT_EQ(regularPartitions, optimizedPartitions); +} + +TEST_F(OptimizedHashPartitionFunctionTest, onePartitionReturnsConstantResult) { + auto input = makeRowVector({makeConstant(true, 10'000)}); + const auto rowType = asRowType(input->type()); + OptimizedHashPartitionFunction partitionFunction( + /*localExchange=*/true, 1, rowType, {0}); + + std::vector partitions{123}; + EXPECT_EQ(partitionFunction.partition(*input, partitions), 0u); + EXPECT_EQ(partitions, std::vector{123}); +} + +TEST_F(OptimizedHashPartitionFunctionTest, constantKeyReturnsConstantResult) { + const auto numRows = 10'000; + for (const auto& vector : { + makeConstant(true, numRows), + BaseVector::createNullConstant(BOOLEAN(), numRows, pool()), + }) { + auto input = makeRowVector({vector}); + const auto rowType = asRowType(input->type()); + OptimizedHashPartitionFunction optimized( + /*localExchange=*/true, 16, rowType, {0}); + + std::vector optimizedPartitions{123}; + const auto optimizedPartition = + optimized.partition(*input, optimizedPartitions); + ASSERT_TRUE(optimizedPartition.has_value()); + EXPECT_LT(optimizedPartition.value(), 16); + EXPECT_EQ(optimizedPartitions, std::vector{123}); + } +} + +TEST_F(OptimizedHashPartitionFunctionTest, emptyConstantKeyReturnsEmptyResult) { + auto input = makeRowVector({makeConstant(true, 0)}); + const auto rowType = asRowType(input->type()); + OptimizedHashPartitionFunction optimized( + /*localExchange=*/true, 16, rowType, {0}); + + std::vector optimizedPartitions{123}; + EXPECT_EQ(optimized.partition(*input, optimizedPartitions), std::nullopt); + EXPECT_TRUE(optimizedPartitions.empty()); +} + +TEST_F(OptimizedHashPartitionFunctionTest, constantKeyMatchesFlatKey) { + constexpr auto numRows = 10'000; + auto constantInput = makeRowVector({makeConstant(123, numRows)}); + auto flatInput = makeRowVector( + {makeFlatVector(numRows, [](auto /*row*/) { return 123; })}); + const auto rowType = asRowType(constantInput->type()); + + for (const bool localExchange : {false, true}) { + OptimizedHashPartitionFunction constantPartitionFunction( + localExchange, 16, rowType, {0}); + OptimizedHashPartitionFunction flatPartitionFunction( + localExchange, 16, rowType, {0}); + + std::vector constantPartitions{123}; + const auto constantPartition = + constantPartitionFunction.partition(*constantInput, constantPartitions); + ASSERT_TRUE(constantPartition.has_value()); + EXPECT_EQ(constantPartitions, std::vector{123}); + + std::vector flatPartitions; + EXPECT_EQ( + flatPartitionFunction.partition(*flatInput, flatPartitions), + std::nullopt); + EXPECT_EQ( + flatPartitions, std::vector(numRows, *constantPartition)); + } +} + +TEST_F(OptimizedHashPartitionFunctionTest, specUsesConfiguredImplementation) { + auto input = makeRowVector( + {makeFlatVector({1, 2, 3, 4}), + makeFlatVector({"a", "b", "c", "d"})}); + const auto rowType = asRowType(input->type()); + HashPartitionFunctionSpec spec(rowType, std::vector{0, 1}); + auto optimizedFunction = spec.create(8, /*localExchange=*/false, true); + ASSERT_NE( + dynamic_cast(optimizedFunction.get()), + nullptr); + + auto regularFunction = spec.create(8, /*localExchange=*/false); + ASSERT_NE( + dynamic_cast(regularFunction.get()), nullptr); + + std::vector optimizedPartitions; + ASSERT_EQ( + optimizedFunction->partition(*input, optimizedPartitions), std::nullopt); + ASSERT_EQ(optimizedPartitions.size(), input->size()); + for (const auto partition : optimizedPartitions) { + EXPECT_LT(partition, 8); + } +} diff --git a/velox/exec/tests/utils/PlanBuilder.cpp b/velox/exec/tests/utils/PlanBuilder.cpp index 9117cba55ee..468ae20bf79 100644 --- a/velox/exec/tests/utils/PlanBuilder.cpp +++ b/velox/exec/tests/utils/PlanBuilder.cpp @@ -1689,7 +1689,8 @@ class RoundRobinRowPartitionFunctionSpec : public core::PartitionFunctionSpec { public: std::unique_ptr create( int numPartitions, - bool /*localExchange*/) const override { + bool /*localExchange*/, + bool /*useOptimizedPartitionFunction*/ = false) const override { return std::make_unique(numPartitions); }