Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 103 additions & 31 deletions cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,36 +90,111 @@ std::string ParquetVersionToString(ParquetVersion::type ver) {
return "UNKNOWN";
}

namespace {

enum class StatsMinMaxMode {
// Ignore min/max fields because their ordering is unknown or unsupported.
kDiscard,
// Use legacy min/max fields for files without column orders.
kLegacy,
// Use min_value/max_value fields with the column's well-defined order.
kNormal,
};

StatsMinMaxMode GetStatsMinMaxMode(const ColumnDescriptor& descr) {
switch (descr.column_order().get_order()) {
case ColumnOrder::TYPE_DEFINED_ORDER:
return descr.sort_order() != SortOrder::UNKNOWN ? StatsMinMaxMode::kNormal
: StatsMinMaxMode::kDiscard;
case ColumnOrder::UNDEFINED:
return descr.sort_order() != SortOrder::UNKNOWN ? StatsMinMaxMode::kLegacy
: StatsMinMaxMode::kDiscard;
case ColumnOrder::UNKNOWN:
return StatsMinMaxMode::kDiscard;
}
return StatsMinMaxMode::kDiscard;
}

} // namespace

static EncodedStatistics EncodedStatisticsFromThrift(const format::Statistics& statistics,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just put this in the anonymous namespace above instead of marking it static.

StatsMinMaxMode min_max) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, can we reconcile this with the existing function in thrift_internal.h?

EncodedStatistics out;

switch (min_max) {
case StatsMinMaxMode::kNormal:
if (statistics.__isset.max_value) {
out.set_max(statistics.max_value);
if (statistics.__isset.is_max_value_exact) {
out.is_max_value_exact = statistics.is_max_value_exact;
}
}
if (statistics.__isset.min_value) {
out.set_min(statistics.min_value);
if (statistics.__isset.is_min_value_exact) {
out.is_min_value_exact = statistics.is_min_value_exact;
}
}
break;
case StatsMinMaxMode::kLegacy:
if (statistics.__isset.max) {
out.set_max(statistics.max);
}
if (statistics.__isset.min) {
out.set_min(statistics.min);
}
break;
case StatsMinMaxMode::kDiscard:
break;
}
if (statistics.__isset.null_count) {
out.set_null_count(statistics.null_count);
}
if (statistics.__isset.distinct_count) {
out.set_distinct_count(statistics.distinct_count);
}

return out;
}

template <typename DType>
static std::shared_ptr<Statistics> MakeTypedColumnStats(
const format::ColumnMetaData& metadata, const ColumnDescriptor* descr,
::arrow::MemoryPool* pool) {
std::optional<bool> min_exact =
metadata.statistics.__isset.is_min_value_exact
? std::optional<bool>(metadata.statistics.is_min_value_exact)
: std::nullopt;
std::optional<bool> max_exact =
metadata.statistics.__isset.is_max_value_exact
? std::optional<bool>(metadata.statistics.is_max_value_exact)
: std::nullopt;
// If ColumnOrder is defined, return max_value and min_value
if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) {
return MakeStatistics<DType>(
descr, metadata.statistics.min_value, metadata.statistics.max_value,
metadata.num_values - metadata.statistics.null_count,
metadata.statistics.null_count, metadata.statistics.distinct_count,
metadata.statistics.__isset.max_value && metadata.statistics.__isset.min_value,
metadata.statistics.__isset.null_count,
metadata.statistics.__isset.distinct_count, min_exact, max_exact, pool);
}
// Default behavior
const auto& statistics = metadata.statistics;
const std::string kEmpty = "";
const std::string* encoded_min = &kEmpty;
const std::string* encoded_max = &kEmpty;
bool has_min_max = false;
std::optional<bool> min_exact = std::nullopt;
std::optional<bool> max_exact = std::nullopt;

switch (GetStatsMinMaxMode(*descr)) {
case StatsMinMaxMode::kNormal:
encoded_min = &statistics.min_value;
encoded_max = &statistics.max_value;
has_min_max = statistics.__isset.max_value && statistics.__isset.min_value;
min_exact = statistics.__isset.is_min_value_exact
? std::optional<bool>(statistics.is_min_value_exact)
: std::nullopt;
max_exact = statistics.__isset.is_max_value_exact
? std::optional<bool>(statistics.is_max_value_exact)
: std::nullopt;
break;
case StatsMinMaxMode::kLegacy:
encoded_min = &statistics.min;
encoded_max = &statistics.max;
has_min_max = statistics.__isset.max && statistics.__isset.min;
break;
case StatsMinMaxMode::kDiscard:
break;
}

