Skip to content

Commit 6a5d056

Browse files
committed
feat(transaction): Add retention control for snapshot tags
Add `with_tag_retention()` method to FastAppendAction to allow controlling tag expiration independently from table defaults. Tags created with `with_tag()` default to using the table's `history.expire.max-ref-age-ms` property. Users can override this with `with_tag_retention(i64::MAX)` to make tags never expire, or specify a custom retention period in milliseconds. Example usage: ```rust // Uses table defaults .with_tag("v1.0.0") // Never expires .with_tag("v1.0.0") .with_tag_retention(i64::MAX) // Custom 30-day retention .with_tag("v1.0.0") .with_tag_retention(30 * 24 * 60 * 60 * 1000) ```
1 parent f1ffb2e commit 6a5d056

File tree

2 files changed

+82
-10
lines changed

2 files changed

+82
-10
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 77 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ pub struct FastAppendAction {
3838
snapshot_properties: HashMap<String, String>,
3939
added_data_files: Vec<DataFile>,
4040
// Optional tag name to create atomically with the snapshot.
41-
tag_ref: Option<String>,
41+
tag_name: Option<String>,
42+
// Optional retention for the tag in milliseconds.
43+
tag_retention: Option<i64>,
4244
}
4345

4446
impl FastAppendAction {
@@ -49,7 +51,8 @@ impl FastAppendAction {
4951
key_metadata: None,
5052
snapshot_properties: HashMap::default(),
5153
added_data_files: vec![],
52-
tag_ref: None,
54+
tag_name: None,
55+
tag_retention: None,
5356
}
5457
}
5558

@@ -84,22 +87,37 @@ impl FastAppendAction {
8487
}
8588

8689
/// Set a tag name to be created atomically with the snapshot.
90+
/// The tag will use the table's default retention policy (history.expire.max-ref-age-ms).
91+
///
92+
/// Use `with_tag_retention()` to override the retention policy.
8793
pub fn with_tag(mut self, tag_name: impl Into<String>) -> Self {
88-
self.tag_ref = Some(tag_name.into());
94+
self.tag_name = Some(tag_name.into());
95+
self
96+
}
97+
98+
/// Set the retention period for the tag in milliseconds.
99+
/// This overrides the table's default retention policy.
100+
/// Use i64::MAX to make the tag never expire.
101+
///
102+
/// Can be called before or after `with_tag()`. Only takes effect if a tag name is set.
103+
pub fn with_tag_retention(mut self, max_ref_age_ms: i64) -> Self {
104+
self.tag_retention = Some(max_ref_age_ms);
89105
self
90106
}
91107
}
92108

93109
#[async_trait]
94110
impl TransactionAction for FastAppendAction {
95111
async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> {
112+
let tag_ref = self.tag_name.as_ref().map(|name| (name.clone(), self.tag_retention));
113+
96114
let snapshot_producer = SnapshotProducer::new(
97115
table,
98116
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
99117
self.key_metadata.clone(),
100118
self.snapshot_properties.clone(),
101119
self.added_data_files.clone(),
102-
self.tag_ref.clone(),
120+
tag_ref,
103121
);
104122

105123
// validate added files
@@ -160,7 +178,8 @@ mod tests {
160178
use std::sync::Arc;
161179

162180
use crate::spec::{
163-
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct,
181+
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, SnapshotRetention,
182+
Struct,
164183
};
165184
use crate::transaction::tests::make_v2_minimal_table;
166185
use crate::transaction::{Transaction, TransactionAction};
@@ -400,6 +419,59 @@ mod tests {
400419
assert_eq!(ref_name, "v1.0.0");
401420
assert_eq!(reference.snapshot_id, snapshot_id);
402421
assert!(!reference.is_branch()); // Should be a tag, not a branch
422+
// Should use table defaults (None)
423+
if let SnapshotRetention::Tag { max_ref_age_ms } = &reference.retention {
424+
assert_eq!(max_ref_age_ms, &None);
425+
} else {
426+
panic!("Expected Tag retention");
427+
}
428+
} else {
429+
panic!("Expected SetSnapshotRef for tag as third update");
430+
}
431+
}
432+
433+
#[tokio::test]
434+
async fn test_fast_append_with_tag_retention() {
435+
let table = make_v2_minimal_table();
436+
let tx = Transaction::new(&table);
437+
438+
let data_file = DataFileBuilder::default()
439+
.content(DataContentType::Data)
440+
.file_path("test/tagged_never_expire.parquet".to_string())
441+
.file_format(DataFileFormat::Parquet)
442+
.file_size_in_bytes(100)
443+
.record_count(1)
444+
.partition_spec_id(table.metadata().default_partition_spec_id())
445+
.partition(Struct::from_iter([Some(Literal::long(300))]))
446+
.build()
447+
.unwrap();
448+
449+
let action = tx
450+
.fast_append()
451+
.add_data_files(vec![data_file])
452+
.with_tag("v2.0.0")
453+
.with_tag_retention(i64::MAX); // Never expire
454+
455+
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
456+
let updates = action_commit.take_updates();
457+
458+
// Should have 3 updates: AddSnapshot, SetSnapshotRef (main), SetSnapshotRef (tag)
459+
assert_eq!(updates.len(), 3);
460+
461+
// Third update: SetSnapshotRef for tag with custom retention
462+
if let TableUpdate::SetSnapshotRef {
463+
ref_name,
464+
reference,
465+
} = &updates[2]
466+
{
467+
assert_eq!(ref_name, "v2.0.0");
468+
assert!(!reference.is_branch());
469+
// Should have custom retention set to i64::MAX
470+
if let SnapshotRetention::Tag { max_ref_age_ms } = &reference.retention {
471+
assert_eq!(max_ref_age_ms, &Some(i64::MAX));
472+
} else {
473+
panic!("Expected Tag retention");
474+
}
403475
} else {
404476
panic!("Expected SetSnapshotRef for tag as third update");
405477
}

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,8 @@ pub(crate) struct SnapshotProducer<'a> {
118118
// It starts from 0 and increments for each new manifest file.
119119
// Note: This counter is limited to the range of (0..u64::MAX).
120120
manifest_counter: RangeFrom<u64>,
121-
// Optional tag name to create atomically with the snapshot.
122-
tag_ref: Option<String>,
121+
// Optional tag (name, retention) to create atomically with the snapshot.
122+
tag_ref: Option<(String, Option<i64>)>,
123123
}
124124

125125
impl<'a> SnapshotProducer<'a> {
@@ -129,7 +129,7 @@ impl<'a> SnapshotProducer<'a> {
129129
key_metadata: Option<Vec<u8>>,
130130
snapshot_properties: HashMap<String, String>,
131131
added_data_files: Vec<DataFile>,
132-
tag_ref: Option<String>,
132+
tag_ref: Option<(String, Option<i64>)>,
133133
) -> Self {
134134
Self {
135135
table,
@@ -502,11 +502,11 @@ impl<'a> SnapshotProducer<'a> {
502502
},
503503
];
504504

505-
if let Some(tag_name) = self.tag_ref {
505+
if let Some((tag_name, max_ref_age_ms)) = self.tag_ref {
506506
updates.push(TableUpdate::SetSnapshotRef {
507507
ref_name: tag_name,
508508
reference: SnapshotReference::new(self.snapshot_id, SnapshotRetention::Tag {
509-
max_ref_age_ms: None,
509+
max_ref_age_ms,
510510
}),
511511
});
512512
}

0 commit comments

Comments
 (0)