perf: Introduce OptimizedHashPartitionFunction#2016
Conversation
| int numPartitions, | ||
| bool localExchange) const override; | ||
| bool localExchange, | ||
| const core::QueryConfig* queryConfig) const override; |
There was a problem hiding this comment.
HivePartitionFunctionTest doesn't compile due to no default value provided for queryConfig. We need to either add the default value here or pass nullptr in HivePartitionFunctionTest.
| } | ||
| } | ||
|
|
||
| bool hasConstantKeys( |
There was a problem hiding this comment.
The name hasConstantKeys is a bit confusing. This function returns true only when all partition keys are constant or backed by a constant-encoded input vector. It might be clearer to rename it to allConstantKeys.
| return 0u; | ||
| } | ||
|
|
||
| const auto size = input.size(); |
There was a problem hiding this comment.
The return value needs to be handled since it may return a single partition now.
There was a problem hiding this comment.
Replaced this with
std::optional<uint32_t> singlePartition =
subPartitioningFunction_->partition(*input, subPartitionIdsBuffer_);
if (singlePartition.has_value()) {
simd::simdFill<uint32_t>(
subPartitionIdsBuffer_.data(), singlePartition.value(), input->size());
}
xin-zhang2
left a comment
There was a problem hiding this comment.
@yingsu00 I left a few comments, please take a look when you have a chance. Thanks.
68f3d0a to
aca5238
Compare
aca5238 to
799d1f6
Compare
| {{core::QueryConfig::kOptimizedHashPartitionFunctionEnabled, "true"}}); | ||
|
|
||
| auto optimizedFunction = | ||
| spec.create(8, /*localExchange=*/false, &optimizedConfig); |
There was a problem hiding this comment.
The third argument is bool, so we can directly pass true to it, or use optimizedConfig.optimizedPartitionedOutputEnabled().
799d1f6 to
d7e5827
Compare
|
@xin-zhang2 Thanks for reviewing. I have updated the PR. |
| } | ||
| } | ||
|
|
||
| std::optional<uint32_t> OptimizedHashPartitionFunction::partition( |
There was a problem hiding this comment.
OptimizedHashpartitionFunction is intended to be a drop-in replacement for HashPartitionFunction, it might be expected to produce the same partition assignments as HashPartitionFunction. However, since it uses a different hash algorithm for non-hashBitRange cases, the resulting partition assignments might be different from HashPartitionFunction. Do you think we should add a comment to clarify this?
There was a problem hiding this comment.
different hash algorithm for non-hashBitRange cases
No, the hash algorithms are the same. Both are using Folly's hash functions:
template <bool typeProvidesCustomComparison, TypeKind Kind>
uint64_t hashOne(const DecodedVector& decoded, vector_size_t index) {
if constexpr (
Kind == TypeKind::ROW || Kind == TypeKind::ARRAY ||
Kind == TypeKind::MAP) {
return decoded.base()->hashValueAt(decoded.index(index));
} else {
using T = typename KindToFlatVector<Kind>::HashRowType;
const T value = decoded.valueAt<T>(index);
if constexpr (typeProvidesCustomComparison) {
return static_cast<const CanProvideCustomComparisonType<Kind>*>(
decoded.base()->type().get())
->hash(value);
} else if constexpr (std::is_floating_point_v<T>) {
return util::floating_point::NaNAwareHash<T>()(value);
} else {
return folly::hasher<T>()(value);
}
}
}
It's just the loops that were optimized.
There was a problem hiding this comment.
Maybe I didn't make this clear in my previous comment. I'm not referring to the algorithm used to compute the hash value, but rather how the partition is determined from the hash value.
In HashPartitionFunction, the partition is calculated as
partitions[i] = hashes_[i] % numPartitions_
whereas in OptimizedPartitionFunction, it uses
partitions[index] = reduceRange(hashes[index], numPartitions);
where
FOLLY_ALWAYS_INLINE uint32_t mixedHash(uint64_t hash) {
return static_cast<uint32_t>(hash) ^ static_cast<uint32_t>(hash >> 32);
}
FOLLY_ALWAYS_INLINE uint32_t
reduceRange(uint64_t hash, uint32_t numPartitions) {
return (static_cast<uint64_t>(mixedHash(hash)) * numPartitions) >> 32;
}
These two approaches are not equivalent, so for the same input hash value, they may produce different partition assignments.
| int numPartitions, | ||
| bool localExchange = false) const = 0; | ||
| bool localExchange = false, | ||
| bool useOptimizedPartitionFunction = false) const = 0; |
There was a problem hiding this comment.
PartitionFunctionSpec::create() is intended to be a generic virtual interface and should be ideally implementation-agnostic. While useOptimizedPartitionFunction is specific to HashPartitionFunctionSpec, it exposes implementation details at the interface level. Would there be a cleaner approach here?
There was a problem hiding this comment.
PartitionFunctionSpec::create() is intended to be a generic virtual interface and should be ideally implementation-agnostic. While useOptimizedPartitionFunction is specific to HashPartitionFunctionSpec, it exposes implementation details at the interface level. Would there be a cleaner approach here?
I will make optimized versions for other PartitionFunctions in near future too, so this is generic for all PartitionFunctions. Otherwise, do you have other suggestions?
There was a problem hiding this comment.
I added a comment in PlanNode.h:
/// TODO: useOptimizedPartitionFunction = true is only supported in
/// HashPartitionFunction now. Will extend the optimization to other
/// PartitionFunctions soon.
| return std::nullopt; | ||
| } | ||
|
|
||
| uint64_t hash; |
There was a problem hiding this comment.
It would be better to add a VELOX_DCHECK_GT(decoded_.size(), 0) check before calling isNullAt(0)
d7e5827 to
15b6e1a
Compare
Introduce OptimizedHashPartitionFunction as a faster drop-in replacement for HashPartitionFunction, gated behind a new query config flag optimized_hash_partition_function_enabled (default false). partition() is improved from 50% to over 200x. Add HashPartitionFunctionBase as a common base exposing numPartitions(), and createHashPartitionFunction() factories that select the implementation based on the flag. Thread QueryConfig* through PartitionFunctionSpec::create() and update callsites (LocalPartition, PartitionedOutput, MarkDistinct, RowNumber, Window, SubPartitionedSortWindowBuild, HiveConnector) to construct partition functions via the factory. Register CMake targets for the new test and benchmark binaries.
15b6e1a to
24bd9b8
Compare
| suspender.dismiss(); | ||
|
|
||
| for (uint32_t iteration = 0; iteration < iterations; ++iteration) { | ||
| partitionFunction->partition(*input, partitions); |
There was a problem hiding this comment.
We need logic to handle the case when it returns a single value.
There is another call site in PartitionedVectorTestBase::partitionRowVectors that doesn't handle this. That function is not currenlty used, but it would be better to either remove the function or address it as well.
This PR contains 2 commits:
perf: Add AVX512 support
Introduce OptimizedHashPartitionFunction
Introduce OptimizedHashPartitionFunction as a faster drop-in replacement
for HashPartitionFunction, gated behind a new query config flag
optimized_hash_partition_function_enabled (default false). partition()
is improved from 50% to over 200x.
Add HashPartitionFunctionBase as a common base exposing numPartitions(),
and createHashPartitionFunction() factories that select the
implementation based on the flag. Thread QueryConfig* through
PartitionFunctionSpec::create() and update callsites (LocalPartition,
PartitionedOutput, MarkDistinct, RowNumber, Window,
SubPartitionedSortWindowBuild, HiveConnector) to construct partition
functions via the factory.
Register CMake targets for the new test and benchmark binaries.