Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/iceberg/parquet/parquet_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
#include "iceberg/parquet/parquet_writer.h"

#include <memory>
#include <string_view>

#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
#include <arrow/util/compression.h>
#include <arrow/util/key_value_metadata.h>
#include <parquet/arrow/schema.h>
#include <parquet/arrow/writer.h>
Expand Down Expand Up @@ -62,6 +64,14 @@ Result<::arrow::Compression::type> ParseCompression(const WriterProperties& prop
}
}

Status CheckCompressionAvailable(std::string_view compression_name,
::arrow::Compression::type compression) {
ICEBERG_PRECHECK(::arrow::util::Codec::IsAvailable(compression),
"Parquet compression codec {} is not available in the current build",
compression_name);
return {};
}

Result<std::optional<int32_t>> ParseCodecLevel(const WriterProperties& properties) {
auto level_str = properties.Get(WriterProperties::kParquetCompressionLevel);
if (level_str.empty()) {
Expand Down Expand Up @@ -98,6 +108,9 @@ class ParquetWriter::Impl {
auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>(
schema_descriptor->schema_root());

ICEBERG_RETURN_UNEXPECTED(CheckCompressionAvailable(
options.properties.Get(WriterProperties::kParquetCompression), compression));

ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
auto file_writer = ::parquet::ParquetFileWriter::Open(
output_stream_, std::move(schema_node), std::move(writer_properties),
Expand Down
48 changes: 48 additions & 0 deletions src/iceberg/test/parquet_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/

#include <optional>
#include <string>
#include <utility>
#include <vector>

#include <arrow/array.h>
#include <arrow/c/bridge.h>
Expand All @@ -26,6 +29,7 @@
#include <arrow/record_batch.h>
#include <arrow/table.h>
#include <arrow/type.h>
#include <arrow/util/compression.h>
#include <arrow/util/key_value_metadata.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
Expand Down Expand Up @@ -124,6 +128,27 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, std::shared_ptr<Schema> s
ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
}

struct ParquetCodec {
std::string name;
::arrow::Compression::type compression;
};

std::optional<ParquetCodec> FirstUnavailableParquetCodec() {
const std::vector<ParquetCodec> codecs = {
{.name = "snappy", .compression = ::arrow::Compression::SNAPPY},
{.name = "gzip", .compression = ::arrow::Compression::GZIP},
{.name = "brotli", .compression = ::arrow::Compression::BROTLI},
{.name = "lz4", .compression = ::arrow::Compression::LZ4},
{.name = "zstd", .compression = ::arrow::Compression::ZSTD},
};
for (const auto& codec : codecs) {
if (!::arrow::util::Codec::IsAvailable(codec.compression)) {
return codec;
}
}
return std::nullopt;
}

} // namespace

class ParquetReaderTest : public TempFileTestBase {
Expand Down Expand Up @@ -461,6 +486,29 @@ TEST_F(ParquetReadWrite, EmptyStruct) {
IsError(ErrorKind::kNotImplemented));
}

TEST_F(ParquetReadWrite, RejectsUnavailableCompressionCodec) {
auto unavailable_codec = FirstUnavailableParquetCodec();
if (!unavailable_codec.has_value()) {
GTEST_SKIP() << "All optional Parquet compression codecs are available";
}

auto schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
WriterProperties writer_properties;
writer_properties.Set(WriterProperties::kParquetCompression, unavailable_codec->name);

auto writer = WriterFactoryRegistry::Open(
FileFormatType::kParquet, {.path = "unavailable_codec.parquet",
.schema = schema,
.io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(),
.properties = std::move(writer_properties)});

EXPECT_THAT(writer, IsError(ErrorKind::kInvalidArgument));
EXPECT_THAT(writer,
HasErrorMessage("Parquet compression codec " + unavailable_codec->name +
" is not available in the current build"));
}

TEST_F(ParquetReadWrite, SimpleStructRoundTrip) {
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeOptional(1, "a",
Expand Down
Loading