Skip to content

Commit a38da04

Browse files
committed
fix(manifest): align managers with Java review feedback
Squash the review-fix commits after 2b124ca3 and the current tracked working-tree changes into one commit. This aligns ManifestFilterManager and ManifestMergeManager more closely with Java by: - adding object-level DeleteFile support and FilesToBeDeleted tracking - renaming DeletesFiles to ContainsDeletes and adding FailAnyDelete - separating merge grouping by content and adding kDeletes coverage - matching ListPacker.packEnd bin-packing behavior - aligning defined public APIs and row-filter semantics with Java, including manager-wide CaseSensitive handling and residual-based metrics evaluation - applying doc and reviewer follow-up fixes
1 parent e8abb0c commit a38da04

6 files changed

Lines changed: 500 additions & 166 deletions

src/iceberg/manifest/manifest_filter_manager.cc

Lines changed: 134 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
#include "iceberg/expression/inclusive_metrics_evaluator.h"
2727
#include "iceberg/expression/manifest_evaluator.h"
28+
#include "iceberg/expression/residual_evaluator.h"
29+
#include "iceberg/expression/strict_metrics_evaluator.h"
2830
#include "iceberg/manifest/manifest_entry.h"
2931
#include "iceberg/manifest/manifest_list.h"
3032
#include "iceberg/manifest/manifest_reader.h"
@@ -39,15 +41,29 @@ ManifestFilterManager::ManifestFilterManager(ManifestContent content,
3941
std::shared_ptr<FileIO> file_io)
4042
: manifest_content_(content), file_io_(std::move(file_io)) {}
4143

42-
void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> expr,
43-
bool case_sensitive) {
44-
delete_exprs_.push_back({std::move(expr), case_sensitive});
44+
void ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> expr) {
45+
ICEBERG_CHECK_OR_DIE(expr != nullptr, "Cannot delete files using filter: null");
46+
delete_exprs_.push_back({.expr = std::move(expr)});
47+
}
48+
49+
void ManifestFilterManager::CaseSensitive(bool case_sensitive) {
50+
case_sensitive_ = case_sensitive;
51+
manifest_evaluator_cache_.clear();
52+
residual_evaluator_cache_.clear();
4553
}
4654

4755
void ManifestFilterManager::DeleteFile(std::string_view path) {
48-
std::string p(path);
49-
delete_paths_.insert(p);
50-
pending_paths_.insert(std::move(p));
56+
delete_paths_.insert(std::string(path));
57+
}
58+
59+
void ManifestFilterManager::DeleteFile(std::shared_ptr<DataFile> file) {
60+
ICEBERG_CHECK_OR_DIE(file != nullptr, "Cannot delete file: null");
61+
delete_paths_.insert(file->file_path);
62+
delete_files_.insert(std::move(file));
63+
}
64+
65+
const DataFileSet& ManifestFilterManager::FilesToBeDeleted() const {
66+
return delete_files_;
5167
}
5268

