Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 226 additions & 5 deletions crates/iceberg/src/arrow/reader/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,19 @@ impl ArrowReader {

/// Filters row groups by byte range to support Iceberg's file splitting.
///
/// Iceberg splits large files at row group boundaries, so we only read row groups
/// whose byte ranges overlap with [start, start+length).
/// Engines split a data file into multiple scan tasks, each covering a byte range
/// `[start, start+length)`. Normally Iceberg planning aligns these splits to row group
/// boundaries using the data file's `split_offsets` metadata, so a task's range never
/// bisects a row group. But when `split_offsets` is missing (e.g. a manually written or
/// non-conforming file), planning falls back to tiling the file at the requested split
/// size, and a task's range can land in the middle of a row group.
///
/// A row group must be read by exactly one task, otherwise its rows are duplicated. We
/// assign ownership by the row group's midpoint: a task owns a row group only if its range
/// contains that midpoint. Because the tasks tile the file contiguously and disjointly,
/// each midpoint falls in exactly one task. This matches parquet-mr's `BlockMetaData`
/// midpoint semantics. For a whole-file task (`start=0, length=fileSize`, as iceberg-rust's
/// own planner emits) every midpoint lies in range, so all row groups are selected.
pub(super) fn filter_row_groups_by_byte_range(
parquet_metadata: &Arc<ParquetMetaData>,
start: u64,
Expand All @@ -176,13 +187,15 @@ impl ArrowReader {

for (idx, row_group) in row_groups.iter().enumerate() {
let row_group_size = row_group.compressed_size() as u64;
let row_group_end = current_byte_offset + row_group_size;
let row_group_midpoint = current_byte_offset + row_group_size / 2;

if current_byte_offset < end && start < row_group_end {
// Half-open ownership: a midpoint on a task boundary belongs to the upper task,
// so exactly one task ever claims a given row group.
if start <= row_group_midpoint && row_group_midpoint < end {
selected.push(idx);
}

current_byte_offset = row_group_end;
current_byte_offset += row_group_size;
}

Ok(selected)
Expand Down Expand Up @@ -604,4 +617,212 @@ mod tests {
assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
}
}

/// A single data file split into multiple sub-row-group byte ranges (as Spark/Iceberg
/// planning produces when split-size is smaller than a row group) must still yield each
/// row exactly once. The previous overlap-based selection let every split whose byte range
/// touched a row group read it, duplicating rows; ownership by midpoint reads each row group
/// from exactly one split.
#[tokio::test]
async fn test_sub_row_group_splits_do_not_duplicate_rows() {
use arrow_array::Int32Array;

let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_path = format!("{table_location}/sub_split.parquet");

// Three row groups of 100 rows each (ids 0..300).
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_row_count(Some(100))
.build();
let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
for chunk in [0..100, 100..200, 200..300] {
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from(chunk.collect::<Vec<i32>>()),
)])
.unwrap();
writer.write(&batch).expect("Writing batch");
}
writer.close().unwrap();

let file_size = std::fs::metadata(&file_path).unwrap().len();
let file_io = FileIO::new_with_fs();
let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build();

// Tile the whole file into 64-byte splits, mirroring Spark's split-size planning, and
// read every split. A 64-byte split is far smaller than a row group, so each row group
// is touched by several splits but must be owned (read) by exactly one.
let mut ids = Vec::new();
let split_size = 64u64;
let mut start = 0u64;
while start < file_size {
let length = split_size.min(file_size - start);
let task = FileScanTask::builder()
.with_file_size_in_bytes(file_size)
.with_start(start)
Comment thread
mbutrovich marked this conversation as resolved.
.with_length(length)
.with_data_file_path(file_path.clone())
.with_data_file_format(DataFileFormat::Parquet)
.with_schema(schema.clone())
.with_project_field_ids(vec![1])
.with_case_sensitive(false)
.build();

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
let batches = reader
.clone()
.read(tasks)
.unwrap()
.stream()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

for batch in &batches {
let col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
ids.extend(col.values().iter().copied());
}

start += length;
}

ids.sort_unstable();
assert_eq!(
ids,
(0..300).collect::<Vec<i32>>(),
"each row must be read exactly once across all splits, got {} rows",
ids.len()
);
}

/// When a split boundary lands exactly on a row group's midpoint, half-open ownership
/// (`start <= midpoint < end`) must hand that row group to the upper split only: the lower
/// split ends at the midpoint and so excludes it, the upper split starts at the midpoint and
/// so claims it. Two splits meeting exactly at the middle row group's midpoint must therefore
/// read every row once, with the middle row group going to the upper split.
#[tokio::test]
async fn test_split_boundary_on_row_group_midpoint() {
use arrow_array::Int32Array;
use parquet::file::reader::{FileReader, SerializedFileReader};

let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap().to_string();
let file_path = format!("{table_location}/midpoint.parquet");

// Three row groups of 100 rows each (ids 0..300).
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_row_count(Some(100))
.build();
let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();
for chunk in [0..100, 100..200, 200..300] {
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from(chunk.collect::<Vec<i32>>()),
)])
.unwrap();
writer.write(&batch).expect("Writing batch");
}
writer.close().unwrap();

// Locate the middle row group's exact midpoint. Row groups are stored back to back after
// the 4-byte magic header.
let metadata = SerializedFileReader::new(File::open(&file_path).unwrap())
.unwrap()
.metadata()
.clone();
assert_eq!(metadata.num_row_groups(), 3);
let rg1_start = 4 + metadata.row_group(0).compressed_size() as u64;
let rg1_size = metadata.row_group(1).compressed_size() as u64;
let rg1_midpoint = rg1_start + rg1_size / 2;
let file_end = rg1_start + rg1_size + metadata.row_group(2).compressed_size() as u64;

let file_size = std::fs::metadata(&file_path).unwrap().len();
let file_io = FileIO::new_with_fs();
let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build();

// Two splits meeting exactly at rg1's midpoint. The lower split ends there, the upper
// starts there; the middle row group must fall to the upper split alone.
let mut per_split = Vec::new();
for (start, end) in [(0, rg1_midpoint), (rg1_midpoint, file_end)] {
let task = FileScanTask::builder()
.with_file_size_in_bytes(file_size)
.with_start(start)
.with_length(end - start)
.with_data_file_path(file_path.clone())
.with_data_file_format(DataFileFormat::Parquet)
.with_schema(schema.clone())
.with_project_field_ids(vec![1])
.with_case_sensitive(false)
.build();

let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream;
let batches = reader
.clone()
.read(tasks)
.unwrap()
.stream()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

let mut ids = Vec::new();
for batch in &batches {
let col = batch
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
ids.extend(col.values().iter().copied());
}
per_split.push(ids);
}

assert_eq!(
per_split[0],
(0..100).collect::<Vec<i32>>(),
"lower split, ending at rg1's midpoint, must read only rg0"
);
assert_eq!(
per_split[1],
(100..300).collect::<Vec<i32>>(),
"upper split, starting at rg1's midpoint, must read rg1 and rg2"
);
}
}
Loading