Skip to content

Commit bc30cbe

Browse files
author
infvg
committed
Added iceberg write configs
1 parent 8b04f51 commit bc30cbe

4 files changed

Lines changed: 139 additions & 70 deletions

File tree

backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,4 +751,40 @@ object VeloxConfig extends ConfigRegistry {
751751
.doc("Maps table field names to file field names using names, not indices for Parquet files.")
752752
.booleanConf
753753
.createWithDefault(true)
754+
755+
val ICEBERG_WRITE_TARGET_FILE_SIZE_BYTES =
756+
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.target-file-size-bytes")
757+
.doc("Target file size in bytes for Iceberg write operations.")
758+
.bytesConf(ByteUnit.BYTE)
759+
.createWithDefaultString("512MB")
760+
761+
val ICEBERG_WRITE_PARQUET_COMPRESSION_CODEC =
762+
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.compression-codec")
763+
.doc("Compression codec to use for Iceberg Parquet write operations.")
764+
.stringConf
765+
.createWithDefault("zstd")
766+
767+
val ICEBERG_WRITE_PARQUET_COMPRESSION_LEVEL =
768+
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.compression-level")
769+
.doc("Compression level for Iceberg Parquet write operations.")
770+
.intConf
771+
.createOptional
772+
773+
val ICEBERG_WRITE_PARQUET_ROW_GROUP_SIZE_BYTES =
774+
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.row-group-size-bytes")
775+
.doc("Row group size in bytes for Iceberg Parquet write operations.")
776+
.bytesConf(ByteUnit.BYTE)
777+
.createWithDefaultString("128MB")
778+
779+
val ICEBERG_WRITE_PARQUET_PAGE_SIZE_BYTES =
780+
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.page-size-bytes")
781+
.doc("Page size in bytes for Iceberg Parquet write operations.")
782+
.bytesConf(ByteUnit.BYTE)
783+
.createWithDefaultString("1MB")
784+
785+
val ICEBERG_WRITE_PARQUET_PAGE_ROW_LIMIT =
786+
buildConf("spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.page-row-limit")
787+
.doc("Maximum number of rows per page for Iceberg Parquet write operations.")
788+
.intConf
789+
.createWithDefault(20000)
754790
}

cpp/velox/config/VeloxConfig.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,23 @@ const std::string kMemoryPoolCapacityTransferAcrossTasks =
166166
const std::string kOrcUseColumnNames = "spark.gluten.sql.columnar.backend.velox.orcUseColumnNames";
167167
const std::string kParquetUseColumnNames = "spark.gluten.sql.columnar.backend.velox.parquetUseColumnNames";
168168

169-
// write fies
169+
// write files
170170
const std::string kMaxPartitions = "spark.gluten.sql.columnar.backend.velox.maxPartitionsPerWritersSession";
171171

172+
// Iceberg write configs
173+
const std::string kIcebergWriteTargetFileSizeBytes =
174+
"spark.gluten.sql.columnar.backend.velox.iceberg.write.target-file-size-bytes";
175+
const std::string kIcebergWriteParquetCompressionCodec =
176+
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.compression-codec";
177+
const std::string kIcebergWriteParquetCompressionLevel =
178+
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.compression-level";
179+
const std::string kIcebergWriteParquetRowGroupSizeBytes =
180+
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.row-group-size-bytes";
181+
const std::string kIcebergWriteParquetPageSizeBytes =
182+
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.page-size-bytes";
183+
const std::string kIcebergWriteParquetPageRowLimit =
184+
"spark.gluten.sql.columnar.backend.velox.iceberg.write.parquet.page-row-limit";
185+
172186
const std::string kGlogVerboseLevel = "spark.gluten.sql.columnar.backend.velox.glogVerboseLevel";
173187
const uint32_t kGlogVerboseLevelDefault = 0;
174188
const uint32_t kGlogVerboseLevelMaximum = 99;

cpp/velox/utils/ConfigExtractor.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,19 @@ std::shared_ptr<facebook::velox::config::ConfigBase> createHiveConnectorSessionC
240240
configs[facebook::velox::connector::hive::HiveConfig::kOrcUseColumnNamesSession] =
241241
conf->get<bool>(kOrcUseColumnNames, true) ? "true" : "false";
242242

243+
if (conf->isValueExists(kIcebergWriteTargetFileSizeBytes)) {
244+
configs[facebook::velox::connector::hive::HiveConfig::kMaxTargetFileSizeSession] =
245+
conf->get<std::string>(kIcebergWriteTargetFileSizeBytes);
246+
}
247+
if (conf->isValueExists(kIcebergWriteParquetPageSizeBytes)) {
248+
configs[facebook::velox::parquet::WriterOptions::kParquetSessionWritePageSize] =
249+
conf->get<std::string>(kIcebergWriteParquetPageSizeBytes);
250+
}
251+
if (conf->isValueExists(kIcebergWriteParquetRowGroupSizeBytes)) {
252+
configs[facebook::velox::parquet::WriterOptions::kParquetSessionWriteBatchSize] =
253+
conf->get<std::string>(kIcebergWriteParquetRowGroupSizeBytes);
254+
}
255+
243256
overwriteVeloxConf(conf.get(), configs, kDynamicBackendConfPrefix);
244257
return std::make_shared<facebook::velox::config::ConfigBase>(std::move(configs));
245258
}

0 commit comments

Comments
 (0)