diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index e48afbfd7..dd869a44c 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES "$" set(ICEBERG_SOURCES arrow_c_data_guard_internal.cc catalog/memory/in_memory_catalog.cc + delete_file_index.cc expression/aggregate.cc expression/binder.cc expression/evaluator.cc @@ -79,6 +80,7 @@ set(ICEBERG_SOURCES update/update_sort_order.cc update/update_properties.cc util/bucket_util.cc + util/content_file_util.cc util/conversions.cc util/decimal.cc util/gzip_internal.cc diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc new file mode 100644 index 000000000..4f0ad42b9 --- /dev/null +++ b/src/iceberg/delete_file_index.cc @@ -0,0 +1,776 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/delete_file_index.h" + +#include +#include +#include +#include + +#include "iceberg/expression/expression.h" +#include "iceberg/expression/manifest_evaluator.h" +#include "iceberg/expression/projections.h" +#include "iceberg/file_io.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/content_file_util.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +namespace internal { + +Status EqualityDeleteFile::ConvertBoundsIfNeeded() const { + if (bounds_converted) { + return {}; + } + + // Convert bounds for equality field IDs only + for (int32_t field_id : wrapped.data_file->equality_ids) { + ICEBERG_ASSIGN_OR_RAISE(auto field, schema->FindFieldById(field_id)); + if (!field.has_value()) { + continue; + } + + const auto& schema_field = field.value().get(); + if (schema_field.type()->is_nested()) { + continue; + } + + const auto primitive_type = checked_pointer_cast(schema_field.type()); + + // Convert lower bound + if (auto it = wrapped.data_file->lower_bounds.find(field_id); + it != wrapped.data_file->lower_bounds.cend() && !it->second.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto lower, + Literal::Deserialize(it->second, primitive_type)); + lower_bounds.emplace(field_id, std::move(lower)); + } + + // Convert upper bound + if (auto it = wrapped.data_file->upper_bounds.find(field_id); + it != wrapped.data_file->upper_bounds.cend() && !it->second.empty()) { + ICEBERG_ASSIGN_OR_RAISE(auto upper, + Literal::Deserialize(it->second, primitive_type)); + upper_bounds.emplace(field_id, std::move(upper)); + } + } + + bounds_converted = true; + return {}; +} + +// Check if an equality delete file can contain deletes for a data file. +Result CanContainEqDeletesForFile(const DataFile& data_file, + const EqualityDeleteFile& delete_file) { + // Whether to check data ranges or to assume that the ranges match. If upper/lower + // bounds are missing, null counts may still be used to determine delete files can be + // skipped. + bool check_ranges = !data_file.lower_bounds.empty() && + !data_file.upper_bounds.empty() && + delete_file.HasLowerAndUpperBounds(); + + const auto* wrapped_delete_file = delete_file.wrapped.data_file.get(); + + for (int32_t field_id : wrapped_delete_file->equality_ids) { + ICEBERG_ASSIGN_OR_RAISE(auto found_field, + delete_file.schema->FindFieldById(field_id)); + if (!found_field.has_value()) { + continue; + } + + const auto& field = found_field.value().get(); + if (field.type()->is_nested()) { + continue; + } + + bool is_required = !field.optional(); + bool data_contains_null = + ContainsNull(data_file.null_value_counts, field_id, is_required); + bool delete_contains_null = + ContainsNull(wrapped_delete_file->null_value_counts, field_id, is_required); + + if (data_contains_null && delete_contains_null) { + // Both have nulls - delete may apply + continue; + } + + if (AllNull(data_file.null_value_counts, data_file.value_counts, field_id, + is_required) && + AllNonNull(wrapped_delete_file->null_value_counts, field_id, is_required)) { + return false; // Data is all null, delete has no nulls - cannot match + } + + if (AllNull(wrapped_delete_file->null_value_counts, wrapped_delete_file->value_counts, + field_id, is_required) && + AllNonNull(data_file.null_value_counts, field_id, is_required)) { + return false; // Delete is all null, data has no nulls - cannot match + } + + if (!check_ranges) { + continue; + } + + // Check range overlap + auto data_lower_it = data_file.lower_bounds.find(field_id); + auto data_upper_it = data_file.upper_bounds.find(field_id); + if (data_lower_it == data_file.lower_bounds.cend() || data_lower_it->second.empty() || + data_upper_it == data_file.upper_bounds.cend() || data_upper_it->second.empty()) { + continue; // Missing bounds, assume may match + } + + auto delete_lower = delete_file.LowerBound(field_id); + auto delete_upper = delete_file.UpperBound(field_id); + if (!delete_lower.has_value() || !delete_upper.has_value()) { + continue; // Missing bounds, assume may match + } + + // Convert data bounds + auto primitive_type = checked_pointer_cast(field.type()); + ICEBERG_ASSIGN_OR_RAISE(auto data_lower, + Literal::Deserialize(data_lower_it->second, primitive_type)); + ICEBERG_ASSIGN_OR_RAISE(auto data_upper, + Literal::Deserialize(data_upper_it->second, primitive_type)); + + if (!RangesOverlap(data_lower, data_upper, delete_lower->value().get(), + delete_upper->value().get())) { + return false; // Ranges don't overlap - cannot match + } + } + + return true; +} + +// PositionDeletes implementation + +Status PositionDeletes::Add(ManifestEntry&& entry) { + ICEBERG_PRECHECK(entry.sequence_number.has_value(), + "Missing sequence number for position delete: {}", + entry.data_file->file_path); + files_.emplace_back(std::move(entry)); + indexed_ = false; + return {}; +} + +std::vector> PositionDeletes::Filter(int64_t seq) { + IndexIfNeeded(); + + size_t start = FindStartIndex(seqs_, seq); + if (start >= files_.size()) { + return {}; + } + + return files_ | std::views::drop(start) | + std::views::transform(&ManifestEntry::data_file) | + std::ranges::to>>(); +} + +std::vector> PositionDeletes::ReferencedDeleteFiles() { + IndexIfNeeded(); + return std::ranges::transform_view(files_, + [](const auto& entry) { return entry.data_file; }) | + std::ranges::to>>(); +} + +void PositionDeletes::IndexIfNeeded() { + if (indexed_) { + return; + } + + // Sort by data sequence number + std::ranges::sort(files_, [](const auto& a, const auto& b) { + return a.sequence_number.value() < b.sequence_number.value(); + }); + + // Build sequence number array for binary search + seqs_ = std::ranges::transform_view( + files_, [](const auto& entry) { return entry.sequence_number.value(); }) | + std::ranges::to>(); + + indexed_ = true; +} + +// EqualityDeletes implementation + +Status EqualityDeletes::Add(ManifestEntry&& entry) { + ICEBERG_PRECHECK(entry.sequence_number.has_value(), + "Missing sequence number for equality delete: {}", + entry.data_file->file_path); + files_.emplace_back(&schema_, std::move(entry)); + indexed_ = false; + return {}; +} + +Result>> EqualityDeletes::Filter( + int64_t seq, const DataFile& data_file) { + IndexIfNeeded(); + + size_t start = FindStartIndex(seqs_, seq); + if (start >= files_.size()) { + return {}; + } + + std::vector> result; + result.reserve(files_.size() - start); + for (size_t i = start; i < files_.size(); ++i) { + const auto& delete_file = files_[i]; + ICEBERG_ASSIGN_OR_RAISE(bool may_contain, + CanContainEqDeletesForFile(data_file, delete_file)); + if (may_contain) { + result.push_back(delete_file.wrapped.data_file); + } + } + + return result; +} + +std::vector> EqualityDeletes::ReferencedDeleteFiles() { + IndexIfNeeded(); + return std::ranges::transform_view( + files_, [](const auto& file) { return file.wrapped.data_file; }) | + std::ranges::to>>(); +} + +void EqualityDeletes::IndexIfNeeded() { + if (indexed_) { + return; + } + + // Sort by apply sequence number + std::ranges::sort(files_, [](const auto& a, const auto& b) { + return a.apply_sequence_number < b.apply_sequence_number; + }); + + // Build sequence number array for binary search + seqs_ = std::ranges::transform_view( + files_, [](const auto& file) { return file.apply_sequence_number; }) | + std::ranges::to>(); + + indexed_ = true; +} + +} // namespace internal + +// DeleteFileIndex implementation + +DeleteFileIndex::DeleteFileIndex( + std::unique_ptr global_deletes, + std::unique_ptr>> + eq_deletes_by_partition, + std::unique_ptr>> + pos_deletes_by_partition, + std::unique_ptr< + std::unordered_map>> + pos_deletes_by_path, + std::unique_ptr> dv_by_path) + : global_deletes_(std::move(global_deletes)), + eq_deletes_by_partition_(std::move(eq_deletes_by_partition)), + pos_deletes_by_partition_(std::move(pos_deletes_by_partition)), + pos_deletes_by_path_(std::move(pos_deletes_by_path)), + dv_by_path_(std::move(dv_by_path)) { + has_eq_deletes_ = (global_deletes_ && !global_deletes_->empty()) || + (eq_deletes_by_partition_ && !eq_deletes_by_partition_->empty()); + has_pos_deletes_ = (pos_deletes_by_partition_ && !pos_deletes_by_partition_->empty()) || + (pos_deletes_by_path_ && !pos_deletes_by_path_->empty()) || + (dv_by_path_ && !dv_by_path_->empty()); + is_empty_ = !has_eq_deletes_ && !has_pos_deletes_; +} + +DeleteFileIndex::~DeleteFileIndex() = default; +DeleteFileIndex::DeleteFileIndex(DeleteFileIndex&&) noexcept = default; +DeleteFileIndex& DeleteFileIndex::operator=(DeleteFileIndex&&) noexcept = default; + +bool DeleteFileIndex::empty() const { return is_empty_; } + +bool DeleteFileIndex::has_equality_deletes() const { return has_eq_deletes_; } + +bool DeleteFileIndex::has_position_deletes() const { return has_pos_deletes_; } + +std::vector> DeleteFileIndex::ReferencedDeleteFiles() const { + std::vector> result; + + if (global_deletes_) { + auto files = global_deletes_->ReferencedDeleteFiles(); + std::ranges::move(files, std::back_inserter(result)); + } + + if (eq_deletes_by_partition_) { + for (const auto& [_, deletes] : *eq_deletes_by_partition_) { + auto files = deletes->ReferencedDeleteFiles(); + std::ranges::move(files, std::back_inserter(result)); + } + } + + if (pos_deletes_by_partition_) { + for (const auto& [_, deletes] : *pos_deletes_by_partition_) { + auto files = deletes->ReferencedDeleteFiles(); + std::ranges::move(files, std::back_inserter(result)); + } + } + + if (pos_deletes_by_path_) { + for (auto& [_, deletes] : *pos_deletes_by_path_) { + auto files = deletes->ReferencedDeleteFiles(); + std::ranges::move(files, std::back_inserter(result)); + } + } + + if (dv_by_path_) { + for (const auto& [_, dv] : *dv_by_path_) { + result.push_back(dv.data_file); + } + } + + return result; +} + +Result>> DeleteFileIndex::ForEntry( + const ManifestEntry& entry) const { + ICEBERG_PRECHECK(entry.data_file != nullptr, "Manifest entry has null data file"); + ICEBERG_PRECHECK(entry.sequence_number.has_value(), + "Missing sequence number for data file: {}", + entry.data_file->file_path); + return ForDataFile(entry.sequence_number.value(), *entry.data_file); +} + +Result>> DeleteFileIndex::ForDataFile( + int64_t sequence_number, const DataFile& file) const { + if (is_empty_) { + return {}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto global, FindGlobalDeletes(sequence_number, file)); + ICEBERG_ASSIGN_OR_RAISE(auto eq_partition, + FindEqPartitionDeletes(sequence_number, file)); + ICEBERG_ASSIGN_OR_RAISE(auto dv, FindDV(sequence_number, file)); + + if (dv && global.empty() && eq_partition.empty()) { + return std::vector>{std::move(dv)}; + } + + std::vector> result; + result.reserve(global.size() + eq_partition.size() + 1); + + std::ranges::move(global, std::back_inserter(result)); + std::ranges::move(eq_partition, std::back_inserter(result)); + + if (dv) { + result.push_back(dv); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto pos_partition, + FindPosPartitionDeletes(sequence_number, file)); + ICEBERG_ASSIGN_OR_RAISE(auto pos_path, FindPathDeletes(sequence_number, file)); + std::ranges::move(pos_partition, std::back_inserter(result)); + std::ranges::move(pos_path, std::back_inserter(result)); + } + + return result; +} + +Result>> DeleteFileIndex::FindGlobalDeletes( + int64_t seq, const DataFile& data_file) const { + if (!global_deletes_) { + return {}; + } + return global_deletes_->Filter(seq, data_file); +} + +Result>> DeleteFileIndex::FindEqPartitionDeletes( + int64_t seq, const DataFile& data_file) const { + if (!eq_deletes_by_partition_) { + return {}; + } + + auto deletes = + eq_deletes_by_partition_->get(data_file.partition_spec_id, data_file.partition); + if (!deletes.has_value()) { + return {}; + } + return deletes->get()->Filter(seq, data_file); +} + +Result>> DeleteFileIndex::FindPosPartitionDeletes( + int64_t seq, const DataFile& data_file) const { + if (!pos_deletes_by_partition_) { + return {}; + } + + auto deletes = + pos_deletes_by_partition_->get(data_file.partition_spec_id, data_file.partition); + if (!deletes.has_value()) { + return {}; + } + + return deletes->get()->Filter(seq); +} + +Result>> DeleteFileIndex::FindPathDeletes( + int64_t seq, const DataFile& data_file) const { + if (!pos_deletes_by_path_) { + return {}; + } + + auto it = pos_deletes_by_path_->find(data_file.file_path); + if (it == pos_deletes_by_path_->end()) { + return {}; + } + + return it->second->Filter(seq); +} + +Result> DeleteFileIndex::FindDV( + int64_t seq, const DataFile& data_file) const { + if (!dv_by_path_) { + return nullptr; + } + + auto it = dv_by_path_->find(data_file.file_path); + if (it == dv_by_path_->end()) { + return nullptr; + } + + ICEBERG_CHECK(it->second.sequence_number.value() >= seq, + "DV data sequence number {} must be greater than or equal to data file " + "sequence number {}", + it->second.sequence_number.value(), seq); + + return it->second.data_file; +} + +Result DeleteFileIndex::BuilderFor( + std::shared_ptr io, std::vector delete_manifests) { + ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null"); + return Builder(std::move(io), std::move(delete_manifests)); +} + +// Builder implementation + +DeleteFileIndex::Builder::Builder(std::shared_ptr io, + std::vector delete_manifests) + : io_(std::move(io)), delete_manifests_(std::move(delete_manifests)) {} + +DeleteFileIndex::Builder::~Builder() = default; +DeleteFileIndex::Builder::Builder(Builder&&) noexcept = default; +DeleteFileIndex::Builder& DeleteFileIndex::Builder::operator=(Builder&&) noexcept = + default; + +DeleteFileIndex::Builder& DeleteFileIndex::Builder::SpecsById( + std::unordered_map> specs_by_id) { + specs_by_id_ = std::move(specs_by_id); + return *this; +} + +DeleteFileIndex::Builder& DeleteFileIndex::Builder::WithSchema( + std::shared_ptr schema) { + schema_ = std::move(schema); + return *this; +} + +DeleteFileIndex::Builder& DeleteFileIndex::Builder::AfterSequenceNumber(int64_t seq) { + min_sequence_number_ = seq; + return *this; +} + +DeleteFileIndex::Builder& DeleteFileIndex::Builder::DataFilter( + std::shared_ptr filter) { + if (data_filter_) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(data_filter_, + And::Make(data_filter_, std::move(filter))); + } else { + data_filter_ = std::move(filter); + } + return *this; +} + +DeleteFileIndex::Builder& DeleteFileIndex::Builder::PartitionFilter( + std::shared_ptr filter) { + if (partition_filter_) { + ICEBERG_BUILDER_ASSIGN_OR_RETURN(partition_filter_, + And::Make(partition_filter_, std::move(filter))); + } else { + partition_filter_ = std::move(filter); + } + return *this; +} + +DeleteFileIndex::Builder& DeleteFileIndex::Builder::FilterPartitions( + std::shared_ptr partition_set) { + partition_set_ = std::move(partition_set); + return *this; +} + +DeleteFileIndex::Builder& DeleteFileIndex::Builder::CaseSensitive(bool case_sensitive) { + case_sensitive_ = case_sensitive; + return *this; +} + +DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() { + ignore_residuals_ = true; + return *this; +} + +Result> DeleteFileIndex::Builder::LoadDeleteFiles() { + ICEBERG_PRECHECK(io_ != nullptr, "FileIO is required to load delete files"); + ICEBERG_PRECHECK(!specs_by_id_.empty(), + "Partition specs are required to load delete files"); + ICEBERG_PRECHECK(schema_ != nullptr, "Schema is required to load delete files"); + + // Build expression caches per spec ID + std::unordered_map> part_expr_cache; + std::unordered_map> eval_cache; + + auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_; + + // Filter and read manifests into manifest entries + std::vector files; + for (const auto& manifest : delete_manifests_) { + if (manifest.content != ManifestContent::kDeletes) { + continue; + } + if (!manifest.has_added_files() && !manifest.has_existing_files()) { + continue; + } + + const int32_t spec_id = manifest.partition_spec_id; + auto spec_iter = specs_by_id_.find(spec_id); + ICEBERG_CHECK(spec_iter != specs_by_id_.cend(), + "Partition spec ID {} not found when loading delete files", spec_id); + + const auto& spec = spec_iter->second; + + // Get or compute projected partition expression + if (!part_expr_cache.contains(spec_id) && data_filter_) { + auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_); + ICEBERG_ASSIGN_OR_RAISE(auto projected, projector->Project(data_filter_)); + part_expr_cache[spec_id] = std::move(projected); + } + + // Get or create manifest evaluator + if (!eval_cache.contains(spec_id)) { + auto filter = partition_filter_; + if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) { + if (filter) { + ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second)); + } else { + filter = it->second; + } + } + if (filter) { + ICEBERG_ASSIGN_OR_RAISE(auto evaluator, + ManifestEvaluator::MakePartitionFilter( + std::move(filter), spec, *schema_, case_sensitive_)); + eval_cache[spec_id] = std::move(evaluator); + } + } + + // Evaluate manifest against filter + if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) { + ICEBERG_ASSIGN_OR_RAISE(auto should_match, it->second->Evaluate(manifest)); + if (!should_match) { + continue; // Manifest doesn't match filter + } + } + + // Read manifest entries + ICEBERG_ASSIGN_OR_RAISE(auto reader, + ManifestReader::Make(manifest, io_, schema_, spec)); + + auto partition_filter = partition_filter_; + if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) { + if (partition_filter) { + ICEBERG_ASSIGN_OR_RAISE(partition_filter, + And::Make(partition_filter, it->second)); + } else { + partition_filter = it->second; + } + } + if (partition_filter) { + reader->FilterPartitions(std::move(partition_filter)); + } + if (partition_set_) { + reader->FilterPartitions(partition_set_); + } + reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats(); + + ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries()); + files.reserve(files.size() + entries.size()); + + for (auto& entry : entries) { + ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); + ICEBERG_CHECK(entry.sequence_number.has_value(), + "Missing sequence number for delete file: {}", + entry.data_file->file_path); + if (entry.sequence_number.value() > min_sequence_number_) { + files.emplace_back(std::move(entry)); + } + } + } + + return files; +} + +Status DeleteFileIndex::Builder::AddDV( + std::unordered_map& dv_by_path, ManifestEntry&& entry) { + ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); + ICEBERG_PRECHECK(entry.sequence_number.has_value(), "Missing sequence number for DV {}", + entry.data_file->file_path); + + const auto& path = entry.data_file->referenced_data_file; + ICEBERG_PRECHECK(path.has_value(), "DV must have a referenced data file"); + + std::string referenced_path = path.value(); + auto [it, inserted] = dv_by_path.emplace(referenced_path, std::move(entry)); + if (!inserted) { + return ValidationFailed("Can't index multiple DVs for {}", referenced_path); + } + return {}; +} + +Status DeleteFileIndex::Builder::AddPositionDelete( + std::unordered_map>& + deletes_by_path, + PartitionMap>& deletes_by_partition, + ManifestEntry&& entry) { + ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); + ICEBERG_PRECHECK(entry.sequence_number.has_value(), + "Missing sequence number for position delete {}", + entry.data_file->file_path); + + ICEBERG_ASSIGN_OR_RAISE(auto referenced_path, + ContentFileUtil::ReferencedDataFile(*entry.data_file)); + + if (referenced_path.has_value()) { + // File-scoped position delete + auto& deletes = deletes_by_path[referenced_path.value()]; + if (!deletes) { + deletes = std::make_unique(); + } + ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry))); + } else { + // Partition-scoped position delete + int32_t spec_id = entry.data_file->partition_spec_id; + const auto& partition = entry.data_file->partition; + + auto existing = deletes_by_partition.get(spec_id, partition); + if (existing.has_value()) { + ICEBERG_RETURN_UNEXPECTED(existing->get()->Add(std::move(entry))); + } else { + auto deletes = std::make_unique(); + ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry))); + deletes_by_partition.put(spec_id, partition, std::move(deletes)); + } + } + + return {}; +} + +Status DeleteFileIndex::Builder::AddEqualityDelete( + internal::EqualityDeletes& global_deletes, + PartitionMap>& deletes_by_partition, + ManifestEntry&& entry) { + ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); + ICEBERG_PRECHECK(entry.sequence_number.has_value(), + "Missing sequence number for equality delete {}", + entry.data_file->file_path); + + int32_t spec_id = entry.data_file->partition_spec_id; + + auto spec_it = specs_by_id_.find(spec_id); + if (spec_it == specs_by_id_.end()) { + return InvalidArgument("Unknown partition spec ID: {}", spec_id); + } + const auto& spec = spec_it->second; + + if (spec->fields().empty()) { + // Global equality delete for unpartitioned tables + ICEBERG_RETURN_UNEXPECTED(global_deletes.Add(std::move(entry))); + } else { + // Partition-scoped equality delete + const auto& partition = entry.data_file->partition; + + auto existing = deletes_by_partition.get(spec_id, partition); + if (existing.has_value()) { + ICEBERG_RETURN_UNEXPECTED(existing->get()->Add(std::move(entry))); + } else { + auto deletes = std::make_unique(*schema_); + ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry))); + deletes_by_partition.put(spec_id, partition, std::move(deletes)); + } + } + + return {}; +} + +Result> DeleteFileIndex::Builder::Build() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + ICEBERG_PRECHECK(schema_ != nullptr, "Schema is required to build DeleteFileIndex"); + + std::vector entries; + if (!delete_manifests_.empty()) { + ICEBERG_ASSIGN_OR_RAISE(entries, LoadDeleteFiles()); + } + + // Build index structures + auto global_deletes = std::make_unique(*schema_); + auto eq_deletes_by_partition = + std::make_unique>>(); + auto pos_deletes_by_partition = + std::make_unique>>(); + auto pos_deletes_by_path = std::make_unique< + std::unordered_map>>(); + auto dv_by_path = std::make_unique>(); + + for (auto& entry : entries) { + ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data file"); + + switch (entry.data_file->content) { + case DataFile::Content::kPositionDeletes: + if (ContentFileUtil::IsDV(*entry.data_file)) { + ICEBERG_RETURN_UNEXPECTED(AddDV(*dv_by_path, std::move(entry))); + } else { + ICEBERG_RETURN_UNEXPECTED(AddPositionDelete( + *pos_deletes_by_path, *pos_deletes_by_partition, std::move(entry))); + } + break; + + case DataFile::Content::kEqualityDeletes: + ICEBERG_RETURN_UNEXPECTED(AddEqualityDelete( + *global_deletes, *eq_deletes_by_partition, std::move(entry))); + break; + + default: + return NotSupported("Unsupported content type: {}", + static_cast(entry.data_file->content)); + } + } + + return std::unique_ptr(new DeleteFileIndex( + global_deletes->empty() ? nullptr : std::move(global_deletes), + eq_deletes_by_partition->empty() ? nullptr : std::move(eq_deletes_by_partition), + pos_deletes_by_partition->empty() ? nullptr : std::move(pos_deletes_by_partition), + pos_deletes_by_path->empty() ? nullptr : std::move(pos_deletes_by_path), + dv_by_path->empty() ? nullptr : std::move(dv_by_path))); +} + +} // namespace iceberg diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h new file mode 100644 index 000000000..2b97a4c0e --- /dev/null +++ b/src/iceberg/delete_file_index.h @@ -0,0 +1,406 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/delete_file_index.h +/// An index of delete files by sequence number. + +#include +#include +#include +#include +#include +#include +#include + +#include "iceberg/expression/literal.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/partition_value_util.h" + +namespace iceberg { + +namespace internal { + +/// \brief Wrapper for equality delete files that caches converted bounds. +struct ICEBERG_EXPORT EqualityDeleteFile { + const Schema* schema; + ManifestEntry wrapped; + int64_t apply_sequence_number; // = data_sequence_number - 1 + + // Lazily converted bounds for pruning + mutable std::unordered_map lower_bounds; + mutable std::unordered_map upper_bounds; + mutable bool bounds_converted = false; + + EqualityDeleteFile(const Schema* schema, ManifestEntry&& entry) + : schema(schema), + wrapped(std::move(entry)), + apply_sequence_number(wrapped.sequence_number.value() - 1) {} + + /// \brief Check if this delete file has both lower and upper bounds. + bool HasLowerAndUpperBounds() const { + return !wrapped.data_file->lower_bounds.empty() && + !wrapped.data_file->upper_bounds.empty(); + } + + /// \brief Get the lower bound for a field ID. + Result>> LowerBound( + int32_t id) const { + ICEBERG_RETURN_UNEXPECTED(ConvertBoundsIfNeeded()); + auto it = lower_bounds.find(id); + return it != lower_bounds.cend() ? std::make_optional(std::cref(it->second)) + : std::nullopt; + } + + /// \brief Get the upper bound for a field ID. + Result>> UpperBound( + int32_t id) const { + ICEBERG_RETURN_UNEXPECTED(ConvertBoundsIfNeeded()); + auto it = upper_bounds.find(id); + return it != upper_bounds.cend() ? std::make_optional(std::cref(it->second)) + : std::nullopt; + } + + private: + /// \brief Convert bounds from binary to Literal. Implemented in .cc file. + Status ConvertBoundsIfNeeded() const; +}; + +/// \brief Find the start index in a sorted array where all elements from that +/// index onward have sequence numbers >= the given sequence number. +inline size_t FindStartIndex(const std::vector& seqs, int64_t seq) { + auto it = std::ranges::lower_bound(seqs, seq); + return static_cast(std::ranges::distance(seqs.cbegin(), it)); +} + +/// \brief Check if two ranges overlap. +inline bool RangesOverlap(const Literal& data_lower, const Literal& data_upper, + const Literal& delete_lower, const Literal& delete_upper) { + if (data_lower > delete_upper) { + return false; + } + if (delete_lower > data_upper) { + return false; + } + return true; +} + +/// \brief Check if a value count map indicates all values are null. +inline bool AllNull(const std::map& null_counts, + const std::map& value_counts, int32_t field_id, + bool is_required) { + if (is_required) { + return false; + } + + auto null_it = null_counts.find(field_id); + auto value_it = value_counts.find(field_id); + if (null_it == null_counts.cend() || value_it == value_counts.cend()) { + return false; + } + + return null_it->second == value_it->second; +} + +/// \brief Check if all values are non-null. +inline bool AllNonNull(const std::map& null_counts, int32_t field_id, + bool is_required) { + if (is_required) { + return true; + } + + auto it = null_counts.find(field_id); + if (it == null_counts.cend()) { + return false; + } + + return it->second <= 0; +} + +/// \brief Check if the column contains any null values. +inline bool ContainsNull(const std::map& null_counts, int32_t field_id, + bool is_required) { + if (is_required) { + return false; + } + + auto it = null_counts.find(field_id); + if (it == null_counts.cend()) { + return true; // Unknown, assume may contain null + } + + return it->second > 0; +} + +/// \brief Check if an equality delete file can contain deletes for a data file. +ICEBERG_EXPORT Result CanContainEqDeletesForFile( + const DataFile& data_file, const EqualityDeleteFile& delete_file); + +/// \brief A group of position delete files sorted by the sequence number they apply to. +/// +/// Position delete files apply to data files with a sequence number <= the delete +/// file's data sequence number. +class ICEBERG_EXPORT PositionDeletes { + public: + PositionDeletes() = default; + + /// \brief Add a position delete file to this group. + [[nodiscard]] Status Add(ManifestEntry&& entry); + + /// \brief Returns all delete files with data_sequence_number >= the given sequence + /// number. + std::vector> Filter(int64_t seq); + + /// \brief Get all delete files in this group. + std::vector> ReferencedDeleteFiles(); + + /// \brief Check if this group is empty. + bool empty() const { return files_.empty(); } + + private: + void IndexIfNeeded(); + + std::vector files_; + std::vector seqs_; + bool indexed_ = false; +}; + +/// \brief A group of equality delete files sorted by apply sequence number. +/// +/// Equality deletes apply to data files with sequence number < the delete's +/// data sequence number (i.e., apply_sequence_number = data_sequence_number - 1). +class ICEBERG_EXPORT EqualityDeletes { + public: + explicit EqualityDeletes(const Schema& schema) : schema_(schema) {} + + /// \brief Add an equality delete file to this group. + [[nodiscard]] Status Add(ManifestEntry&& entry); + + /// \brief Filter equality deletes that apply to the given data file. + /// + /// Returns delete files where: + /// 1. apply_sequence_number >= the data file's sequence number + /// 2. The delete file's bounds may overlap with the data file + Result>> Filter(int64_t seq, + const DataFile& data_file); + + /// \brief Get all delete files in this group. + std::vector> ReferencedDeleteFiles(); + + /// \brief Check if this group is empty. + bool empty() const { return files_.empty(); } + + private: + void IndexIfNeeded(); + + const Schema& schema_; + std::vector files_; + std::vector seqs_; + bool indexed_ = false; +}; + +} // namespace internal + +/// \brief An index of delete files by sequence number. +/// +/// Use `DeleteFileIndex::Builder` to construct an index, and `ForDataFile()` +/// or `ForEntry()` to get the delete files to apply to a given data file. +/// +/// The index organizes delete files by: +/// - Global equality deletes (apply to all partitions) +/// - Partitioned equality deletes (apply to specific partitions) +/// - Partitioned position deletes (apply to specific partitions) +/// - File-scoped position deletes (apply to specific data files) +/// - Deletion vectors (DVs) that reference specific data files +class ICEBERG_EXPORT DeleteFileIndex { + public: + class Builder; + + ~DeleteFileIndex(); + + DeleteFileIndex(DeleteFileIndex&&) noexcept; + DeleteFileIndex& operator=(DeleteFileIndex&&) noexcept; + DeleteFileIndex(const DeleteFileIndex&) = delete; + DeleteFileIndex& operator=(const DeleteFileIndex&) = delete; + + /// \brief Check if this index is empty (has no delete files). + bool empty() const; + + /// \brief Check if this index has any equality delete files. + bool has_equality_deletes() const; + + /// \brief Check if this index has any position delete files. + bool has_position_deletes() const; + + /// \brief Get all delete files referenced by this index. + /// TODO(gangwu): use lazy iterator to avoid large memory allocation. + std::vector> ReferencedDeleteFiles() const; + + /// \brief Get the delete files that apply to a manifest entry. + /// + /// \param entry The manifest entry to find delete files for + /// \return Delete files that should be applied when reading the data file + Result>> ForEntry( + const ManifestEntry& entry) const; + + /// \brief Get the delete files that apply to a data file with a specific sequence + /// number. + /// + /// \param sequence_number The data sequence number of the data file + /// \param file The data file to find delete files for + /// \return Delete files that should be applied when reading the data file + Result>> ForDataFile(int64_t sequence_number, + const DataFile& file) const; + + /// \brief Create a builder for constructing a DeleteFileIndex from manifest files. + /// + /// \param io The FileIO to use for reading manifests + /// \param delete_manifests The delete manifests to index + /// \return A Builder instance + static Result BuilderFor(std::shared_ptr io, + std::vector delete_manifests); + + private: + friend class Builder; + + // Private constructor used by Builder + DeleteFileIndex( + std::unique_ptr global_deletes, + std::unique_ptr>> + eq_deletes_by_partition, + std::unique_ptr>> + pos_deletes_by_partition, + std::unique_ptr< + std::unordered_map>> + pos_deletes_by_path, + std::unique_ptr> dv_by_path); + + // Helper methods for finding delete files + Result>> FindGlobalDeletes( + int64_t seq, const DataFile& data_file) const; + Result>> FindEqPartitionDeletes( + int64_t seq, const DataFile& data_file) const; + Result>> FindPosPartitionDeletes( + int64_t seq, const DataFile& data_file) const; + Result>> FindPathDeletes( + int64_t seq, const DataFile& data_file) const; + Result> FindDV(int64_t seq, const DataFile& data_file) const; + + // Index data structures + std::unique_ptr global_deletes_; + std::unique_ptr>> + eq_deletes_by_partition_; + std::unique_ptr>> + pos_deletes_by_partition_; + std::unique_ptr< + std::unordered_map>> + pos_deletes_by_path_; + std::unique_ptr> dv_by_path_; + + bool has_eq_deletes_ = false; + bool has_pos_deletes_ = false; + bool is_empty_ = true; +}; + +class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector { + public: + /// \brief Construct a builder from manifest files. + Builder(std::shared_ptr io, std::vector delete_manifests); + + ~Builder() override; + + Builder(Builder&&) noexcept; + Builder& operator=(Builder&&) noexcept; + Builder(const Builder&) = delete; + Builder& operator=(const Builder&) = delete; + + /// \brief Set the partition specs by ID. + Builder& SpecsById( + std::unordered_map> specs_by_id); + + /// \brief Set the table schema. + /// + /// Required for filtering and expression evaluation. + Builder& WithSchema(std::shared_ptr schema); + + /// \brief Set the minimum sequence number for delete files. + /// + /// Only delete files with sequence number > min_sequence_number will be included. + Builder& AfterSequenceNumber(int64_t seq); + + /// \brief Set a row-level data filter. + /// + /// This filter is projected to partition expressions for manifest pruning and + /// then residuals are applied to data files. + Builder& DataFilter(std::shared_ptr filter); + + /// \brief Set a partition filter expression. + Builder& PartitionFilter(std::shared_ptr filter); + + /// \brief Set a partition set to filter manifests. + Builder& FilterPartitions(std::shared_ptr partition_set); + + /// \brief Set case sensitivity for column name matching. + Builder& CaseSensitive(bool case_sensitive); + + /// \brief Ignore residual expressions after partition filtering. + Builder& IgnoreResiduals(); + + /// \brief Build the DeleteFileIndex. + Result> Build(); + + private: + // Load delete files from manifests + Result> LoadDeleteFiles(); + + // Add a DV to the index + Status AddDV(std::unordered_map& dv_by_path, + ManifestEntry&& entry); + + // Add a position delete file to the index + Status AddPositionDelete( + std::unordered_map>& + deletes_by_path, + PartitionMap>& deletes_by_partition, + ManifestEntry&& entry); + + // Add an equality delete file to the index + Status AddEqualityDelete( + internal::EqualityDeletes& global_deletes, + PartitionMap>& deletes_by_partition, + ManifestEntry&& entry); + + std::shared_ptr io_; + std::vector delete_manifests_; + int64_t min_sequence_number_ = 0; + std::unordered_map> specs_by_id_; + std::shared_ptr schema_; + std::shared_ptr data_filter_; + std::shared_ptr partition_filter_; + std::shared_ptr partition_set_; + bool case_sensitive_ = true; + bool ignore_residuals_ = false; +}; + +} // namespace iceberg diff --git a/src/iceberg/manifest/manifest_reader.cc b/src/iceberg/manifest/manifest_reader.cc index 2ba377fa2..2dd22b02a 100644 --- a/src/iceberg/manifest/manifest_reader.cc +++ b/src/iceberg/manifest/manifest_reader.cc @@ -39,6 +39,7 @@ #include "iceberg/schema_field.h" #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" +#include "iceberg/util/content_file_util.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -557,9 +558,9 @@ Result> ParseManifestEntry( return manifest_entries; } -const std::vector kStatsColumns = {"value_counts", "null_value_counts", - "nan_value_counts", "lower_bounds", - "upper_bounds", "record_count"}; +const std::vector kStatsColumns = { + "value_counts", "null_value_counts", "nan_value_counts", "lower_bounds", + "upper_bounds", "column_sizes", "record_count"}; bool RequireStatsProjection(const std::shared_ptr& row_filter, const std::vector& columns) { @@ -592,6 +593,29 @@ Result> ProjectSchema(std::shared_ptr schema, } // namespace +bool ManifestReader::ShouldDropStats(const std::vector& columns) { + // Make sure we only drop all stats if we had projected all stats. + // We do not drop stats even if we had partially added some stats columns, except for + // record_count column. + // Since we don't want to keep stats map which could be huge in size just because we + // select record_count, which is a primitive type. + if (!columns.empty()) { + const std::unordered_set selected(columns.cbegin(), columns.cend()); + if (selected.contains(ManifestReader::kAllColumns)) { + return false; + } + std::unordered_set intersection; + for (const auto& col : kStatsColumns) { + if (selected.contains(col)) { + intersection.insert(col); + } + } + return intersection.empty() || + (intersection.size() == 1 && intersection.contains("record_count")); + } + return false; +} + std::vector ManifestReader::WithStatsColumns( const std::vector& columns) { if (std::ranges::contains(columns, ManifestReader::kAllColumns)) { @@ -645,6 +669,11 @@ ManifestReader& ManifestReaderImpl::CaseSensitive(bool case_sensitive) { return *this; } +ManifestReader& ManifestReaderImpl::TryDropStats() { + drop_stats_ = true; + return *this; +} + bool ManifestReaderImpl::HasPartitionFilter() const { ICEBERG_DCHECK(part_filter_, "Partition filter is not set"); return part_filter_->op() != Expression::Operation::kTrue; @@ -773,6 +802,8 @@ Result> ManifestReaderImpl::ReadEntries(bool only_liv ICEBERG_ASSIGN_OR_RAISE(metrics_evaluator, GetMetricsEvaluator()); } + bool drop_stats = drop_stats_ && ShouldDropStats(columns_); + while (true) { ICEBERG_ASSIGN_OR_RAISE(auto result, file_reader_->Next()); if (!result.has_value()) { @@ -812,6 +843,10 @@ Result> ManifestReaderImpl::ReadEntries(bool only_liv } } + if (drop_stats) { + ContentFileUtil::DropStats(*entry.data_file); + } + manifest_entries.push_back(std::move(entry)); } } diff --git a/src/iceberg/manifest/manifest_reader.h b/src/iceberg/manifest/manifest_reader.h index a35c1fb94..ffe1ba98c 100644 --- a/src/iceberg/manifest/manifest_reader.h +++ b/src/iceberg/manifest/manifest_reader.h @@ -74,6 +74,15 @@ class ICEBERG_EXPORT ManifestReader { /// \brief Set case sensitivity for column name matching. virtual ManifestReader& CaseSensitive(bool case_sensitive) = 0; + /// \brief Try to drop stats from returned DataFile objects. + virtual ManifestReader& TryDropStats() = 0; + + /// \brief Determine whether stats should be dropped based on selected columns. + /// + /// Returns true if the selected columns do not include any stats columns, or only + /// include record_count (which is a primitive, not a large map). + static bool ShouldDropStats(const std::vector& columns); + /// \brief Creates a reader for a manifest file. /// \param manifest A ManifestFile object containing metadata about the manifest. /// \param file_io File IO implementation to use. diff --git a/src/iceberg/manifest/manifest_reader_internal.h b/src/iceberg/manifest/manifest_reader_internal.h index ba804ffbf..42263808d 100644 --- a/src/iceberg/manifest/manifest_reader_internal.h +++ b/src/iceberg/manifest/manifest_reader_internal.h @@ -74,6 +74,8 @@ class ManifestReaderImpl : public ManifestReader { ManifestReader& CaseSensitive(bool case_sensitive) override; + ManifestReader& TryDropStats() override; + private: /// \brief Read entries with optional live-only filtering. Result> ReadEntries(bool only_live); @@ -111,6 +113,7 @@ class ManifestReaderImpl : public ManifestReader { std::shared_ptr row_filter_{True::Instance()}; std::shared_ptr partition_set_; bool case_sensitive_{true}; + bool drop_stats_{false}; // Lazy fields std::unique_ptr file_reader_; diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 7c1011fc5..56fc737f7 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -42,6 +42,7 @@ iceberg_include_dir = include_directories('..') iceberg_sources = files( 'arrow_c_data_guard_internal.cc', 'catalog/memory/in_memory_catalog.cc', + 'delete_file_index.cc', 'expression/aggregate.cc', 'expression/binder.cc', 'expression/evaluator.cc', @@ -101,6 +102,7 @@ iceberg_sources = files( 'update/update_properties.cc', 'update/update_sort_order.cc', 'util/bucket_util.cc', + 'util/content_file_util.cc', 'util/conversions.cc', 'util/decimal.cc', 'util/gzip_internal.cc', diff --git a/src/iceberg/metadata_columns.h b/src/iceberg/metadata_columns.h index 18aeb2b8f..61f07c488 100644 --- a/src/iceberg/metadata_columns.h +++ b/src/iceberg/metadata_columns.h @@ -39,19 +39,22 @@ struct ICEBERG_EXPORT MetadataColumns { constexpr static int32_t kInt32Max = std::numeric_limits::max(); // IDs kInt32Max - (1-100) are used for metadata columns - inline static const SchemaField kFilePath = - SchemaField::MakeRequired(kInt32Max - 1, "_file", iceberg::string(), - "Path of the file in which a row is stored"); + constexpr static int32_t kFilePathColumnId = kInt32Max - 1; + inline static const SchemaField kFilePath = SchemaField::MakeRequired( + kFilePathColumnId, "_file", string(), "Path of the file in which a row is stored"); + constexpr static int32_t kFilePositionColumnId = kInt32Max - 2; inline static const SchemaField kRowPosition = - SchemaField::MakeRequired(kInt32Max - 2, "_pos", iceberg::int64(), + SchemaField::MakeRequired(kFilePositionColumnId, "_pos", int64(), "Ordinal position of a row in the source data file"); + constexpr static int32_t kIsDeletedColumnId = kInt32Max - 3; inline static const SchemaField kIsDeleted = SchemaField::MakeRequired( - kInt32Max - 3, "_deleted", iceberg::binary(), "Whether the row has been deleted"); + kIsDeletedColumnId, "_deleted", binary(), "Whether the row has been deleted"); + constexpr static int32_t kSpecIdColumnId = kInt32Max - 4; inline static const SchemaField kSpecId = - SchemaField::MakeRequired(kInt32Max - 4, "_spec_id", iceberg::int32(), + SchemaField::MakeRequired(kSpecIdColumnId, "_spec_id", int32(), "Spec ID used to track the file containing a row"); // The partition column type depends on all specs in the table @@ -64,35 +67,41 @@ struct ICEBERG_EXPORT MetadataColumns { constexpr static int32_t kContentSizeInBytesColumnId = kInt32Max - 7; // IDs kInt32Max - (101-200) are used for reserved columns + constexpr static int32_t kDeleteFilePathColumnId = kInt32Max - 101; inline static const SchemaField kDeleteFilePath = - SchemaField::MakeRequired(kInt32Max - 101, "file_path", iceberg::string(), + SchemaField::MakeRequired(kDeleteFilePathColumnId, "file_path", string(), "Path of a file in which a deleted row is stored"); + constexpr static int32_t kDeleteFilePosColumnId = kInt32Max - 102; inline static const SchemaField kDeleteFilePos = - SchemaField::MakeRequired(kInt32Max - 102, "pos", iceberg::int64(), + SchemaField::MakeRequired(kDeleteFilePosColumnId, "pos", int64(), "Ordinal position of a deleted row in the data file"); // The row column type depends on the table schema - constexpr static int32_t kDeleteFileRowFieldId = kInt32Max - 103; + constexpr static int32_t kDeleteFileRowColumnId = kInt32Max - 103; constexpr static std::string_view kDeleteFileRowFieldName = "row"; constexpr static std::string_view kDeleteFileRowDoc = "Deleted row values"; + constexpr static int32_t kChangeTypeColumnId = kInt32Max - 104; inline static const SchemaField kChangeType = SchemaField::MakeRequired( - kInt32Max - 104, "_change_type", iceberg::string(), "Record type in changelog"); + kChangeTypeColumnId, "_change_type", string(), "Record type in changelog"); - inline static const SchemaField kChangeOrdinal = - SchemaField::MakeOptional(kInt32Max - 105, "_change_ordinal", iceberg::int32(), - "Change ordinal in changelog"); + constexpr static int32_t kChangeOrdinalColumnId = kInt32Max - 105; + inline static const SchemaField kChangeOrdinal = SchemaField::MakeOptional( + kChangeOrdinalColumnId, "_change_ordinal", int32(), "Change ordinal in changelog"); + constexpr static int32_t kCommitSnapshotIdColumnId = kInt32Max - 106; inline static const SchemaField kCommitSnapshotId = SchemaField::MakeOptional( - kInt32Max - 106, "_commit_snapshot_id", iceberg::int64(), "Commit snapshot ID"); + kCommitSnapshotIdColumnId, "_commit_snapshot_id", int64(), "Commit snapshot ID"); + constexpr static int32_t kRowIdColumnId = kInt32Max - 107; inline static const SchemaField kRowId = - SchemaField::MakeOptional(kInt32Max - 107, "_row_id", iceberg::int64(), + SchemaField::MakeOptional(kRowIdColumnId, "_row_id", int64(), "Implicit row ID that is automatically assigned"); + constexpr static int32_t kLastUpdatedSequenceNumberColumnId = kInt32Max - 108; inline static const SchemaField kLastUpdatedSequenceNumber = SchemaField::MakeOptional( - kInt32Max - 108, "_last_updated_sequence_number", iceberg::int64(), + kLastUpdatedSequenceNumberColumnId, "_last_updated_sequence_number", int64(), "Sequence number when the row was last updated"); /// \brief Get the set of metadata field IDs. diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 7e71310de..f2e50d8aa 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -120,11 +120,7 @@ if(ICEBERG_BUILD_BUNDLE) avro_data_test.cc avro_test.cc avro_schema_test.cc - avro_stream_test.cc - manifest_list_versions_test.cc - manifest_reader_stats_test.cc - manifest_reader_test.cc - manifest_writer_versions_test.cc) + avro_stream_test.cc) add_iceberg_test(arrow_test USE_BUNDLE @@ -143,6 +139,15 @@ if(ICEBERG_BUILD_BUNDLE) eval_expr_test.cc evaluator_test.cc) + add_iceberg_test(manifest_test + USE_BUNDLE + SOURCES + delete_file_index_test.cc + manifest_list_versions_test.cc + manifest_reader_stats_test.cc + manifest_reader_test.cc + manifest_writer_versions_test.cc) + add_iceberg_test(parquet_test USE_BUNDLE SOURCES diff --git a/src/iceberg/test/delete_file_index_test.cc b/src/iceberg/test/delete_file_index_test.cc new file mode 100644 index 000000000..69b6ce46e --- /dev/null +++ b/src/iceberg/test/delete_file_index_test.cc @@ -0,0 +1,1176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/delete_file_index.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "iceberg/arrow/arrow_file_io.h" +#include "iceberg/avro/avro_register.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/manifest/manifest_list.h" +#include "iceberg/manifest/manifest_reader.h" +#include "iceberg/manifest/manifest_writer.h" +#include "iceberg/partition_spec.h" +#include "iceberg/schema.h" +#include "iceberg/test/matchers.h" +#include "iceberg/transform.h" +#include "iceberg/type.h" + +namespace iceberg { + +class DeleteFileIndexTest : public testing::TestWithParam { + protected: + void SetUp() override { + avro::RegisterAll(); + + file_io_ = arrow::MakeMockFileIO(); + + // Schema with id and data fields + schema_ = std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/1, "id", int32()), + SchemaField::MakeRequired(/*field_id=*/2, "data", string())}); + + // Partitioned spec: bucket by data + ICEBERG_UNWRAP_OR_FAIL( + partitioned_spec_, + PartitionSpec::Make( + /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000, + "data_bucket", Transform::Bucket(16))})); + + // Unpartitioned spec + unpartitioned_spec_ = PartitionSpec::Unpartitioned(); + + // Create sample data files + file_a_ = MakeDataFile("/path/to/data-a.parquet", PartitionValues({Literal::Int(0)}), + partitioned_spec_->spec_id()); + file_b_ = MakeDataFile("/path/to/data-b.parquet", PartitionValues({Literal::Int(1)}), + partitioned_spec_->spec_id()); + file_c_ = MakeDataFile("/path/to/data-c.parquet", PartitionValues({Literal::Int(2)}), + partitioned_spec_->spec_id()); + unpartitioned_file_ = MakeDataFile("/path/to/data-unpartitioned.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + } + + std::string MakeManifestPath() { + static int counter = 0; + return std::format("manifest-{}-{}.avro", counter++, + std::chrono::system_clock::now().time_since_epoch().count()); + } + + std::shared_ptr MakeDataFile(const std::string& path, + const PartitionValues& partition, + int32_t spec_id, int64_t record_count = 1) { + return std::make_shared(DataFile{ + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = record_count, + .file_size_in_bytes = 10, + .sort_order_id = 0, + .partition_spec_id = spec_id, + }); + } + + std::shared_ptr MakePositionDeleteFile( + const std::string& path, const PartitionValues& partition, int32_t spec_id, + std::optional referenced_file = std::nullopt) { + return std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = 1, + .file_size_in_bytes = 10, + .partition_spec_id = spec_id, + .referenced_data_file = referenced_file, + }); + } + + std::shared_ptr MakeEqualityDeleteFile(const std::string& path, + const PartitionValues& partition, + int32_t spec_id, + std::vector equality_ids = {1}) { + return std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = path, + .file_format = FileFormatType::kParquet, + .partition = partition, + .record_count = 1, + .file_size_in_bytes = 10, + .equality_ids = std::move(equality_ids), + .partition_spec_id = spec_id, + }); + } + + std::shared_ptr MakeDV(const std::string& path, + const PartitionValues& partition, int32_t spec_id, + const std::string& referenced_file, + int64_t content_offset = 4L, + int64_t content_size = 6L) { + return std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = path, + .file_format = FileFormatType::kPuffin, + .partition = partition, + .record_count = 1, + .file_size_in_bytes = 10, + .partition_spec_id = spec_id, + .referenced_data_file = referenced_file, + .content_offset = content_offset, + .content_size_in_bytes = content_size, + }); + } + + ManifestEntry MakeDeleteEntry(int64_t snapshot_id, int64_t sequence_number, + std::shared_ptr file, + ManifestStatus status = ManifestStatus::kAdded) { + return ManifestEntry{ + .status = status, + .snapshot_id = snapshot_id, + .sequence_number = sequence_number, + .file_sequence_number = sequence_number, + .data_file = std::move(file), + }; + } + + ManifestFile WriteDeleteManifest(int format_version, int64_t snapshot_id, + std::vector entries, + std::shared_ptr spec) { + const std::string manifest_path = MakeManifestPath(); + + Result> writer_result = + NotSupported("Format version: {}", format_version); + + if (format_version == 2) { + writer_result = ManifestWriter::MakeV2Writer( + snapshot_id, manifest_path, file_io_, spec, schema_, ManifestContent::kDeletes); + } else if (format_version == 3) { + writer_result = ManifestWriter::MakeV3Writer( + snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_, spec, + schema_, ManifestContent::kDeletes); + } + + EXPECT_THAT(writer_result, IsOk()); + auto writer = std::move(writer_result.value()); + + for (const auto& entry : entries) { + EXPECT_THAT(writer->WriteEntry(entry), IsOk()); + } + + EXPECT_THAT(writer->Close(), IsOk()); + auto manifest_result = writer->ToManifestFile(); + EXPECT_THAT(manifest_result, IsOk()); + return std::move(manifest_result.value()); + } + + std::unordered_map> GetSpecsById() { + return {{partitioned_spec_->spec_id(), partitioned_spec_}, + {unpartitioned_spec_->spec_id(), unpartitioned_spec_}}; + } + + Result> BuildIndex( + std::vector delete_manifests, + std::optional after_sequence_number = std::nullopt) { + ICEBERG_ASSIGN_OR_RAISE( + auto builder, DeleteFileIndex::BuilderFor(file_io_, std::move(delete_manifests))); + builder.SpecsById(GetSpecsById()).WithSchema(schema_); + if (after_sequence_number.has_value()) { + builder.AfterSequenceNumber(after_sequence_number.value()); + } + return builder.Build(); + } + + std::shared_ptr file_io_; + std::shared_ptr schema_; + std::shared_ptr partitioned_spec_; + std::shared_ptr unpartitioned_spec_; + + std::shared_ptr file_a_; + std::shared_ptr file_b_; + std::shared_ptr file_c_; + std::shared_ptr unpartitioned_file_; + + // Helper to extract paths from delete files for comparison + static std::vector GetPaths( + const std::vector>& files) { + return std::ranges::transform_view(files, + [](const auto& f) { return f->file_path; }) | + std::ranges::to>(); + } +}; + +TEST_P(DeleteFileIndexTest, TestEmptyIndex) { + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({})); + + EXPECT_TRUE(index->empty()); + EXPECT_FALSE(index->has_equality_deletes()); + EXPECT_FALSE(index->has_position_deletes()); + + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_)); + EXPECT_TRUE(deletes.empty()); +} + +TEST_P(DeleteFileIndexTest, TestMinSequenceNumberFilteringForFiles) { + int version = GetParam(); + + auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4, eq_delete_1)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, eq_delete_2)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + unpartitioned_spec_); + + // Build index with afterSequenceNumber = 4 + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}, /*after_sequence_number=*/4)); + + EXPECT_TRUE(index->has_equality_deletes()); + EXPECT_FALSE(index->has_position_deletes()); + + // Only delete file with seq > 4 should be included (seq=6) + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *unpartitioned_file_)); + EXPECT_EQ(deletes.size(), 1); + EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete-2.parquet"); +} + +TEST_P(DeleteFileIndexTest, TestUnpartitionedDeletes) { + int version = GetParam(); + + auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + auto pos_delete_1 = MakePositionDeleteFile("/path/to/pos-delete-1.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + auto pos_delete_2 = MakePositionDeleteFile("/path/to/pos-delete-2.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4, eq_delete_1)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, eq_delete_2)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/5, pos_delete_1)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, pos_delete_2)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + unpartitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + EXPECT_TRUE(index->has_equality_deletes()); + EXPECT_TRUE(index->has_position_deletes()); + + // All deletes should apply to seq 0 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *unpartitioned_file_)); + EXPECT_EQ(deletes.size(), 4); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre( + "/path/to/eq-delete-1.parquet", "/path/to/eq-delete-2.parquet", + "/path/to/pos-delete-1.parquet", "/path/to/pos-delete-2.parquet")); + } + + // All deletes should apply to seq 3 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(3, *unpartitioned_file_)); + EXPECT_EQ(deletes.size(), 4); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre( + "/path/to/eq-delete-1.parquet", "/path/to/eq-delete-2.parquet", + "/path/to/pos-delete-1.parquet", "/path/to/pos-delete-2.parquet")); + } + + // Last 3 deletes should apply to seq 4 (eq_delete_2, pos_delete_1, pos_delete_2) + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(4, *unpartitioned_file_)); + EXPECT_EQ(deletes.size(), 3); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet", + "/path/to/pos-delete-1.parquet", + "/path/to/pos-delete-2.parquet")); + } + + // Last 3 deletes should apply to seq 5 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(5, *unpartitioned_file_)); + EXPECT_EQ(deletes.size(), 3); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet", + "/path/to/pos-delete-1.parquet", + "/path/to/pos-delete-2.parquet")); + } + + // Last delete should apply to seq 6 (only pos_delete_2) + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(6, *unpartitioned_file_)); + EXPECT_EQ(deletes.size(), 1); + EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete-2.parquet"); + } + + // No deletes should apply to seq 7 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(7, *unpartitioned_file_)); + EXPECT_TRUE(deletes.empty()); + } + + // Global equality deletes should apply to a partitioned file + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_)); + // Only equality deletes are global, position deletes are not + EXPECT_EQ(deletes.size(), 2); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre("/path/to/eq-delete-1.parquet", + "/path/to/eq-delete-2.parquet")); + } +} + +TEST_P(DeleteFileIndexTest, TestPartitionedDeleteIndex) { + int version = GetParam(); + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet", partition_a, + partitioned_spec_->spec_id()); + auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet", partition_a, + partitioned_spec_->spec_id()); + auto pos_delete_1 = MakePositionDeleteFile("/path/to/pos-delete-1.parquet", partition_a, + partitioned_spec_->spec_id()); + auto pos_delete_2 = MakePositionDeleteFile("/path/to/pos-delete-2.parquet", partition_a, + partitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4, eq_delete_1)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, eq_delete_2)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/5, pos_delete_1)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, pos_delete_2)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + EXPECT_TRUE(index->has_equality_deletes()); + EXPECT_TRUE(index->has_position_deletes()); + + // All deletes should apply to file_a_ at seq 0 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_)); + EXPECT_EQ(deletes.size(), 4); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre( + "/path/to/eq-delete-1.parquet", "/path/to/eq-delete-2.parquet", + "/path/to/pos-delete-1.parquet", "/path/to/pos-delete-2.parquet")); + } + + // All deletes should apply to file_a_ at seq 3 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(3, *file_a_)); + EXPECT_EQ(deletes.size(), 4); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre( + "/path/to/eq-delete-1.parquet", "/path/to/eq-delete-2.parquet", + "/path/to/pos-delete-1.parquet", "/path/to/pos-delete-2.parquet")); + } + + // Last 3 deletes should apply to file_a_ at seq 4 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(4, *file_a_)); + EXPECT_EQ(deletes.size(), 3); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet", + "/path/to/pos-delete-1.parquet", + "/path/to/pos-delete-2.parquet")); + } + + // Last 3 deletes should apply to file_a_ at seq 5 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(5, *file_a_)); + EXPECT_EQ(deletes.size(), 3); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet", + "/path/to/pos-delete-1.parquet", + "/path/to/pos-delete-2.parquet")); + } + + // Last delete should apply to file_a_ at seq 6 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(6, *file_a_)); + EXPECT_EQ(deletes.size(), 1); + EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete-2.parquet"); + } + + // No deletes should apply to file_a_ at seq 7 + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(7, *file_a_)); + EXPECT_TRUE(deletes.empty()); + } + + // No deletes should apply to file_b_ (different partition) + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_b_)); + EXPECT_TRUE(deletes.empty()); + } + + // No deletes should apply to file_c_ (different partition) + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_c_)); + EXPECT_TRUE(deletes.empty()); + } + + // No deletes should apply to unpartitioned file (different spec) + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *unpartitioned_file_)); + EXPECT_TRUE(deletes.empty()); + } +} + +TEST_P(DeleteFileIndexTest, TestPartitionedTableWithPartitionPosDeletes) { + int version = GetParam(); + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, pos_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + EXPECT_FALSE(index->has_equality_deletes()); + EXPECT_TRUE(index->has_position_deletes()); + + // Position delete should apply to file_a_ at seq 1 + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_)); + EXPECT_EQ(deletes.size(), 1); + EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete.parquet"); +} + +TEST_P(DeleteFileIndexTest, TestPartitionedTableWithPartitionEqDeletes) { + int version = GetParam(); + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, eq_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + EXPECT_TRUE(index->has_equality_deletes()); + EXPECT_FALSE(index->has_position_deletes()); + + // Equality delete should apply to file_a_ at seq 1 + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_)); + EXPECT_EQ(deletes.size(), 1); + EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete.parquet"); +} + +TEST_P(DeleteFileIndexTest, TestPartitionedTableWithUnrelatedPartitionDeletes) { + int version = GetParam(); + + // Create deletes for partition A + auto partition_a = PartitionValues({Literal::Int(0)}); + auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, pos_delete)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, eq_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + // No deletes should apply to file_b_ (different partition) + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_b_)); + EXPECT_TRUE(deletes.empty()); +} + +TEST_P(DeleteFileIndexTest, TestPartitionedTableWithOlderPartitionDeletes) { + int version = GetParam(); + if (version >= 3) { + GTEST_SKIP() << "DVs are not filtered using sequence numbers in V3+"; + } + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + + // Delete files have sequence number 1 + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, pos_delete)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, eq_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + // Data file with sequence number 2 should not have any deletes applied + // (deletes were committed before the data file) + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(2, *file_a_)); + EXPECT_TRUE(deletes.empty()); +} + +TEST_P(DeleteFileIndexTest, TestPartitionedTableScanWithGlobalDeletes) { + int version = GetParam(); + if (version >= 3) { + GTEST_SKIP() << "Different behavior for position deletes in V3"; + } + + // Create unpartitioned equality and position deletes + auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, eq_delete)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, pos_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + unpartitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + // Only global equality deletes should apply to partitioned file_a_ + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_)); + EXPECT_EQ(deletes.size(), 1); + EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete.parquet"); +} + +TEST_P(DeleteFileIndexTest, TestPartitionedTableScanWithGlobalAndPartitionDeletes) { + int version = GetParam(); + if (version >= 3) { + GTEST_SKIP() << "Different behavior for position deletes in V3"; + } + + // Create partition-scoped equality delete + auto partition_a = PartitionValues({Literal::Int(0)}); + auto partition_eq_delete = MakeEqualityDeleteFile( + "/path/to/partition-eq-delete.parquet", partition_a, partitioned_spec_->spec_id()); + + std::vector partition_entries; + partition_entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, partition_eq_delete)); + + auto partition_manifest = WriteDeleteManifest( + version, /*snapshot_id=*/1000L, std::move(partition_entries), partitioned_spec_); + + // Create unpartitioned equality and position deletes + auto global_eq_delete = MakeEqualityDeleteFile("/path/to/global-eq-delete.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + auto global_pos_delete = MakePositionDeleteFile("/path/to/global-pos-delete.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + + std::vector global_entries; + global_entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1001L, /*sequence_number=*/3, global_eq_delete)); + global_entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1001L, /*sequence_number=*/3, global_pos_delete)); + + auto global_manifest = WriteDeleteManifest( + version, /*snapshot_id=*/1001L, std::move(global_entries), unpartitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({partition_manifest, global_manifest})); + + // Both partition-scoped and global equality deletes should apply to file_a_ + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_)); + EXPECT_EQ(deletes.size(), 2); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre("/path/to/partition-eq-delete.parquet", + "/path/to/global-eq-delete.parquet")); +} + +TEST_P(DeleteFileIndexTest, TestPartitionedTableSequenceNumbers) { + int version = GetParam(); + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + + // Both data and deletes have same sequence number (same commit) + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, eq_delete)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, pos_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + // Data file with sequence number 1 should only have position deletes applied + // (equality deletes apply to data with seq < delete seq) + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_)); + EXPECT_EQ(deletes.size(), 1); + EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete.parquet"); +} + +TEST_P(DeleteFileIndexTest, TestUnpartitionedTableSequenceNumbers) { + int version = GetParam(); + if (version >= 3) { + GTEST_SKIP() << "Different behavior in V3"; + } + + auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + + // Both have same sequence number + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, eq_delete)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, pos_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + unpartitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + // Data file with sequence number 1 should only have position deletes applied + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *unpartitioned_file_)); + EXPECT_EQ(deletes.size(), 1); + EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete.parquet"); +} + +TEST_P(DeleteFileIndexTest, TestPositionDeletesGroup) { + internal::PositionDeletes group; + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto file1 = MakePositionDeleteFile("/path/to/pos-delete-1.parquet", partition_a, + partitioned_spec_->spec_id()); + auto file2 = MakePositionDeleteFile("/path/to/pos-delete-2.parquet", partition_a, + partitioned_spec_->spec_id()); + auto file3 = MakePositionDeleteFile("/path/to/pos-delete-3.parquet", partition_a, + partitioned_spec_->spec_id()); + auto file4 = MakePositionDeleteFile("/path/to/pos-delete-4.parquet", partition_a, + partitioned_spec_->spec_id()); + + // Add files out of order + EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 4, file4)), IsOk()); + EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 2, file2)), IsOk()); + EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 1, file1)), IsOk()); + EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 3, file3)), IsOk()); + + // Group must not be empty + EXPECT_FALSE(group.empty()); + + // All files must be reported as referenced + auto referenced = group.ReferencedDeleteFiles(); + EXPECT_EQ(referenced.size(), 4); + EXPECT_THAT(GetPaths(referenced), + testing::UnorderedElementsAre( + "/path/to/pos-delete-1.parquet", "/path/to/pos-delete-2.parquet", + "/path/to/pos-delete-3.parquet", "/path/to/pos-delete-4.parquet")); + + // Position deletes are indexed by their data sequence numbers + { + auto filtered = group.Filter(0); + EXPECT_EQ(filtered.size(), 4); + EXPECT_THAT(GetPaths(filtered), + testing::UnorderedElementsAre( + "/path/to/pos-delete-1.parquet", "/path/to/pos-delete-2.parquet", + "/path/to/pos-delete-3.parquet", "/path/to/pos-delete-4.parquet")); + } + { + auto filtered = group.Filter(1); + EXPECT_EQ(filtered.size(), 4); + EXPECT_THAT(GetPaths(filtered), + testing::UnorderedElementsAre( + "/path/to/pos-delete-1.parquet", "/path/to/pos-delete-2.parquet", + "/path/to/pos-delete-3.parquet", "/path/to/pos-delete-4.parquet")); + } + { + auto filtered = group.Filter(2); + EXPECT_EQ(filtered.size(), 3); + EXPECT_THAT(GetPaths(filtered), + testing::UnorderedElementsAre("/path/to/pos-delete-2.parquet", + "/path/to/pos-delete-3.parquet", + "/path/to/pos-delete-4.parquet")); + } + { + auto filtered = group.Filter(3); + EXPECT_EQ(filtered.size(), 2); + EXPECT_THAT(GetPaths(filtered), + testing::UnorderedElementsAre("/path/to/pos-delete-3.parquet", + "/path/to/pos-delete-4.parquet")); + } + { + auto filtered = group.Filter(4); + EXPECT_EQ(filtered.size(), 1); + EXPECT_EQ(filtered[0]->file_path, "/path/to/pos-delete-4.parquet"); + } + { + auto filtered = group.Filter(5); + EXPECT_EQ(filtered.size(), 0); + } +} + +TEST_P(DeleteFileIndexTest, TestEqualityDeletesGroup) { + internal::EqualityDeletes group(*schema_); + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto file1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet", partition_a, + partitioned_spec_->spec_id()); + auto file2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet", partition_a, + partitioned_spec_->spec_id()); + auto file3 = MakeEqualityDeleteFile("/path/to/eq-delete-3.parquet", partition_a, + partitioned_spec_->spec_id()); + auto file4 = MakeEqualityDeleteFile("/path/to/eq-delete-4.parquet", partition_a, + partitioned_spec_->spec_id()); + + // Add files out of order + EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 4, file4)), IsOk()); + EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 2, file2)), IsOk()); + EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 1, file1)), IsOk()); + EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 3, file3)), IsOk()); + + // Group must not be empty + EXPECT_FALSE(group.empty()); + + // All files must be reported as referenced + auto referenced = group.ReferencedDeleteFiles(); + EXPECT_EQ(referenced.size(), 4); + EXPECT_THAT(GetPaths(referenced), + testing::UnorderedElementsAre( + "/path/to/eq-delete-1.parquet", "/path/to/eq-delete-2.parquet", + "/path/to/eq-delete-3.parquet", "/path/to/eq-delete-4.parquet")); + + // Equality deletes are indexed by data sequence number - 1 to apply to next snapshots + { + ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(0, *file_a_)); + EXPECT_EQ(filtered.size(), 4); + EXPECT_THAT(GetPaths(filtered), + testing::UnorderedElementsAre( + "/path/to/eq-delete-1.parquet", "/path/to/eq-delete-2.parquet", + "/path/to/eq-delete-3.parquet", "/path/to/eq-delete-4.parquet")); + } + { + ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(1, *file_a_)); + EXPECT_EQ(filtered.size(), 3); + EXPECT_THAT(GetPaths(filtered), + testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet", + "/path/to/eq-delete-3.parquet", + "/path/to/eq-delete-4.parquet")); + } + { + ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(2, *file_a_)); + EXPECT_EQ(filtered.size(), 2); + EXPECT_THAT(GetPaths(filtered), + testing::UnorderedElementsAre("/path/to/eq-delete-3.parquet", + "/path/to/eq-delete-4.parquet")); + } + { + ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(3, *file_a_)); + EXPECT_EQ(filtered.size(), 1); + EXPECT_EQ(filtered[0]->file_path, "/path/to/eq-delete-4.parquet"); + } + { + ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(4, *file_a_)); + EXPECT_EQ(filtered.size(), 0); + } +} + +TEST_P(DeleteFileIndexTest, TestMixDeleteFilesAndDVs) { + int version = GetParam(); + if (version < 3) { + GTEST_SKIP() << "DVs only supported in V3+"; + } + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto partition_b = PartitionValues({Literal::Int(1)}); + + // Position delete for file_a_ + auto pos_delete_a = MakePositionDeleteFile("/path/to/pos-delete-a.parquet", partition_a, + partitioned_spec_->spec_id()); + // DV for file_a_ (should take precedence) + auto dv_a = MakeDV("/path/to/dv-a.puffin", partition_a, partitioned_spec_->spec_id(), + file_a_->file_path); + // Position deletes for file_b_ (no DV) + auto pos_delete_b1 = MakePositionDeleteFile("/path/to/pos-delete-b1.parquet", + partition_b, partitioned_spec_->spec_id()); + auto pos_delete_b2 = MakePositionDeleteFile("/path/to/pos-delete-b2.parquet", + partition_b, partitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, pos_delete_a)); + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, dv_a)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, pos_delete_b1)); + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, pos_delete_b2)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + // Only DV should apply to file_a_ + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_)); + EXPECT_EQ(deletes.size(), 1); + EXPECT_TRUE(deletes[0]->content_offset.has_value()); // DV has content_offset + EXPECT_EQ(deletes[0]->referenced_data_file, file_a_->file_path); + EXPECT_EQ(deletes[0]->file_path, "/path/to/dv-a.puffin"); + } + + // Two delete files should apply to file_b_ + { + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_b_)); + EXPECT_EQ(deletes.size(), 2); + EXPECT_FALSE(deletes[0]->content_offset.has_value()); // Not DVs + EXPECT_FALSE(deletes[1]->content_offset.has_value()); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre("/path/to/pos-delete-b1.parquet", + "/path/to/pos-delete-b2.parquet")); + } +} + +TEST_P(DeleteFileIndexTest, TestMultipleDVs) { + int version = GetParam(); + if (version < 3) { + GTEST_SKIP() << "DVs only supported in V3+"; + } + + auto partition_a = PartitionValues({Literal::Int(0)}); + + auto dv1 = MakeDV("/path/to/dv1.puffin", partition_a, partitioned_spec_->spec_id(), + file_a_->file_path); + auto dv2 = MakeDV("/path/to/dv2.puffin", partition_a, partitioned_spec_->spec_id(), + file_a_->file_path); + + std::vector entries; + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, dv1)); + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, dv2)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + auto index_result = BuildIndex({manifest}); + EXPECT_THAT(index_result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(index_result, HasErrorMessage("Can't index multiple DVs")); + EXPECT_THAT(index_result, HasErrorMessage(file_a_->file_path)); +} + +TEST_P(DeleteFileIndexTest, TestInvalidDVSequenceNumber) { + int version = GetParam(); + if (version < 3) { + GTEST_SKIP() << "DVs only supported in V3+"; + } + + auto partition_a = PartitionValues({Literal::Int(0)}); + + auto dv = MakeDV("/path/to/dv.puffin", partition_a, partitioned_spec_->spec_id(), + file_a_->file_path); + + std::vector entries; + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, dv)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + // Querying with sequence number > DV sequence number should fail + auto result = index->ForDataFile(2, *file_a_); + EXPECT_THAT(result, IsError(ErrorKind::kInvalid)); + EXPECT_THAT(result, HasErrorMessage( + "must be greater than or equal to data file sequence number")); +} + +TEST_P(DeleteFileIndexTest, TestReferencedDeleteFiles) { + int version = GetParam(); + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + auto global_eq_delete = MakeEqualityDeleteFile("/path/to/global-eq-delete.parquet", + PartitionValues(std::vector{}), + unpartitioned_spec_->spec_id()); + + std::vector partition_entries; + partition_entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, eq_delete)); + partition_entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, pos_delete)); + + auto partition_manifest = WriteDeleteManifest( + version, /*snapshot_id=*/1000L, std::move(partition_entries), partitioned_spec_); + + std::vector global_entries; + global_entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1001L, /*sequence_number=*/2, global_eq_delete)); + + auto global_manifest = WriteDeleteManifest( + version, /*snapshot_id=*/1001L, std::move(global_entries), unpartitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({partition_manifest, global_manifest})); + + auto referenced = index->ReferencedDeleteFiles(); + EXPECT_EQ(referenced.size(), 3); + EXPECT_THAT(GetPaths(referenced), + testing::UnorderedElementsAre("/path/to/eq-delete.parquet", + "/path/to/pos-delete.parquet", + "/path/to/global-eq-delete.parquet")); +} + +TEST_P(DeleteFileIndexTest, TestExistingDeleteFiles) { + int version = GetParam(); + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", partition_a, + partitioned_spec_->spec_id()); + + std::vector entries; + // Use ManifestStatus::kExisting to simulate files that were merged from a previous + // manifest + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, + eq_delete, ManifestStatus::kExisting)); + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, + pos_delete, ManifestStatus::kExisting)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + EXPECT_TRUE(index->has_equality_deletes()); + EXPECT_TRUE(index->has_position_deletes()); + + // Both delete files should be correctly loaded and applied to file_a_ + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_)); + EXPECT_EQ(deletes.size(), 2); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre("/path/to/eq-delete.parquet", + "/path/to/pos-delete.parquet")); +} + +TEST_P(DeleteFileIndexTest, TestDeletedStatusExcluded) { + int version = GetParam(); + + auto partition_a = PartitionValues({Literal::Int(0)}); + auto eq_delete_added = MakeEqualityDeleteFile( + "/path/to/eq-delete-added.parquet", partition_a, partitioned_spec_->spec_id()); + auto eq_delete_deleted = MakeEqualityDeleteFile( + "/path/to/eq-delete-deleted.parquet", partition_a, partitioned_spec_->spec_id()); + auto pos_delete_added = MakePositionDeleteFile( + "/path/to/pos-delete-added.parquet", partition_a, partitioned_spec_->spec_id()); + auto pos_delete_deleted = MakePositionDeleteFile( + "/path/to/pos-delete-deleted.parquet", partition_a, partitioned_spec_->spec_id()); + + std::vector entries; + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, + eq_delete_added, ManifestStatus::kAdded)); + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, + eq_delete_deleted, ManifestStatus::kDeleted)); + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, + pos_delete_added, ManifestStatus::kAdded)); + entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, + pos_delete_deleted, ManifestStatus::kDeleted)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + EXPECT_TRUE(index->has_equality_deletes()); + EXPECT_TRUE(index->has_position_deletes()); + + // Only the non-deleted (ADDED) delete files should be loaded + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_)); + EXPECT_EQ(deletes.size(), 2); + EXPECT_THAT(GetPaths(deletes), + testing::UnorderedElementsAre("/path/to/eq-delete-added.parquet", + "/path/to/pos-delete-added.parquet")); +} + +// TODO(gangwu): enable this test after ManifestGroup has been added. +TEST_P(DeleteFileIndexTest, DISABLED_TestPositionDeleteDiscardMetrics) { + int version = GetParam(); + + auto partition_a = PartitionValues({Literal::Int(0)}); + + // Create a position delete file with full metrics + auto pos_delete = std::make_shared(DataFile{ + .content = DataFile::Content::kPositionDeletes, + .file_path = "/path/to/pos-delete-with-metrics.parquet", + .file_format = FileFormatType::kParquet, + .partition = partition_a, + .record_count = 100, + .file_size_in_bytes = 1024, + // Add stats for multiple columns + .column_sizes = {{1, 100}, {2, 200}, {3, 300}}, + .value_counts = {{1, 10}, {2, 20}, {3, 30}}, + .null_value_counts = {{1, 1}, {2, 2}, {3, 3}}, + .nan_value_counts = {{1, 0}, {2, 0}, {3, 0}}, + .lower_bounds = {{1, {0x01}}, {2, {0x02}}, {3, {0x03}}}, + .upper_bounds = {{1, {0xFF}}, {2, {0xFE}}, {3, {0xFD}}}, + .partition_spec_id = partitioned_spec_->spec_id(), + }); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, pos_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + EXPECT_TRUE(index->has_position_deletes()); + + // Get the delete files from the index + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_)); + ASSERT_EQ(deletes.size(), 1); + + const auto& returned_file = deletes[0]; + EXPECT_EQ(returned_file->file_path, "/path/to/pos-delete-with-metrics.parquet"); + // record_count should be preserved + EXPECT_EQ(returned_file->record_count, 100); + // Stats maps should be cleared (empty) after building the index + EXPECT_TRUE(returned_file->column_sizes.empty()); + EXPECT_TRUE(returned_file->value_counts.empty()); + EXPECT_TRUE(returned_file->null_value_counts.empty()); + EXPECT_TRUE(returned_file->nan_value_counts.empty()); + EXPECT_TRUE(returned_file->lower_bounds.empty()); + EXPECT_TRUE(returned_file->upper_bounds.empty()); +} + +// TODO(gangwu): enable this test after ManifestGroup has been added. +TEST_P(DeleteFileIndexTest, DISABLED_TestEqualityDeleteDiscardMetrics) { + int version = GetParam(); + + auto partition_a = PartitionValues({Literal::Int(0)}); + + // Create an equality delete file with full metrics + auto eq_delete = std::make_shared(DataFile{ + .content = DataFile::Content::kEqualityDeletes, + .file_path = "/path/to/eq-delete-with-metrics.parquet", + .file_format = FileFormatType::kParquet, + .partition = partition_a, + .record_count = 50, + .file_size_in_bytes = 512, + // Add stats for multiple columns + .column_sizes = {{1, 100}, {2, 200}, {3, 300}}, + .value_counts = {{1, 10}, {2, 20}, {3, 30}}, + .null_value_counts = {{1, 1}, {2, 2}, {3, 3}}, + .nan_value_counts = {{1, 0}, {2, 0}, {3, 0}}, + .lower_bounds = {{1, {0x01}}, {2, {0x02}}, {3, {0x03}}}, + .upper_bounds = {{1, {0xFF}}, {2, {0xFE}}, {3, {0xFD}}}, + .equality_ids = {1}, // equality field IDs + .partition_spec_id = partitioned_spec_->spec_id(), + }); + + std::vector entries; + entries.push_back( + MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, eq_delete)); + + auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, std::move(entries), + partitioned_spec_); + + ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest})); + + EXPECT_TRUE(index->has_equality_deletes()); + + // Get the delete files from the index + ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_)); + ASSERT_EQ(deletes.size(), 1); + + const auto& returned_file = deletes[0]; + EXPECT_EQ(returned_file->file_path, "/path/to/eq-delete-with-metrics.parquet"); + // record_count should be preserved + EXPECT_EQ(returned_file->record_count, 50); + // Stats maps should be cleared (empty) after building the index + EXPECT_TRUE(returned_file->column_sizes.empty()); + EXPECT_TRUE(returned_file->value_counts.empty()); + EXPECT_TRUE(returned_file->null_value_counts.empty()); + EXPECT_TRUE(returned_file->nan_value_counts.empty()); + EXPECT_TRUE(returned_file->lower_bounds.empty()); + EXPECT_TRUE(returned_file->upper_bounds.empty()); +} + +INSTANTIATE_TEST_SUITE_P(DeleteFileIndexVersions, DeleteFileIndexTest, + testing::Values(2, 3)); + +} // namespace iceberg diff --git a/src/iceberg/test/manifest_reader_test.cc b/src/iceberg/test/manifest_reader_test.cc index ab4798db9..762654c52 100644 --- a/src/iceberg/test/manifest_reader_test.cc +++ b/src/iceberg/test/manifest_reader_test.cc @@ -375,6 +375,72 @@ TEST_P(TestManifestReader, TestInvalidUsage) { EXPECT_THAT(reader_result, HasErrorMessage("has no snapshot ID")); } +TEST_P(TestManifestReader, TestDropStats) { + int version = GetParam(); + + // Create a data file with full metrics + auto file_with_stats = std::make_unique(DataFile{ + .file_path = "/path/to/data-with-stats.parquet", + .file_format = FileFormatType::kParquet, + .partition = PartitionValues({Literal::Int(0)}), + .record_count = 100, + .file_size_in_bytes = 1024, + // Add stats for multiple columns + .column_sizes = {{1, 100}, {2, 200}, {3, 300}}, + .value_counts = {{1, 10}, {2, 20}, {3, 30}}, + .null_value_counts = {{1, 1}, {2, 2}, {3, 3}}, + .nan_value_counts = {{1, 0}, {2, 0}, {3, 0}}, + .lower_bounds = {{1, {0x01}}, {2, {0x02}}, {3, {0x03}}}, + .upper_bounds = {{1, {0xFF}}, {2, {0xFE}}, {3, {0xFD}}}, + .sort_order_id = 0, + }); + + auto entry = MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L, + std::move(file_with_stats)); + + std::vector entries; + entries.push_back(std::move(entry)); + + auto manifest = WriteManifest(version, /*snapshot_id=*/1000L, std::move(entries)); + + auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, spec_); + ASSERT_THAT(reader_result, IsOk()); + auto reader = std::move(reader_result.value()); + reader->Select({"record_count"}).TryDropStats(); + + ICEBERG_UNWRAP_OR_FAIL(auto read_entries, reader->Entries()); + ASSERT_EQ(read_entries.size(), 1); + const auto& read_entry = read_entries[0]; + + // record_count should be preserved + EXPECT_EQ(read_entry.data_file->record_count, 100); + + // Stats maps should be cleared + EXPECT_TRUE(read_entry.data_file->column_sizes.empty()); + EXPECT_TRUE(read_entry.data_file->value_counts.empty()); + EXPECT_TRUE(read_entry.data_file->null_value_counts.empty()); + EXPECT_TRUE(read_entry.data_file->nan_value_counts.empty()); + EXPECT_TRUE(read_entry.data_file->lower_bounds.empty()); + EXPECT_TRUE(read_entry.data_file->upper_bounds.empty()); +} + +TEST(ManifestReaderStaticTest, TestShouldDropStats) { + EXPECT_FALSE(ManifestReader::ShouldDropStats({})); + EXPECT_FALSE( + ManifestReader::ShouldDropStats({std::string(ManifestReader::kAllColumns)})); + EXPECT_TRUE(ManifestReader::ShouldDropStats({"file_path", "file_format", "partition"})); + EXPECT_TRUE( + ManifestReader::ShouldDropStats({"file_path", "file_format", "record_count"})); + EXPECT_FALSE( + ManifestReader::ShouldDropStats({"file_path", "file_format", "value_counts"})); + EXPECT_FALSE( + ManifestReader::ShouldDropStats({"file_path", "file_format", "lower_bounds"})); + EXPECT_FALSE(ManifestReader::ShouldDropStats( + {"file_path", "value_counts", "null_value_counts", "lower_bounds"})); + EXPECT_FALSE( + ManifestReader::ShouldDropStats({"file_path", "record_count", "value_counts"})); +} + INSTANTIATE_TEST_SUITE_P(ManifestReaderVersions, TestManifestReader, testing::Values(1, 2, 3)); diff --git a/src/iceberg/util/content_file_util.cc b/src/iceberg/util/content_file_util.cc new file mode 100644 index 000000000..9f45b77dd --- /dev/null +++ b/src/iceberg/util/content_file_util.cc @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/util/content_file_util.h" + +#include + +#include "iceberg/file_format.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/metadata_columns.h" +#include "iceberg/util/conversions.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +bool ContentFileUtil::IsDV(const DataFile& file) { + return file.file_format == FileFormatType::kPuffin; +} + +Result> ContentFileUtil::ReferencedDataFile( + const DataFile& file) { + // Equality deletes don't reference a specific data file + if (file.content == DataFile::Content::kEqualityDeletes) { + return std::nullopt; + } + + // If referenced_data_file is set, return it + if (file.referenced_data_file.has_value()) { + return file.referenced_data_file; + } + + // Try to derive from lower/upper bounds on file_path column + auto lower_it = file.lower_bounds.find(MetadataColumns::kDeleteFilePathColumnId); + if (lower_it == file.lower_bounds.end() || lower_it->second.empty()) { + return std::nullopt; + } + + auto upper_it = file.upper_bounds.find(MetadataColumns::kDeleteFilePathColumnId); + if (upper_it == file.upper_bounds.end() || upper_it->second.empty()) { + return std::nullopt; + } + + // Check if lower and upper bounds are equal + if (lower_it->second == upper_it->second) { + // Convert the binary bound to a string + ICEBERG_ASSIGN_OR_RAISE(auto string_literal, + Conversions::FromBytes(*string(), lower_it->second)); + return std::get(string_literal); + } + + return std::nullopt; +} + +Result ContentFileUtil::IsFileScoped(const DataFile& file) { + ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file, ReferencedDataFile(file)); + return referenced_data_file.has_value(); +} + +bool ContentFileUtil::ContainsSingleDV(std::span> files) { + return files.size() == 1 && IsDV(*files[0]); +} + +std::string ContentFileUtil::DVDesc(const DataFile& file) { + return std::format("DV{{location={}, offset={}, length={}, referencedDataFile={}}}", + file.file_path, file.content_offset.value_or(-1), + file.content_size_in_bytes.value_or(-1), + file.referenced_data_file.value_or("")); +} + +void ContentFileUtil::DropStats(DataFile& data_file) { + data_file.column_sizes.clear(); + data_file.value_counts.clear(); + data_file.null_value_counts.clear(); + data_file.nan_value_counts.clear(); + data_file.lower_bounds.clear(); + data_file.upper_bounds.clear(); +} + +} // namespace iceberg diff --git a/src/iceberg/util/content_file_util.h b/src/iceberg/util/content_file_util.h new file mode 100644 index 000000000..191008887 --- /dev/null +++ b/src/iceberg/util/content_file_util.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/util/content_file_util.h +/// Utility functions for content files (data files and delete files). + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Utility functions for content files. +struct ICEBERG_EXPORT ContentFileUtil { + /// \brief Check if a delete file is a deletion vector (DV). + static bool IsDV(const DataFile& file); + + /// \brief Get the referenced data file path from a position delete file. + static Result> ReferencedDataFile(const DataFile& file); + + /// \brief Check if a delete file is file-scoped. + static Result IsFileScoped(const DataFile& file); + + /// \brief Check if a collection of delete files contains exactly one DV. + static bool ContainsSingleDV(std::span> files); + + /// \brief Generate a description string for a deletion vector. + static std::string DVDesc(const DataFile& file); + + /// \brief In-place drop stats. + static void DropStats(DataFile& data_file); +}; + +} // namespace iceberg diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build index 9f3277533..48477925e 100644 --- a/src/iceberg/util/meson.build +++ b/src/iceberg/util/meson.build @@ -20,6 +20,7 @@ install_headers( 'bucket_util.h', 'checked_cast.h', 'config.h', + 'content_file_util.h', 'conversions.h', 'decimal.h', 'endian.h',