diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index 7e2d3d151..c70d3310c 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -20,9 +20,11 @@ #include "iceberg/parquet/parquet_writer.h" #include +#include #include #include +#include #include #include #include @@ -62,6 +64,14 @@ Result<::arrow::Compression::type> ParseCompression(const WriterProperties& prop } } +Status CheckCompressionAvailable(std::string_view compression_name, + ::arrow::Compression::type compression) { + ICEBERG_PRECHECK(::arrow::util::Codec::IsAvailable(compression), + "Parquet compression codec {} is not available in the current build", + compression_name); + return {}; +} + Result> ParseCodecLevel(const WriterProperties& properties) { auto level_str = properties.Get(WriterProperties::kParquetCompressionLevel); if (level_str.empty()) { @@ -98,6 +108,9 @@ class ParquetWriter::Impl { auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>( schema_descriptor->schema_root()); + ICEBERG_RETURN_UNEXPECTED(CheckCompressionAvailable( + options.properties.Get(WriterProperties::kParquetCompression), compression)); + ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options)); auto file_writer = ::parquet::ParquetFileWriter::Open( output_stream_, std::move(schema_node), std::move(writer_properties), diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 65a4602d8..70fb9880f 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -18,6 +18,9 @@ */ #include +#include +#include +#include #include #include @@ -26,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -124,6 +128,27 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr s ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data"; } +struct ParquetCodec { + std::string name; + ::arrow::Compression::type compression; +}; + +std::optional FirstUnavailableParquetCodec() { + const std::vector codecs = { + {.name = "snappy", .compression = ::arrow::Compression::SNAPPY}, + {.name = "gzip", .compression = ::arrow::Compression::GZIP}, + {.name = "brotli", .compression = ::arrow::Compression::BROTLI}, + {.name = "lz4", .compression = ::arrow::Compression::LZ4}, + {.name = "zstd", .compression = ::arrow::Compression::ZSTD}, + }; + for (const auto& codec : codecs) { + if (!::arrow::util::Codec::IsAvailable(codec.compression)) { + return codec; + } + } + return std::nullopt; +} + } // namespace class ParquetReaderTest : public TempFileTestBase { @@ -461,6 +486,29 @@ TEST_F(ParquetReadWrite, EmptyStruct) { IsError(ErrorKind::kNotImplemented)); } +TEST_F(ParquetReadWrite, RejectsUnavailableCompressionCodec) { + auto unavailable_codec = FirstUnavailableParquetCodec(); + if (!unavailable_codec.has_value()) { + GTEST_SKIP() << "All optional Parquet compression codecs are available"; + } + + auto schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, unavailable_codec->name); + + auto writer = WriterFactoryRegistry::Open( + FileFormatType::kParquet, {.path = "unavailable_codec.parquet", + .schema = schema, + .io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(), + .properties = std::move(writer_properties)}); + + EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(writer, + HasErrorMessage("Parquet compression codec " + unavailable_codec->name + + " is not available in the current build")); +} + TEST_F(ParquetReadWrite, SimpleStructRoundTrip) { auto schema = std::make_shared(std::vector{ SchemaField::MakeOptional(1, "a",