From 0c9d7c7b8f31994cc395c7232f596bcad7f51f27 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Sat, 13 Jun 2026 20:39:01 +0800 Subject: [PATCH 1/2] init fix --- .../src/row_group_filter.rs | 76 ++++++++++++++++++- 1 file changed, 73 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 1e9b0636e59e9..7bcf4eb8094e0 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -18,7 +18,7 @@ use std::collections::HashSet; use std::sync::Arc; -use super::{ParquetAccessPlan, ParquetFileMetrics}; +use super::{ParquetAccessPlan, ParquetFileMetrics, RowGroupAccess}; // Re-exported so the existing `crate::row_group_filter::BloomFilterStatistics` // path keeps resolving for in-crate callers (e.g. `opener`). pub(crate) use crate::bloom_filter::BloomFilterStatistics; @@ -190,7 +190,11 @@ impl RowGroupAccessPlanFilter { // find a set of matching row groups that can satisfy the limit for &idx in self.access_plan.row_group_indexes().iter() { if self.access_plan.is_fully_matched(idx) { - let row_group_row_count = rg_metadata[idx].num_rows() as usize; + let row_group_row_count = match &self.access_plan.inner()[idx] { + RowGroupAccess::Skip => 0, + RowGroupAccess::Scan => rg_metadata[idx].num_rows() as usize, + RowGroupAccess::Selection(selection) => selection.row_count(), + }; fully_matched_row_group_indexes.push(idx); fully_matched_rows_count += row_group_row_count; if fully_matched_rows_count >= limit { @@ -211,7 +215,7 @@ impl RowGroupAccessPlanFilter { let mut new_access_plan = ParquetAccessPlan::new_none(rg_metadata.len()); for &idx in &fully_matched_row_group_indexes { - new_access_plan.scan(idx); + new_access_plan.set(idx, self.access_plan.inner()[idx].clone()); new_access_plan.mark_fully_matched(idx); } self.access_plan = new_access_plan; @@ -535,6 +539,7 @@ mod tests { use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use parquet::arrow::ArrowSchemaConverter; + use parquet::arrow::arrow_reader::{RowSelection, RowSelector}; use parquet::basic::LogicalType; use parquet::data_type::{ByteArray, FixedLenByteArray}; use parquet::file::metadata::ColumnChunkMetaData; @@ -701,6 +706,71 @@ mod tests { assert_eq!(row_groups.is_fully_matched(), &vec![false, true, false]); } + #[test] + fn prune_by_limit_preserves_row_selection() { + let field = PrimitiveTypeField::new("c1", PhysicalType::INT32); + let schema_descr = get_test_schema_descr(vec![field]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(None, None, None, Some(0), false)], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(None, None, None, Some(0), false)], + ); + let groups = &[rgm1, rgm2]; + + let selection = + RowSelection::from(vec![RowSelector::skip(900), RowSelector::select(100)]); + let mut access_plan = ParquetAccessPlan::new_all(2); + access_plan.scan_selection(0, selection.clone()); + access_plan.mark_fully_matched(0); + access_plan.mark_fully_matched(1); + + let metrics = parquet_file_metrics(); + let mut row_groups = RowGroupAccessPlanFilter::new(access_plan); + row_groups.prune_by_limit(50, groups, &metrics); + + assert_eq!(row_groups.access_plan.row_group_indexes(), vec![0]); + assert_eq!( + row_groups.access_plan.inner(), + &[RowGroupAccess::Selection(selection), RowGroupAccess::Skip] + ); + assert_eq!(row_groups.is_fully_matched(), &vec![true, false]); + } + + #[test] + fn prune_by_limit_counts_only_selected_rows() { + let field = PrimitiveTypeField::new("c1", PhysicalType::INT32); + let schema_descr = get_test_schema_descr(vec![field]); + let rgm1 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(None, None, None, Some(0), false)], + ); + let rgm2 = get_row_group_meta_data( + &schema_descr, + vec![ParquetStatistics::int32(None, None, None, Some(0), false)], + ); + let groups = &[rgm1, rgm2]; + + let selection = + RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(990)]); + let mut access_plan = ParquetAccessPlan::new_all(2); + access_plan.scan_selection(0, selection.clone()); + access_plan.mark_fully_matched(0); + + let metrics = parquet_file_metrics(); + let mut row_groups = RowGroupAccessPlanFilter::new(access_plan); + row_groups.prune_by_limit(50, groups, &metrics); + + assert_eq!(row_groups.access_plan.row_group_indexes(), vec![0, 1]); + assert_eq!( + row_groups.access_plan.inner(), + &[RowGroupAccess::Selection(selection), RowGroupAccess::Scan] + ); + assert_eq!(row_groups.is_fully_matched(), &vec![true, false]); + } + #[test] fn row_group_pruning_predicate_missing_stats() { use datafusion_expr::{col, lit}; From b8be15c9be231228a42d107b2641fe992b81b93d Mon Sep 17 00:00:00 2001 From: Huaijin Date: Sat, 13 Jun 2026 21:37:17 +0800 Subject: [PATCH 2/2] update --- datafusion/datasource-parquet/src/row_group_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 7bcf4eb8094e0..8893c86a5cc7a 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -191,7 +191,7 @@ impl RowGroupAccessPlanFilter { for &idx in self.access_plan.row_group_indexes().iter() { if self.access_plan.is_fully_matched(idx) { let row_group_row_count = match &self.access_plan.inner()[idx] { - RowGroupAccess::Skip => 0, + RowGroupAccess::Skip => continue, RowGroupAccess::Scan => rg_metadata[idx].num_rows() as usize, RowGroupAccess::Selection(selection) => selection.row_count(), };