diff --git a/crates/iceberg/src/arrow/reader/row_filter.rs b/crates/iceberg/src/arrow/reader/row_filter.rs index 5a4b9c1ded..e63cf27e11 100644 --- a/crates/iceberg/src/arrow/reader/row_filter.rs +++ b/crates/iceberg/src/arrow/reader/row_filter.rs @@ -160,8 +160,19 @@ impl ArrowReader { /// Filters row groups by byte range to support Iceberg's file splitting. /// - /// Iceberg splits large files at row group boundaries, so we only read row groups - /// whose byte ranges overlap with [start, start+length). + /// Engines split a data file into multiple scan tasks, each covering a byte range + /// `[start, start+length)`. Normally Iceberg planning aligns these splits to row group + /// boundaries using the data file's `split_offsets` metadata, so a task's range never + /// bisects a row group. But when `split_offsets` is missing (e.g. a manually written or + /// non-conforming file), planning falls back to tiling the file at the requested split + /// size, and a task's range can land in the middle of a row group. + /// + /// A row group must be read by exactly one task, otherwise its rows are duplicated. We + /// assign ownership by the row group's midpoint: a task owns a row group only if its range + /// contains that midpoint. Because the tasks tile the file contiguously and disjointly, + /// each midpoint falls in exactly one task. This matches parquet-mr's `BlockMetaData` + /// midpoint semantics. For a whole-file task (`start=0, length=fileSize`, as iceberg-rust's + /// own planner emits) every midpoint lies in range, so all row groups are selected. pub(super) fn filter_row_groups_by_byte_range( parquet_metadata: &Arc, start: u64, @@ -176,13 +187,15 @@ impl ArrowReader { for (idx, row_group) in row_groups.iter().enumerate() { let row_group_size = row_group.compressed_size() as u64; - let row_group_end = current_byte_offset + row_group_size; + let row_group_midpoint = current_byte_offset + row_group_size / 2; - if current_byte_offset < end && start < row_group_end { + // Half-open ownership: a midpoint on a task boundary belongs to the upper task, + // so exactly one task ever claims a given row group. + if start <= row_group_midpoint && row_group_midpoint < end { selected.push(idx); } - current_byte_offset = row_group_end; + current_byte_offset += row_group_size; } Ok(selected) @@ -604,4 +617,212 @@ mod tests { assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0"); } } + + /// A single data file split into multiple sub-row-group byte ranges (as Spark/Iceberg + /// planning produces when split-size is smaller than a row group) must still yield each + /// row exactly once. The previous overlap-based selection let every split whose byte range + /// touched a row group read it, duplicating rows; ownership by midpoint reads each row group + /// from exactly one split. + #[tokio::test] + async fn test_sub_row_group_splits_do_not_duplicate_rows() { + use arrow_array::Int32Array; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{table_location}/sub_split.parquet"); + + // Three row groups of 100 rows each (ids 0..300). + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_row_count(Some(100)) + .build(); + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + for chunk in [0..100, 100..200, 200..300] { + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from(chunk.collect::>()), + )]) + .unwrap(); + writer.write(&batch).expect("Writing batch"); + } + writer.close().unwrap(); + + let file_size = std::fs::metadata(&file_path).unwrap().len(); + let file_io = FileIO::new_with_fs(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); + + // Tile the whole file into 64-byte splits, mirroring Spark's split-size planning, and + // read every split. A 64-byte split is far smaller than a row group, so each row group + // is touched by several splits but must be owned (read) by exactly one. + let mut ids = Vec::new(); + let split_size = 64u64; + let mut start = 0u64; + while start < file_size { + let length = split_size.min(file_size - start); + let task = FileScanTask::builder() + .with_file_size_in_bytes(file_size) + .with_start(start) + .with_length(length) + .with_data_file_path(file_path.clone()) + .with_data_file_format(DataFileFormat::Parquet) + .with_schema(schema.clone()) + .with_project_field_ids(vec![1]) + .with_case_sensitive(false) + .build(); + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let batches = reader + .clone() + .read(tasks) + .unwrap() + .stream() + .try_collect::>() + .await + .unwrap(); + + for batch in &batches { + let col = batch + .column(0) + .as_primitive::(); + ids.extend(col.values().iter().copied()); + } + + start += length; + } + + ids.sort_unstable(); + assert_eq!( + ids, + (0..300).collect::>(), + "each row must be read exactly once across all splits, got {} rows", + ids.len() + ); + } + + /// When a split boundary lands exactly on a row group's midpoint, half-open ownership + /// (`start <= midpoint < end`) must hand that row group to the upper split only: the lower + /// split ends at the midpoint and so excludes it, the upper split starts at the midpoint and + /// so claims it. Two splits meeting exactly at the middle row group's midpoint must therefore + /// read every row once, with the middle row group going to the upper split. + #[tokio::test] + async fn test_split_boundary_on_row_group_midpoint() { + use arrow_array::Int32Array; + use parquet::file::reader::{FileReader, SerializedFileReader}; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_path = format!("{table_location}/midpoint.parquet"); + + // Three row groups of 100 rows each (ids 0..300). + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_row_count(Some(100)) + .build(); + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + for chunk in [0..100, 100..200, 200..300] { + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from(chunk.collect::>()), + )]) + .unwrap(); + writer.write(&batch).expect("Writing batch"); + } + writer.close().unwrap(); + + // Locate the middle row group's exact midpoint. Row groups are stored back to back after + // the 4-byte magic header. + let metadata = SerializedFileReader::new(File::open(&file_path).unwrap()) + .unwrap() + .metadata() + .clone(); + assert_eq!(metadata.num_row_groups(), 3); + let rg1_start = 4 + metadata.row_group(0).compressed_size() as u64; + let rg1_size = metadata.row_group(1).compressed_size() as u64; + let rg1_midpoint = rg1_start + rg1_size / 2; + let file_end = rg1_start + rg1_size + metadata.row_group(2).compressed_size() as u64; + + let file_size = std::fs::metadata(&file_path).unwrap().len(); + let file_io = FileIO::new_with_fs(); + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); + + // Two splits meeting exactly at rg1's midpoint. The lower split ends there, the upper + // starts there; the middle row group must fall to the upper split alone. + let mut per_split = Vec::new(); + for (start, end) in [(0, rg1_midpoint), (rg1_midpoint, file_end)] { + let task = FileScanTask::builder() + .with_file_size_in_bytes(file_size) + .with_start(start) + .with_length(end - start) + .with_data_file_path(file_path.clone()) + .with_data_file_format(DataFileFormat::Parquet) + .with_schema(schema.clone()) + .with_project_field_ids(vec![1]) + .with_case_sensitive(false) + .build(); + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let batches = reader + .clone() + .read(tasks) + .unwrap() + .stream() + .try_collect::>() + .await + .unwrap(); + + let mut ids = Vec::new(); + for batch in &batches { + let col = batch + .column(0) + .as_primitive::(); + ids.extend(col.values().iter().copied()); + } + per_split.push(ids); + } + + assert_eq!( + per_split[0], + (0..100).collect::>(), + "lower split, ending at rg1's midpoint, must read only rg0" + ); + assert_eq!( + per_split[1], + (100..300).collect::>(), + "upper split, starting at rg1's midpoint, must read rg1 and rg2" + ); + } }