diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 7db8f5890f..03a8b8e8d1 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -35,7 +35,7 @@ pub const MAIN_BRANCH: &str = "main"; /// Reference to [`Snapshot`]. pub type SnapshotRef = Arc; -#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)] +#[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)] #[serde(rename_all = "lowercase")] /// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots. pub enum Operation { diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 36fde117ab..197d0a8d9d 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -121,8 +121,8 @@ impl SnapshotProduceOperation for FastAppendOperation { } async fn existing_manifest( - &self, - snapshot_produce: &SnapshotProducer<'_>, + &mut self, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> Result> { let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { return Ok(vec![]); diff --git a/crates/iceberg/src/transaction/delete_aware.rs b/crates/iceberg/src/transaction/delete_aware.rs new file mode 100644 index 0000000000..d46c9559eb --- /dev/null +++ b/crates/iceberg/src/transaction/delete_aware.rs @@ -0,0 +1,275 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashSet; +use std::future::Future; + +use futures::SinkExt; +use futures::future::try_join_all; +use once_cell::sync::Lazy; + +use crate::delete_file_index::DeleteFileIndex; +use crate::error::Result; +use crate::scan::DeleteFileContext; +use crate::spec::{ + DataContentType, DataFile, FormatVersion, INITIAL_SEQUENCE_NUMBER, ManifestContentType, + ManifestFile, Operation, +}; +use crate::table::Table; +use crate::transaction::manifest_filter::ManifestFilterManager; +use crate::transaction::snapshot::SnapshotProduceOperation; +use crate::util::snapshot::ancestors_between; +use crate::{Error, ErrorKind}; + +/// Operations whose snapshots may add delete files. +static VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Lazy> = + Lazy::new(|| HashSet::from([Operation::Overwrite, Operation::Delete])); + +/// An additive sub-trait of [`SnapshotProduceOperation`] implemented only by delete-class +/// operations (delete, overwrite, rewrite). +/// +/// Append-only operations implement only the base [`SnapshotProduceOperation`] and carry +/// none of the validation/filtering surface defined here. A `DeleteAwareOperation` adds two +/// capabilities on top of the base trait: +/// +/// - **Write-time conflict validation** via [`validate`](DeleteAwareOperation::validate), +/// invoked by the action against the refreshed base table before any snapshot is written. +/// - **Manifest filtering** via the owned [`ManifestFilterManager`]s reached through +/// [`data_filter`](DeleteAwareOperation::data_filter) and +/// [`delete_filter`](DeleteAwareOperation::delete_filter), which rewrite carried-forward +/// manifests to drop the files the operation removes. +/// +/// The filter managers are owned fields on the operation, populated incrementally as the +/// action builds the operation. Because the operation is rebuilt on each commit attempt, the +/// managers are reconstructed deterministically per attempt from the action's stored inputs. +#[allow(unused)] +pub(crate) trait DeleteAwareOperation: SnapshotProduceOperation { + /// Per-operation conflict check against the refreshed base table, run before any write. + /// + /// Implemented per operation; each operation composes the reusable validation helpers to + /// detect concurrent changes that would make the pending operation incorrect. + fn validate( + &self, + base: &Table, + parent_snapshot_id: Option, + ) -> impl Future> + Send; + + /// Accessor to the operation's owned data-manifest filter manager. + /// + /// Built up as the operation is constructed and mutated during manifest production. + fn data_filter(&mut self) -> &mut ManifestFilterManager; + + /// Accessor to the operation's owned delete-manifest filter manager. + /// + /// Built up as the operation is constructed and mutated during manifest production. + fn delete_filter(&mut self) -> &mut ManifestFilterManager; + + /// Retrieves the history of snapshots between two points with matching operations and content + /// type. + /// + /// # Arguments + /// + /// * `base` - The base table to retrieve history from. + /// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the + /// beginning. + /// * `to_snapshot_id` - The ending snapshot ID (inclusive). + /// * `matching_operations` - Set of operations to match when collecting snapshots. + /// * `manifest_content_type` - The content type of manifests to collect. + /// + /// # Returns + /// + /// A tuple containing: + /// * A vector of manifest files matching the criteria. + /// * A set of snapshot IDs that were collected. + /// + /// # Errors + /// + /// Returns an error if the history between the snapshots cannot be determined. + fn validation_history<'a>( + &'a self, + base: &'a Table, + from_snapshot_id: Option, + to_snapshot_id: i64, + matching_operations: &'a HashSet, + manifest_content_type: ManifestContentType, + ) -> impl Future, HashSet)>> + Send + 'a { + async move { + let mut manifests: Vec = vec![]; + let mut new_snapshots = HashSet::new(); + let mut last_snapshot = None; + + let metadata = base.metadata_ref(); + let snapshots = ancestors_between(&metadata, to_snapshot_id, from_snapshot_id); + + for current_snapshot in snapshots { + last_snapshot = Some(current_snapshot.clone()); + + // Find all snapshots with the matching operations + // and their manifest files with the matching content type + if matching_operations.contains(¤t_snapshot.summary().operation) { + new_snapshots.insert(current_snapshot.snapshot_id()); + + let manifest_list = base.manifest_list_reader(¤t_snapshot).load().await?; + + for manifest in manifest_list.entries() { + if manifest.content == manifest_content_type + && manifest.added_snapshot_id == current_snapshot.snapshot_id() + { + manifests.push(manifest.clone()); + } + } + } + } + + if let Some(last_snapshot) = last_snapshot + && last_snapshot.parent_snapshot_id() != from_snapshot_id + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot determine history between starting snapshot {} and the last known ancestor {}", + from_snapshot_id.unwrap_or(-1), + last_snapshot.snapshot_id() + ), + )); + } + + Ok((manifests, new_snapshots)) + } + } + + /// Validates that there are no new delete files for the given data files. + /// + /// # Arguments + /// + /// * `base` - The base table to validate against. + /// * `from_snapshot_id` - The starting snapshot ID (exclusive), or None to start from the + /// beginning. + /// * `to_snapshot_id` - The ending snapshot ID (inclusive), or None if there is no current + /// table state. + /// * `data_files` - The data files to check for conflicting delete files. + /// * `ignore_equality_deletes` - Whether to ignore equality deletes and only check for + /// positional deletes. + /// + /// # Returns + /// + /// A `Result` indicating success or an error if validation fails. + /// + /// # Errors + /// + /// Returns an error if new delete files are found for any of the data files. + fn validate_no_new_deletes_for_data_files<'a>( + &'a self, + base: &'a Table, + from_snapshot_id: Option, + to_snapshot_id: Option, + data_files: &'a [DataFile], + ignore_equality_deletes: bool, + ) -> impl Future> + Send + 'a { + async move { + // Delete files only exist in format version 2 and above. If there is no current table + // state, or the table is V1, there cannot be any new delete files to conflict with. + let Some(to_snapshot_id) = to_snapshot_id else { + return Ok(()); + }; + if base.metadata().format_version() == FormatVersion::V1 { + return Ok(()); + } + + // Get matching delete files that have been added since the from_snapshot_id + let (delete_manifests, _) = self + .validation_history( + base, + from_snapshot_id, + to_snapshot_id, + &VALIDATE_ADDED_DELETE_FILES_OPERATIONS, + ManifestContentType::Deletes, + ) + .await?; + + // Build the delete file index from the matching delete manifests. + // + // `DeleteFileIndex::new` spawns a background task that populates the index by + // collecting from the channel until *all* senders are dropped; queries on the + // returned index (`get_deletes_for_data_file`) block until that population + // completes. We therefore scope the sender to this block so it is dropped as + // soon as every delete file has been sent, allowing the index to finish + // populating before we query it below. Holding the sender past this point + // would deadlock. + let delete_file_index = { + let (delete_file_index, mut delete_file_tx) = + DeleteFileIndex::new(base.runtime().clone()); + let manifests = try_join_all( + delete_manifests + .iter() + .map(|f| f.load_manifest(base.file_io())), + ) + .await?; + for entry in manifests.iter().flat_map(|manifest| manifest.entries()) { + let delete_file_ctx = DeleteFileContext { + manifest_entry: entry.clone(), + partition_spec_id: entry.data_file().partition_spec_id, + }; + delete_file_tx.send(delete_file_ctx).await?; + } + // `delete_file_tx` is dropped here as the block ends, closing the channel. + delete_file_index + }; + + // Get starting seq num from starting snapshot if available + let starting_sequence_number = if let Some(from_snapshot_id) = from_snapshot_id { + match base.metadata().snapshot_by_id(from_snapshot_id) { + Some(snapshot) => snapshot.sequence_number(), + None => INITIAL_SEQUENCE_NUMBER, + } + } else { + INITIAL_SEQUENCE_NUMBER + }; + + // Validate if there are deletes using delete file index + for data_file in data_files { + let delete_files = delete_file_index + .get_deletes_for_data_file(data_file, Some(starting_sequence_number)) + .await; + + if ignore_equality_deletes { + if delete_files.iter().any(|delete_file| { + delete_file.file_type == DataContentType::PositionDeletes + }) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot commit, found new positional delete for added data file: {}", + data_file.file_path + ), + )); + } + } else if !delete_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot commit, found new delete for added data file: {}", + data_file.file_path + ), + )); + } + } + + Ok(()) + } + } +} diff --git a/crates/iceberg/src/transaction/manifest_filter.rs b/crates/iceberg/src/transaction/manifest_filter.rs new file mode 100644 index 0000000000..67b358d1df --- /dev/null +++ b/crates/iceberg/src/transaction/manifest_filter.rs @@ -0,0 +1,379 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +use crate::error::Result; +use crate::spec::{DataFile, ManifestFile, ManifestStatus}; +use crate::transaction::snapshot::SnapshotProducer; + +/// Accumulates the set of files an operation removes from a table and rewrites the +/// affected manifests during manifest production. +/// +/// A delete-class operation owns one `ManifestFilterManager` for data manifests and one for +/// delete manifests. Removed files are recorded via [`ManifestFilterManager::delete_file`] +/// keyed by their file path; the later filtering pass drops the matching manifest entries +/// and re-emits the survivors. +/// +/// A `DataFile` describes both data files and delete (positional/equality) files, so the +/// same entry point serves data-manifest and delete-manifest filtering. The manager's only +/// durable state lives within a single filtering pass, which matches the operation's +/// per-attempt lifetime (it is reconstructed deterministically on each commit retry). +#[derive(Debug, Default)] +pub(crate) struct ManifestFilterManager { + /// Files to drop, keyed by file path. `DataFile` covers both data and delete files. + deleted_files: HashMap, +} + +impl ManifestFilterManager { + /// Record a file for removal. + /// + /// `DataFile` covers both data and delete files, so the same entry point serves + /// data-manifest and delete-manifest filtering. Removals are keyed by file path; + /// recording the same path more than once keeps a single removal entry. + #[allow(unused)] + pub(crate) fn delete_file(&mut self, file: DataFile) { + self.deleted_files + .insert(file.file_path().to_string(), file); + } + + /// Returns `true` if no files are recorded for removal. + #[allow(unused)] + pub(crate) fn is_empty(&self) -> bool { + self.deleted_files.is_empty() + } + + /// Returns `true` if the file at `path` is recorded for removal. + #[allow(unused)] + pub(crate) fn is_removed(&self, path: &str) -> bool { + self.deleted_files.contains_key(path) + } + + /// Rewrite the given `manifests`, dropping any entries recorded for removal and + /// re-emitting the survivors via a producer-provided manifest writer. + /// + /// Manifests that contain none of the removed files are passed through unchanged so the + /// common (non-conflicting) case avoids a rewrite. For each manifest that does reference + /// a removed file, a new manifest is written that: + /// + /// - skips entries already marked [`ManifestStatus::Deleted`] (they are informational + /// only and are not carried forward), and + /// - skips entries whose file path is in the removed set. + /// + /// All remaining (alive, not-removed) entries are re-emitted as existing entries. If a + /// rewrite drops every entry, the manifest is omitted from the returned vector entirely. + /// + /// This mirrors `#1606`'s per-manifest rewrite loop, moved off `SnapshotProducer` and + /// onto the manager that owns the removal set. + #[allow(unused)] + pub(crate) async fn filter_manifests( + &mut self, + sp: &mut SnapshotProducer<'_>, + manifests: Vec, + ) -> Result> { + // Nothing recorded for removal: every manifest is carried forward verbatim. + if self.deleted_files.is_empty() { + return Ok(manifests); + } + + let file_io = sp.table.file_io().clone(); + let mut filtered = Vec::with_capacity(manifests.len()); + + for manifest_file in manifests { + let manifest = manifest_file.load_manifest(&file_io).await?; + let entries = manifest.entries(); + + // Pass the manifest through unchanged when none of its live entries reference a + // removed file. Deleted entries are ignored for this decision. + let has_removed_entry = entries.iter().any(|entry| { + entry.status() != ManifestStatus::Deleted && self.is_removed(entry.file_path()) + }); + if !has_removed_entry { + filtered.push(manifest_file); + continue; + } + + // Rewrite the manifest, re-emitting the survivors as existing entries. + let mut writer = sp.new_manifest_writer(manifest_file.content)?; + let mut survivors = 0; + for entry in entries { + // Deleted entries are informational only; never carried forward. + if entry.status() == ManifestStatus::Deleted { + continue; + } + // Drop entries whose file is being removed by this operation. + if self.is_removed(entry.file_path()) { + continue; + } + + writer.add_existing_file( + entry.data_file().clone(), + entry.snapshot_id().unwrap_or_default(), + entry.sequence_number().unwrap_or_default(), + entry.file_sequence_number, + )?; + survivors += 1; + } + + // If every entry was dropped, omit the manifest from the output entirely. + if survivors == 0 { + continue; + } + + filtered.push(writer.write_manifest_file().await?); + } + + Ok(filtered) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::fs; + + use tempfile::TempDir; + use uuid::Uuid; + + use super::*; + use crate::TableIdent; + use crate::io::FileIO; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry, ManifestStatus, + ManifestWriterBuilder, Struct, TableMetadata, + }; + use crate::table::Table; + use crate::test_utils::test_runtime; + + fn data_file(path: &str) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(0))])) + .partition_spec_id(0) + .build() + .unwrap() + } + + /// Builds a v2 table backed by the local filesystem in a `TempDir`, so manifests can be + /// written to and loaded from real files (as `filter_manifests` requires). + fn make_fs_table() -> (Table, TempDir) { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().join("table1"); + let manifest_list_location = table_location.join("metadata/manifests_list_1.avro"); + let table_metadata_location = table_location.join("metadata/v1.json"); + + let file_io = FileIO::new_with_fs(); + + let template = fs::read_to_string(format!( + "{}/testdata/example_table_metadata_v2.json", + env!("CARGO_MANIFEST_DIR") + )) + .unwrap(); + let metadata_json = template + .replace("{{ table_location }}", table_location.to_str().unwrap()) + .replace( + "{{ manifest_list_1_location }}", + manifest_list_location.to_str().unwrap(), + ) + .replace( + "{{ manifest_list_2_location }}", + manifest_list_location.to_str().unwrap(), + ) + .replace( + "{{ table_metadata_1_location }}", + table_metadata_location.to_str().unwrap(), + ); + let table_metadata = serde_json::from_str::(&metadata_json).unwrap(); + + let table = Table::builder() + .metadata(table_metadata) + .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) + .file_io(file_io) + .metadata_location(table_metadata_location.to_str().unwrap()) + .runtime(test_runtime()) + .build() + .unwrap(); + + (table, tmp_dir) + } + + /// Writes a data manifest to disk containing one `Added` data entry per path in `paths` + /// and returns the resulting [`ManifestFile`]. + async fn write_data_manifest(table: &Table, paths: &[&str]) -> ManifestFile { + let current_snapshot = table.metadata().current_snapshot().unwrap(); + let schema = current_snapshot.schema(table.metadata()).unwrap(); + let partition_spec = table.metadata().default_partition_spec(); + let table_location_str = table.metadata().location().to_string(); + + let output_file = table + .file_io() + .new_output(format!( + "{table_location_str}/metadata/manifest_{}.avro", + Uuid::new_v4() + )) + .unwrap(); + + let mut writer = ManifestWriterBuilder::new( + output_file, + Some(current_snapshot.snapshot_id()), + None, + schema.clone(), + partition_spec.as_ref().clone(), + ) + .build_v2_data(); + + for path in paths { + writer + .add_entry( + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFileBuilder::default() + .partition_spec_id(0) + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(), + ) + .build(), + ) + .unwrap(); + } + + writer.write_manifest_file().await.unwrap() + } + + /// Collects the file paths of all alive entries in a manifest on disk. + async fn manifest_file_paths(table: &Table, manifest_file: &ManifestFile) -> Vec { + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + manifest + .entries() + .iter() + .filter(|entry| entry.status() != ManifestStatus::Deleted) + .map(|entry| entry.file_path().to_string()) + .collect() + } + + #[test] + fn test_default_is_empty_no_op() { + let manager = ManifestFilterManager::default(); + assert!(manager.is_empty()); + assert!(!manager.is_removed("test/1.parquet")); + } + + #[test] + fn test_delete_file_records_removal_by_path() { + let mut manager = ManifestFilterManager::default(); + manager.delete_file(data_file("test/1.parquet")); + + assert!(!manager.is_empty()); + assert!(manager.is_removed("test/1.parquet")); + assert!(!manager.is_removed("test/2.parquet")); + } + + #[test] + fn test_delete_file_same_path_dedupes() { + let mut manager = ManifestFilterManager::default(); + manager.delete_file(data_file("test/1.parquet")); + manager.delete_file(data_file("test/1.parquet")); + + assert_eq!(manager.deleted_files.len(), 1); + } + + #[tokio::test] + async fn test_filter_manifests_no_op_when_nothing_removed() { + let (table, _tmp_dir) = make_fs_table(); + let manifest = write_data_manifest(&table, &["data/a.parquet", "data/b.parquet"]).await; + + let mut producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![]); + + // Nothing recorded for removal: the manifest must be returned verbatim. + let mut manager = ManifestFilterManager::default(); + let result = manager + .filter_manifests(&mut producer, vec![manifest.clone()]) + .await + .unwrap(); + + assert_eq!(result.len(), 1); + assert_eq!(result[0].manifest_path, manifest.manifest_path); + } + + #[tokio::test] + async fn test_filter_manifests_removing_all_entries_drops_manifest() { + let (table, _tmp_dir) = make_fs_table(); + let manifest = write_data_manifest(&table, &["data/a.parquet", "data/b.parquet"]).await; + + let mut producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![]); + + // Remove every entry of the manifest. + let mut manager = ManifestFilterManager::default(); + manager.delete_file(data_file("data/a.parquet")); + manager.delete_file(data_file("data/b.parquet")); + + let result = manager + .filter_manifests(&mut producer, vec![manifest]) + .await + .unwrap(); + + // The manifest is dropped from the output entirely once all entries are removed. + assert!(result.is_empty()); + } + + #[tokio::test] + async fn test_filter_manifests_removing_subset_rewrites_survivors() { + let (table, _tmp_dir) = make_fs_table(); + let original = write_data_manifest(&table, &[ + "data/a.parquet", + "data/b.parquet", + "data/c.parquet", + ]) + .await; + + let mut producer = + SnapshotProducer::new(&table, Uuid::now_v7(), None, HashMap::new(), vec![]); + + // Remove only a subset of the entries. + let mut manager = ManifestFilterManager::default(); + manager.delete_file(data_file("data/b.parquet")); + + let result = manager + .filter_manifests(&mut producer, vec![original.clone()]) + .await + .unwrap(); + + // A rewritten manifest is produced (distinct from the original) holding survivors. + assert_eq!(result.len(), 1); + assert_ne!(result[0].manifest_path, original.manifest_path); + + let mut survivors = manifest_file_paths(&table, &result[0]).await; + survivors.sort(); + assert_eq!(survivors, vec![ + "data/a.parquet".to_string(), + "data/c.parquet".to_string(), + ]); + } +} diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index b68a53e5e3..9fbb75d64c 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,9 @@ mod action; pub use action::*; mod append; +mod delete_aware; +mod manifest_filter; +mod rewrite_files; mod snapshot; mod sort_order; mod update_location; @@ -73,6 +76,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::rewrite_files::RewriteFilesAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -149,6 +153,15 @@ impl Transaction { FastAppendAction::new() } + /// Creates a rewrite files action. + /// + /// `RewriteFiles` replaces a set of existing files with a new, equivalent set (i.e. + /// compaction) in a single `Replace` snapshot, validating that no conflicting row-level + /// deletes have appeared for the rewritten data files since the starting snapshot. + pub fn rewrite_files(&self) -> RewriteFilesAction { + RewriteFilesAction::new() + } + /// Creates replace sort order action. pub fn replace_sort_order(&self) -> ReplaceSortOrderAction { ReplaceSortOrderAction::new() diff --git a/crates/iceberg/src/transaction/rewrite_files.rs b/crates/iceberg/src/transaction/rewrite_files.rs new file mode 100644 index 0000000000..e71df5c8dc --- /dev/null +++ b/crates/iceberg/src/transaction/rewrite_files.rs @@ -0,0 +1,455 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{ + DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestFile, Operation, +}; +use crate::table::Table; +use crate::transaction::delete_aware::DeleteAwareOperation; +use crate::transaction::manifest_filter::ManifestFilterManager; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind}; + +/// A transaction action that rewrites a set of existing files with a new, equivalent set +/// (i.e. compaction). +/// +/// `RewriteFiles` adds new files and deletes old ones in a single `Replace` snapshot. Added +/// and deleted files are bucketed by [`DataContentType`] as they come in: data files land in +/// the data buckets, while positional/equality delete files land in the delete buckets (this +/// split mirrors `#1606`). +/// +/// The action stores its inputs and rebuilds a fresh [`RewriteFilesOperation`] on each commit +/// attempt via [`build_operation`](RewriteFilesAction::build_operation), so the operation's +/// filter managers are repopulated deterministically and survive `do_commit` retries. +pub struct RewriteFilesAction { + // below are properties used to create SnapshotProducer when commit + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + // Added/deleted files split by content type, as in #1606. + added_data_files: Vec, + added_delete_files: Vec, + deleted_data_files: Vec, + deleted_delete_files: Vec, + // When set, only positional deletes count as conflicts during validation. + data_sequence_number: Option, + // The snapshot the rewrite started from; validation scans for new deletes since this id. + starting_snapshot_id: Option, +} + +impl RewriteFilesAction { + pub(crate) fn new() -> Self { + Self { + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + added_delete_files: vec![], + deleted_data_files: vec![], + deleted_delete_files: vec![], + data_sequence_number: None, + starting_snapshot_id: None, + } + } + + /// Add files produced by the rewrite, bucketing each by its content type: data files go + /// to the data bucket; positional/equality delete files go to the delete bucket. + pub fn add_data_files(mut self, files: impl IntoIterator) -> Result { + for f in files { + match f.content_type() { + DataContentType::Data => self.added_data_files.push(f), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.added_delete_files.push(f) + } + } + } + Ok(self) + } + + /// Mark files removed by the rewrite, bucketing each by its content type: data files go + /// to the data bucket; positional/equality delete files go to the delete bucket. + pub fn delete_files(mut self, files: impl IntoIterator) -> Result { + for f in files { + match f.content_type() { + DataContentType::Data => self.deleted_data_files.push(f), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.deleted_delete_files.push(f) + } + } + } + Ok(self) + } + + /// Set the snapshot the rewrite started from. Conflict validation scans for new delete + /// files added since this snapshot. + pub fn set_starting_snapshot_id(mut self, id: i64) -> Self { + self.starting_snapshot_id = Some(id); + self + } + + /// Pin the data sequence number for the rewrite. When set, only positional deletes are + /// treated as conflicts during validation. + pub fn set_data_sequence_number(mut self, seq: i64) -> Self { + self.data_sequence_number = Some(seq); + self + } + + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } + + /// Build a fresh [`RewriteFilesOperation`] from the action's stored inputs. + /// + /// A new operation is built per commit attempt; the filter managers are repopulated + /// deterministically from the deleted-file vectors (so the operation survives `do_commit` + /// retries). Removed files become filter-manager removals via + /// [`ManifestFilterManager::delete_file`] — data files into the data filter, delete files + /// into the delete filter (`DataFile` covers both kinds). + fn build_operation(&self) -> RewriteFilesOperation { + let mut op = RewriteFilesOperation { + deleted_data_files: self.deleted_data_files.clone(), + deleted_delete_files: self.deleted_delete_files.clone(), + starting_snapshot_id: self.starting_snapshot_id, + data_sequence_number: self.data_sequence_number, + data_filter: ManifestFilterManager::default(), + delete_filter: ManifestFilterManager::default(), + }; + + // Removed files become filter-manager removals (DataFile covers both kinds). + for f in &self.deleted_data_files { + op.data_filter.delete_file(f.clone()); + } + for f in &self.deleted_delete_files { + op.delete_filter.delete_file(f.clone()); + } + + op + } +} + +/// The operation driven by [`SnapshotProducer`](crate::transaction::snapshot::SnapshotProducer) +/// for a `RewriteFiles` commit. +/// +/// Rebuilt per commit attempt by [`RewriteFilesAction::build_operation`]. The +/// `SnapshotProduceOperation` and `DeleteAwareOperation` implementations are wired in +/// subsequent subtasks (5.2–5.4). +pub(crate) struct RewriteFilesOperation { + deleted_data_files: Vec, + deleted_delete_files: Vec, + starting_snapshot_id: Option, + data_sequence_number: Option, + data_filter: ManifestFilterManager, + delete_filter: ManifestFilterManager, +} + +impl SnapshotProduceOperation for RewriteFilesOperation { + fn operation(&self) -> Operation { + Operation::Replace + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + // Removed files are dropped by rewriting the carried-forward manifests in + // `existing_manifest` via the filter managers, so no separate delete entries are + // emitted here. + Ok(vec![]) + } + + async fn existing_manifest( + &mut self, + snapshot_produce: &mut SnapshotProducer<'_>, + ) -> Result> { + // No current snapshot means there are no manifests to carry forward. + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifests = snapshot_produce + .table + .manifest_list_reader(snapshot) + .load() + .await? + .entries() + .to_vec(); + + // Partition the carried-forward manifests by content type so each filter manager + // rewrites only the manifests it owns: data manifests through `data_filter`, + // positional/equality delete manifests through `delete_filter`. + let (data, deletes): (Vec, Vec) = manifests + .into_iter() + .partition(|m| m.content == ManifestContentType::Data); + + let mut out = self + .data_filter + .filter_manifests(snapshot_produce, data) + .await?; + out.extend( + self.delete_filter + .filter_manifests(snapshot_produce, deletes) + .await?, + ); + + Ok(out) + } +} + +impl DeleteAwareOperation for RewriteFilesOperation { + async fn validate(&self, base: &Table, parent_snapshot_id: Option) -> Result<()> { + // Precondition: a rewrite must remove at least one file. (Intent from #1606.) + if self.deleted_data_files.is_empty() && self.deleted_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Files to delete cannot be empty", + )); + } + + // When data files are being rewritten, ensure no new row-level deletes have appeared + // for them since the starting snapshot. When a data sequence number is pinned, only + // positional deletes count as conflicts (`ignore_equality_deletes`), following #1606. + if !self.deleted_data_files.is_empty() { + self.validate_no_new_deletes_for_data_files( + base, + self.starting_snapshot_id, + parent_snapshot_id, + &self.deleted_data_files, + self.data_sequence_number.is_some(), + ) + .await?; + } + + Ok(()) + } + + fn data_filter(&mut self) -> &mut ManifestFilterManager { + &mut self.data_filter + } + + fn delete_filter(&mut self) -> &mut ManifestFilterManager { + &mut self.delete_filter + } +} + +#[async_trait] +impl TransactionAction for RewriteFilesAction { + async fn commit(self: Arc, table: &Table) -> Result { + // Rebuild a fresh operation for this attempt; the filter managers are repopulated + // deterministically from the action's stored inputs (survives `do_commit` retries). + let op = self.build_operation(); + + // STEP 1 — conflict validation before any write. Scans the refreshed base table for + // concurrent changes (e.g. new row-level deletes for rewritten data files) since the + // starting snapshot. + op.validate(table, table.metadata().current_snapshot_id()) + .await?; + + // STEP 2 — produce the `Replace` snapshot. The producer drives the operation by `&mut` + // so the filter managers can rewrite the carried-forward manifests inside + // `existing_manifest`. + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + ); + + snapshot_producer.commit(op, DefaultManifestProcess).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::spec::{DataFileBuilder, DataFileFormat, Literal, Struct}; + + fn file(path: &str, content: DataContentType) -> DataFile { + let mut builder = DataFileBuilder::default(); + builder + .content(content) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(0))])) + .partition_spec_id(0); + // Positional/equality deletes require a referenced data file / equality ids to be + // valid, but for these builder-bucketing tests only content_type matters. + if content == DataContentType::EqualityDeletes { + builder.equality_ids(Some(vec![1])); + } + if content == DataContentType::PositionDeletes { + builder.referenced_data_file(Some("data/ref.parquet".to_string())); + } + builder.build().unwrap() + } + + #[test] + fn test_add_data_files_buckets_by_content_type() { + let action = RewriteFilesAction::new() + .add_data_files(vec![ + file("data/a.parquet", DataContentType::Data), + file("delete/pos.parquet", DataContentType::PositionDeletes), + file("delete/eq.parquet", DataContentType::EqualityDeletes), + ]) + .unwrap(); + + assert_eq!(action.added_data_files.len(), 1); + assert_eq!(action.added_data_files[0].file_path(), "data/a.parquet"); + assert_eq!(action.added_delete_files.len(), 2); + assert!(action.deleted_data_files.is_empty()); + assert!(action.deleted_delete_files.is_empty()); + } + + #[test] + fn test_delete_files_buckets_by_content_type() { + let action = RewriteFilesAction::new() + .delete_files(vec![ + file("data/a.parquet", DataContentType::Data), + file("data/b.parquet", DataContentType::Data), + file("delete/pos.parquet", DataContentType::PositionDeletes), + ]) + .unwrap(); + + assert_eq!(action.deleted_data_files.len(), 2); + assert_eq!(action.deleted_delete_files.len(), 1); + assert!(action.added_data_files.is_empty()); + assert!(action.added_delete_files.is_empty()); + } + + #[test] + fn test_setters_populate_fields() { + let uuid = Uuid::now_v7(); + let mut props = HashMap::new(); + props.insert("k".to_string(), "v".to_string()); + + let action = RewriteFilesAction::new() + .set_starting_snapshot_id(42) + .set_data_sequence_number(7) + .set_commit_uuid(uuid) + .set_key_metadata(vec![1, 2, 3]) + .set_snapshot_properties(props.clone()); + + assert_eq!(action.starting_snapshot_id, Some(42)); + assert_eq!(action.data_sequence_number, Some(7)); + assert_eq!(action.commit_uuid, Some(uuid)); + assert_eq!(action.key_metadata, Some(vec![1, 2, 3])); + assert_eq!(action.snapshot_properties, props); + } + + #[test] + fn test_build_operation_populates_filters_from_deleted_files() { + let action = RewriteFilesAction::new() + .delete_files(vec![ + file("data/a.parquet", DataContentType::Data), + file("data/b.parquet", DataContentType::Data), + file("delete/pos.parquet", DataContentType::PositionDeletes), + ]) + .unwrap() + .set_starting_snapshot_id(99) + .set_data_sequence_number(3); + + let op = action.build_operation(); + + // Carried-over scalar fields. + assert_eq!(op.starting_snapshot_id, Some(99)); + assert_eq!(op.data_sequence_number, Some(3)); + assert_eq!(op.deleted_data_files.len(), 2); + assert_eq!(op.deleted_delete_files.len(), 1); + + // Deleted data files routed into the data filter; delete files into delete filter. + assert!(op.data_filter.is_removed("data/a.parquet")); + assert!(op.data_filter.is_removed("data/b.parquet")); + assert!(!op.data_filter.is_removed("delete/pos.parquet")); + + assert!(op.delete_filter.is_removed("delete/pos.parquet")); + assert!(!op.delete_filter.is_removed("data/a.parquet")); + } + + #[test] + fn test_build_operation_empty_when_nothing_deleted() { + let op = RewriteFilesAction::new().build_operation(); + + assert!(op.data_filter.is_empty()); + assert!(op.delete_filter.is_empty()); + assert!(op.deleted_data_files.is_empty()); + assert!(op.deleted_delete_files.is_empty()); + assert_eq!(op.starting_snapshot_id, None); + assert_eq!(op.data_sequence_number, None); + } + + #[tokio::test] + async fn test_validate_errors_when_no_files_deleted() { + let table = crate::transaction::tests::make_v2_minimal_table(); + let op = RewriteFilesAction::new().build_operation(); + + let err = op + .validate(&table, table.metadata().current_snapshot_id()) + .await + .unwrap_err(); + + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message().contains("Files to delete cannot be empty"), + "unexpected error message: {}", + err.message() + ); + } + + #[tokio::test] + async fn test_validate_passes_when_no_data_files_deleted_on_minimal_table() { + // Only a delete file is removed (no data files), so the data-file conflict check is + // skipped and the precondition is satisfied. + let table = crate::transaction::tests::make_v2_minimal_table(); + let op = RewriteFilesAction::new() + .delete_files(vec![file( + "delete/pos.parquet", + DataContentType::PositionDeletes, + )]) + .unwrap() + .build_operation(); + + op.validate(&table, table.metadata().current_snapshot_id()) + .await + .unwrap(); + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8e47226072..79a405fd1d 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -84,8 +84,8 @@ pub(crate) trait SnapshotProduceOperation: Send + Sync { /// - **Overwrite operations**: May exclude manifests for partitions being overwritten /// - **Delete operations**: May exclude manifests for partitions being deleted fn existing_manifest( - &self, - snapshot_produce: &SnapshotProducer<'_>, + &mut self, + snapshot_produce: &mut SnapshotProducer<'_>, ) -> impl Future>> + Send; } @@ -242,7 +242,10 @@ impl<'a> SnapshotProducer<'a> { snapshot_id } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + pub(crate) fn new_manifest_writer( + &mut self, + content: ManifestContentType, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -340,7 +343,7 @@ impl<'a> SnapshotProducer<'a> { async fn manifest_file( &mut self, - snapshot_produce_operation: &OP, + snapshot_produce_operation: &mut OP, manifest_process: &MP, ) -> Result> { // Assert current snapshot producer contains new content to add to new snapshot. @@ -434,7 +437,7 @@ impl<'a> SnapshotProducer<'a> { /// Finished building the action and return the [`ActionCommit`] to the transaction. pub(crate) async fn commit( mut self, - snapshot_produce_operation: OP, + mut snapshot_produce_operation: OP, process: MP, ) -> Result { let manifest_list_path = self.generate_manifest_list_file_path(0); @@ -475,7 +478,7 @@ impl<'a> SnapshotProducer<'a> { })?; let new_manifests = self - .manifest_file(&snapshot_produce_operation, &process) + .manifest_file(&mut snapshot_produce_operation, &process) .await?; manifest_list_writer.add_manifests(new_manifests.into_iter())?; diff --git a/docs/rfcs/conflict-detection.md b/docs/rfcs/conflict-detection.md new file mode 100644 index 0000000000..004aa01eb8 --- /dev/null +++ b/docs/rfcs/conflict-detection.md @@ -0,0 +1,444 @@ + + +# Design Document: Snapshot Conflict Validation + +## 1. Problem Statement + +Snapshot production on `main` only supports append-only operations. There is no +machinery for the operations that **remove existing files** from a table — delete, +overwrite, and rewrite. Those operations need two capabilities that append does not: + +1. **Write-time conflict validation** — before writing a new snapshot, the commit must + check the refreshed base table for concurrent changes that would make the pending + operation incorrect (e.g. a concurrent commit that added delete files for data files + this operation is about to rewrite). +2. **Manifest filtering** — instead of carrying existing manifests forward verbatim, a + delete-class operation must rewrite manifests to drop the files it removes. + +iceberg-java packages all of this into a single `MergingSnapshotProducer` base class via +implementation inheritance. Rust has no implementation inheritance, and the append path is +already factored differently. We need a shape that adds validation + filtering for +delete-class operations **without disturbing the append-only path** and without recreating +a Java-style inheritance chain. + +This document defines the trait/struct structure for that shape. Concrete operations +(`OverwriteFiles`, `RowDelta`, `RewriteFiles`) and the `DeleteFileIndex` conflict-scoping +gap are out of scope here and left as placeholders. + +## 2. Existing Architecture and Components + +The snapshot-production machinery on `main` lives in `transaction/snapshot.rs` and +`transaction/append.rs`: + +- **`SnapshotProducer`** (`snapshot.rs`) — a concrete struct that *drives* one operation + plus one manifest process and writes the new snapshot (id generation, manifest writing, + manifest-list writing, summary). Its `commit(...)` method does **not** validate. +- **`SnapshotProduceOperation`** (`snapshot.rs`) — a plain trait with no supertrait, + exposing `operation()`, `delete_entries()`, and `existing_manifest()`. `existing_manifest` + today only *selects* which manifests to carry forward. +- **`ManifestProcess` + `DefaultManifestProcess`** (`snapshot.rs`) — a seam for + merge/compaction of the manifest set. `DefaultManifestProcess` is a no-op pass-through. +- **`FastAppendOperation`** (`append.rs`) — the only `SnapshotProduceOperation` impl; it + implements the base trait and nothing else. +- **`TransactionAction`** (`action.rs`) — `commit(self: Arc, table: &Table)`. The + action is the object `Arc`-cloned and reused across `Transaction::do_commit` retries; the + producer and operation are rebuilt from scratch on each attempt. + +There is no validator trait, no `validate.rs`, no filter manager, and no producer-side +validation call anywhere on `main`. + +## 3. Proposed Architecture + +```mermaid +classDiagram + direction TB + + class SnapshotProducer { + <> + +commit(&mut op, process) ActionCommit + } + class SnapshotProduceOperation { + <> + +operation() Operation + +existing_manifest(&mut self, sp) Vec~ManifestFile~ + } + class ManifestProcess { + <> + } + class FastAppendOperation { + <> + } + + class DeleteAwareOperation { + <> + +validate(base, parent) + +validation_history(...) default + +validate_no_new_deletes_for_data_files(...) default + +data_filter(&mut self) &mut ManifestFilterManager + +delete_filter(&mut self) &mut ManifestFilterManager + } + class ManifestFilterManager { + <> + +delete_file(file) + +filter_manifests(sp, manifests) + } + class OverwriteFilesOperation { + <> + } + class RowDeltaOperation { + <> + } + + SnapshotProducer ..> SnapshotProduceOperation : drives (&mut) + SnapshotProducer ..> ManifestProcess : drives + FastAppendOperation ..|> SnapshotProduceOperation + DeleteAwareOperation --|> SnapshotProduceOperation : sub-trait + DeleteAwareOperation ..> ManifestFilterManager : owns (data + delete) + OverwriteFilesOperation ..|> DeleteAwareOperation + RowDeltaOperation ..|> DeleteAwareOperation +``` + +Key points: + +- The new `DeleteAwareOperation` is an **additive sub-trait** of the existing + `SnapshotProduceOperation`. Append-only operations implement only the base trait and carry + none of the validation/filtering surface. +- `validate` is invoked by the **action** before it hands off to `SnapshotProducer::commit`, + so validation precedes every write. The producer stays oblivious to validation. +- The producer drives the operation by `&mut` so the filter managers can mutate during + manifest production. This is the one base-trait signature change: `existing_manifest` + takes `&mut self`; `FastAppendOperation` absorbs it trivially. + +## 4. Proposed Components + +### 4.1 `DeleteAwareOperation` (new trait) + +A sub-trait of `SnapshotProduceOperation`, implemented only by delete-class operations. It +provides three things: + +- **`validate`** — the per-operation conflict check (real polymorphism; each operation + implements its own). +- **Reusable validation helpers with default implementations** — `validation_history` and + `validate_no_new_deletes_for_data_files`. A single `validate()` is not enough: each + delete-class operation composes the same lower-level checks (scan the snapshots added since + the validation window, find conflicting deletes for a set of data files). Putting these on + the trait as defaults lets each `validate` reuse them instead of re-deriving the logic. + (See `#2590` for the reference shape of these helpers.) +- **Filter accessors** — `data_filter` / `delete_filter` reach the operation's owned filter + managers so the above can be wired without each implementor re-plumbing them. + +```rust +// NEW — does not exist on main +trait DeleteAwareOperation: SnapshotProduceOperation { + /// Per-operation conflict check against the refreshed base table, run before any write. + /// Implemented per operation; composes the helpers below. + async fn validate(&self, base: &Table, parent_snapshot_id: Option) -> Result<()>; + + /// Default helper: collect manifests + snapshot ids between two points, filtered by + /// operation set and manifest content type. Reused by the checks below. + async fn validation_history( + &self, + base: &Table, + from_snapshot_id: Option, + to_snapshot_id: i64, + matching_operations: &HashSet, + content_type: ManifestContentType, + ) -> Result<(Vec, HashSet)> { + // default implementation + } + + /// Default helper: fail if any delete added since `from_snapshot_id` targets the + /// given data files. + async fn validate_no_new_deletes_for_data_files( + &self, + base: &Table, + from_snapshot_id: Option, + to_snapshot_id: Option, + data_files: &[DataFile], + ) -> Result<()> { + // default implementation, built on validation_history + } + + /// Accessors to the operation's owned managers. Built up as the operation is + /// constructed and mutated during manifest production. + fn data_filter(&mut self) -> &mut ManifestFilterManager; + fn delete_filter(&mut self) -> &mut ManifestFilterManager; +} +``` + +The managers are **owned fields on the operation**, populated incrementally as the action +builds the operation, e.g.: + +```rust +struct RowDelta { + data_filter: ManifestFilterManager, + delete_filter: ManifestFilterManager, + // ... added files, conflict-detection config, etc. +} + +impl RowDelta { + fn remove_rows(&mut self, file: DataFile) { + self.delete_filter().delete_file(file); + } +} + +impl DeleteAwareOperation for RowDelta { + async fn validate(&self, base: &Table, parent: Option) -> Result<()> { + // composes the default helpers, e.g. + self.validate_no_new_deletes_for_data_files(base, parent, None, &self.targets) + .await + } + fn data_filter(&mut self) -> &mut ManifestFilterManager { &mut self.data_filter } + fn delete_filter(&mut self) -> &mut ManifestFilterManager { &mut self.delete_filter } +} +``` + +Because the operation is rebuilt each commit attempt, the managers are reconstructed +deterministically per attempt from the action's stored inputs — no stale per-attempt state. + +### 4.2 `ManifestFilterManager` (new struct) + +A concrete struct (not a trait) that an operation owns — one for data manifests, one for +delete manifests. It accumulates which files/entries to drop and rewrites the affected +manifests during manifest production. Its only durable state is within a single filtering +pass, which matches the operation's per-attempt lifetime. + +```rust +// NEW — concrete struct +struct ManifestFilterManager { + // files/entries to drop, drop predicate, within-pass tracking, ... +} + +impl ManifestFilterManager { + /// Record a file for removal. `DataFile` covers both data and delete files, so the + /// same entry point serves data-manifest and delete-manifest filtering. + fn delete_file(&mut self, file: DataFile) { /* record removal */ } + + /// Rewrite the given manifests, dropping removed entries; writes rewritten + /// manifests via a producer-provided helper. + async fn filter_manifests( + &mut self, + sp: &mut SnapshotProducer<'_>, + manifests: Vec, + ) -> Result> { /* ... */ } +} +``` + +### 4.3 `SnapshotProducer` (existing — one signature change) + +Stays a concrete driver. `commit` and `manifest_file` thread the operation by `&mut` so a +delete-class operation can run its filter managers inside `existing_manifest`. The +append-only path is unchanged in behavior. + +```rust +// existing trait method, now &mut self +fn existing_manifest( + &mut self, + sp: &mut SnapshotProducer<'_>, +) -> impl Future>> + Send; +``` + +### 4.4 Action wiring (placeholder) + +Delete-class actions opt into validation explicitly; append actions do neither. + +```rust +// NEW delete-class action — placeholder +impl TransactionAction for OverwriteFilesAction { + async fn commit(self: Arc, table: &Table) -> Result { + let op = self.build_operation(); // build delete_aware_op + op.validate(table, table.metadata().current_snapshot_id()).await?; + let producer = SnapshotProducer::new(table, /* ... */); + producer.commit(op, DefaultManifestProcess).await + } +} + +// FastAppendAction is unchanged: no validate, no DeleteAwareOperation. +``` + +### 4.5 Worked example: `RewriteFilesAction` + +`RewriteFiles` replaces a set of existing files with a new, equivalent set (compaction). It +adds new files and deletes old ones in one `Replace` snapshot, and must validate that no new +row-level deletes have appeared for the data files it is rewriting since its starting +snapshot. The reference implementation is `#1606`; the field set, the builder split of added +/ deleted files by content type, the empty-file precondition checks, and the +`ignore_equality_deletes = data_sequence_number.is_some()` flag are taken directly from it. +The shape below keeps that behavior but expresses it through `DeleteAwareOperation` (rather +than `#1606`'s separate `SnapshotValidator` supertrait) and routes the removed files through +`ManifestFilterManager` (rather than carrying `deleted_*` vectors on `SnapshotProducer`). + +```rust +// NEW — placeholder. Added/deleted files are split by content type, as in #1606. +pub struct RewriteFilesAction { + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, + added_delete_files: Vec, + deleted_data_files: Vec, + deleted_delete_files: Vec, + data_sequence_number: Option, + starting_snapshot_id: Option, +} + +impl RewriteFilesAction { + /// Split incoming files into data vs delete buckets (content type), as in #1606. + pub fn add_data_files(mut self, files: impl IntoIterator) -> Result { + for f in files { + match f.content_type() { + DataContentType::Data => self.added_data_files.push(f), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.added_delete_files.push(f) + } + } + } + Ok(self) + } + + pub fn delete_files(mut self, files: impl IntoIterator) -> Result { + for f in files { + match f.content_type() { + DataContentType::Data => self.deleted_data_files.push(f), + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + self.deleted_delete_files.push(f) + } + } + } + Ok(self) + } + + pub fn set_starting_snapshot_id(mut self, id: i64) -> Self { /* ... */ self } + pub fn set_data_sequence_number(mut self, seq: i64) -> Self { /* ... */ self } + // set_commit_uuid / set_key_metadata / set_snapshot_properties omitted + + /// Rebuild a fresh operation per attempt; the managers are repopulated + /// deterministically from the action's stored inputs (survives do_commit retries). + fn build_operation(&self) -> RewriteFilesOperation { + let mut op = RewriteFilesOperation { + deleted_data_files: self.deleted_data_files.clone(), + deleted_delete_files: self.deleted_delete_files.clone(), + starting_snapshot_id: self.starting_snapshot_id, + data_sequence_number: self.data_sequence_number, + data_filter: ManifestFilterManager::default(), + delete_filter: ManifestFilterManager::default(), + }; + // Removed files become filter-manager removals (DataFile covers both kinds). + for f in &self.deleted_data_files { + op.data_filter.delete_file(f.clone()); + } + for f in &self.deleted_delete_files { + op.delete_filter.delete_file(f.clone()); + } + op + } +} + +pub struct RewriteFilesOperation { + deleted_data_files: Vec, + deleted_delete_files: Vec, + starting_snapshot_id: Option, + data_sequence_number: Option, + data_filter: ManifestFilterManager, + delete_filter: ManifestFilterManager, +} + +impl SnapshotProduceOperation for RewriteFilesOperation { + fn operation(&self) -> Operation { Operation::Replace } + + async fn existing_manifest( + &mut self, + sp: &mut SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = sp.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + let manifests = snapshot + .load_manifest_list(sp.table.file_io(), sp.table.metadata()) + .await? + .entries() + .to_vec(); + + // Each manager rewrites the manifests it owns, dropping the removed files and + // re-emitting the survivors (this is #1606's per-manifest rewrite loop, moved + // into ManifestFilterManager::filter_manifests). + let (data, deletes): (Vec<_>, Vec<_>) = + manifests.into_iter().partition(|m| m.content == ManifestContentType::Data); + let mut out = self.data_filter.filter_manifests(sp, data).await?; + out.extend(self.delete_filter.filter_manifests(sp, deletes).await?); + Ok(out) + } +} + +impl DeleteAwareOperation for RewriteFilesOperation { + async fn validate(&self, base: &Table, parent_snapshot_id: Option) -> Result<()> { + // Precondition checks (verbatim intent from #1606). + if self.deleted_data_files.is_empty() && self.deleted_delete_files.is_empty() { + return Err(Error::new(ErrorKind::DataInvalid, "Files to delete cannot be empty")); + } + + // If data files are being rewritten, there must be no new row-level deletes for + // them since the starting snapshot. ignore_equality_deletes follows #1606: when a + // data_sequence_number is pinned, only positional deletes count as conflicts. + if !self.deleted_data_files.is_empty() { + self.validate_no_new_deletes_for_data_files( + base, + self.starting_snapshot_id, + parent_snapshot_id, + &self.deleted_data_files, + self.data_sequence_number.is_some(), + ) + .await?; + } + Ok(()) + } + + fn data_filter(&mut self) -> &mut ManifestFilterManager { &mut self.data_filter } + fn delete_filter(&mut self) -> &mut ManifestFilterManager { &mut self.delete_filter } +} + +#[async_trait] +impl TransactionAction for RewriteFilesAction { + async fn commit(self: Arc, table: &Table) -> Result { + let op = self.build_operation(); + + // STEP 1 — conflict validation before any write. + op.validate(table, table.metadata().current_snapshot_id()).await?; + + // STEP 2 — produce the snapshot. The producer drives the op by &mut so the + // filter managers can rewrite manifests inside existing_manifest. + let producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + ); + producer.commit(op, DefaultManifestProcess).await + } +} +``` + +This exercises every component: `validate` runs the precondition checks then composes the +`validate_no_new_deletes_for_data_files` default helper (which itself calls +`validation_history`); the two `ManifestFilterManager`s take the removed data and delete +files via `delete_file` and rewrite the carried-forward manifests in `existing_manifest` +(driven by `&mut`); and `FastAppend` continues to need none of it. +