diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..5c7b20ea5c 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -37,6 +37,10 @@ pub struct FastAppendAction { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + // Optional tag name to create atomically with the snapshot. + tag_name: Option, + // Optional retention for the tag in milliseconds. + tag_retention: Option, } impl FastAppendAction { @@ -47,6 +51,8 @@ impl FastAppendAction { key_metadata: None, snapshot_properties: HashMap::default(), added_data_files: vec![], + tag_name: None, + tag_retention: None, } } @@ -79,17 +85,42 @@ impl FastAppendAction { self.snapshot_properties = snapshot_properties; self } + + /// Set a tag name to be created atomically with the snapshot. + /// The tag will use the table's default retention policy (history.expire.max-ref-age-ms). + /// + /// Use `with_tag_retention()` to override the retention policy. + pub fn with_tag(mut self, tag_name: impl Into) -> Self { + self.tag_name = Some(tag_name.into()); + self + } + + /// Set the retention period for the tag in milliseconds. + /// This overrides the table's default retention policy. + /// Use i64::MAX to make the tag never expire. + /// + /// Can be called before or after `with_tag()`. Only takes effect if a tag name is set. + pub fn with_tag_retention(mut self, max_ref_age_ms: i64) -> Self { + self.tag_retention = Some(max_ref_age_ms); + self + } } #[async_trait] impl TransactionAction for FastAppendAction { async fn commit(self: Arc, table: &Table) -> Result { + let tag_ref = self + .tag_name + .as_ref() + .map(|name| (name.clone(), self.tag_retention)); + let snapshot_producer = SnapshotProducer::new( table, self.commit_uuid.unwrap_or_else(Uuid::now_v7), self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), + tag_ref, ); // validate added files @@ -150,7 +181,8 @@ mod tests { use std::sync::Arc; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct, + DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, SnapshotRetention, + Struct, }; use crate::transaction::tests::make_v2_minimal_table; use crate::transaction::{Transaction, TransactionAction}; @@ -333,4 +365,118 @@ mod tests { ); assert_eq!(data_file, *manifest.entries()[0].data_file()); } + + #[tokio::test] + async fn test_fast_append_with_tag() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/tagged.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + let action = tx + .fast_append() + .add_data_files(vec![data_file]) + .with_tag("v1.0.0"); + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + // Should have 3 updates: AddSnapshot, SetSnapshotRef (main), SetSnapshotRef (tag) + assert_eq!(updates.len(), 3); + + // First update: AddSnapshot + let snapshot_id = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot.snapshot_id() + } else { + panic!("Expected AddSnapshot as first update"); + }; + + // Second update: SetSnapshotRef for main branch + if let TableUpdate::SetSnapshotRef { + ref_name, + reference, + } = &updates[1] + { + assert_eq!(ref_name, MAIN_BRANCH); + assert_eq!(reference.snapshot_id, snapshot_id); + assert!(reference.is_branch()); + } else { + panic!("Expected SetSnapshotRef for main branch as second update"); + } + + // Third update: SetSnapshotRef for tag + if let TableUpdate::SetSnapshotRef { + ref_name, + reference, + } = &updates[2] + { + assert_eq!(ref_name, "v1.0.0"); + assert_eq!(reference.snapshot_id, snapshot_id); + assert!(!reference.is_branch()); // Should be a tag, not a branch + // Should use table defaults (None) + if let SnapshotRetention::Tag { max_ref_age_ms } = &reference.retention { + assert_eq!(max_ref_age_ms, &None); + } else { + panic!("Expected Tag retention"); + } + } else { + panic!("Expected SetSnapshotRef for tag as third update"); + } + } + + #[tokio::test] + async fn test_fast_append_with_tag_retention() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/tagged_never_expire.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + let action = tx + .fast_append() + .add_data_files(vec![data_file]) + .with_tag("v2.0.0") + .with_tag_retention(i64::MAX); // Never expire + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + // Should have 3 updates: AddSnapshot, SetSnapshotRef (main), SetSnapshotRef (tag) + assert_eq!(updates.len(), 3); + + // Third update: SetSnapshotRef for tag with custom retention + if let TableUpdate::SetSnapshotRef { + ref_name, + reference, + } = &updates[2] + { + assert_eq!(ref_name, "v2.0.0"); + assert!(!reference.is_branch()); + // Should have custom retention set to i64::MAX + if let SnapshotRetention::Tag { max_ref_age_ms } = &reference.retention { + assert_eq!(max_ref_age_ms, &Some(i64::MAX)); + } else { + panic!("Expected Tag retention"); + } + } else { + panic!("Expected SetSnapshotRef for tag as third update"); + } + } } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..09d9415052 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -118,6 +118,8 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, + // Optional tag (name, retention) to create atomically with the snapshot. + tag_ref: Option<(String, Option)>, } impl<'a> SnapshotProducer<'a> { @@ -127,6 +129,7 @@ impl<'a> SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + tag_ref: Option<(String, Option)>, ) -> Self { Self { table, @@ -136,6 +139,7 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties, added_data_files, manifest_counter: (0..), + tag_ref, } } @@ -485,7 +489,7 @@ impl<'a> SnapshotProducer<'a> { new_snapshot.build() }; - let updates = vec![ + let mut updates = vec![ TableUpdate::AddSnapshot { snapshot: new_snapshot, }, @@ -498,6 +502,15 @@ impl<'a> SnapshotProducer<'a> { }, ]; + if let Some((tag_name, max_ref_age_ms)) = self.tag_ref { + updates.push(TableUpdate::SetSnapshotRef { + ref_name: tag_name, + reference: SnapshotReference::new(self.snapshot_id, SnapshotRetention::Tag { + max_ref_age_ms, + }), + }); + } + let requirements = vec![ TableRequirement::UuidMatch { uuid: self.table.metadata().uuid(),