return MakeStatistics<DType>(
descr, metadata.statistics.min, metadata.statistics.max,
metadata.num_values - metadata.statistics.null_count,
metadata.statistics.null_count, metadata.statistics.distinct_count,
metadata.statistics.__isset.max && metadata.statistics.__isset.min,
metadata.statistics.__isset.null_count, metadata.statistics.__isset.distinct_count,
min_exact, max_exact, pool);
descr, *encoded_min, *encoded_max, metadata.num_values - statistics.null_count,
statistics.null_count, statistics.distinct_count, has_min_max,
statistics.__isset.null_count, statistics.__isset.distinct_count, min_exact,
max_exact, pool);
}

namespace {
Expand Down Expand Up @@ -337,11 +412,8 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
const std::lock_guard<std::mutex> guard(stats_mutex_);
if (possible_encoded_stats_ == nullptr) {
possible_encoded_stats_ =
std::make_shared<EncodedStatistics>(FromThrift(column_metadata_->statistics));
if (descr_->sort_order() == SortOrder::UNKNOWN) {
// If the column SortOrder is Unknown we can't trust max/min.
possible_encoded_stats_->ClearMinMax();
}
std::make_shared<EncodedStatistics>(EncodedStatisticsFromThrift(
column_metadata_->statistics, GetStatsMinMaxMode(*descr_)));
}
}
return writer_version_->HasCorrectStatistics(type(), *possible_encoded_stats_,
Expand Down Expand Up @@ -1037,7 +1109,7 @@ class FileMetaData::FileMetaDataImpl {
if (column_order.__isset.TYPE_ORDER) {
column_orders.push_back(ColumnOrder::type_defined_);
} else {
column_orders.push_back(ColumnOrder::undefined_);
column_orders.push_back(ColumnOrder::unknown_);
}
}
} else {
Expand Down
98 changes: 98 additions & 0 deletions cpp/src/parquet/metadata_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,104 @@ TEST(Metadata, TestBuildAccess) {
ASSERT_TRUE(f_accessor_1->Equals(*f_accessor->Subset({2, 0})));
}

namespace {

std::string EncodeInt32(int32_t value) {
return std::string(reinterpret_cast<const char*>(&value), sizeof(value));
}

constexpr int32_t kLegacyMin = 100, kLegacyMax = 200;

std::string SerializeMetadata(const format::FileMetaData& thrift_metadata) {
std::string out;
ThriftSerializer{}.SerializeToString(&thrift_metadata, &out);
return out;
}

std::shared_ptr<FileMetaData> ParseMetadata(std::string serialized_metadata) {
uint32_t decoded_len = static_cast<uint32_t>(serialized_metadata.size());
return FileMetaData::Make(serialized_metadata.data(), &decoded_len);
}

format::FileMetaData SingleInt32MetadataWithStats() {
format::FileMetaData metadata;
format::SchemaElement root, leaf;
schema::NodeVector fields = {schema::Int32("int_col", Repetition::REQUIRED)};
schema::GroupNode::Make("schema", Repetition::REPEATED, fields)->ToParquet(&root);
fields.back()->ToParquet(&leaf);
metadata.schema = {std::move(root), std::move(leaf)};

auto& column = metadata.row_groups.emplace_back().columns.emplace_back();
column.__isset.meta_data = true;
auto& column_metadata = column.meta_data;
column_metadata.__set_type(format::Type::INT32);
column_metadata.__isset.statistics = true;
auto& statistics = column_metadata.statistics;
statistics.__set_min(EncodeInt32(kLegacyMin));
statistics.__set_max(EncodeInt32(kLegacyMax));
statistics.__set_min_value(EncodeInt32(kLegacyMin));
statistics.__set_max_value(EncodeInt32(kLegacyMax));
metadata.column_orders.emplace_back().__set_TYPE_ORDER(format::TypeDefinedOrder{});
metadata.__isset.column_orders = true;
return metadata;
}

std::unique_ptr<ColumnChunkMetaData> GetOnlyColumnChunk(
const FileMetaData& metadata, ColumnOrder::type expected_order) {
EXPECT_EQ(expected_order, metadata.schema()->Column(0)->column_order().get_order());
return metadata.RowGroup(0)->ColumnChunk(0);
}

void AssertColumnChunkHasNoMinMax(const FileMetaData& metadata,
ColumnOrder::type expected_order) {
auto column = GetOnlyColumnChunk(metadata, expected_order);
ASSERT_NE(nullptr, column->encoded_statistics());
EXPECT_FALSE(column->encoded_statistics()->has_min);
EXPECT_FALSE(column->encoded_statistics()->has_max);
ASSERT_NE(nullptr, column->statistics());
EXPECT_FALSE(column->statistics()->HasMinMax());
}

void AssertColumnChunkMinMax(const FileMetaData& metadata,
ColumnOrder::type expected_order, int32_t min, int32_t max) {
auto column = GetOnlyColumnChunk(metadata, expected_order);
const std::string encoded_min = EncodeInt32(min);
const std::string encoded_max = EncodeInt32(max);

ASSERT_NE(nullptr, column->encoded_statistics());
EXPECT_EQ(encoded_min, column->encoded_statistics()->min());
EXPECT_EQ(encoded_max, column->encoded_statistics()->max());
ASSERT_NE(nullptr, column->statistics());
EXPECT_EQ(encoded_min, column->statistics()->EncodeMin());
EXPECT_EQ(encoded_max, column->statistics()->EncodeMax());
}

} // namespace

TEST(Metadata, UnknownColumnOrderIgnoresMinMax) {
format::FileMetaData thrift_metadata = SingleInt32MetadataWithStats();
// Simulate an unsupported ColumnOrder value: unknown union fields are skipped by
// Thrift, leaving an entry with no known field set.
thrift_metadata.column_orders.clear();
thrift_metadata.column_orders.emplace_back();
thrift_metadata.__isset.column_orders = true;

auto metadata = ParseMetadata(SerializeMetadata(thrift_metadata));
AssertColumnChunkHasNoMinMax(*metadata, ColumnOrder::UNKNOWN);
}

TEST(Metadata, MissingColumnOrderUsesLegacyMinMax) {
format::FileMetaData thrift_metadata = SingleInt32MetadataWithStats();
thrift_metadata.column_orders.clear();
thrift_metadata.__isset.column_orders = false;
auto& statistics = thrift_metadata.row_groups.at(0).columns.at(0).meta_data.statistics;
statistics.__set_min_value(EncodeInt32(kLegacyMin - 100));
statistics.__set_max_value(EncodeInt32(kLegacyMax + 100));

auto metadata = ParseMetadata(SerializeMetadata(thrift_metadata));
AssertColumnChunkMinMax(*metadata, ColumnOrder::UNDEFINED, kLegacyMin, kLegacyMax);
}

TEST(Metadata, TestV1Version) {
// PARQUET-839
parquet::schema::NodeVector fields;
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/parquet/page_index.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ namespace parquet {

namespace {

bool CanTrustPageIndexMinMax(const ColumnDescriptor& descr) {
const auto column_order = descr.column_order().get_order();
return column_order != ColumnOrder::UNKNOWN && column_order != ColumnOrder::UNDEFINED &&
descr.sort_order() != SortOrder::UNKNOWN;
}

template <typename DType>
void Decode(std::unique_ptr<typename EncodingTraits<DType>::Decoder>& decoder,
const std::string& input, std::vector<typename DType::c_type>* output,
Expand Down Expand Up @@ -973,6 +979,9 @@ std::unique_ptr<ColumnIndex> ColumnIndex::Make(const ColumnDescriptor& descr,
// Guard against UB when moving column_index
throw ParquetException("Invalid ColumnIndex boundary_order");
}
if (!CanTrustPageIndexMinMax(descr)) {
return nullptr;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem right for a factory function to be allowed to return NULL. It's also an API change.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note we're raising an exception if we encounter an unknown boundary order above.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I think the bottom line is that the ColumnIndex object only gives access to raw encoded statistics anyway. It's up to the caller to decide if they know how to handle them. So why check this here?

Instead, perhaps ColumnDescriptor can expose a method can_use_statistics or something similar.

}
Comment on lines +982 to +984
switch (descr.physical_type()) {
case Type::BOOLEAN:
return std::make_unique<TypedColumnIndexImpl<BooleanType>>(descr,
Expand Down
52 changes: 52 additions & 0 deletions cpp/src/parquet/page_index_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,58 @@ void TestWriteTypedColumnIndex(schema::NodePtr node,
}
}

template <typename T>
std::shared_ptr<Buffer> SerializedColumnIndex(const T& min, const T& max) {
auto encode = [](const T& value) {
return std::string(reinterpret_cast<const char*>(&value), sizeof(value));
};
format::ColumnIndex column_index;
column_index.__set_null_pages({false});
column_index.__set_min_values({encode(min)});
column_index.__set_max_values({encode(max)});
column_index.__set_boundary_order(format::BoundaryOrder::UNORDERED);

auto sink = CreateOutputStream();
ThriftSerializer{}.Serialize(&column_index, sink.get());
PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
return buffer;
}

void AssertColumnIndexIgnored(const ColumnDescriptor& read_descr,
const std::shared_ptr<Buffer>& buffer) {
auto column_index =
ColumnIndex::Make(read_descr, buffer->data(), static_cast<uint32_t>(buffer->size()),
default_reader_properties());
ASSERT_EQ(nullptr, column_index);
}

void AssertColumnIndexIgnoredWithColumnOrder(ColumnOrder column_order) {
auto node = schema::Int32("c1");
auto buffer = SerializedColumnIndex<int32_t>(/*min=*/1, /*max=*/2);

std::static_pointer_cast<schema::PrimitiveNode>(node)->SetColumnOrder(column_order);
ColumnDescriptor read_descr(node, /*max_definition_level=*/1,
/*max_repetition_level=*/0);
AssertColumnIndexIgnored(read_descr, buffer);
}

TEST(PageIndex, ReadColumnIndexWithUnsupportedColumnOrder) {
AssertColumnIndexIgnoredWithColumnOrder(ColumnOrder::unknown_);
AssertColumnIndexIgnoredWithColumnOrder(ColumnOrder::undefined_);
}

TEST(PageIndex, ReadColumnIndexWithUnknownSortOrder) {
auto node = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::INT96);
ColumnDescriptor descr(node, /*max_definition_level=*/0, /*max_repetition_level=*/0);
ASSERT_EQ(SortOrder::UNKNOWN, descr.sort_order());

Int96 min{{1, 2, 3}};
Int96 max{{4, 5, 6}};
auto buffer = SerializedColumnIndex(min, max);

AssertColumnIndexIgnored(descr, buffer);
}

TEST(PageIndex, WriteInt32ColumnIndex) {
auto encode = [=](int32_t value) {
return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/parquet/statistics_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,13 @@ TEST(TestStatisticsSortOrder, UNKNOWN) {
ASSERT_EQ(1, enc_stats->null_count);
ASSERT_FALSE(enc_stats->is_max_value_exact.has_value());
ASSERT_FALSE(enc_stats->is_min_value_exact.has_value());

// Unknown sort order should not cause min/max to be set
std::shared_ptr<Statistics> stats = column_chunk->statistics();
ASSERT_NE(nullptr, stats);
ASSERT_FALSE(stats->HasMinMax());
ASSERT_TRUE(stats->HasNullCount());
ASSERT_EQ(1, stats->null_count());
}

// Test statistics for binary column with UNSIGNED sort order
Expand Down
1 change: 1 addition & 0 deletions cpp/src/parquet/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ SortOrder::type GetSortOrder(const std::shared_ptr<const LogicalType>& logical_t

ColumnOrder ColumnOrder::undefined_ = ColumnOrder(ColumnOrder::UNDEFINED);
ColumnOrder ColumnOrder::type_defined_ = ColumnOrder(ColumnOrder::TYPE_DEFINED_ORDER);
ColumnOrder ColumnOrder::unknown_ = ColumnOrder(ColumnOrder::UNKNOWN);

// Static methods for LogicalType class

Expand Down
10 changes: 9 additions & 1 deletion cpp/src/parquet/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,14 +599,22 @@ bool PageCanUseChecksum(PageType::type pageType);

class ColumnOrder {
public:
enum type { UNDEFINED, TYPE_DEFINED_ORDER };
enum type {
// File metadata has no column order, only legacy min/max in stats are defined.
UNDEFINED,
// File metadata uses TypeDefinedOrder from the Parquet format.
TYPE_DEFINED_ORDER,
// Column order value unsupported by this reader.
UNKNOWN
};
explicit ColumnOrder(ColumnOrder::type column_order) : column_order_(column_order) {}
// Default to Type Defined Order
ColumnOrder() : column_order_(type::TYPE_DEFINED_ORDER) {}
ColumnOrder::type get_order() { return column_order_; }

static ColumnOrder undefined_;
static ColumnOrder type_defined_;
static ColumnOrder unknown_;

private:
ColumnOrder::type column_order_;
Expand Down
Loading