Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 237 additions & 5 deletions crates/iceberg/src/spec/manifest/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,21 @@ use crate::{Error, ErrorKind};
/// with the actual snapshot ID before it is committed.
const UNASSIGNED_SNAPSHOT_ID: i64 = -1;

fn u64_to_i64_row_id(value: u64) -> Result<i64> {
i64::try_from(value).map_err(|_| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Row ID {value} exceeds i64::MAX and cannot be encoded as DataFile.first_row_id"
),
)
})
}

/// The builder used to create a [`ManifestWriter`].
pub struct ManifestWriterBuilder {
output: OutputFile,
first_row_id: Option<u64>,
snapshot_id: Option<i64>,
key_metadata: Option<Vec<u8>>,
schema: SchemaRef,
Expand All @@ -60,13 +72,20 @@ impl ManifestWriterBuilder {
) -> Self {
Self {
output,
first_row_id: None,
snapshot_id,
key_metadata,
schema,
partition_spec,
}
}

/// Seed the manifest-level `first_row_id` for a v3 data manifest.
pub fn with_first_row_id(mut self, first_row_id: u64) -> Self {
self.first_row_id = Some(first_row_id);
self
}

/// Build a [`ManifestWriter`] for format version 1.
pub fn build_v1(self) -> ManifestWriter {
let metadata = ManifestMetadata::builder()
Expand Down Expand Up @@ -121,7 +140,7 @@ impl ManifestWriterBuilder {
)
}

/// Build a [`ManifestWriter`] for format version 2, data content.
/// Build a [`ManifestWriter`] for format version 3, data content.
pub fn build_v3_data(self) -> ManifestWriter {
let metadata = ManifestMetadata::builder()
.schema_id(self.schema.schema_id())
Expand All @@ -135,9 +154,7 @@ impl ManifestWriterBuilder {
self.snapshot_id,
self.key_metadata,
metadata,
// First row id is assigned by the [`ManifestListWriter`] when the manifest
// is added to the list.
None,
self.first_row_id,
)
}

Expand Down Expand Up @@ -173,6 +190,7 @@ pub struct ManifestWriter {
deleted_files: u32,
deleted_rows: u64,
first_row_id: Option<u64>,
next_data_file_row_id: Option<u64>,

min_seq_num: Option<i64>,

Expand Down Expand Up @@ -202,6 +220,7 @@ impl ManifestWriter {
deleted_files: 0,
deleted_rows: 0,
first_row_id,
next_data_file_row_id: first_row_id,
min_seq_num: None,
key_metadata,
manifest_entries: Vec::new(),
Expand Down Expand Up @@ -366,7 +385,7 @@ impl ManifestWriter {
Ok(())
}

fn add_entry_inner(&mut self, entry: ManifestEntry) -> Result<()> {
fn add_entry_inner(&mut self, mut entry: ManifestEntry) -> Result<()> {
// Check if the entry has sequence number
if (entry.status == ManifestStatus::Deleted || entry.status == ManifestStatus::Existing)
&& (entry.sequence_number.is_none() || entry.file_sequence_number.is_none())
Expand All @@ -377,6 +396,8 @@ impl ManifestWriter {
));
}

self.assign_data_file_first_row_id(&mut entry)?;

// Update the statistics
match entry.status {
ManifestStatus::Added => {
Expand All @@ -401,6 +422,36 @@ impl ManifestWriter {
Ok(())
}

fn assign_data_file_first_row_id(&mut self, entry: &mut ManifestEntry) -> Result<()> {
if self.metadata.format_version != FormatVersion::V3
|| self.metadata.content != ManifestContentType::Data
{
return Ok(());
}
let Some(cursor) = self.next_data_file_row_id else {
return Ok(());
};
if entry.status != ManifestStatus::Added {
return Ok(());
}
if entry.data_file.first_row_id.is_none() {
entry.data_file.first_row_id = Some(u64_to_i64_row_id(cursor)?);
}
self.next_data_file_row_id = cursor
.checked_add(entry.data_file.record_count)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!(
"Row ID overflow assigning DataFile.first_row_id (cursor={cursor}, record_count={})",
entry.data_file.record_count
),
)
})
.map(Some)?;
Ok(())
}

/// Write manifest file and return it.
pub async fn write_manifest_file(mut self) -> Result<ManifestFile> {
// Create the avro writer
Expand Down Expand Up @@ -804,4 +855,185 @@ mod tests {
ManifestContentType::Deletes,
);
}

fn simple_v3_schema() -> Arc<Schema> {
Arc::new(
Schema::builder()
.with_fields(vec![Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
))])
.build()
.unwrap(),
)
}

