Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions scripts/setup-helper-functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ function github_checkout {
# The values that CPU_ARCH can take are as follows:
# arm64 : Target Apple silicon.
# aarch64: Target general 64 bit arm cpus.
# avx: Target Intel CPUs with AVX.
# avx512: Target Intel CPUs with AVX-512.
# avx: Target Intel CPUs with AVX2.
# sse: Target Intel CPUs with sse.
# Echo's the appropriate compiler flags which can be captured as so
# CXX_FLAGS=$(get_cxx_flags) or
Expand All @@ -102,7 +103,12 @@ function get_cxx_flags {
else # x86_64
local CPU_CAPABILITIES
CPU_CAPABILITIES=$(sysctl -a | grep machdep.cpu.features | awk '{print tolower($0)}')
if [[ $CPU_CAPABILITIES =~ "avx" ]]; then
if [[ $CPU_CAPABILITIES =~ "avx512f" ]] &&
[[ $CPU_CAPABILITIES =~ "avx512dq" ]] &&
[[ $CPU_CAPABILITIES =~ "avx512bw" ]] &&
[[ $CPU_CAPABILITIES =~ "avx512vl" ]]; then
CPU_ARCH="avx512"
elif [[ $CPU_CAPABILITIES =~ "avx" ]]; then
CPU_ARCH="avx"
else
CPU_ARCH="sse"
Expand All @@ -114,7 +120,12 @@ function get_cxx_flags {
else # x86_64
local CPU_CAPABILITIES
CPU_CAPABILITIES=$(cat /proc/cpuinfo | grep flags | head -n 1 | awk '{print tolower($0)}')
if [[ $CPU_CAPABILITIES =~ "avx" ]]; then
if [[ $CPU_CAPABILITIES =~ "avx512f" ]] &&
[[ $CPU_CAPABILITIES =~ "avx512dq" ]] &&
[[ $CPU_CAPABILITIES =~ "avx512bw" ]] &&
[[ $CPU_CAPABILITIES =~ "avx512vl" ]]; then
CPU_ARCH="avx512"
elif [[ $CPU_CAPABILITIES =~ "avx" ]]; then
CPU_ARCH="avx"
elif [[ $CPU_CAPABILITIES =~ "sse" ]]; then
CPU_ARCH="sse"
Expand All @@ -131,8 +142,12 @@ function get_cxx_flags {
echo -n "-mcpu=apple-m1+crc"
;;

"avx512")
echo -n "-mavx512f -mavx512dq -mavx512bw -mavx512vl -mavx2 -mfma -mavx -mf16c -mlzcnt -mbmi2"
;;

"avx")
echo -n "-mavx2 -mfma -mavx -mf16c -mlzcnt -mbmi2"
echo -n "-mavx2 -mfma -mavx -mf16c -mlzcnt -mbmi2"
;;

"sse")
Expand Down
11 changes: 11 additions & 0 deletions velox/common/process/ProcessBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ DECLARE_bool(avx2); // Enables use of AVX2 when available NOLINT

DECLARE_bool(bmi2); // Enables use of BMI2 when available NOLINT

DECLARE_bool(avx512f);

namespace facebook {
namespace velox {
namespace process {
Expand Down Expand Up @@ -106,6 +108,7 @@ uint64_t threadCpuNanos() {
namespace {
bool bmi2CpuFlag = folly::CpuId().bmi2();
bool avx2CpuFlag = folly::CpuId().avx2();
bool avx512fCpuFlag = folly::CpuId().avx512f();
} // namespace

bool hasAvx2() {
Expand All @@ -124,6 +127,14 @@ bool hasBmi2() {
#endif
}

bool hasAvx512f() {
#ifdef __AVX512F__
return avx512fCpuFlag && FLAGS_avx512f;
#else
return false;
#endif
}

} // namespace process
} // namespace velox
} // namespace facebook
4 changes: 4 additions & 0 deletions velox/common/process/ProcessBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ uint64_t threadCpuNanos();
/// by flag.
bool hasAvx2();

/// True if the machine has Intel AVX512F instructions and these are not
/// disabled by flag.
bool hasAvx512f();

/// True if the machine has Intel BMI2 instructions and these are not disabled
/// by flag.
bool hasBmi2();
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ void HiveConnector::registerSerDe() {

std::unique_ptr<core::PartitionFunction> HivePartitionFunctionSpec::create(
int numPartitions,
bool localExchange) const {
bool localExchange,
bool /*useOptimizedPartitionFunction*/) const {
std::vector<int> bucketToPartitions;
if (bucketToPartition_.empty()) {
// NOTE: if hive partition function spec doesn't specify bucket to partition
Expand Down
3 changes: 2 additions & 1 deletion velox/connectors/hive/HiveConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ class HivePartitionFunctionSpec : public core::PartitionFunctionSpec {

std::unique_ptr<core::PartitionFunction> create(
int numPartitions,
bool localExchange) const override;
bool localExchange,
bool useOptimizedPartitionFunction = false) const override;

std::string toString() const override;

Expand Down
9 changes: 7 additions & 2 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -2500,9 +2500,13 @@ class PartitionFunctionSpec : public ISerializable {
public:
/// If 'localExchange' is true, the partition function is used for local
/// exchange within a velox task.
/// TODO: useOptimizedPartitionFunction = true is only supported in
/// HashPartitionFunction now. Will extend the optimization to other
/// PartitionFunctions soon.
virtual std::unique_ptr<PartitionFunction> create(
int numPartitions,
bool localExchange = false) const = 0;
bool localExchange = false,
bool useOptimizedPartitionFunction = false) const = 0;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionFunctionSpec::create() is intended to be a generic virtual interface and should be ideally implementation-agnostic. While useOptimizedPartitionFunction is specific to HashPartitionFunctionSpec, it exposes implementation details at the interface level. Would there be a cleaner approach here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartitionFunctionSpec::create() is intended to be a generic virtual interface and should be ideally implementation-agnostic. While useOptimizedPartitionFunction is specific to HashPartitionFunctionSpec, it exposes implementation details at the interface level. Would there be a cleaner approach here?

I will make optimized versions for other PartitionFunctions in near future too, so this is generic for all PartitionFunctions. Otherwise, do you have other suggestions?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment in PlanNode.h:

  /// TODO: useOptimizedPartitionFunction = true is only supported in
  /// HashPartitionFunction now. Will extend the optimization to other
  /// PartitionFunctions soon.


virtual ~PartitionFunctionSpec() = default;

Expand All @@ -2515,7 +2519,8 @@ class GatherPartitionFunctionSpec : public PartitionFunctionSpec {
public:
std::unique_ptr<PartitionFunction> create(
int /*numPartitions*/,
bool /*localExchange*/) const override {
bool /*localExchange*/,
bool /*useOptimizedPartitionFunction*/ = false) const override {
VELOX_UNREACHABLE();
}

Expand Down
1 change: 1 addition & 0 deletions velox/core/QueryConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ const std::vector<config::ConfigProperty>& QueryConfig::registeredProperties() {

// Partitioned output.
VELOX_REGISTER_QUERY_CONFIG(kPartitionedOutputEagerFlush);
VELOX_REGISTER_QUERY_CONFIG(kOptimizedHashPartitionFunctionEnabled);
VELOX_REGISTER_QUERY_CONFIG(kMaxPartitionedOutputBufferSize);
VELOX_REGISTER_QUERY_CONFIG(kMaxOutputBufferSize);

Expand Down
10 changes: 10 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,16 @@ class QueryConfig {
false,
"Flush PartitionedOutput rows eagerly without buffering.")

/// If true, use OptimizedHashPartitionFunction in place of
/// HashPartitionFunction.
VELOX_QUERY_CONFIG(
kOptimizedHashPartitionFunctionEnabled,
optimizedHashPartitionFunctionEnabled,
"optimized_hash_partition_function_enabled",
bool,
false,
"Use OptimizedHashPartitionFunction instead of HashPartitionFunction.")

/// The maximum number of bytes to buffer in PartitionedOutput operator to
/// avoid creating tiny SerializedPages.
VELOX_QUERY_CONFIG(
Expand Down
1 change: 1 addition & 0 deletions velox/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ velox_add_library(
OperatorTraceScan.cpp
OperatorTraceWriter.cpp
OperatorUtils.cpp
OptimizedHashPartitionFunction.cpp
OptimizedPartitionedOutput.cpp
OptimizedVectorHasher.cpp
OrderBy.cpp
Expand Down
47 changes: 42 additions & 5 deletions velox/exec/HashPartitionFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <velox/exec/HashPartitionFunction.h>
#include <velox/exec/VectorHasher.h>
#include "velox/exec/HashPartitionFunction.h"

#include "velox/exec/OptimizedHashPartitionFunction.h"
#include "velox/exec/VectorHasher.h"

#define XXH_INLINE_ALL
#include <xxhash.h> // @manual=third-party//xxHash:xxhash
Expand Down Expand Up @@ -123,9 +125,15 @@ std::optional<uint32_t> HashPartitionFunction::partition(

std::unique_ptr<core::PartitionFunction> HashPartitionFunctionSpec::create(
int numPartitions,
bool localExchange) const {
return std::make_unique<exec::HashPartitionFunction>(
localExchange, numPartitions, inputType_, keyChannels_, constValues_);
bool localExchange,
bool useOptimizedPartitionFunction) const {
return createHashPartitionFunction(
localExchange,
numPartitions,
inputType_,
keyChannels_,
constValues_,
useOptimizedPartitionFunction);
}

std::string HashPartitionFunctionSpec::toString() const {
Expand Down Expand Up @@ -180,4 +188,33 @@ core::PartitionFunctionSpecPtr HashPartitionFunctionSpec::deserialize(
return std::make_shared<HashPartitionFunctionSpec>(
ISerializable::deserialize<RowType>(obj["inputType"]), keys, constValues);
}

std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
bool localExchange,
int numPartitions,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues,
bool useOptimizedPartitionFunction) {
if (useOptimizedPartitionFunction) {
return std::make_unique<OptimizedHashPartitionFunction>(
localExchange, numPartitions, inputType, keyChannels, constValues);
}
return std::make_unique<HashPartitionFunction>(
localExchange, numPartitions, inputType, keyChannels, constValues);
}

std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
const HashBitRange& hashBitRange,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues,
bool useOptimizedPartitionFunction) {
if (useOptimizedPartitionFunction) {
return std::make_unique<OptimizedHashPartitionFunction>(
hashBitRange, inputType, keyChannels, constValues);
}
return std::make_unique<HashPartitionFunction>(
hashBitRange, inputType, keyChannels, constValues);
}
} // namespace facebook::velox::exec
38 changes: 33 additions & 5 deletions velox/exec/HashPartitionFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
#pragma once

#include <velox/exec/HashBitRange.h>
#include <velox/exec/VectorHasher.h>
#include "velox/core/PlanNode.h"
#include "velox/exec/HashBitRange.h"
#include "velox/exec/VectorHasher.h"

namespace facebook::velox::exec {

Expand All @@ -27,7 +27,16 @@ namespace facebook::velox::exec {
/// numPartitions allows the keyChannels argument to be empty. If keyChannels is
/// empty, then the resulting partition number of partition() will always be
/// zero.
class HashPartitionFunction : public core::PartitionFunction {
/// Extends PartitionFunction with access to the configured number of
/// partitions.
class HashPartitionFunctionBase : public core::PartitionFunction {
public:
~HashPartitionFunctionBase() override = default;

virtual int numPartitions() const = 0;
};

class HashPartitionFunction : public HashPartitionFunctionBase {
public:
HashPartitionFunction(
bool localExchange,
Expand All @@ -48,7 +57,7 @@ class HashPartitionFunction : public core::PartitionFunction {
const RowVector& input,
std::vector<uint32_t>& partitions) override;

int numPartitions() const {
int numPartitions() const override {
return numPartitions_;
}

Expand Down Expand Up @@ -85,7 +94,8 @@ class HashPartitionFunctionSpec : public core::PartitionFunctionSpec {

std::unique_ptr<core::PartitionFunction> create(
int numPartitions,
bool localExchange) const override;
bool localExchange,
bool useOptimizedPartitionFunction = false) const override;

std::string toString() const override;

Expand All @@ -100,4 +110,22 @@ class HashPartitionFunctionSpec : public core::PartitionFunctionSpec {
const std::vector<column_index_t> keyChannels_;
const std::vector<VectorPtr> constValues_;
};

/// Creates either HashPartitionFunction or OptimizedHashPartitionFunction
/// based on 'useOptimizedPartitionFunction'.
std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
bool localExchange,
int numPartitions,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues = {},
bool useOptimizedPartitionFunction = false);

std::unique_ptr<HashPartitionFunctionBase> createHashPartitionFunction(
const HashBitRange& hashBitRange,
const RowTypePtr& inputType,
const std::vector<column_index_t>& keyChannels,
const std::vector<VectorPtr>& constValues = {},
bool useOptimizedPartitionFunction = false);

} // namespace facebook::velox::exec
11 changes: 7 additions & 4 deletions velox/exec/LocalPartition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,13 @@ LocalPartition::LocalPartition(
ctx->task->getLocalExchangeQueues(ctx->splitGroupId, planNode->id())},
numPartitions_{queues_.size()},
partitionFunction_(
numPartitions_ == 1 ? nullptr
: planNode->partitionFunctionSpec().create(
numPartitions_,
/*localExchange=*/true)),
numPartitions_ == 1
? nullptr
: planNode->partitionFunctionSpec().create(
numPartitions_,
/*localExchange=*/true,
ctx->queryConfig()
.optimizedHashPartitionFunctionEnabled())),
singlePartitionBufferSize_{
(numPartitions_ <
ctx->queryConfig()
Expand Down
10 changes: 8 additions & 2 deletions velox/exec/MarkDistinct.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,14 @@ void MarkDistinct::setupInputSpiller(
&spillConfig_.value(),
spillStats_.get());

spillHashFunction_ = std::make_unique<HashPartitionFunction>(
inputSpiller_->hashBits(), inputType_, distinctKeyChannels_);
spillHashFunction_ = createHashPartitionFunction(
inputSpiller_->hashBits(),
inputType_,
distinctKeyChannels_,
{},
operatorCtx_->driverCtx()
->queryConfig()
.optimizedHashPartitionFunctionEnabled());
}

void MarkDistinct::spill() {
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/MarkDistinct.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class MarkDistinct : public Operator {

SpillPartitionSet spillInputPartitionSet_;

std::unique_ptr<HashPartitionFunction> spillHashFunction_;
std::unique_ptr<HashPartitionFunctionBase> spillHashFunction_;

SpillPartitionSet spillHashTablePartitionSet_;

Expand Down
Loading
Loading