From eb55b0f646c055b24fdc959ab7f1846461b3360b Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Mon, 18 May 2026 10:55:13 +0800 Subject: [PATCH 1/3] fix(parquet): check compression codec availability --- src/iceberg/parquet/parquet_writer.cc | 27 ++++++++++++---- src/iceberg/test/parquet_test.cc | 46 +++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index 7e2d3d151..5ff2ca7a1 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -20,9 +20,11 @@ #include "iceberg/parquet/parquet_writer.h" #include +#include #include #include +#include #include #include #include @@ -45,21 +47,31 @@ Result> OpenOutputStream( Result<::arrow::Compression::type> ParseCompression(const WriterProperties& properties) { const auto& compression_name = properties.Get(WriterProperties::kParquetCompression); + ::arrow::Compression::type compression; if (compression_name == "uncompressed") { - return ::arrow::Compression::UNCOMPRESSED; + compression = ::arrow::Compression::UNCOMPRESSED; } else if (compression_name == "snappy") { - return ::arrow::Compression::SNAPPY; + compression = ::arrow::Compression::SNAPPY; } else if (compression_name == "gzip") { - return ::arrow::Compression::GZIP; + compression = ::arrow::Compression::GZIP; } else if (compression_name == "brotli") { - return ::arrow::Compression::BROTLI; + compression = ::arrow::Compression::BROTLI; } else if (compression_name == "lz4") { - return ::arrow::Compression::LZ4; + compression = ::arrow::Compression::LZ4; } else if (compression_name == "zstd") { - return ::arrow::Compression::ZSTD; + compression = ::arrow::Compression::ZSTD; } else { return InvalidArgument("Unsupported Parquet compression codec: {}", compression_name); } + return compression; +} + +Status CheckCompressionAvailable(std::string_view compression_name, + ::arrow::Compression::type compression) { + ICEBERG_PRECHECK(::arrow::util::Codec::IsAvailable(compression), + "Parquet compression codec {} is not available in the current build", + compression_name); + return {}; } Result> ParseCodecLevel(const WriterProperties& properties) { @@ -98,6 +110,9 @@ class ParquetWriter::Impl { auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>( schema_descriptor->schema_root()); + ICEBERG_RETURN_UNEXPECTED(CheckCompressionAvailable( + options.properties.Get(WriterProperties::kParquetCompression), compression)); + ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options)); auto file_writer = ::parquet::ParquetFileWriter::Open( output_stream_, std::move(schema_node), std::move(writer_properties), diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 65a4602d8..957492370 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -18,6 +18,9 @@ */ #include +#include +#include +#include #include #include @@ -26,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -124,6 +128,25 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr s ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data"; } +struct ParquetCodec { + std::string name; + ::arrow::Compression::type compression; +}; + +std::optional UnavailableParquetCodec() { + const std::vector codecs = { + {"snappy", ::arrow::Compression::SNAPPY}, {"gzip", ::arrow::Compression::GZIP}, + {"brotli", ::arrow::Compression::BROTLI}, {"lz4", ::arrow::Compression::LZ4}, + {"zstd", ::arrow::Compression::ZSTD}, + }; + for (const auto& codec : codecs) { + if (!::arrow::util::Codec::IsAvailable(codec.compression)) { + return codec; + } + } + return std::nullopt; +} + } // namespace class ParquetReaderTest : public TempFileTestBase { @@ -461,6 +484,29 @@ TEST_F(ParquetReadWrite, EmptyStruct) { IsError(ErrorKind::kNotImplemented)); } +TEST_F(ParquetReadWrite, RejectsUnavailableCompressionCodec) { + auto unavailable_codec = UnavailableParquetCodec(); + if (!unavailable_codec.has_value()) { + GTEST_SKIP() << "All optional Parquet compression codecs are available"; + } + + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, unavailable_codec->name); + + auto writer = WriterFactoryRegistry::Open( + FileFormatType::kParquet, {.path = "unavailable_codec.parquet", + .schema = schema, + .io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(), + .properties = std::move(writer_properties)}); + + EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(writer, + HasErrorMessage("Parquet compression codec " + unavailable_codec->name + + " is not available in the current build")); +} + TEST_F(ParquetReadWrite, SimpleStructRoundTrip) { auto schema = std::make_shared(std::vector{ SchemaField::MakeOptional(1, "a", From eca9a4f7cdbeda52f8677ab6823dc535405c88eb Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Mon, 18 May 2026 11:19:41 +0800 Subject: [PATCH 2/3] fix(parquet): satisfy codec test lint --- src/iceberg/test/parquet_test.cc | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 957492370..dba7f2551 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -135,9 +135,11 @@ struct ParquetCodec { std::optional UnavailableParquetCodec() { const std::vector codecs = { - {"snappy", ::arrow::Compression::SNAPPY}, {"gzip", ::arrow::Compression::GZIP}, - {"brotli", ::arrow::Compression::BROTLI}, {"lz4", ::arrow::Compression::LZ4}, - {"zstd", ::arrow::Compression::ZSTD}, + {.name = "snappy", .compression = ::arrow::Compression::SNAPPY}, + {.name = "gzip", .compression = ::arrow::Compression::GZIP}, + {.name = "brotli", .compression = ::arrow::Compression::BROTLI}, + {.name = "lz4", .compression = ::arrow::Compression::LZ4}, + {.name = "zstd", .compression = ::arrow::Compression::ZSTD}, }; for (const auto& codec : codecs) { if (!::arrow::util::Codec::IsAvailable(codec.compression)) { From ac8d96111a31f25acce3d856713f461b4d2f39bb Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Wed, 20 May 2026 11:23:56 +0800 Subject: [PATCH 3/3] chore(parquet): address codec review comments --- src/iceberg/parquet/parquet_writer.cc | 14 ++++++-------- src/iceberg/test/parquet_test.cc | 4 ++-- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index 5ff2ca7a1..c70d3310c 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -47,23 +47,21 @@ Result> OpenOutputStream( Result<::arrow::Compression::type> ParseCompression(const WriterProperties& properties) { const auto& compression_name = properties.Get(WriterProperties::kParquetCompression); - ::arrow::Compression::type compression; if (compression_name == "uncompressed") { - compression = ::arrow::Compression::UNCOMPRESSED; + return ::arrow::Compression::UNCOMPRESSED; } else if (compression_name == "snappy") { - compression = ::arrow::Compression::SNAPPY; + return ::arrow::Compression::SNAPPY; } else if (compression_name == "gzip") { - compression = ::arrow::Compression::GZIP; + return ::arrow::Compression::GZIP; } else if (compression_name == "brotli") { - compression = ::arrow::Compression::BROTLI; + return ::arrow::Compression::BROTLI; } else if (compression_name == "lz4") { - compression = ::arrow::Compression::LZ4; + return ::arrow::Compression::LZ4; } else if (compression_name == "zstd") { - compression = ::arrow::Compression::ZSTD; + return ::arrow::Compression::ZSTD; } else { return InvalidArgument("Unsupported Parquet compression codec: {}", compression_name); } - return compression; } Status CheckCompressionAvailable(std::string_view compression_name, diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index dba7f2551..70fb9880f 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -133,7 +133,7 @@ struct ParquetCodec { ::arrow::Compression::type compression; }; -std::optional UnavailableParquetCodec() { +std::optional FirstUnavailableParquetCodec() { const std::vector codecs = { {.name = "snappy", .compression = ::arrow::Compression::SNAPPY}, {.name = "gzip", .compression = ::arrow::Compression::GZIP}, @@ -487,7 +487,7 @@ TEST_F(ParquetReadWrite, EmptyStruct) { } TEST_F(ParquetReadWrite, RejectsUnavailableCompressionCodec) { - auto unavailable_codec = UnavailableParquetCodec(); + auto unavailable_codec = FirstUnavailableParquetCodec(); if (!unavailable_codec.has_value()) { GTEST_SKIP() << "All optional Parquet compression codecs are available"; }