From c71ace30b16692248b51a0b0c85994db7a192f6e Mon Sep 17 00:00:00 2001 From: lifulong Date: Fri, 27 Mar 2026 06:27:28 +0000 Subject: [PATCH] support config velox parquet writer option storeDecimalAsInteger for compatible with spark conf spark.sql.parquet.writeLegacyFormat --- .../datasources/velox/VeloxParquetWriterInjects.scala | 3 +++ cpp/core/config/GlutenConfig.h | 4 ++++ cpp/velox/utils/VeloxWriterUtils.cc | 6 ++++++ .../main/scala/org/apache/gluten/config/GlutenConfig.scala | 1 + 4 files changed, 14 insertions(+) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala index 14f1c6d63b94..1244aab279de 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/datasources/velox/VeloxParquetWriterInjects.scala @@ -31,6 +31,9 @@ class VeloxParquetWriterInjects extends VeloxFormatWriterInjects { // i.e., compression, block size, block rows. val sparkOptions = new mutable.HashMap[String, String]() sparkOptions.put(SQLConf.PARQUET_COMPRESSION.key, compressionCodec) + sparkOptions.put( + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, + SQLConf.get.writeLegacyParquetFormat.toString) val blockSize = options.getOrElse( GlutenConfig.PARQUET_BLOCK_SIZE, GlutenConfig.get.columnarParquetWriteBlockSize.toString) diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index a082a720eadd..2ed044622e15 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -70,6 +70,10 @@ const std::string kParquetWriterVersion = "parquet.writer.version"; const std::string kParquetCompressionCodec = "spark.sql.parquet.compression.codec"; +/// Maps to Spark `spark.sql.parquet.writeLegacyFormat`; drives Velox +/// `WriterOptions::storeDecimalAsInteger` (inverted: legacy true -> store as integer false). +const std::string kParquetStoreDecimalAsInteger = "spark.sql.parquet.writeLegacyFormat"; + const std::string kColumnarToRowMemoryThreshold = "spark.gluten.sql.columnarToRowMemoryThreshold"; const std::string kUGIUserName = "spark.gluten.ugi.username"; diff --git a/cpp/velox/utils/VeloxWriterUtils.cc b/cpp/velox/utils/VeloxWriterUtils.cc index 026418a223c4..363a56416c1b 100644 --- a/cpp/velox/utils/VeloxWriterUtils.cc +++ b/cpp/velox/utils/VeloxWriterUtils.cc @@ -49,6 +49,12 @@ std::unique_ptr makeParquetWriteOption(const std::unordered_map(); writeOption->parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds /*micro*/; + bool writeLegacyParquetFormat = false; + if (auto it = sparkConfs.find(kParquetStoreDecimalAsInteger); it != sparkConfs.end()) { + writeLegacyParquetFormat = boost::iequals(it->second, "true"); + } + // Spark legacy Parquet uses FLBA-style decimals; Velox uses INT32/INT64 when writeLegacyParquetFormat is false. + writeOption->storeDecimalAsInteger = !writeLegacyParquetFormat; auto compressionCodec = CompressionKind::CompressionKind_SNAPPY; if (auto it = sparkConfs.find(kParquetCompressionCodec); it != sparkConfs.end()) { auto compressionCodecStr = it->second; diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala index 371c2b96091a..d713084b5a06 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala @@ -629,6 +629,7 @@ object GlutenConfig extends ConfigRegistry { DEBUG_ENABLED.key, // datasource config SPARK_SQL_PARQUET_COMPRESSION_CODEC, + PARQUET_WRITE_LEGACY_FORMAT.key, // datasource config end GlutenCoreConfig.COLUMNAR_OVERHEAD_SIZE_IN_BYTES.key, GlutenCoreConfig.COLUMNAR_OFFHEAP_SIZE_IN_BYTES.key,