5369
void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues partition) {
@@ -58,7 +74,9 @@ void ManifestFilterManager::FailMissingDeletePaths() {
5874
fail_missing_delete_paths_ = true;
5975
}
6076

61-
bool ManifestFilterManager::DeletesFiles() const {
77+
void ManifestFilterManager::FailAnyDelete() { fail_any_delete_ = true; }
78+
79+
bool ManifestFilterManager::ContainsDeletes() const {
6280
return !delete_exprs_.empty() || !delete_paths_.empty() || !drop_partitions_.empty();
6381
}
6482

@@ -68,20 +86,22 @@ bool ManifestFilterManager::CanContainDroppedFiles() const {
6886

6987
bool ManifestFilterManager::CanContainDroppedPartitions(const ManifestFile& manifest) {
7088
if (drop_partitions_.empty()) return false;
71-
// Without a partition filter helper, we conservatively say yes.
72-
// A full implementation would use ManifestFileUtil::canContainAny; for now
73-
// we open the manifest and let per-entry checks decide.
74-
(void)manifest;
75-
return true;
89+
// Only manifests whose partition spec matches a registered drop can contain
90+
// entries for that partition. PartitionKey is pair<spec_id, values>.
91+
int32_t spec_id = manifest.partition_spec_id;
92+
for (const auto& key : drop_partitions_) {
93+
if (key.first == spec_id) return true;
94+
}
95+
return false;
7696
}
7797

7898
bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& manifest,
7999
const TableMetadata& metadata) {
80100
if (delete_exprs_.empty()) return false;
81101
int32_t spec_id = manifest.partition_spec_id;
82-
for (size_t i = 0; i < delete_exprs_.size(); ++i) {
83-
auto* evaluator_ptr = GetManifestEvaluator(metadata, spec_id, delete_exprs_[i])
84-
.value_or(nullptr);
102+
for (const auto& delete_expr : delete_exprs_) {
103+
auto* evaluator_ptr =
104+
GetManifestEvaluator(metadata, spec_id, delete_expr).value_or(nullptr);
85105
if (evaluator_ptr == nullptr) return true; // conservative on error
86106
auto result = evaluator_ptr->Evaluate(manifest);
87107
if (!result.has_value() || result.value()) return true;
@@ -92,8 +112,12 @@ bool ManifestFilterManager::CanContainExpressionDeletes(const ManifestFile& mani
92112
bool ManifestFilterManager::CanContainDeletedFiles(const ManifestFile& manifest,
93113
const TableMetadata& metadata) {
94114
// A manifest with no live files cannot contain files to delete.
95-
bool has_live = (manifest.added_files_count.value_or(0) > 0) ||
96-
(manifest.existing_files_count.value_or(0) > 0);
115+
// Null counts mean the count is unknown — treat conservatively as possibly non-zero
116+
// (matches Java's ManifestFile.hasAddedFiles / hasExistingFiles behaviour).
117+
bool has_live = !manifest.added_files_count.has_value() ||
118+
manifest.added_files_count.value() > 0 ||
119+
!manifest.existing_files_count.has_value() ||
120+
manifest.existing_files_count.value() > 0;
97121
if (!has_live) return false;
98122

99123
return CanContainDroppedFiles() || CanContainExpressionDeletes(manifest, metadata) ||
@@ -111,51 +135,79 @@ Result<ManifestEvaluator*> ManifestFilterManager::GetManifestEvaluator(
111135
ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
112136
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
113137
ICEBERG_ASSIGN_OR_RAISE(vec[idx], ManifestEvaluator::MakeRowFilter(
114-
de.expr, spec, *schema, de.case_sensitive));
138+
de.expr, spec, *schema, case_sensitive_));
115139
}
116140
return vec[idx].get();
117141
}
118142

119-
Result<InclusiveMetricsEvaluator*> ManifestFilterManager::GetMetricsEvaluator(
143+
Result<ResidualEvaluator*> ManifestFilterManager::GetResidualEvaluator(
120144
const TableMetadata& metadata, int32_t spec_id, const DeleteExpr& de) {
121-
auto& vec = metrics_evaluator_cache_[spec_id];
145+
auto& vec = residual_evaluator_cache_[spec_id];
122146
size_t idx = &de - delete_exprs_.data();
123147
if (idx >= vec.size()) {
124148
vec.resize(delete_exprs_.size());
125149
}
126150
if (!vec[idx]) {
151+
ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
127152
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
128-
ICEBERG_ASSIGN_OR_RAISE(vec[idx],
129-
InclusiveMetricsEvaluator::Make(de.expr, *schema,
130-
de.case_sensitive));
153+
ICEBERG_ASSIGN_OR_RAISE(
154+
vec[idx], ResidualEvaluator::Make(de.expr, *spec, *schema, case_sensitive_));
131155
}
132156
return vec[idx].get();
133157
}
134158

135-
bool ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
136-
const TableMetadata& metadata,
137-
int32_t manifest_spec_id) {
159+
Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
160+
const TableMetadata& metadata,
161+
int32_t manifest_spec_id) {
138162
if (!entry.data_file) return false;
139163
const DataFile& file = *entry.data_file;
140-
141-
// Path-based check
142-
if (delete_paths_.count(file.file_path)) {
143-
pending_paths_.erase(file.file_path);
144-
return true;
164+
int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);
165+
std::shared_ptr<Schema> schema;
166+
if (!delete_exprs_.empty()) {
167+
ICEBERG_ASSIGN_OR_RAISE(schema, metadata.Schema());
145168
}
146169

147-
// Partition-drop check
148-
int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);
149-
if (drop_partitions_.contains(spec_id, file.partition)) {
170+
// Path-based and partition-drop checks
171+
if (delete_paths_.count(file.file_path) ||
172+
drop_partitions_.contains(spec_id, file.partition)) {
173+
if (fail_any_delete_) {
174+
return InvalidArgument("Operation would delete existing data: {}", file.file_path);
175+
}
150176
return true;
151177
}
152178

153-
// Expression-based check (inclusive metrics)
179+
// Expression-based check.
180+
// Java semantics: compute a partition residual first, then use strict/inclusive
181+
// metrics on that residual. Data manifests reject partial matches; delete manifests
182+
// tolerate them because only data-file deletes require all-rows-match validation.
154183
for (const auto& de : delete_exprs_) {
155-
auto* eval = GetMetricsEvaluator(metadata, spec_id, de).value_or(nullptr);
156-
if (eval == nullptr) return true; // conservative on error
157-
auto result = eval->Evaluate(file);
158-
if (!result.has_value() || result.value()) return true;
184+
ICEBERG_ASSIGN_OR_RAISE(auto* residual_eval,
185+
GetResidualEvaluator(metadata, spec_id, de));
186+
ICEBERG_ASSIGN_OR_RAISE(auto residual_expr,
187+
residual_eval->ResidualFor(file.partition));
188+
ICEBERG_ASSIGN_OR_RAISE(
189+
auto strict_eval,
190+
StrictMetricsEvaluator::Make(residual_expr, schema, case_sensitive_));
191+
ICEBERG_ASSIGN_OR_RAISE(auto strict_match, strict_eval->Evaluate(file));
192+
if (strict_match) {
193+
if (fail_any_delete_) {
194+
return InvalidArgument("Operation would delete existing data: {}",
195+
file.file_path);
196+
}
197+
return true;
198+
}
199+
200+
ICEBERG_ASSIGN_OR_RAISE(auto incl_eval, InclusiveMetricsEvaluator::Make(
201+
residual_expr, *schema, case_sensitive_));
202+
ICEBERG_ASSIGN_OR_RAISE(auto incl_match, incl_eval->Evaluate(file));
203+
if (incl_match) {
204+
if (manifest_content_ == ManifestContent::kDeletes) {
205+
continue;
206+
}
207+
return InvalidArgument(
208+
"Cannot delete file where some, but not all, rows match filter: {}",
209+
file.file_path);
210+
}
159211
}
160212

161213
return false;
@@ -164,12 +216,25 @@ bool ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
164216
Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
165217
const TableMetadata& metadata, const std::shared_ptr<Snapshot>& base_snapshot,
166218
const ManifestWriterFactory& writer_factory) {
219+
// Validate required delete paths before any early return — even an empty base
220+
// snapshot must report missing required paths (matches Java's validateRequiredDeletes).
221+
if (fail_missing_delete_paths_ && !delete_paths_.empty() && !base_snapshot) {
222+
return InvalidArgument("Missing delete paths: {}", [&] {
223+
std::string s;
224+
for (const auto& p : delete_paths_) {
225+
if (!s.empty()) s += ", ";
226+
s += p;
227+
}
228+
return s;
229+
}());
230+
}
231+
167232
// No base snapshot → nothing to filter
168233
if (!base_snapshot) return std::vector<ManifestFile>{};
169234

170235
// Load the relevant manifests from the manifest list
171-
ICEBERG_ASSIGN_OR_RAISE(auto list_reader,
172-
ManifestListReader::Make(base_snapshot->manifest_list, file_io_));
236+
ICEBERG_ASSIGN_OR_RAISE(
237+
auto list_reader, ManifestListReader::Make(base_snapshot->manifest_list, file_io_));
173238
ICEBERG_ASSIGN_OR_RAISE(auto all_manifests, list_reader->Files());
174239

175240
// Keep only manifests for this manager's content type
@@ -180,12 +245,16 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
180245
}
181246

182247
// No conditions registered → return unchanged
183-
if (!DeletesFiles()) return manifests;
248+
if (!ContainsDeletes()) return manifests;
184249

185250
ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
186251

187252
std::vector<ManifestFile> result;
188253
result.reserve(manifests.size());
254+
// Track which registered delete paths were actually found across all manifests.
255+
// Using delete_paths_ as the immutable source of truth makes FilterManifests
256+
// idempotent across commit retries (matches Java's validateRequiredDeletes design).
257+
std::unordered_set<std::string> found_paths;
189258

190259
for (const auto& manifest : manifests) {
191260
// Fast path: metadata skip
@@ -199,13 +268,14 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
199268

200269
// Read all live entries from the manifest
201270
ICEBERG_ASSIGN_OR_RAISE(auto reader,
202-
ManifestReader::Make(manifest, file_io_, schema, spec));
271+
ManifestReader::Make(manifest, file_io_, schema, spec));
203272
ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
204273

205274
// Check whether any entry should be deleted
206275
bool has_deletes = false;
207276
for (const auto& entry : entries) {
208-
if (ShouldDelete(entry, metadata, spec_id)) {
277+
ICEBERG_ASSIGN_OR_RAISE(auto should_delete, ShouldDelete(entry, metadata, spec_id));
278+
if (should_delete) {
209279
has_deletes = true;
210280
break;
211281
}
@@ -216,10 +286,17 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
216286
continue;
217287
}
218288

219-
// Rewrite the manifest with deleted entries marked
289+
// Rewrite the manifest with deleted entries marked; record found paths.
220290
ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, manifest_content_));
221291
for (const auto& entry : entries) {
222-
if (ShouldDelete(entry, metadata, spec_id)) {
292+
ICEBERG_ASSIGN_OR_RAISE(auto should_delete, ShouldDelete(entry, metadata, spec_id));
293+
if (should_delete) {
294+
if (entry.data_file && delete_paths_.count(entry.data_file->file_path)) {
295+
found_paths.insert(entry.data_file->file_path);
296+
}
297+
if (entry.data_file) {
298+
delete_files_.insert(std::make_shared<DataFile>(*entry.data_file));
299+
}
223300
ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry));
224301
} else {
225302
ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry));
@@ -230,14 +307,19 @@ Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
230307
result.push_back(std::move(filtered_manifest));
231308
}
232309

233-
// Validate that all registered delete paths were found
234-
if (fail_missing_delete_paths_ && !pending_paths_.empty()) {
310+
// Validate that all registered delete paths were found. Uses delete_paths_ (not a
311+
// consumed set) so repeated calls on the same manager produce the same result.
312+
if (fail_missing_delete_paths_) {
235313
std::string missing;
236-
for (const auto& p : pending_paths_) {
237-
if (!missing.empty()) missing += ", ";
238-
missing += p;
314+
for (const auto& p : delete_paths_) {
315+
if (!found_paths.count(p)) {
316+
if (!missing.empty()) missing += ", ";
317+
missing += p;
318+
}
319+
}
320+
if (!missing.empty()) {
321+
return InvalidArgument("Missing delete paths: {}", missing);
239322
}
240-
return InvalidArgument("Missing delete paths: {}", missing);
241323
}
242324

243325
return result;

0 commit comments

Comments
 (0)