From 0544243d307a167395f8b034594502e832f6d626 Mon Sep 17 00:00:00 2001 From: alexjbuck Date: Tue, 9 Dec 2025 00:20:06 -0500 Subject: [PATCH 1/3] feat(transaction): Add atomic snapshot tagging to FastAppendAction Add the ability to create a tag reference atomically in the same transaction that creates a new snapshot via fast_append. This adds a `with_tag()` method to `FastAppendAction` that allows specifying a tag name. When set, the tag will be created pointing to the newly created snapshot, all within a single atomic catalog update. Example usage: ```rust let tx = Transaction::new(&table); let action = tx .fast_append() .add_data_files(data_files) .with_tag("v1.0.0"); let tx = action.apply(tx)?; let table = tx.commit(&catalog).await?; ``` --- crates/iceberg/src/transaction/append.rs | 71 ++++++++++++++++++++++ crates/iceberg/src/transaction/snapshot.rs | 15 ++++- 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..01db2bab40 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -37,6 +37,8 @@ pub struct FastAppendAction { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + // Optional tag name to create atomically with the snapshot. + tag_ref: Option, } impl FastAppendAction { @@ -47,6 +49,7 @@ impl FastAppendAction { key_metadata: None, snapshot_properties: HashMap::default(), added_data_files: vec![], + tag_ref: None, } } @@ -79,6 +82,12 @@ impl FastAppendAction { self.snapshot_properties = snapshot_properties; self } + + /// Set a tag name to be created atomically with the snapshot. + pub fn with_tag(mut self, tag_name: impl Into) -> Self { + self.tag_ref = Some(tag_name.into()); + self + } } #[async_trait] @@ -90,6 +99,7 @@ impl TransactionAction for FastAppendAction { self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), + self.tag_ref.clone(), ); // validate added files @@ -333,4 +343,65 @@ mod tests { ); assert_eq!(data_file, *manifest.entries()[0].data_file()); } + + #[tokio::test] + async fn test_fast_append_with_tag() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/tagged.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + let action = tx + .fast_append() + .add_data_files(vec![data_file]) + .with_tag("v1.0.0"); + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + // Should have 3 updates: AddSnapshot, SetSnapshotRef (main), SetSnapshotRef (tag) + assert_eq!(updates.len(), 3); + + // First update: AddSnapshot + let snapshot_id = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot.snapshot_id() + } else { + panic!("Expected AddSnapshot as first update"); + }; + + // Second update: SetSnapshotRef for main branch + if let TableUpdate::SetSnapshotRef { + ref_name, + reference, + } = &updates[1] + { + assert_eq!(ref_name, MAIN_BRANCH); + assert_eq!(reference.snapshot_id, snapshot_id); + assert!(reference.is_branch()); + } else { + panic!("Expected SetSnapshotRef for main branch as second update"); + } + + // Third update: SetSnapshotRef for tag + if let TableUpdate::SetSnapshotRef { + ref_name, + reference, + } = &updates[2] + { + assert_eq!(ref_name, "v1.0.0"); + assert_eq!(reference.snapshot_id, snapshot_id); + assert!(!reference.is_branch()); // Should be a tag, not a branch + } else { + panic!("Expected SetSnapshotRef for tag as third update"); + } + } } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..a9cd39648a 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -118,6 +118,8 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, + // Optional tag name to create atomically with the snapshot. + tag_ref: Option, } impl<'a> SnapshotProducer<'a> { @@ -127,6 +129,7 @@ impl<'a> SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + tag_ref: Option, ) -> Self { Self { table, @@ -136,6 +139,7 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties, added_data_files, manifest_counter: (0..), + tag_ref, } } @@ -485,7 +489,7 @@ impl<'a> SnapshotProducer<'a> { new_snapshot.build() }; - let updates = vec![ + let mut updates = vec![ TableUpdate::AddSnapshot { snapshot: new_snapshot, }, @@ -498,6 +502,15 @@ impl<'a> SnapshotProducer<'a> { }, ]; + if let Some(tag_name) = self.tag_ref { + updates.push(TableUpdate::SetSnapshotRef { + ref_name: tag_name, + reference: SnapshotReference::new(self.snapshot_id, SnapshotRetention::Tag { + max_ref_age_ms: None, + }), + }); + } + let requirements = vec![ TableRequirement::UuidMatch { uuid: self.table.metadata().uuid(), From c44dd9d9d3a7ee631dcfdd2420a7add4b28a3f2f Mon Sep 17 00:00:00 2001 From: alexjbuck Date: Wed, 17 Dec 2025 11:42:30 -0500 Subject: [PATCH 2/3] feat(transaction): Add retention control for snapshot tags Add `with_tag_retention()` method to FastAppendAction to allow controlling tag expiration independently from table defaults. Tags created with `with_tag()` default to using the table's `history.expire.max-ref-age-ms` property. Users can override this with `with_tag_retention(i64::MAX)` to make tags never expire, or specify a custom retention period in milliseconds. Example usage: ```rust // Uses table defaults .with_tag("v1.0.0") // Never expires .with_tag("v1.0.0") .with_tag_retention(i64::MAX) // Custom 30-day retention .with_tag("v1.0.0") .with_tag_retention(30 * 24 * 60 * 60 * 1000) ``` --- crates/iceberg/src/transaction/append.rs | 82 ++++++++++++++++++++-- crates/iceberg/src/transaction/snapshot.rs | 10 +-- 2 files changed, 82 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 01db2bab40..fd33ab0d30 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -38,7 +38,9 @@ pub struct FastAppendAction { snapshot_properties: HashMap, added_data_files: Vec, // Optional tag name to create atomically with the snapshot. - tag_ref: Option, + tag_name: Option, + // Optional retention for the tag in milliseconds. + tag_retention: Option, } impl FastAppendAction { @@ -49,7 +51,8 @@ impl FastAppendAction { key_metadata: None, snapshot_properties: HashMap::default(), added_data_files: vec![], - tag_ref: None, + tag_name: None, + tag_retention: None, } } @@ -84,8 +87,21 @@ impl FastAppendAction { } /// Set a tag name to be created atomically with the snapshot. + /// The tag will use the table's default retention policy (history.expire.max-ref-age-ms). + /// + /// Use `with_tag_retention()` to override the retention policy. pub fn with_tag(mut self, tag_name: impl Into) -> Self { - self.tag_ref = Some(tag_name.into()); + self.tag_name = Some(tag_name.into()); + self + } + + /// Set the retention period for the tag in milliseconds. + /// This overrides the table's default retention policy. + /// Use i64::MAX to make the tag never expire. + /// + /// Can be called before or after `with_tag()`. Only takes effect if a tag name is set. + pub fn with_tag_retention(mut self, max_ref_age_ms: i64) -> Self { + self.tag_retention = Some(max_ref_age_ms); self } } @@ -93,13 +109,15 @@ impl FastAppendAction { #[async_trait] impl TransactionAction for FastAppendAction { async fn commit(self: Arc, table: &Table) -> Result { + let tag_ref = self.tag_name.as_ref().map(|name| (name.clone(), self.tag_retention)); + let snapshot_producer = SnapshotProducer::new( table, self.commit_uuid.unwrap_or_else(Uuid::now_v7), self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), - self.tag_ref.clone(), + tag_ref, ); // validate added files @@ -160,7 +178,8 @@ mod tests { use std::sync::Arc; use crate::spec::{ - DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct, + DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, SnapshotRetention, + Struct, }; use crate::transaction::tests::make_v2_minimal_table; use crate::transaction::{Transaction, TransactionAction}; @@ -400,6 +419,59 @@ mod tests { assert_eq!(ref_name, "v1.0.0"); assert_eq!(reference.snapshot_id, snapshot_id); assert!(!reference.is_branch()); // Should be a tag, not a branch + // Should use table defaults (None) + if let SnapshotRetention::Tag { max_ref_age_ms } = &reference.retention { + assert_eq!(max_ref_age_ms, &None); + } else { + panic!("Expected Tag retention"); + } + } else { + panic!("Expected SetSnapshotRef for tag as third update"); + } + } + + #[tokio::test] + async fn test_fast_append_with_tag_retention() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/tagged_never_expire.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap(); + + let action = tx + .fast_append() + .add_data_files(vec![data_file]) + .with_tag("v2.0.0") + .with_tag_retention(i64::MAX); // Never expire + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + // Should have 3 updates: AddSnapshot, SetSnapshotRef (main), SetSnapshotRef (tag) + assert_eq!(updates.len(), 3); + + // Third update: SetSnapshotRef for tag with custom retention + if let TableUpdate::SetSnapshotRef { + ref_name, + reference, + } = &updates[2] + { + assert_eq!(ref_name, "v2.0.0"); + assert!(!reference.is_branch()); + // Should have custom retention set to i64::MAX + if let SnapshotRetention::Tag { max_ref_age_ms } = &reference.retention { + assert_eq!(max_ref_age_ms, &Some(i64::MAX)); + } else { + panic!("Expected Tag retention"); + } } else { panic!("Expected SetSnapshotRef for tag as third update"); } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index a9cd39648a..09d9415052 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -118,8 +118,8 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, - // Optional tag name to create atomically with the snapshot. - tag_ref: Option, + // Optional tag (name, retention) to create atomically with the snapshot. + tag_ref: Option<(String, Option)>, } impl<'a> SnapshotProducer<'a> { @@ -129,7 +129,7 @@ impl<'a> SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, - tag_ref: Option, + tag_ref: Option<(String, Option)>, ) -> Self { Self { table, @@ -502,11 +502,11 @@ impl<'a> SnapshotProducer<'a> { }, ]; - if let Some(tag_name) = self.tag_ref { + if let Some((tag_name, max_ref_age_ms)) = self.tag_ref { updates.push(TableUpdate::SetSnapshotRef { ref_name: tag_name, reference: SnapshotReference::new(self.snapshot_id, SnapshotRetention::Tag { - max_ref_age_ms: None, + max_ref_age_ms, }), }); } From 99a0143186f7cb9a45ff4f6c3118044be64d3a19 Mon Sep 17 00:00:00 2001 From: alexjbuck Date: Wed, 17 Dec 2025 11:48:25 -0500 Subject: [PATCH 3/3] Run cargo fmt --- crates/iceberg/src/transaction/append.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index fd33ab0d30..5c7b20ea5c 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -109,7 +109,10 @@ impl FastAppendAction { #[async_trait] impl TransactionAction for FastAppendAction { async fn commit(self: Arc, table: &Table) -> Result { - let tag_ref = self.tag_name.as_ref().map(|name| (name.clone(), self.tag_retention)); + let tag_ref = self + .tag_name + .as_ref() + .map(|name| (name.clone(), self.tag_retention)); let snapshot_producer = SnapshotProducer::new( table,