fn added_data_entry(path: &str, record_count: u64) -> ManifestEntry {
ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_path: path.to_string(),
file_format: DataFileFormat::Parquet,
partition: Struct::empty(),
record_count,
file_size_in_bytes: 100,
column_sizes: HashMap::new(),
value_counts: HashMap::new(),
null_value_counts: HashMap::new(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::new(),
upper_bounds: HashMap::new(),
key_metadata: None,
split_offsets: None,
equality_ids: None,
sort_order_id: None,
partition_spec_id: 0,
first_row_id: None,
referenced_data_file: None,
content_offset: None,
content_size_in_bytes: None,
},
}
}

#[tokio::test]
async fn test_b1_writer_assigns_per_file_first_row_id() {
let schema = simple_v3_schema();
let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.build()
.unwrap();
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("b1.avro");
let output_file = FileIO::new_with_fs()
.new_output(path.to_str().unwrap())
.unwrap();

let mut writer =
ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec)
.with_first_row_id(100)
.build_v3_data();

writer
.add_entry(added_data_entry("f1.parquet", 10))
.unwrap();
writer
.add_entry(added_data_entry("f2.parquet", 20))
.unwrap();
writer
.add_entry(added_data_entry("f3.parquet", 30))
.unwrap();
let manifest_file = writer.write_manifest_file().await.unwrap();

assert_eq!(manifest_file.added_rows_count, Some(60));
assert_eq!(manifest_file.first_row_id, Some(100));

let manifest = Manifest::parse_avro(&fs::read(path).unwrap()).unwrap();
let ids: Vec<Option<i64>> = manifest
.entries()
.iter()
.map(|e| e.data_file().first_row_id())
.collect();
assert_eq!(ids, vec![Some(100), Some(110), Some(130)]);
}

#[tokio::test]
async fn test_b3_writer_preserves_preset_first_row_id() {
let schema = simple_v3_schema();
let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.build()
.unwrap();
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("b3.avro");
let output_file = FileIO::new_with_fs()
.new_output(path.to_str().unwrap())
.unwrap();

let mut writer =
ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec)
.with_first_row_id(100)
.build_v3_data();

let mut preset = added_data_entry("f1.parquet", 10);
preset.data_file.first_row_id = Some(500);
writer.add_entry(preset).unwrap();
writer
.add_entry(added_data_entry("f2.parquet", 20))
.unwrap();
writer.write_manifest_file().await.unwrap();

let manifest = Manifest::parse_avro(&fs::read(path).unwrap()).unwrap();
let ids: Vec<Option<i64>> = manifest
.entries()
.iter()
.map(|e| e.data_file().first_row_id())
.collect();
assert_eq!(ids, vec![Some(500), Some(110)]);
}

#[tokio::test]
async fn test_b4_v3_delete_manifest_never_assigns_per_file_id() {
let schema = simple_v3_schema();
let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.build()
.unwrap();
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("b4.avro");
let output_file = FileIO::new_with_fs()
.new_output(path.to_str().unwrap())
.unwrap();

let mut writer =
ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec)
.with_first_row_id(100)
.build_v3_deletes();

let mut entry = added_data_entry("d1.parquet", 10);
entry.data_file.content = DataContentType::PositionDeletes;
writer.add_entry(entry).unwrap();
writer.write_manifest_file().await.unwrap();

let manifest = Manifest::parse_avro(&fs::read(path).unwrap()).unwrap();
assert!(
manifest
.entries()
.iter()
.all(|e| e.data_file().first_row_id().is_none())
);
}

#[tokio::test]
async fn test_b6_writer_first_row_id_overflow() {
let schema = simple_v3_schema();
let partition_spec = PartitionSpec::builder(schema.clone())
.with_spec_id(0)
.build()
.unwrap();
let tmp_dir = TempDir::new().unwrap();
let path = tmp_dir.path().join("b6.avro");
let output_file = FileIO::new_with_fs()
.new_output(path.to_str().unwrap())
.unwrap();

let mut writer =
ManifestWriterBuilder::new(output_file, Some(1), None, schema, partition_spec)
.with_first_row_id(u64::MAX - 5)
.build_v3_data();

let err = writer
.add_entry(added_data_entry("f1.parquet", 100))
.expect_err("expected overflow / range error");
let msg = format!("{err:?}");
assert!(
msg.contains("overflow") || msg.contains("exceeds i64"),
"unexpected error: {msg}",
);
}
}
Loading
Loading