From 4b71fd515feb928938f878b54738c3e7943bae9d Mon Sep 17 00:00:00 2001 From: Rong Ma Date: Wed, 25 Mar 2026 15:46:51 +0000 Subject: [PATCH] use immutable cudf config --- cpp/velox/compute/VeloxBackend.cc | 67 ++++++++++++++----- cpp/velox/compute/VeloxBackend.h | 1 + cpp/velox/compute/WholeStageResultIterator.cc | 12 ++-- .../apache/gluten/config/GlutenConfig.scala | 4 +- 4 files changed, 57 insertions(+), 27 deletions(-) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index 4738d2e3a6a5..fb21b08ab914 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -39,6 +39,7 @@ #include "jni/JniFileSystem.h" #include "memory/GlutenBufferedInputBuilder.h" #include "operators/functions/SparkExprToSubfieldFilterParser.h" +#include "operators/plannodes/RowVectorStream.h" #include "shuffle/ArrowShuffleDictionaryWriter.h" #include "udf/UdfLoader.h" #include "utils/Exception.h" @@ -47,7 +48,6 @@ #include "velox/connectors/hive/BufferedInputBuilder.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveDataSource.h" -#include "operators/plannodes/RowVectorStream.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" @@ -73,6 +73,17 @@ using namespace facebook; namespace gluten { namespace { + +bool hasCudaRuntimeAndDevice() { +#ifdef GLUTEN_ENABLE_GPU + int count = 0; + cudaError_t err = cudaGetDeviceCount(&count); + return err == cudaSuccess && count > 0; +#else + return false; +#endif +} + MemoryManager* veloxMemoryManagerFactory(const std::string& kind, std::unique_ptr listener) { return new VeloxMemoryManager(kind, std::move(listener), *VeloxBackend::get()->getBackendConf()); } @@ -81,13 +92,20 @@ void veloxMemoryManagerReleaser(MemoryManager* memoryManager) { delete memoryManager; } -Runtime* veloxRuntimeFactory( - const std::string& kind, - MemoryManager* memoryManager, - const std::unordered_map& sessionConf) { - auto* vmm = dynamic_cast(memoryManager); - GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager"); - return new VeloxRuntime(kind, vmm, sessionConf); +Runtime::Factory createVeloxRuntimeFactory(std::unordered_map immutableConf) { + return [immutableConf = std::move(immutableConf)]( + const std::string& kind, + MemoryManager* memoryManager, + const std::unordered_map& sessionConf) -> Runtime* { + auto* vmm = dynamic_cast(memoryManager); + GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager"); + + std::unordered_map confMap(immutableConf); + // immutableConf takes precedence; sessionConf only fills missing keys. + confMap.insert(sessionConf.begin(), sessionConf.end()); + + return new VeloxRuntime(kind, vmm, confMap); + }; } void veloxRuntimeReleaser(Runtime* runtime) { @@ -105,7 +123,15 @@ void VeloxBackend::init( // Register factories. MemoryManager::registerFactory(kVeloxBackendKind, veloxMemoryManagerFactory, veloxMemoryManagerReleaser); - Runtime::registerFactory(kVeloxBackendKind, veloxRuntimeFactory, veloxRuntimeReleaser); + + // Set immutable configurations from backend conf. + const bool enableCudf = backendConf_->get(kCudfEnabled, kCudfEnabledDefault) && hasCudaRuntimeAndDevice(); + const bool enableCudfTableScan = + enableCudf && backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault); + std::unordered_map immutableConf = { + {kCudfEnabled, enableCudf ? "true" : "false"}, {kCudfEnableTableScan, enableCudfTableScan ? "true" : "false"}}; + Runtime::registerFactory( + kVeloxBackendKind, createVeloxRuntimeFactory(std::move(immutableConf)), veloxRuntimeReleaser); if (backendConf_->get(kDebugModeEnabled, false)) { LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigs()); @@ -170,7 +196,7 @@ void VeloxBackend::init( #endif #ifdef GLUTEN_ENABLE_GPU - if (backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { + if (enableCudf) { std::unordered_map options = { {velox::cudf_velox::CudfConfig::kCudfEnabled, "true"}, {velox::cudf_velox::CudfConfig::kCudfDebugEnabled, backendConf_->get(kDebugCudf, kDebugCudfDefault)}, @@ -189,6 +215,12 @@ void VeloxBackend::init( initJolFilesystem(); initConnector(hiveConf); +#ifdef GLUTEN_ENABLE_GPU + if (enableCudfTableScan) { + initCudfConnector(hiveConf); + } +#endif + velox::dwio::common::registerFileSinks(); velox::parquet::registerParquetReaderFactory(); velox::parquet::registerParquetWriterFactory(); @@ -318,20 +350,19 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); - + // Register value-stream connector for runtime iterator-based inputs auto valueStreamDynamicFilterEnabled = backendConf_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); velox::connector::registerConnector( std::make_shared(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled)); - +} + +void VeloxBackend::initCudfConnector(const std::shared_ptr& hiveConf) { #ifdef GLUTEN_ENABLE_GPU - if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && - backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { - facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; - auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, hiveConf, ioExecutor_.get()); - facebook::velox::connector::registerConnector(hiveConnector); - } + facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory; + auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, hiveConf, ioExecutor_.get()); + facebook::velox::connector::registerConnector(hiveConnector); #endif } diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index 9176b369778c..554ea05720ea 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -73,6 +73,7 @@ class VeloxBackend { void init(std::unique_ptr listener, const std::unordered_map& conf); void initCache(); void initConnector(const std::shared_ptr& hiveConf); + void initCudfConnector(const std::shared_ptr& hiveConf); void initUdf(); std::unique_ptr initSsdCache(uint64_t ssdSize); diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 3c0505263159..83a5662ce7d4 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -25,14 +25,13 @@ #include "velox/exec/PlanNodeStats.h" #ifdef GLUTEN_ENABLE_GPU #include +#include "cudf/GpuLock.h" #include "velox/experimental/cudf/CudfConfig.h" #include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h" #include "velox/experimental/cudf/exec/ToCudf.h" -#include "cudf/GpuLock.h" #endif #include "operators/plannodes/RowVectorStream.h" - using namespace facebook; namespace gluten { @@ -358,14 +357,15 @@ void WholeStageResultIterator::constructPartitionColumns( } void WholeStageResultIterator::addIteratorSplits(const std::vector>& inputIterators) { - GLUTEN_CHECK(!allSplitsAdded_, "Method addIteratorSplits should not be called since all splits has been added to the Velox task."); + GLUTEN_CHECK( + !allSplitsAdded_, + "Method addIteratorSplits should not be called since all splits has been added to the Velox task."); // Create IteratorConnectorSplit for each iterator for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) { if (inputIterators[i] == nullptr) { continue; } - auto connectorSplit = std::make_shared( - kIteratorConnectorId, inputIterators[i]); + auto connectorSplit = std::make_shared(kIteratorConnectorId, inputIterators[i]); exec::Split split(folly::copy(connectorSplit), -1); task_->addSplit(streamIds_[i], std::move(split)); } @@ -385,7 +385,7 @@ void WholeStageResultIterator::noMoreSplits() { for (const auto& scanNodeId : scanNodeIds_) { task_->noMoreSplits(scanNodeId); } - + // Mark no more splits for all stream nodes for (const auto& streamId : streamIds_) { task_->noMoreSplits(streamId); diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 371c2b96091a..46cf2aeed912 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -468,7 +468,6 @@ object GlutenConfig extends ConfigRegistry { GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key, COLUMNAR_MAX_BATCH_SIZE.key, SHUFFLE_WRITER_BUFFER_SIZE.key, - COLUMNAR_CUDF_ENABLED.key, SQLConf.LEGACY_SIZE_OF_NULL.key, SQLConf.LEGACY_STATISTICAL_AGGREGATE.key, SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key, @@ -515,8 +514,7 @@ object GlutenConfig extends ConfigRegistry { "spark.gluten.sql.columnar.backend.velox.memoryUseHugePages", "spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct", "spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks", - "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes", - "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan" + "spark.gluten.sql.columnar.backend.velox.preferredBatchBytes" ) /** Get dynamic configs. */