From 7832d39a97fd67981cbcc363405ba0f732df38c1 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 10 Jun 2026 13:46:46 -0400 Subject: [PATCH 1/3] fix(arrow): assign row groups to splits by midpoint, not byte-range overlap --- crates/iceberg/src/arrow/reader/row_filter.rs | 90 +++++++++++++++++-- 1 file changed, 85 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/arrow/reader/row_filter.rs b/crates/iceberg/src/arrow/reader/row_filter.rs index 5a4b9c1ded..3a28b088bf 100644 --- a/crates/iceberg/src/arrow/reader/row_filter.rs +++ b/crates/iceberg/src/arrow/reader/row_filter.rs @@ -160,8 +160,14 @@ impl ArrowReader { /// Filters row groups by byte range to support Iceberg's file splitting. /// - /// Iceberg splits large files at row group boundaries, so we only read row groups - /// whose byte ranges overlap with [start, start+length). + /// External engines (e.g. Spark via Comet) split a data file into multiple scan tasks, + /// each covering a byte range `[start, start+length)`. A row group must be read by exactly + /// one task, otherwise its rows are duplicated. We assign ownership by the row group's + /// midpoint: a task owns a row group only if its range contains that midpoint. Because the + /// tasks tile the file contiguously and disjointly, each midpoint falls in exactly one task. + /// This matches parquet-mr's `BlockMetaData` midpoint semantics. For a whole-file task + /// (`start=0, length=fileSize`, as iceberg-rust's own planner emits) every midpoint lies in + /// range, so all row groups are selected. pub(super) fn filter_row_groups_by_byte_range( parquet_metadata: &Arc, start: u64, @@ -176,13 +182,15 @@ impl ArrowReader { for (idx, row_group) in row_groups.iter().enumerate() { let row_group_size = row_group.compressed_size() as u64; - let row_group_end = current_byte_offset + row_group_size; + let row_group_midpoint = current_byte_offset + row_group_size / 2; - if current_byte_offset < end && start < row_group_end { + // Half-open ownership: a midpoint on a task boundary belongs to the upper task, + // so exactly one task ever claims a given row group. + if start <= row_group_midpoint && row_group_midpoint < end { selected.push(idx); } - current_byte_offset = row_group_end; + current_byte_offset += row_group_size; } Ok(selected) @@ -604,4 +612,76 @@ mod tests { assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0"); } } + + /// Reproduces row duplication when a single row group is subdivided into multiple + /// byte-range splits (e.g. Spark/Iceberg planning with a split-size smaller than the + /// row group size). Each row group must be owned by exactly one split. With the + /// previous overlap-based selection, every split whose byte range touched the row + /// group claimed it, so the row group was read multiple times and its rows duplicated. + #[tokio::test] + async fn test_sub_row_group_splits_assign_each_row_group_once() { + use std::sync::Arc as StdArc; + + use arrow_array::Int32Array; + use parquet::file::metadata::ParquetMetaData; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{table_location}/sub_split.parquet"); + + // Three row groups so we exercise both interior and boundary midpoints. + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_row_count(Some(100)) + .build(); + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + for chunk in [0..100, 100..200, 200..300] { + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from(chunk.collect::>()), + )]) + .unwrap(); + writer.write(&batch).expect("Writing batch"); + } + writer.close().unwrap(); + + let file = File::open(&file_path).unwrap(); + let reader = SerializedFileReader::new(file).unwrap(); + let metadata: StdArc = StdArc::new(reader.metadata().clone()); + let num_row_groups = metadata.num_row_groups(); + assert_eq!(num_row_groups, 3, "Expected 3 row groups"); + + // Tile the whole file into 64-byte splits, mirroring Spark's split-size=64 planning. + let file_size = std::fs::metadata(&file_path).unwrap().len(); + let split_size = 64u64; + + // Count how many splits claim each row group. Every row group must be claimed exactly + // once; more than once is the duplication bug, zero means data is silently dropped. + let mut claims = vec![0usize; num_row_groups]; + let mut start = 0u64; + while start < file_size { + let length = split_size.min(file_size - start); + let selected = + ArrowReader::filter_row_groups_by_byte_range(&metadata, start, length).unwrap(); + for idx in selected { + claims[idx] += 1; + } + start += length; + } + + for (idx, &count) in claims.iter().enumerate() { + assert_eq!( + count, 1, + "row group {idx} was claimed by {count} splits, expected exactly 1" + ); + } + } } From 2b27ebe2667bd812379af551769ff1d2e3f75619 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 10 Jun 2026 13:53:13 -0400 Subject: [PATCH 2/3] refactor test, cleanup --- crates/iceberg/src/arrow/reader/row_filter.rs | 90 ++++++++++++------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/crates/iceberg/src/arrow/reader/row_filter.rs b/crates/iceberg/src/arrow/reader/row_filter.rs index 3a28b088bf..a959db1608 100644 --- a/crates/iceberg/src/arrow/reader/row_filter.rs +++ b/crates/iceberg/src/arrow/reader/row_filter.rs @@ -613,18 +613,24 @@ mod tests { } } - /// Reproduces row duplication when a single row group is subdivided into multiple - /// byte-range splits (e.g. Spark/Iceberg planning with a split-size smaller than the - /// row group size). Each row group must be owned by exactly one split. With the - /// previous overlap-based selection, every split whose byte range touched the row - /// group claimed it, so the row group was read multiple times and its rows duplicated. + /// A single data file split into multiple sub-row-group byte ranges (as Spark/Iceberg + /// planning produces when split-size is smaller than a row group) must still yield each + /// row exactly once. The previous overlap-based selection let every split whose byte range + /// touched a row group read it, duplicating rows; ownership by midpoint reads each row group + /// from exactly one split. #[tokio::test] - async fn test_sub_row_group_splits_assign_each_row_group_once() { - use std::sync::Arc as StdArc; - + async fn test_sub_row_group_splits_do_not_duplicate_rows() { use arrow_array::Int32Array; - use parquet::file::metadata::ParquetMetaData; - use parquet::file::reader::{FileReader, SerializedFileReader}; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); let arrow_schema = Arc::new(ArrowSchema::new(vec![ Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( @@ -637,7 +643,7 @@ mod tests { let table_location = tmp_dir.path().to_str().unwrap().to_string(); let file_path = format!("{table_location}/sub_split.parquet"); - // Three row groups so we exercise both interior and boundary midpoints. + // Three row groups of 100 rows each (ids 0..300). let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) .set_max_row_group_row_count(Some(100)) @@ -653,35 +659,55 @@ mod tests { } writer.close().unwrap(); - let file = File::open(&file_path).unwrap(); - let reader = SerializedFileReader::new(file).unwrap(); - let metadata: StdArc = StdArc::new(reader.metadata().clone()); - let num_row_groups = metadata.num_row_groups(); - assert_eq!(num_row_groups, 3, "Expected 3 row groups"); - - // Tile the whole file into 64-byte splits, mirroring Spark's split-size=64 planning. let file_size = std::fs::metadata(&file_path).unwrap().len(); - let split_size = 64u64; + let file_io = FileIO::new_with_fs(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); - // Count how many splits claim each row group. Every row group must be claimed exactly - // once; more than once is the duplication bug, zero means data is silently dropped. - let mut claims = vec![0usize; num_row_groups]; + // Tile the whole file into 64-byte splits, mirroring Spark's split-size planning, and + // read every split. A 64-byte split is far smaller than a row group, so each row group + // is touched by several splits but must be owned (read) by exactly one. + let mut ids = Vec::new(); + let split_size = 64u64; let mut start = 0u64; while start < file_size { let length = split_size.min(file_size - start); - let selected = - ArrowReader::filter_row_groups_by_byte_range(&metadata, start, length).unwrap(); - for idx in selected { - claims[idx] += 1; + let task = FileScanTask::builder() + .with_file_size_in_bytes(file_size) + .with_start(start) + .with_length(length) + .with_data_file_path(file_path.clone()) + .with_data_file_format(DataFileFormat::Parquet) + .with_schema(schema.clone()) + .with_project_field_ids(vec![1]) + .with_case_sensitive(false) + .build(); + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let batches = reader + .clone() + .read(tasks) + .unwrap() + .stream() + .try_collect::>() + .await + .unwrap(); + + for batch in &batches { + let col = batch + .column(0) + .as_primitive::(); + ids.extend(col.values().iter().copied()); } + start += length; } - for (idx, &count) in claims.iter().enumerate() { - assert_eq!( - count, 1, - "row group {idx} was claimed by {count} splits, expected exactly 1" - ); - } + ids.sort_unstable(); + assert_eq!( + ids, + (0..300).collect::>(), + "each row must be read exactly once across all splits, got {} rows", + ids.len() + ); } } From 46dde2cc9eb128f66940878d183e06cb60ffc283 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 11 Jun 2026 06:18:11 -0400 Subject: [PATCH 3/3] address PR feedback: clarifying docs, add test for exact midpoint split --- crates/iceberg/src/arrow/reader/row_filter.rs | 131 ++++++++++++++++-- 1 file changed, 123 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/arrow/reader/row_filter.rs b/crates/iceberg/src/arrow/reader/row_filter.rs index a959db1608..e63cf27e11 100644 --- a/crates/iceberg/src/arrow/reader/row_filter.rs +++ b/crates/iceberg/src/arrow/reader/row_filter.rs @@ -160,14 +160,19 @@ impl ArrowReader { /// Filters row groups by byte range to support Iceberg's file splitting. /// - /// External engines (e.g. Spark via Comet) split a data file into multiple scan tasks, - /// each covering a byte range `[start, start+length)`. A row group must be read by exactly - /// one task, otherwise its rows are duplicated. We assign ownership by the row group's - /// midpoint: a task owns a row group only if its range contains that midpoint. Because the - /// tasks tile the file contiguously and disjointly, each midpoint falls in exactly one task. - /// This matches parquet-mr's `BlockMetaData` midpoint semantics. For a whole-file task - /// (`start=0, length=fileSize`, as iceberg-rust's own planner emits) every midpoint lies in - /// range, so all row groups are selected. + /// Engines split a data file into multiple scan tasks, each covering a byte range + /// `[start, start+length)`. Normally Iceberg planning aligns these splits to row group + /// boundaries using the data file's `split_offsets` metadata, so a task's range never + /// bisects a row group. But when `split_offsets` is missing (e.g. a manually written or + /// non-conforming file), planning falls back to tiling the file at the requested split + /// size, and a task's range can land in the middle of a row group. + /// + /// A row group must be read by exactly one task, otherwise its rows are duplicated. We + /// assign ownership by the row group's midpoint: a task owns a row group only if its range + /// contains that midpoint. Because the tasks tile the file contiguously and disjointly, + /// each midpoint falls in exactly one task. This matches parquet-mr's `BlockMetaData` + /// midpoint semantics. For a whole-file task (`start=0, length=fileSize`, as iceberg-rust's + /// own planner emits) every midpoint lies in range, so all row groups are selected. pub(super) fn filter_row_groups_by_byte_range( parquet_metadata: &Arc, start: u64, @@ -710,4 +715,114 @@ mod tests { ids.len() ); } + + /// When a split boundary lands exactly on a row group's midpoint, half-open ownership + /// (`start <= midpoint < end`) must hand that row group to the upper split only: the lower + /// split ends at the midpoint and so excludes it, the upper split starts at the midpoint and + /// so claims it. Two splits meeting exactly at the middle row group's midpoint must therefore + /// read every row once, with the middle row group going to the upper split. + #[tokio::test] + async fn test_split_boundary_on_row_group_midpoint() { + use arrow_array::Int32Array; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{table_location}/midpoint.parquet"); + + // Three row groups of 100 rows each (ids 0..300). + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_row_count(Some(100)) + .build(); + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + for chunk in [0..100, 100..200, 200..300] { + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from(chunk.collect::>()), + )]) + .unwrap(); + writer.write(&batch).expect("Writing batch"); + } + writer.close().unwrap(); + + // Locate the middle row group's exact midpoint. Row groups are stored back to back after + // the 4-byte magic header. + let metadata = SerializedFileReader::new(File::open(&file_path).unwrap()) + .unwrap() + .metadata() + .clone(); + assert_eq!(metadata.num_row_groups(), 3); + let rg1_start = 4 + metadata.row_group(0).compressed_size() as u64; + let rg1_size = metadata.row_group(1).compressed_size() as u64; + let rg1_midpoint = rg1_start + rg1_size / 2; + let file_end = rg1_start + rg1_size + metadata.row_group(2).compressed_size() as u64; + + let file_size = std::fs::metadata(&file_path).unwrap().len(); + let file_io = FileIO::new_with_fs(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); + + // Two splits meeting exactly at rg1's midpoint. The lower split ends there, the upper + // starts there; the middle row group must fall to the upper split alone. + let mut per_split = Vec::new(); + for (start, end) in [(0, rg1_midpoint), (rg1_midpoint, file_end)] { + let task = FileScanTask::builder() + .with_file_size_in_bytes(file_size) + .with_start(start) + .with_length(end - start) + .with_data_file_path(file_path.clone()) + .with_data_file_format(DataFileFormat::Parquet) + .with_schema(schema.clone()) + .with_project_field_ids(vec![1]) + .with_case_sensitive(false) + .build(); + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let batches = reader + .clone() + .read(tasks) + .unwrap() + .stream() + .try_collect::>() + .await + .unwrap(); + + let mut ids = Vec::new(); + for batch in &batches { + let col = batch + .column(0) + .as_primitive::(); + ids.extend(col.values().iter().copied()); + } + per_split.push(ids); + } + + assert_eq!( + per_split[0], + (0..100).collect::>(), + "lower split, ending at rg1's midpoint, must read only rg0" + ); + assert_eq!( + per_split[1], + (100..300).collect::>(), + "upper split, starting at rg1's midpoint, must read rg1 and rg2" + ); + } }