Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 49 additions & 18 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "jni/JniFileSystem.h"
#include "memory/GlutenBufferedInputBuilder.h"
#include "operators/functions/SparkExprToSubfieldFilterParser.h"
#include "operators/plannodes/RowVectorStream.h"
#include "shuffle/ArrowShuffleDictionaryWriter.h"
#include "udf/UdfLoader.h"
#include "utils/Exception.h"
Expand All @@ -47,7 +48,6 @@
#include "velox/connectors/hive/BufferedInputBuilder.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual
#include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
Expand All @@ -73,6 +73,17 @@ using namespace facebook;
namespace gluten {

namespace {

bool hasCudaRuntimeAndDevice() {
#ifdef GLUTEN_ENABLE_GPU
int count = 0;
cudaError_t err = cudaGetDeviceCount(&count);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this code would execute on CPU node, but is the function executes by header, if not, the cuda library does not exist in CPU node, I'm not sure if it can run successfully. If you very it can run well on CPU node without CUDA environment, we may need to add a comment on it.

And the common way is to check if nvidia-smi command exists, if exists, we can check further.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we execute the GPU build on a cpu node without CUDA Runtime installed, the process will fail early when loading libvelox.so and reporting the cuda library is missing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new problem, we should not require user to install CUDA in CPU node, the build pipeline may also need to be updated

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create an issue to track this, thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added #11844

return err == cudaSuccess && count > 0;
#else
return false;
#endif
}

MemoryManager* veloxMemoryManagerFactory(const std::string& kind, std::unique_ptr<AllocationListener> listener) {
return new VeloxMemoryManager(kind, std::move(listener), *VeloxBackend::get()->getBackendConf());
}
Expand All @@ -81,13 +92,20 @@ void veloxMemoryManagerReleaser(MemoryManager* memoryManager) {
delete memoryManager;
}

Runtime* veloxRuntimeFactory(
const std::string& kind,
MemoryManager* memoryManager,
const std::unordered_map<std::string, std::string>& sessionConf) {
auto* vmm = dynamic_cast<VeloxMemoryManager*>(memoryManager);
GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager");
return new VeloxRuntime(kind, vmm, sessionConf);
Runtime::Factory createVeloxRuntimeFactory(std::unordered_map<std::string, std::string> immutableConf) {
return [immutableConf = std::move(immutableConf)](
const std::string& kind,
MemoryManager* memoryManager,
const std::unordered_map<std::string, std::string>& sessionConf) -> Runtime* {
auto* vmm = dynamic_cast<VeloxMemoryManager*>(memoryManager);
GLUTEN_CHECK(vmm != nullptr, "Not a Velox memory manager");

std::unordered_map<std::string, std::string> confMap(immutableConf);
// immutableConf takes precedence; sessionConf only fills missing keys.
confMap.insert(sessionConf.begin(), sessionConf.end());

return new VeloxRuntime(kind, vmm, confMap);
};
}

void veloxRuntimeReleaser(Runtime* runtime) {
Expand All @@ -105,7 +123,15 @@ void VeloxBackend::init(

// Register factories.
MemoryManager::registerFactory(kVeloxBackendKind, veloxMemoryManagerFactory, veloxMemoryManagerReleaser);
Runtime::registerFactory(kVeloxBackendKind, veloxRuntimeFactory, veloxRuntimeReleaser);

// Set immutable configurations from backend conf.
const bool enableCudf = backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault) && hasCudaRuntimeAndDevice();
const bool enableCudfTableScan =
enableCudf && backendConf_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault);
std::unordered_map<std::string, std::string> immutableConf = {
{kCudfEnabled, enableCudf ? "true" : "false"}, {kCudfEnableTableScan, enableCudfTableScan ? "true" : "false"}};
Runtime::registerFactory(
kVeloxBackendKind, createVeloxRuntimeFactory(std::move(immutableConf)), veloxRuntimeReleaser);

if (backendConf_->get<bool>(kDebugModeEnabled, false)) {
LOG(INFO) << "VeloxBackend config:" << printConfig(backendConf_->rawConfigs());
Expand Down Expand Up @@ -170,7 +196,7 @@ void VeloxBackend::init(
#endif

#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
if (enableCudf) {
std::unordered_map<std::string, std::string> options = {
{velox::cudf_velox::CudfConfig::kCudfEnabled, "true"},
{velox::cudf_velox::CudfConfig::kCudfDebugEnabled, backendConf_->get(kDebugCudf, kDebugCudfDefault)},
Expand All @@ -189,6 +215,12 @@ void VeloxBackend::init(
initJolFilesystem();
initConnector(hiveConf);

#ifdef GLUTEN_ENABLE_GPU
if (enableCudfTableScan) {
initCudfConnector(hiveConf);
}
#endif

velox::dwio::common::registerFileSinks();
velox::parquet::registerParquetReaderFactory();
velox::parquet::registerParquetWriterFactory();
Expand Down Expand Up @@ -318,20 +350,19 @@ void VeloxBackend::initConnector(const std::shared_ptr<velox::config::ConfigBase
}
velox::connector::registerConnector(
std::make_shared<velox::connector::hive::HiveConnector>(kHiveConnectorId, hiveConf, ioExecutor_.get()));

// Register value-stream connector for runtime iterator-based inputs
auto valueStreamDynamicFilterEnabled =
backendConf_->get<bool>(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault);
velox::connector::registerConnector(
std::make_shared<ValueStreamConnector>(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled));

}

void VeloxBackend::initCudfConnector(const std::shared_ptr<velox::config::ConfigBase>& hiveConf) {
#ifdef GLUTEN_ENABLE_GPU
if (backendConf_->get<bool>(kCudfEnableTableScan, kCudfEnableTableScanDefault) &&
backendConf_->get<bool>(kCudfEnabled, kCudfEnabledDefault)) {
facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory;
auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, hiveConf, ioExecutor_.get());
facebook::velox::connector::registerConnector(hiveConnector);
}
facebook::velox::cudf_velox::connector::hive::CudfHiveConnectorFactory factory;
auto hiveConnector = factory.newConnector(kCudfHiveConnectorId, hiveConf, ioExecutor_.get());
facebook::velox::connector::registerConnector(hiveConnector);
#endif
}

Expand Down
1 change: 1 addition & 0 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class VeloxBackend {
void init(std::unique_ptr<AllocationListener> listener, const std::unordered_map<std::string, std::string>& conf);
void initCache();
void initConnector(const std::shared_ptr<facebook::velox::config::ConfigBase>& hiveConf);
void initCudfConnector(const std::shared_ptr<facebook::velox::config::ConfigBase>& hiveConf);
void initUdf();
std::unique_ptr<facebook::velox::cache::SsdCache> initSsdCache(uint64_t ssdSize);

Expand Down
12 changes: 6 additions & 6 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@
#include "velox/exec/PlanNodeStats.h"
#ifdef GLUTEN_ENABLE_GPU
#include <cudf/io/types.hpp>
#include "cudf/GpuLock.h"
#include "velox/experimental/cudf/CudfConfig.h"
#include "velox/experimental/cudf/connectors/hive/CudfHiveConnectorSplit.h"
#include "velox/experimental/cudf/exec/ToCudf.h"
#include "cudf/GpuLock.h"
#endif
#include "operators/plannodes/RowVectorStream.h"


using namespace facebook;

namespace gluten {
Expand Down Expand Up @@ -358,14 +357,15 @@ void WholeStageResultIterator::constructPartitionColumns(
}

void WholeStageResultIterator::addIteratorSplits(const std::vector<std::shared_ptr<ResultIterator>>& inputIterators) {
GLUTEN_CHECK(!allSplitsAdded_, "Method addIteratorSplits should not be called since all splits has been added to the Velox task.");
GLUTEN_CHECK(
!allSplitsAdded_,
"Method addIteratorSplits should not be called since all splits has been added to the Velox task.");
// Create IteratorConnectorSplit for each iterator
for (size_t i = 0; i < streamIds_.size() && i < inputIterators.size(); ++i) {
if (inputIterators[i] == nullptr) {
continue;
}
auto connectorSplit = std::make_shared<IteratorConnectorSplit>(
kIteratorConnectorId, inputIterators[i]);
auto connectorSplit = std::make_shared<IteratorConnectorSplit>(kIteratorConnectorId, inputIterators[i]);
exec::Split split(folly::copy(connectorSplit), -1);
task_->addSplit(streamIds_[i], std::move(split));
}
Expand All @@ -385,7 +385,7 @@ void WholeStageResultIterator::noMoreSplits() {
for (const auto& scanNodeId : scanNodeIds_) {
task_->noMoreSplits(scanNodeId);
}

// Mark no more splits for all stream nodes
for (const auto& streamId : streamIds_) {
task_->noMoreSplits(streamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,6 @@ object GlutenConfig extends ConfigRegistry {
GlutenCoreConfig.COLUMNAR_TASK_OFFHEAP_SIZE_IN_BYTES.key,
COLUMNAR_MAX_BATCH_SIZE.key,
SHUFFLE_WRITER_BUFFER_SIZE.key,
COLUMNAR_CUDF_ENABLED.key,
SQLConf.LEGACY_SIZE_OF_NULL.key,
SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key,
Expand Down Expand Up @@ -515,8 +514,7 @@ object GlutenConfig extends ConfigRegistry {
"spark.gluten.sql.columnar.backend.velox.memoryUseHugePages",
"spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct",
"spark.gluten.sql.columnar.backend.velox.memoryPoolCapacityTransferAcrossTasks",
"spark.gluten.sql.columnar.backend.velox.preferredBatchBytes",
"spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan"
"spark.gluten.sql.columnar.backend.velox.preferredBatchBytes"
)

/** Get dynamic configs. */
Expand Down
Loading