From 52a9be16e40845fd614de24595cbb805b895a0d3 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 4 Jun 2026 00:09:27 +0530 Subject: [PATCH 1/4] feat(spec): assign per-DataFile.first_row_id in v3 data manifest writer --- crates/iceberg/src/spec/manifest/writer.rs | 256 ++++++++++++++++++++- 1 file changed, 251 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 1b3b605fd8..d70b20ef5f 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -40,9 +40,21 @@ use crate::{Error, ErrorKind}; /// with the actual snapshot ID before it is committed. const UNASSIGNED_SNAPSHOT_ID: i64 = -1; +fn u64_to_i64_row_id(value: u64) -> Result { + i64::try_from(value).map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID {value} exceeds i64::MAX and cannot be encoded as DataFile.first_row_id" + ), + ) + }) +} + /// The builder used to create a [`ManifestWriter`]. pub struct ManifestWriterBuilder { output: OutputFile, + first_row_id: Option, snapshot_id: Option, key_metadata: Option>, schema: SchemaRef, @@ -60,6 +72,7 @@ impl ManifestWriterBuilder { ) -> Self { Self { output, + first_row_id: None, snapshot_id, key_metadata, schema, @@ -67,6 +80,15 @@ impl ManifestWriterBuilder { } } + /// Seed the manifest-level `first_row_id` for a v3 data manifest. + /// + /// When set, ADDED entries whose `DataFile.first_row_id` is `None` are stamped with + /// `first_row_id + cumulative record_count of prior ADDED entries`. + pub fn with_first_row_id(mut self, first_row_id: u64) -> Self { + self.first_row_id = Some(first_row_id); + self + } + /// Build a [`ManifestWriter`] for format version 1. pub fn build_v1(self) -> ManifestWriter { let metadata = ManifestMetadata::builder() @@ -121,7 +143,12 @@ impl ManifestWriterBuilder { ) } - /// Build a [`ManifestWriter`] for format version 2, data content. + /// Build a [`ManifestWriter`] for format version 3, data content. + /// + /// If `with_first_row_id` was called on the builder, the writer assigns each ADDED + /// `DataFile.first_row_id` from a running cursor seeded with that value. Otherwise + /// the manifest-level `first_row_id` is assigned by the [`ManifestListWriter`] when + /// the manifest is added to the list and per-file ids are left as `None`. pub fn build_v3_data(self) -> ManifestWriter { let metadata = ManifestMetadata::builder() .schema_id(self.schema.schema_id()) @@ -135,9 +162,7 @@ impl ManifestWriterBuilder { self.snapshot_id, self.key_metadata, metadata, - // First row id is assigned by the [`ManifestListWriter`] when the manifest - // is added to the list. - None, + self.first_row_id, ) } @@ -173,6 +198,7 @@ pub struct ManifestWriter { deleted_files: u32, deleted_rows: u64, first_row_id: Option, + next_data_file_row_id: Option, min_seq_num: Option, @@ -202,6 +228,7 @@ impl ManifestWriter { deleted_files: 0, deleted_rows: 0, first_row_id, + next_data_file_row_id: first_row_id, min_seq_num: None, key_metadata, manifest_entries: Vec::new(), @@ -366,7 +393,7 @@ impl ManifestWriter { Ok(()) } - fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> { + fn add_entry_inner(&mut self, mut entry: ManifestEntry) -> Result<()> { // Check if the entry has sequence number if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing) && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none()) @@ -377,6 +404,8 @@ impl ManifestWriter { )); } + self.assign_data_file_first_row_id(&mut entry)?; + // Update the statistics match entry.status { ManifestStatus::Added => { @@ -401,6 +430,36 @@ impl ManifestWriter { Ok(()) } + fn assign_data_file_first_row_id(&mut self, entry: &mut ManifestEntry) -> Result<()> { + if self.metadata.format_version != FormatVersion::V3 + || self.metadata.content != ManifestContentType::Data + { + return Ok(()); + } + let Some(cursor) = self.next_data_file_row_id else { + return Ok(()); + }; + if entry.status != ManifestStatus::Added { + return Ok(()); + } + if entry.data_file.first_row_id.is_none() { + entry.data_file.first_row_id = Some(u64_to_i64_row_id(cursor)?); + } + self.next_data_file_row_id = cursor + .checked_add(entry.data_file.record_count) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow assigning DataFile.first_row_id (cursor={cursor}, record_count={})", + entry.data_file.record_count + ), + ) + }) + .map(Some)?; + Ok(()) + } + /// Write manifest file and return it. pub async fn write_manifest_file(mut self) -> Result { // Create the avro writer @@ -804,4 +863,191 @@ mod tests { ManifestContentType::Deletes, ); } + + fn simple_v3_schema() -> Arc { + Arc::new( + Schema::builder() + .with_fields(vec![Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + ))]) + .build() + .unwrap(), + ) + } + + fn added_data_entry(path: &str, record_count: u64) -> ManifestEntry { + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: path.to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count, + file_size_in_bytes: 100, + column_sizes: HashMap::new(), + value_counts: HashMap::new(), + null_value_counts: HashMap::new(), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: None, + split_offsets: None, + equality_ids: None, + sort_order_id: None, + partition_spec_id: 0, + first_row_id: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, + }, + } + } + + // B1: v3 data manifest, first_row_id=Some(100), three ADDED files (10,20,30) + // -> per-file first_row_id = 100, 110, 130; manifest added_rows_count = 60. + #[tokio::test] + async fn test_b1_writer_assigns_per_file_first_row_id() { + let schema = simple_v3_schema(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("b1.avro"); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + + let mut writer = + ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec) + .with_first_row_id(100) + .build_v3_data(); + + writer + .add_entry(added_data_entry("f1.parquet", 10)) + .unwrap(); + writer + .add_entry(added_data_entry("f2.parquet", 20)) + .unwrap(); + writer + .add_entry(added_data_entry("f3.parquet", 30)) + .unwrap(); + let manifest_file = writer.write_manifest_file().await.unwrap(); + + assert_eq!(manifest_file.added_rows_count, Some(60)); + assert_eq!(manifest_file.first_row_id, Some(100)); + + let manifest = Manifest::parse_avro(&fs::read(path).unwrap()).unwrap(); + let ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(ids, vec![Some(100), Some(110), Some(130)]); + } + + // B3: v3 data manifest, ADDED file already has first_row_id=Some(500) -> preserved. + #[tokio::test] + async fn test_b3_writer_preserves_preset_first_row_id() { + let schema = simple_v3_schema(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("b3.avro"); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + + let mut writer = + ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec) + .with_first_row_id(100) + .build_v3_data(); + + let mut preset = added_data_entry("f1.parquet", 10); + preset.data_file.first_row_id = Some(500); + writer.add_entry(preset).unwrap(); + writer + .add_entry(added_data_entry("f2.parquet", 20)) + .unwrap(); + writer.write_manifest_file().await.unwrap(); + + let manifest = Manifest::parse_avro(&fs::read(path).unwrap()).unwrap(); + let ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(ids, vec![Some(500), Some(110)]); + } + + // B4: v3 delete manifest never assigns per-file first_row_id. + #[tokio::test] + async fn test_b4_v3_delete_manifest_never_assigns_per_file_id() { + let schema = simple_v3_schema(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("b4.avro"); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + + let mut writer = + ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec) + .with_first_row_id(100) + .build_v3_deletes(); + + let mut entry = added_data_entry("d1.parquet", 10); + entry.data_file.content = DataContentType::PositionDeletes; + writer.add_entry(entry).unwrap(); + writer.write_manifest_file().await.unwrap(); + + let manifest = Manifest::parse_avro(&fs::read(path).unwrap()).unwrap(); + assert!( + manifest + .entries() + .iter() + .all(|e| e.data_file().first_row_id().is_none()) + ); + } + + // B6: cursor near i64::MAX cannot be encoded as DataFile.first_row_id -> error. + // (DataFile.first_row_id is i64 per spec; we use u64 internally but range-check on assign.) + #[tokio::test] + async fn test_b6_writer_first_row_id_overflow() { + let schema = simple_v3_schema(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("b6.avro"); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + + let mut writer = + ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec) + .with_first_row_id(u64::MAX - 5) + .build_v3_data(); + + let err = writer + .add_entry(added_data_entry("f1.parquet", 100)) + .expect_err("expected overflow / range error"); + let msg = format!("{err:?}"); + assert!( + msg.contains("overflow") || msg.contains("exceeds i64"), + "unexpected error: {msg}", + ); + } } From 48b77d5e485f860193ef41b93c8883326ce0b5e8 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 4 Jun 2026 00:09:31 +0530 Subject: [PATCH 2/4] feat(spec): inherit per-file first_row_id on read and skip carry-over manifests --- crates/iceberg/src/spec/manifest_list.rs | 234 ++++++++++++++++++++++- 1 file changed, 231 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 5f5fa3c148..3bc2a7d8a2 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -341,9 +341,41 @@ impl ManifestListWriter { match manifest.content { ManifestContentType::Data => { match (self.next_row_id, manifest.first_row_id) { - (Some(_), Some(_)) => { - // Case: Manifest with already assigned first row ID. - // No need to increase next_row_id, as this manifest is already assigned. + (Some(writer_next_row_id), Some(manifest_first_row_id)) => { + // Carry-over manifests have a first-row-id strictly less than the + // writer cursor (cursor seeded with TableMetadata.next_row_id which + // is the high-water mark of all prior assignments). Leave them alone. + // + // Newly created data manifests pre-assigned upstream by the snapshot + // producer (ManifestWriterBuilder::with_first_row_id) must match the + // current cursor; advance the cursor by their row counts so subsequent + // unassigned manifests pick up the right value. + if manifest_first_row_id < writer_next_row_id { + // carry-over, no-op + } else if manifest_first_row_id == writer_next_row_id { + let (existing_rows_count, added_rows_count) = + require_row_counts_in_manifest(manifest)?; + self.next_row_id = writer_next_row_id + .checked_add(existing_rows_count) + .and_then(|sum| sum.checked_add(added_rows_count)) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow when computing next row ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: {existing_rows_count}, Added Rows Count: {added_rows_count}", + manifest.manifest_path + ), + ) + }).map(Some)?; + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "first-row-id for Manifest {} is ahead of writer cursor (manifest={manifest_first_row_id}, writer next-row-id={writer_next_row_id}).", + manifest.manifest_path + ), + )); + } } (None, Some(manifest_first_row_id)) => { // Case: Assigned first row ID for data manifest, but the writer does not have a next-row-id assigned. @@ -890,6 +922,40 @@ impl ManifestFile { entry.inherit_data(self); } + // v3 row lineage: assign per-DataFile.first_row_id by inheritance for ADDED/EXISTING + // entries in a v3 data manifest whose manifest-level first_row_id is set. Carry-over + // values on the DataFile are preserved; cursor advances past them. + if matches!(metadata.format_version, FormatVersion::V3) + && metadata.content == ManifestContentType::Data + && let Some(mut cursor) = self.first_row_id + { + for entry in &mut entries { + if entry.status == crate::spec::ManifestStatus::Deleted { + continue; + } + let record_count = entry.data_file.record_count; + if entry.data_file.first_row_id.is_none() { + entry.data_file.first_row_id = + Some(i64::try_from(cursor).map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID {cursor} exceeds i64::MAX and cannot be encoded as DataFile.first_row_id" + ), + ) + })?); + } + cursor = cursor.checked_add(record_count).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow inheriting DataFile.first_row_id (cursor={cursor}, record_count={record_count})" + ), + ) + })?; + } + } + Ok(Manifest::new(metadata, entries)) } } @@ -2079,4 +2145,166 @@ mod test { assert_eq!(v2_manifest.partitions, None); assert_eq!(v2_manifest.key_metadata, None); } + + // Helpers for C-series reader-inheritance tests. + async fn write_v3_data_manifest_with_entries( + path: &std::path::Path, + entries: Vec, + ) -> ManifestFile { + use std::sync::Arc; + + use crate::spec::{ + ManifestWriterBuilder, NestedField, PartitionSpec, PrimitiveType, Schema, Type, + }; + + let schema = Arc::new( + Schema::builder() + .with_fields(vec![Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + ))]) + .build() + .unwrap(), + ); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + let mut writer = + ManifestWriterBuilder::new(output_file, Some(42), None, schema, partition_spec) + .build_v3_data(); + for entry in entries { + writer.add_entry(entry).unwrap(); + } + writer.write_manifest_file().await.unwrap() + } + + fn data_entry( + status: crate::spec::ManifestStatus, + path: &str, + record_count: u64, + sequence_number: Option, + preset_first_row_id: Option, + ) -> crate::spec::ManifestEntry { + use std::collections::HashMap; + + use crate::spec::{ + DataContentType, DataFile, DataFileFormat, ManifestEntry, ManifestStatus, Struct, + }; + let _ = status; + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: path.to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count, + file_size_in_bytes: 100, + column_sizes: HashMap::new(), + value_counts: HashMap::new(), + null_value_counts: HashMap::new(), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: None, + split_offsets: None, + equality_ids: None, + sort_order_id: None, + partition_spec_id: 0, + first_row_id: preset_first_row_id, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, + }, + } + } + + // C1: manifest-level first_row_id=Some(100), per-file ids None + // -> readers see 100, 110, 130. + #[tokio::test] + async fn test_c1_reader_inherits_per_file_first_row_id() { + use crate::spec::ManifestStatus; + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("c1.avro"); + let mut manifest_file = write_v3_data_manifest_with_entries(&path, vec![ + data_entry(ManifestStatus::Added, "f1.parquet", 10, None, None), + data_entry(ManifestStatus::Added, "f2.parquet", 20, None, None), + data_entry(ManifestStatus::Added, "f3.parquet", 30, None, None), + ]) + .await; + manifest_file.first_row_id = Some(100); + + let manifest = manifest_file + .load_manifest(&FileIO::new_with_fs()) + .await + .unwrap(); + let ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(ids, vec![Some(100), Some(110), Some(130)]); + } + + // C2: per-file first_row_id already set -> preserved on read; cursor still advances past it. + #[tokio::test] + async fn test_c2_reader_preserves_preset_per_file_id() { + use crate::spec::ManifestStatus; + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("c2.avro"); + let mut manifest_file = write_v3_data_manifest_with_entries(&path, vec![ + data_entry(ManifestStatus::Added, "f1.parquet", 10, None, Some(7)), + data_entry(ManifestStatus::Added, "f2.parquet", 20, None, None), + ]) + .await; + manifest_file.first_row_id = Some(100); + + let manifest = manifest_file + .load_manifest(&FileIO::new_with_fs()) + .await + .unwrap(); + let ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + // First file kept its preset 7; second file gets 100 + 10 = 110 (cursor advanced past first). + assert_eq!(ids, vec![Some(7), Some(110)]); + } + + // C4: manifest-level first_row_id=None -> no inheritance; per-file ids stay None. + #[tokio::test] + async fn test_c4_reader_no_inheritance_when_manifest_first_row_id_unset() { + use crate::spec::ManifestStatus; + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("c4.avro"); + let mut manifest_file = write_v3_data_manifest_with_entries(&path, vec![data_entry( + ManifestStatus::Added, + "f1.parquet", + 10, + None, + None, + )]) + .await; + manifest_file.first_row_id = None; + + let manifest = manifest_file + .load_manifest(&FileIO::new_with_fs()) + .await + .unwrap(); + assert!( + manifest + .entries() + .iter() + .all(|e| e.data_file().first_row_id().is_none()) + ); + } } From a0cab01f58abfd84f526687e2f0299fcb80c6f40 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 4 Jun 2026 00:09:37 +0530 Subject: [PATCH 3/4] feat(transaction): seed v3 manifest first_row_id from snapshot producer --- crates/iceberg/src/transaction/mod.rs | 21 +++++++++++++ crates/iceberg/src/transaction/snapshot.rs | 36 ++++++++++++++++++++-- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index b68a53e5e3..f376598995 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -632,5 +632,26 @@ mod test_row_lineage { assert_eq!(manifest_list.entries().len(), 2); let manifest_file = &manifest_list.entries()[1]; assert_eq!(manifest_file.first_row_id, Some(30)); + + // Per-DataFile.first_row_id must be stamped on the new manifest: 30 then 30+17=47. + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + let per_file_ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(per_file_ids, vec![Some(30), Some(47)]); + + // Per-DataFile.first_row_id on the first snapshot's manifest should be 0. + let first_manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + let first_per_file_ids: Vec> = first_manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(first_per_file_ids, vec![Some(0)]); } } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 752371804c..61eac406d5 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -120,6 +120,11 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, + // Running cursor for assigning manifest-level first_row_id to new v3 data manifests. + // Seeded from TableMetadata.next_row_id on construction; advances as each new data + // manifest is created so the snapshot producer and the ManifestListWriter agree on + // per-manifest first_row_id values. + next_data_manifest_first_row_id: Option, } impl<'a> SnapshotProducer<'a> { @@ -130,6 +135,9 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties: HashMap, added_data_files: Vec, ) -> Self { + let next_data_manifest_first_row_id = + matches!(table.metadata().format_version(), FormatVersion::V3) + .then(|| table.metadata().next_row_id()); Self { table, snapshot_id: Self::generate_unique_snapshot_id(table), @@ -138,6 +146,7 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties, added_data_files, manifest_counter: (0..), + next_data_manifest_first_row_id, } } @@ -242,7 +251,11 @@ impl<'a> SnapshotProducer<'a> { snapshot_id } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + fn new_manifest_writer( + &mut self, + content: ManifestContentType, + added_rows: u64, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -270,7 +283,23 @@ impl<'a> SnapshotProducer<'a> { ManifestContentType::Deletes => Ok(builder.build_v2_deletes()), }, FormatVersion::V3 => match content { - ManifestContentType::Data => Ok(builder.build_v3_data()), + ManifestContentType::Data => { + let builder = if let Some(cursor) = self.next_data_manifest_first_row_id { + let next_cursor = cursor.checked_add(added_rows).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow assigning manifest first_row_id (cursor={cursor}, added_rows={added_rows})" + ), + ) + })?; + self.next_data_manifest_first_row_id = Some(next_cursor); + builder.with_first_row_id(cursor) + } else { + builder + }; + Ok(builder.build_v3_data()) + } ManifestContentType::Deletes => Ok(builder.build_v3_deletes()), }, } @@ -319,6 +348,7 @@ impl<'a> SnapshotProducer<'a> { let snapshot_id = self.snapshot_id; let format_version = self.table.metadata().format_version(); + let added_rows: u64 = added_data_files.iter().map(|df| df.record_count).sum(); let manifest_entries = added_data_files.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) @@ -331,7 +361,7 @@ impl<'a> SnapshotProducer<'a> { builder.build() } }); - let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; + let mut writer = self.new_manifest_writer(ManifestContentType::Data, added_rows)?; for entry in manifest_entries { writer.add_entry(entry)?; } From 73869fbd38262f978f9c98727dd85ef4e5daef7c Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 4 Jun 2026 00:13:04 +0530 Subject: [PATCH 4/4] chore: strip verbose comments per repo guidelines --- crates/iceberg/src/spec/manifest/writer.rs | 14 ----- crates/iceberg/src/spec/manifest_list.rs | 73 +++++++++------------- crates/iceberg/src/transaction/mod.rs | 2 - crates/iceberg/src/transaction/snapshot.rs | 5 +- 4 files changed, 30 insertions(+), 64 deletions(-) diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index d70b20ef5f..30920bfe6b 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -81,9 +81,6 @@ impl ManifestWriterBuilder { } /// Seed the manifest-level `first_row_id` for a v3 data manifest. - /// - /// When set, ADDED entries whose `DataFile.first_row_id` is `None` are stamped with - /// `first_row_id + cumulative record_count of prior ADDED entries`. pub fn with_first_row_id(mut self, first_row_id: u64) -> Self { self.first_row_id = Some(first_row_id); self @@ -144,11 +141,6 @@ impl ManifestWriterBuilder { } /// Build a [`ManifestWriter`] for format version 3, data content. - /// - /// If `with_first_row_id` was called on the builder, the writer assigns each ADDED - /// `DataFile.first_row_id` from a running cursor seeded with that value. Otherwise - /// the manifest-level `first_row_id` is assigned by the [`ManifestListWriter`] when - /// the manifest is added to the list and per-file ids are left as `None`. pub fn build_v3_data(self) -> ManifestWriter { let metadata = ManifestMetadata::builder() .schema_id(self.schema.schema_id()) @@ -909,8 +901,6 @@ mod tests { } } - // B1: v3 data manifest, first_row_id=Some(100), three ADDED files (10,20,30) - // -> per-file first_row_id = 100, 110, 130; manifest added_rows_count = 60. #[tokio::test] async fn test_b1_writer_assigns_per_file_first_row_id() { let schema = simple_v3_schema(); @@ -952,7 +942,6 @@ mod tests { assert_eq!(ids, vec![Some(100), Some(110), Some(130)]); } - // B3: v3 data manifest, ADDED file already has first_row_id=Some(500) -> preserved. #[tokio::test] async fn test_b3_writer_preserves_preset_first_row_id() { let schema = simple_v3_schema(); @@ -988,7 +977,6 @@ mod tests { assert_eq!(ids, vec![Some(500), Some(110)]); } - // B4: v3 delete manifest never assigns per-file first_row_id. #[tokio::test] async fn test_b4_v3_delete_manifest_never_assigns_per_file_id() { let schema = simple_v3_schema(); @@ -1021,8 +1009,6 @@ mod tests { ); } - // B6: cursor near i64::MAX cannot be encoded as DataFile.first_row_id -> error. - // (DataFile.first_row_id is i64 per spec; we use u64 internally but range-check on assign.) #[tokio::test] async fn test_b6_writer_first_row_id_overflow() { let schema = simple_v3_schema(); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 3bc2a7d8a2..d48ae569ec 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -342,39 +342,33 @@ impl ManifestListWriter { ManifestContentType::Data => { match (self.next_row_id, manifest.first_row_id) { (Some(writer_next_row_id), Some(manifest_first_row_id)) => { - // Carry-over manifests have a first-row-id strictly less than the - // writer cursor (cursor seeded with TableMetadata.next_row_id which - // is the high-water mark of all prior assignments). Leave them alone. - // - // Newly created data manifests pre-assigned upstream by the snapshot - // producer (ManifestWriterBuilder::with_first_row_id) must match the - // current cursor; advance the cursor by their row counts so subsequent - // unassigned manifests pick up the right value. - if manifest_first_row_id < writer_next_row_id { - // carry-over, no-op - } else if manifest_first_row_id == writer_next_row_id { - let (existing_rows_count, added_rows_count) = - require_row_counts_in_manifest(manifest)?; - self.next_row_id = writer_next_row_id - .checked_add(existing_rows_count) - .and_then(|sum| sum.checked_add(added_rows_count)) - .ok_or_else(|| { - Error::new( - ErrorKind::DataInvalid, - format!( - "Row ID overflow when computing next row ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: {existing_rows_count}, Added Rows Count: {added_rows_count}", - manifest.manifest_path - ), - ) - }).map(Some)?; - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "first-row-id for Manifest {} is ahead of writer cursor (manifest={manifest_first_row_id}, writer next-row-id={writer_next_row_id}).", - manifest.manifest_path - ), - )); + match manifest_first_row_id.cmp(&writer_next_row_id) { + std::cmp::Ordering::Less => {} + std::cmp::Ordering::Equal => { + let (existing_rows_count, added_rows_count) = + require_row_counts_in_manifest(manifest)?; + self.next_row_id = writer_next_row_id + .checked_add(existing_rows_count) + .and_then(|sum| sum.checked_add(added_rows_count)) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow when computing next row ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: {existing_rows_count}, Added Rows Count: {added_rows_count}", + manifest.manifest_path + ), + ) + }).map(Some)?; + } + std::cmp::Ordering::Greater => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "first-row-id for Manifest {} is ahead of writer cursor (manifest={manifest_first_row_id}, writer next-row-id={writer_next_row_id}).", + manifest.manifest_path + ), + )); + } } } (None, Some(manifest_first_row_id)) => { @@ -922,9 +916,7 @@ impl ManifestFile { entry.inherit_data(self); } - // v3 row lineage: assign per-DataFile.first_row_id by inheritance for ADDED/EXISTING - // entries in a v3 data manifest whose manifest-level first_row_id is set. Carry-over - // values on the DataFile are preserved; cursor advances past them. + // v3 row-lineage: stamp per-DataFile.first_row_id from manifest cursor for non-DELETED entries. if matches!(metadata.format_version, FormatVersion::V3) && metadata.content == ManifestContentType::Data && let Some(mut cursor) = self.first_row_id @@ -2146,7 +2138,6 @@ mod test { assert_eq!(v2_manifest.key_metadata, None); } - // Helpers for C-series reader-inheritance tests. async fn write_v3_data_manifest_with_entries( path: &std::path::Path, entries: Vec, @@ -2184,7 +2175,7 @@ mod test { } fn data_entry( - status: crate::spec::ManifestStatus, + _status: crate::spec::ManifestStatus, path: &str, record_count: u64, sequence_number: Option, @@ -2195,7 +2186,6 @@ mod test { use crate::spec::{ DataContentType, DataFile, DataFileFormat, ManifestEntry, ManifestStatus, Struct, }; - let _ = status; ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, @@ -2227,8 +2217,6 @@ mod test { } } - // C1: manifest-level first_row_id=Some(100), per-file ids None - // -> readers see 100, 110, 130. #[tokio::test] async fn test_c1_reader_inherits_per_file_first_row_id() { use crate::spec::ManifestStatus; @@ -2254,7 +2242,6 @@ mod test { assert_eq!(ids, vec![Some(100), Some(110), Some(130)]); } - // C2: per-file first_row_id already set -> preserved on read; cursor still advances past it. #[tokio::test] async fn test_c2_reader_preserves_preset_per_file_id() { use crate::spec::ManifestStatus; @@ -2276,11 +2263,9 @@ mod test { .iter() .map(|e| e.data_file().first_row_id()) .collect(); - // First file kept its preset 7; second file gets 100 + 10 = 110 (cursor advanced past first). assert_eq!(ids, vec![Some(7), Some(110)]); } - // C4: manifest-level first_row_id=None -> no inheritance; per-file ids stay None. #[tokio::test] async fn test_c4_reader_no_inheritance_when_manifest_first_row_id_unset() { use crate::spec::ManifestStatus; diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index f376598995..8418e2eaa9 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -633,7 +633,6 @@ mod test_row_lineage { let manifest_file = &manifest_list.entries()[1]; assert_eq!(manifest_file.first_row_id, Some(30)); - // Per-DataFile.first_row_id must be stamped on the new manifest: 30 then 30+17=47. let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); let per_file_ids: Vec> = manifest .entries() @@ -642,7 +641,6 @@ mod test_row_lineage { .collect(); assert_eq!(per_file_ids, vec![Some(30), Some(47)]); - // Per-DataFile.first_row_id on the first snapshot's manifest should be 0. let first_manifest = manifest_list.entries()[0] .load_manifest(table.file_io()) .await diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 61eac406d5..fb1db9d03c 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -120,10 +120,7 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, - // Running cursor for assigning manifest-level first_row_id to new v3 data manifests. - // Seeded from TableMetadata.next_row_id on construction; advances as each new data - // manifest is created so the snapshot producer and the ManifestListWriter agree on - // per-manifest first_row_id values. + // v3 row-lineage cursor seeded from TableMetadata.next_row_id. next_data_manifest_first_row_id: Option, }