diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 1b3b605fd8..30920bfe6b 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -40,9 +40,21 @@ use crate::{Error, ErrorKind}; /// with the actual snapshot ID before it is committed. const UNASSIGNED_SNAPSHOT_ID: i64 = -1; +fn u64_to_i64_row_id(value: u64) -> Result { + i64::try_from(value).map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID {value} exceeds i64::MAX and cannot be encoded as DataFile.first_row_id" + ), + ) + }) +} + /// The builder used to create a [`ManifestWriter`]. pub struct ManifestWriterBuilder { output: OutputFile, + first_row_id: Option, snapshot_id: Option, key_metadata: Option>, schema: SchemaRef, @@ -60,6 +72,7 @@ impl ManifestWriterBuilder { ) -> Self { Self { output, + first_row_id: None, snapshot_id, key_metadata, schema, @@ -67,6 +80,12 @@ impl ManifestWriterBuilder { } } + /// Seed the manifest-level `first_row_id` for a v3 data manifest. + pub fn with_first_row_id(mut self, first_row_id: u64) -> Self { + self.first_row_id = Some(first_row_id); + self + } + /// Build a [`ManifestWriter`] for format version 1. pub fn build_v1(self) -> ManifestWriter { let metadata = ManifestMetadata::builder() @@ -121,7 +140,7 @@ impl ManifestWriterBuilder { ) } - /// Build a [`ManifestWriter`] for format version 2, data content. + /// Build a [`ManifestWriter`] for format version 3, data content. pub fn build_v3_data(self) -> ManifestWriter { let metadata = ManifestMetadata::builder() .schema_id(self.schema.schema_id()) @@ -135,9 +154,7 @@ impl ManifestWriterBuilder { self.snapshot_id, self.key_metadata, metadata, - // First row id is assigned by the [`ManifestListWriter`] when the manifest - // is added to the list. - None, + self.first_row_id, ) } @@ -173,6 +190,7 @@ pub struct ManifestWriter { deleted_files: u32, deleted_rows: u64, first_row_id: Option, + next_data_file_row_id: Option, min_seq_num: Option, @@ -202,6 +220,7 @@ impl ManifestWriter { deleted_files: 0, deleted_rows: 0, first_row_id, + next_data_file_row_id: first_row_id, min_seq_num: None, key_metadata, manifest_entries: Vec::new(), @@ -366,7 +385,7 @@ impl ManifestWriter { Ok(()) } - fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> { + fn add_entry_inner(&mut self, mut entry: ManifestEntry) -> Result<()> { // Check if the entry has sequence number if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing) && (entry.sequence_number.is_none() || entry.file_sequence_number.is_none()) @@ -377,6 +396,8 @@ impl ManifestWriter { )); } + self.assign_data_file_first_row_id(&mut entry)?; + // Update the statistics match entry.status { ManifestStatus::Added => { @@ -401,6 +422,36 @@ impl ManifestWriter { Ok(()) } + fn assign_data_file_first_row_id(&mut self, entry: &mut ManifestEntry) -> Result<()> { + if self.metadata.format_version != FormatVersion::V3 + || self.metadata.content != ManifestContentType::Data + { + return Ok(()); + } + let Some(cursor) = self.next_data_file_row_id else { + return Ok(()); + }; + if entry.status != ManifestStatus::Added { + return Ok(()); + } + if entry.data_file.first_row_id.is_none() { + entry.data_file.first_row_id = Some(u64_to_i64_row_id(cursor)?); + } + self.next_data_file_row_id = cursor + .checked_add(entry.data_file.record_count) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow assigning DataFile.first_row_id (cursor={cursor}, record_count={})", + entry.data_file.record_count + ), + ) + }) + .map(Some)?; + Ok(()) + } + /// Write manifest file and return it. pub async fn write_manifest_file(mut self) -> Result { // Create the avro writer @@ -804,4 +855,185 @@ mod tests { ManifestContentType::Deletes, ); } + + fn simple_v3_schema() -> Arc { + Arc::new( + Schema::builder() + .with_fields(vec![Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + ))]) + .build() + .unwrap(), + ) + } + + fn added_data_entry(path: &str, record_count: u64) -> ManifestEntry { + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number: None, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: path.to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count, + file_size_in_bytes: 100, + column_sizes: HashMap::new(), + value_counts: HashMap::new(), + null_value_counts: HashMap::new(), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: None, + split_offsets: None, + equality_ids: None, + sort_order_id: None, + partition_spec_id: 0, + first_row_id: None, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, + }, + } + } + + #[tokio::test] + async fn test_b1_writer_assigns_per_file_first_row_id() { + let schema = simple_v3_schema(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("b1.avro"); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + + let mut writer = + ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec) + .with_first_row_id(100) + .build_v3_data(); + + writer + .add_entry(added_data_entry("f1.parquet", 10)) + .unwrap(); + writer + .add_entry(added_data_entry("f2.parquet", 20)) + .unwrap(); + writer + .add_entry(added_data_entry("f3.parquet", 30)) + .unwrap(); + let manifest_file = writer.write_manifest_file().await.unwrap(); + + assert_eq!(manifest_file.added_rows_count, Some(60)); + assert_eq!(manifest_file.first_row_id, Some(100)); + + let manifest = Manifest::parse_avro(&fs::read(path).unwrap()).unwrap(); + let ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(ids, vec![Some(100), Some(110), Some(130)]); + } + + #[tokio::test] + async fn test_b3_writer_preserves_preset_first_row_id() { + let schema = simple_v3_schema(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("b3.avro"); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + + let mut writer = + ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec) + .with_first_row_id(100) + .build_v3_data(); + + let mut preset = added_data_entry("f1.parquet", 10); + preset.data_file.first_row_id = Some(500); + writer.add_entry(preset).unwrap(); + writer + .add_entry(added_data_entry("f2.parquet", 20)) + .unwrap(); + writer.write_manifest_file().await.unwrap(); + + let manifest = Manifest::parse_avro(&fs::read(path).unwrap()).unwrap(); + let ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(ids, vec![Some(500), Some(110)]); + } + + #[tokio::test] + async fn test_b4_v3_delete_manifest_never_assigns_per_file_id() { + let schema = simple_v3_schema(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("b4.avro"); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + + let mut writer = + ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec) + .with_first_row_id(100) + .build_v3_deletes(); + + let mut entry = added_data_entry("d1.parquet", 10); + entry.data_file.content = DataContentType::PositionDeletes; + writer.add_entry(entry).unwrap(); + writer.write_manifest_file().await.unwrap(); + + let manifest = Manifest::parse_avro(&fs::read(path).unwrap()).unwrap(); + assert!( + manifest + .entries() + .iter() + .all(|e| e.data_file().first_row_id().is_none()) + ); + } + + #[tokio::test] + async fn test_b6_writer_first_row_id_overflow() { + let schema = simple_v3_schema(); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("b6.avro"); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + + let mut writer = + ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec) + .with_first_row_id(u64::MAX - 5) + .build_v3_data(); + + let err = writer + .add_entry(added_data_entry("f1.parquet", 100)) + .expect_err("expected overflow / range error"); + let msg = format!("{err:?}"); + assert!( + msg.contains("overflow") || msg.contains("exceeds i64"), + "unexpected error: {msg}", + ); + } } diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 5f5fa3c148..d48ae569ec 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -341,9 +341,35 @@ impl ManifestListWriter { match manifest.content { ManifestContentType::Data => { match (self.next_row_id, manifest.first_row_id) { - (Some(_), Some(_)) => { - // Case: Manifest with already assigned first row ID. - // No need to increase next_row_id, as this manifest is already assigned. + (Some(writer_next_row_id), Some(manifest_first_row_id)) => { + match manifest_first_row_id.cmp(&writer_next_row_id) { + std::cmp::Ordering::Less => {} + std::cmp::Ordering::Equal => { + let (existing_rows_count, added_rows_count) = + require_row_counts_in_manifest(manifest)?; + self.next_row_id = writer_next_row_id + .checked_add(existing_rows_count) + .and_then(|sum| sum.checked_add(added_rows_count)) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow when computing next row ID for Manifest {}. Next Row ID: {writer_next_row_id}, Existing Rows Count: {existing_rows_count}, Added Rows Count: {added_rows_count}", + manifest.manifest_path + ), + ) + }).map(Some)?; + } + std::cmp::Ordering::Greater => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "first-row-id for Manifest {} is ahead of writer cursor (manifest={manifest_first_row_id}, writer next-row-id={writer_next_row_id}).", + manifest.manifest_path + ), + )); + } + } } (None, Some(manifest_first_row_id)) => { // Case: Assigned first row ID for data manifest, but the writer does not have a next-row-id assigned. @@ -890,6 +916,38 @@ impl ManifestFile { entry.inherit_data(self); } + // v3 row-lineage: stamp per-DataFile.first_row_id from manifest cursor for non-DELETED entries. + if matches!(metadata.format_version, FormatVersion::V3) + && metadata.content == ManifestContentType::Data + && let Some(mut cursor) = self.first_row_id + { + for entry in &mut entries { + if entry.status == crate::spec::ManifestStatus::Deleted { + continue; + } + let record_count = entry.data_file.record_count; + if entry.data_file.first_row_id.is_none() { + entry.data_file.first_row_id = + Some(i64::try_from(cursor).map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID {cursor} exceeds i64::MAX and cannot be encoded as DataFile.first_row_id" + ), + ) + })?); + } + cursor = cursor.checked_add(record_count).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow inheriting DataFile.first_row_id (cursor={cursor}, record_count={record_count})" + ), + ) + })?; + } + } + Ok(Manifest::new(metadata, entries)) } } @@ -2079,4 +2137,159 @@ mod test { assert_eq!(v2_manifest.partitions, None); assert_eq!(v2_manifest.key_metadata, None); } + + async fn write_v3_data_manifest_with_entries( + path: &std::path::Path, + entries: Vec, + ) -> ManifestFile { + use std::sync::Arc; + + use crate::spec::{ + ManifestWriterBuilder, NestedField, PartitionSpec, PrimitiveType, Schema, Type, + }; + + let schema = Arc::new( + Schema::builder() + .with_fields(vec![Arc::new(NestedField::optional( + 1, + "id", + Type::Primitive(PrimitiveType::Long), + ))]) + .build() + .unwrap(), + ); + let partition_spec = PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .build() + .unwrap(); + let output_file = FileIO::new_with_fs() + .new_output(path.to_str().unwrap()) + .unwrap(); + let mut writer = + ManifestWriterBuilder::new(output_file, Some(42), None, schema, partition_spec) + .build_v3_data(); + for entry in entries { + writer.add_entry(entry).unwrap(); + } + writer.write_manifest_file().await.unwrap() + } + + fn data_entry( + _status: crate::spec::ManifestStatus, + path: &str, + record_count: u64, + sequence_number: Option, + preset_first_row_id: Option, + ) -> crate::spec::ManifestEntry { + use std::collections::HashMap; + + use crate::spec::{ + DataContentType, DataFile, DataFileFormat, ManifestEntry, ManifestStatus, Struct, + }; + ManifestEntry { + status: ManifestStatus::Added, + snapshot_id: None, + sequence_number, + file_sequence_number: None, + data_file: DataFile { + content: DataContentType::Data, + file_path: path.to_string(), + file_format: DataFileFormat::Parquet, + partition: Struct::empty(), + record_count, + file_size_in_bytes: 100, + column_sizes: HashMap::new(), + value_counts: HashMap::new(), + null_value_counts: HashMap::new(), + nan_value_counts: HashMap::new(), + lower_bounds: HashMap::new(), + upper_bounds: HashMap::new(), + key_metadata: None, + split_offsets: None, + equality_ids: None, + sort_order_id: None, + partition_spec_id: 0, + first_row_id: preset_first_row_id, + referenced_data_file: None, + content_offset: None, + content_size_in_bytes: None, + }, + } + } + + #[tokio::test] + async fn test_c1_reader_inherits_per_file_first_row_id() { + use crate::spec::ManifestStatus; + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("c1.avro"); + let mut manifest_file = write_v3_data_manifest_with_entries(&path, vec![ + data_entry(ManifestStatus::Added, "f1.parquet", 10, None, None), + data_entry(ManifestStatus::Added, "f2.parquet", 20, None, None), + data_entry(ManifestStatus::Added, "f3.parquet", 30, None, None), + ]) + .await; + manifest_file.first_row_id = Some(100); + + let manifest = manifest_file + .load_manifest(&FileIO::new_with_fs()) + .await + .unwrap(); + let ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(ids, vec![Some(100), Some(110), Some(130)]); + } + + #[tokio::test] + async fn test_c2_reader_preserves_preset_per_file_id() { + use crate::spec::ManifestStatus; + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("c2.avro"); + let mut manifest_file = write_v3_data_manifest_with_entries(&path, vec![ + data_entry(ManifestStatus::Added, "f1.parquet", 10, None, Some(7)), + data_entry(ManifestStatus::Added, "f2.parquet", 20, None, None), + ]) + .await; + manifest_file.first_row_id = Some(100); + + let manifest = manifest_file + .load_manifest(&FileIO::new_with_fs()) + .await + .unwrap(); + let ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(ids, vec![Some(7), Some(110)]); + } + + #[tokio::test] + async fn test_c4_reader_no_inheritance_when_manifest_first_row_id_unset() { + use crate::spec::ManifestStatus; + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().join("c4.avro"); + let mut manifest_file = write_v3_data_manifest_with_entries(&path, vec![data_entry( + ManifestStatus::Added, + "f1.parquet", + 10, + None, + None, + )]) + .await; + manifest_file.first_row_id = None; + + let manifest = manifest_file + .load_manifest(&FileIO::new_with_fs()) + .await + .unwrap(); + assert!( + manifest + .entries() + .iter() + .all(|e| e.data_file().first_row_id().is_none()) + ); + } } diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index b68a53e5e3..8418e2eaa9 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -632,5 +632,24 @@ mod test_row_lineage { assert_eq!(manifest_list.entries().len(), 2); let manifest_file = &manifest_list.entries()[1]; assert_eq!(manifest_file.first_row_id, Some(30)); + + let manifest = manifest_file.load_manifest(table.file_io()).await.unwrap(); + let per_file_ids: Vec> = manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(per_file_ids, vec![Some(30), Some(47)]); + + let first_manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + let first_per_file_ids: Vec> = first_manifest + .entries() + .iter() + .map(|e| e.data_file().first_row_id()) + .collect(); + assert_eq!(first_per_file_ids, vec![Some(0)]); } } diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 752371804c..fb1db9d03c 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -120,6 +120,8 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, + // v3 row-lineage cursor seeded from TableMetadata.next_row_id. + next_data_manifest_first_row_id: Option, } impl<'a> SnapshotProducer<'a> { @@ -130,6 +132,9 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties: HashMap, added_data_files: Vec, ) -> Self { + let next_data_manifest_first_row_id = + matches!(table.metadata().format_version(), FormatVersion::V3) + .then(|| table.metadata().next_row_id()); Self { table, snapshot_id: Self::generate_unique_snapshot_id(table), @@ -138,6 +143,7 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties, added_data_files, manifest_counter: (0..), + next_data_manifest_first_row_id, } } @@ -242,7 +248,11 @@ impl<'a> SnapshotProducer<'a> { snapshot_id } - fn new_manifest_writer(&mut self, content: ManifestContentType) -> Result { + fn new_manifest_writer( + &mut self, + content: ManifestContentType, + added_rows: u64, + ) -> Result { let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.table.metadata().location(), @@ -270,7 +280,23 @@ impl<'a> SnapshotProducer<'a> { ManifestContentType::Deletes => Ok(builder.build_v2_deletes()), }, FormatVersion::V3 => match content { - ManifestContentType::Data => Ok(builder.build_v3_data()), + ManifestContentType::Data => { + let builder = if let Some(cursor) = self.next_data_manifest_first_row_id { + let next_cursor = cursor.checked_add(added_rows).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Row ID overflow assigning manifest first_row_id (cursor={cursor}, added_rows={added_rows})" + ), + ) + })?; + self.next_data_manifest_first_row_id = Some(next_cursor); + builder.with_first_row_id(cursor) + } else { + builder + }; + Ok(builder.build_v3_data()) + } ManifestContentType::Deletes => Ok(builder.build_v3_deletes()), }, } @@ -319,6 +345,7 @@ impl<'a> SnapshotProducer<'a> { let snapshot_id = self.snapshot_id; let format_version = self.table.metadata().format_version(); + let added_rows: u64 = added_data_files.iter().map(|df| df.record_count).sum(); let manifest_entries = added_data_files.into_iter().map(|data_file| { let builder = ManifestEntry::builder() .status(crate::spec::ManifestStatus::Added) @@ -331,7 +358,7 @@ impl<'a> SnapshotProducer<'a> { builder.build() } }); - let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; + let mut writer = self.new_manifest_writer(ManifestContentType::Data, added_rows)?; for entry in manifest_entries { writer.add_entry(entry)?; }