From 4e29a81e8dec829dc3f1a5896db7b57de101ecbf Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Fri, 12 Jun 2026 17:52:12 +0800 Subject: [PATCH 1/9] bench(parquet): add row filter baseline cases --- parquet/benches/arrow_reader_row_filter.rs | 1444 +++++++++++++++++++- parquet/benches/row_selection_cursor.rs | 148 ++ 2 files changed, 1559 insertions(+), 33 deletions(-) diff --git a/parquet/benches/arrow_reader_row_filter.rs b/parquet/benches/arrow_reader_row_filter.rs index 2b5a09eebcb3..6ae7c816e56b 100644 --- a/parquet/benches/arrow_reader_row_filter.rs +++ b/parquet/benches/arrow_reader_row_filter.rs @@ -52,20 +52,24 @@ //! - unsel_clustered: for Unselective Clustered – in each 10K-row block, rows with an offset >= 1000 are "unsel_clustered". //! -use arrow::array::{ArrayRef, BooleanArray, Float64Array, Int64Array, TimestampMillisecondArray}; -use arrow::compute::and; +use arrow::array::{ + ArrayRef, BooleanArray, Float64Array, Int64Array, StructArray, TimestampMillisecondArray, +}; use arrow::compute::kernels::cmp::{eq, gt, lt, neq}; +use arrow::compute::{and, or}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; use arrow_array::StringViewArray; use arrow_array::builder::{ArrayBuilder, StringViewBuilder}; -use arrow_cast::pretty::pretty_format_batches; use bytes::Bytes; -use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use criterion::{ + BenchmarkGroup, BenchmarkId, Criterion, criterion_group, criterion_main, measurement::WallTime, +}; use futures::future::BoxFuture; use futures::{FutureExt, StreamExt}; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter, + RowSelectionPolicy, }; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder, ProjectionMask}; @@ -76,6 +80,9 @@ use rand::{Rng, SeedableRng, rngs::StdRng}; use std::ops::Range; use std::sync::Arc; +const COLUMN_NAMES: [&str; 4] = ["int64", "float64", "utf8View", "ts"]; +const UTF8_VIEW_MISSING_VALUE: &str = "__arrow_rs_missing__"; + /// Generates a random string. Has a 50% chance to generate a short string (3–11 characters) /// or a long string (13–20 characters). fn random_string(rng: &mut StdRng) -> String { @@ -188,32 +195,77 @@ const ROW_GROUP_SIZE: usize = 100_000; /// Writes the RecordBatch to an in memory buffer, returning the buffer fn write_parquet_file() -> Vec { - let batch = create_record_batch(TOTAL_ROWS); - println!("Batch created with {TOTAL_ROWS} rows, row group size = {ROW_GROUP_SIZE}"); - println!( - "First 100 rows:\n{}", - pretty_format_batches(&[batch.clone().slice(0, 100)]).unwrap() - ); + write_parquet_file_with_rows(TOTAL_ROWS, ROW_GROUP_SIZE) +} + +/// Writes a RecordBatch with a configurable shape to an in memory buffer, +/// returning the buffer. +fn write_parquet_file_with_rows(total_rows: usize, row_group_size: usize) -> Vec { + let batch = create_record_batch(total_rows); + write_record_batch_to_parquet(&batch, row_group_size) +} + +fn write_record_batch_to_parquet(batch: &RecordBatch, row_group_size: usize) -> Vec { let schema = batch.schema(); let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) - .set_max_row_group_row_count(Some(ROW_GROUP_SIZE)) + .set_max_row_group_row_count(Some(row_group_size)) .build(); let mut buffer = vec![]; { let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap(); - writer.write(&batch).unwrap(); + writer.write(batch).unwrap(); writer.close().unwrap(); } buffer } +fn create_nested_record_batch(size: usize) -> RecordBatch { + let tag = Arc::new(StringViewArray::from_iter_values( + (0..size).map(|idx| format!("tag_{}", idx % 7)), + )) as ArrayRef; + let payload = StructArray::from(vec![ + ( + Arc::new(Field::new("id", DataType::Int64, false)), + Arc::new(Int64Array::from_iter_values( + (0..size).map(|idx| idx as i64 + 1_000), + )) as ArrayRef, + ), + ( + Arc::new(Field::new("label", DataType::Utf8View, false)), + Arc::new(StringViewArray::from_iter_values( + (0..size).map(|idx| format!("payload_{idx}")), + )) as ArrayRef, + ), + ]); + let payload = Arc::new(payload) as ArrayRef; + let value = Arc::new(Int64Array::from_iter_values( + (0..size).map(|idx| idx as i64 + 10_000), + )) as ArrayRef; + + RecordBatch::try_from_iter(vec![("tag", tag), ("payload", payload), ("value", value)]).unwrap() +} + +fn write_nested_parquet_file_with_rows(total_rows: usize, row_group_size: usize) -> Vec { + let batch = create_nested_record_batch(total_rows); + write_record_batch_to_parquet(&batch, row_group_size) +} + /// ProjectionCase defines the projection mode for the benchmark: /// either projecting all columns or excluding the column that is used for filtering. -#[derive(Clone)] +#[derive(Clone, Copy)] enum ProjectionCase { AllColumns, ExcludeFilterColumn, + FilterColumnsOnly, + CountOnly, + FixedColumns, + Float64AndTs, + Float64Only, + Int64AndFloat64, + Int64AndUtf8, + TsAndUtf8, + Utf8Only, } impl std::fmt::Display for ProjectionCase { @@ -221,6 +273,53 @@ impl std::fmt::Display for ProjectionCase { match self { ProjectionCase::AllColumns => write!(f, "all_columns"), ProjectionCase::ExcludeFilterColumn => write!(f, "exclude_filter_column"), + ProjectionCase::FilterColumnsOnly => write!(f, "filter_columns_only"), + ProjectionCase::CountOnly => write!(f, "count_only"), + ProjectionCase::FixedColumns => write!(f, "fixed_columns"), + ProjectionCase::Float64AndTs => write!(f, "float64_and_ts"), + ProjectionCase::Float64Only => write!(f, "float64_only"), + ProjectionCase::Int64AndFloat64 => write!(f, "int64_and_float64"), + ProjectionCase::Int64AndUtf8 => write!(f, "int64_and_utf8"), + ProjectionCase::TsAndUtf8 => write!(f, "ts_and_utf8"), + ProjectionCase::Utf8Only => write!(f, "utf8_only"), + } + } +} + +#[derive(Clone, Copy)] +enum SyncStrategy { + FullPostFilter, + PushdownAuto, + PushdownSelectors, + PushdownMask, +} + +impl std::fmt::Display for SyncStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SyncStrategy::FullPostFilter => write!(f, "full_post_filter"), + SyncStrategy::PushdownAuto => write!(f, "pushdown_auto"), + SyncStrategy::PushdownSelectors => write!(f, "pushdown_selectors"), + SyncStrategy::PushdownMask => write!(f, "pushdown_mask"), + } + } +} + +#[derive(Clone, Copy)] +enum AsyncStrategy { + FullPostFilter, + PushdownAuto, + PushdownSelectors, + PushdownMask, +} + +impl std::fmt::Display for AsyncStrategy { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + AsyncStrategy::FullPostFilter => write!(f, "full_post_filter"), + AsyncStrategy::PushdownAuto => write!(f, "pushdown_auto"), + AsyncStrategy::PushdownSelectors => write!(f, "pushdown_selectors"), + AsyncStrategy::PushdownMask => write!(f, "pushdown_mask"), } } } @@ -334,6 +433,71 @@ enum FilterType { /// [ClickBench]: https://github.com/ClickHouse/ClickBench /// [Q21-Q27]: https://github.com/apache/datafusion/blob/b7177234e65cbbb2dcc04c252f6acd80bb026362/benchmarks/queries/clickbench/queries.sql#L22-L28 Utf8ViewNonEmpty, + /// Sparse variable-width predicate shaped like TPC-DS Q83 dynamic + /// `i_item_id` filters, where the predicate column is also projected. + Utf8ViewMissing, + /// Scalar-only part of ClickBench Q37: + /// + /// ```sql + /// WHERE CounterID = 62 + /// AND EventDate BETWEEN ... + /// AND DontCountHits = 0 + /// AND IsRefresh = 0 + /// AND Title <> '' + /// ``` + /// + /// DataFusion `Auto` does not push down the `Title <> ''` string predicate, + /// but it can push down the scalar prefix to defer decoding `Title`. + /// This synthetic predicate keeps that reader-level shape: cheap scalar + /// filter columns protect an expensive `Utf8View` output column. + ClickBenchQ37ScalarPrefix, + /// Shape of ClickBench extended Q6 under DataFusion row-filter pushdown: + /// an early cheap fixed-width predicate can prune almost all rows before a + /// later unprojected variable-width predicate is decoded. + ClickBenchQ6MixedPredicates, + /// Same scalar + variable-width predicate columns as [`Self::ClickBenchQ6MixedPredicates`], + /// but with the variable-width predicate evaluated first. This anchors the + /// static post-filter gate against predicate-order drift. + ClickBenchQ6VarWidthFirst, + /// Shape of ClickBench Q41-like fixed-width filters: sparse fragmented + /// scalar predicates with a cheap fixed-width output projection. + ClickBenchQ41SparseFixedOutput, + /// Shape of ClickBench Q40: multiple cheap scalar predicates, very small + /// output, and one projected predicate column used later by grouping. + ClickBenchQ40ScalarGroupBy, + /// Shape of TPC-DS Q41: a complex OR predicate over dictionary/string-like + /// and scalar columns where predicate evaluation dominates reader time. + TpcdsQ41ComplexOr, + /// Shape of TPC-DS Q20 catalog_sales after dynamic filters: multiple + /// fixed-width predicates where predicate columns are also projected. + TpcdsQ20ProjectedDynamicFilters, + /// Shape of TPC-DS Q21 after dynamic-filter pruning: sparse fragmented + /// fixed-width predicates where the final projection still includes the + /// predicate columns. This protects against choosing selectors for columns + /// that were already decoded/cached by predicate evaluation. + TpcdsQ21ProjectedFixedOutput, + /// Shape of TPC-DS Q2 fact scans: the dynamic filter applies to the date + /// key, the same date key is projected, and an additional fixed-width sales + /// value can still be deferred by predicate pushdown. + TpcdsQ2ProjectedPredicate5Pct, + TpcdsQ2ProjectedPredicate8Pct, + TpcdsQ2ProjectedPredicate10Pct, + TpcdsQ2ProjectedPredicate20Pct, + TpcdsQ2ProjectedPredicate30Pct, + TpcdsQ2ProjectedPredicate40Pct, + TpcdsQ2ProjectedPredicate50Pct, + /// Scalar range predicate shaped like TPC-DS Q9 `ss_quantity BETWEEN ...` + /// subqueries. The selected rows are random and moderately selective, and + /// benchmark projections cover both count-only and numeric aggregate cases. + TpcdsQ9QuantityRange, + /// Exact shape for the projected-predicate moderate-selectivity gate: + /// a clustered 20% timestamp predicate where the predicate column is + /// projected and the deferred output is variable-width. + ProjectedTs8PctClustered, + ProjectedTs20PctClustered, + /// Very sparse projected fixed-width scan shaped like TPC-DS fact-table + /// filters where the predicate column is also needed in the output projection. + TpcdsSparseProjectedFactScan, } impl std::fmt::Display for FilterType { @@ -347,6 +511,50 @@ impl std::fmt::Display for FilterType { FilterType::UnselectiveClustered => "ts < 9000", FilterType::Composite => "float64 > 99.0 AND ts >= 9000", FilterType::Utf8ViewNonEmpty => "utf8View <> ''", + FilterType::Utf8ViewMissing => "utf8View == ''", + FilterType::ClickBenchQ37ScalarPrefix => "int64 == 62 AND ts < 9000", + FilterType::ClickBenchQ6MixedPredicates => "int64 == 9999 AND utf8View <> ''", + FilterType::ClickBenchQ6VarWidthFirst => "utf8View <> '' AND int64 == 9999", + FilterType::ClickBenchQ41SparseFixedOutput => "int64 < 8 AND ts < 9000", + FilterType::ClickBenchQ40ScalarGroupBy => { + "int64 == 62 AND float64 > 10.0 AND ts < 9000" + } + FilterType::TpcdsQ41ComplexOr => { + "(utf8View <> '' AND int64 < 8) OR (ts < 100 AND float64 > 95.0)" + } + FilterType::TpcdsQ20ProjectedDynamicFilters => { + "int64 < 12 AND ts < 9000 projected dynamic filters" + } + FilterType::TpcdsQ21ProjectedFixedOutput => { + "int64 < 8 AND ts < 9000 projected predicates" + } + FilterType::TpcdsQ2ProjectedPredicate10Pct => { + "int64 < 10 projected predicate with fixed output" + } + FilterType::TpcdsQ2ProjectedPredicate5Pct => { + "int64 < 5 projected predicate with fixed output" + } + FilterType::TpcdsQ2ProjectedPredicate8Pct => { + "int64 < 8 projected predicate with fixed output" + } + FilterType::TpcdsQ2ProjectedPredicate20Pct => { + "int64 < 20 projected predicate with fixed output" + } + FilterType::TpcdsQ2ProjectedPredicate30Pct => { + "int64 < 30 projected predicate with fixed output" + } + FilterType::TpcdsQ2ProjectedPredicate40Pct => { + "int64 < 40 projected predicate with fixed output" + } + FilterType::TpcdsQ2ProjectedPredicate50Pct => { + "int64 < 50 projected predicate with fixed output" + } + FilterType::TpcdsQ9QuantityRange => "int64 > 0 AND int64 < 21", + FilterType::ProjectedTs20PctClustered => { + "ts < 2000 projected predicate with utf8 output" + } + FilterType::ProjectedTs8PctClustered => "ts < 800 projected predicate with utf8 output", + FilterType::TpcdsSparseProjectedFactScan => "ts % 1000 == 0", }; write!(f, "{s}") } @@ -401,6 +609,113 @@ impl FilterType { let scalar = StringViewArray::new_scalar(""); neq(array, &scalar) } + FilterType::Utf8ViewMissing => { + let array = batch.column(batch.schema().index_of("utf8View")?); + let scalar = StringViewArray::new_scalar(UTF8_VIEW_MISSING_VALUE); + eq(array, &scalar) + } + // ClickBenchQ37ScalarPrefix: a cheap fragmented scalar predicate + // evaluated before decoding a variable-width output column. + FilterType::ClickBenchQ37ScalarPrefix => { + let int64 = batch.column(batch.schema().index_of("int64")?); + let ts = batch.column(batch.schema().index_of("ts")?); + let counter_match = eq(int64, &Int64Array::new_scalar(62))?; + let date_like_range = lt(ts, &TimestampMillisecondArray::new_scalar(9000))?; + and(&counter_match, &date_like_range) + } + FilterType::ClickBenchQ6MixedPredicates | FilterType::ClickBenchQ6VarWidthFirst => { + let int64 = batch.column(batch.schema().index_of("int64")?); + let utf8 = batch.column(batch.schema().index_of("utf8View")?); + let cheap_prefix = eq(int64, &Int64Array::new_scalar(9999))?; + let string_suffix = neq(utf8, &StringViewArray::new_scalar(""))?; + and(&cheap_prefix, &string_suffix) + } + FilterType::ClickBenchQ41SparseFixedOutput + | FilterType::TpcdsQ21ProjectedFixedOutput => { + let int64 = batch.column(batch.schema().index_of("int64")?); + let ts = batch.column(batch.schema().index_of("ts")?); + let counter_like = lt(int64, &Int64Array::new_scalar(8))?; + let date_like = lt(ts, &TimestampMillisecondArray::new_scalar(9000))?; + and(&counter_like, &date_like) + } + FilterType::ClickBenchQ40ScalarGroupBy => { + let int64 = batch.column(batch.schema().index_of("int64")?); + let float64 = batch.column(batch.schema().index_of("float64")?); + let ts = batch.column(batch.schema().index_of("ts")?); + let counter_match = eq(int64, &Int64Array::new_scalar(62))?; + let width_match = gt(float64, &Float64Array::new_scalar(10.0))?; + let date_like = lt(ts, &TimestampMillisecondArray::new_scalar(9000))?; + and(&and(&counter_match, &width_match)?, &date_like) + } + FilterType::TpcdsQ41ComplexOr => { + let int64 = batch.column(batch.schema().index_of("int64")?); + let float64 = batch.column(batch.schema().index_of("float64")?); + let utf8 = batch.column(batch.schema().index_of("utf8View")?); + let ts = batch.column(batch.schema().index_of("ts")?); + let string_branch = and( + &neq(utf8, &StringViewArray::new_scalar(""))?, + <(int64, &Int64Array::new_scalar(8))?, + )?; + let scalar_branch = and( + <(ts, &TimestampMillisecondArray::new_scalar(100))?, + >(float64, &Float64Array::new_scalar(95.0))?, + )?; + or(&string_branch, &scalar_branch) + } + FilterType::TpcdsQ20ProjectedDynamicFilters => { + let int64 = batch.column(batch.schema().index_of("int64")?); + let ts = batch.column(batch.schema().index_of("ts")?); + let item_like = lt(int64, &Int64Array::new_scalar(12))?; + let date_like = lt(ts, &TimestampMillisecondArray::new_scalar(9000))?; + and(&item_like, &date_like) + } + FilterType::TpcdsQ2ProjectedPredicate5Pct + | FilterType::TpcdsQ2ProjectedPredicate8Pct + | FilterType::TpcdsQ2ProjectedPredicate10Pct + | FilterType::TpcdsQ2ProjectedPredicate20Pct + | FilterType::TpcdsQ2ProjectedPredicate30Pct + | FilterType::TpcdsQ2ProjectedPredicate40Pct + | FilterType::TpcdsQ2ProjectedPredicate50Pct => { + let int64 = batch.column(batch.schema().index_of("int64")?); + let threshold = match self { + FilterType::TpcdsQ2ProjectedPredicate5Pct => 5, + FilterType::TpcdsQ2ProjectedPredicate8Pct => 8, + FilterType::TpcdsQ2ProjectedPredicate10Pct => 10, + FilterType::TpcdsQ2ProjectedPredicate20Pct => 20, + FilterType::TpcdsQ2ProjectedPredicate30Pct => 30, + FilterType::TpcdsQ2ProjectedPredicate40Pct => 40, + FilterType::TpcdsQ2ProjectedPredicate50Pct => 50, + _ => unreachable!(), + }; + lt(int64, &Int64Array::new_scalar(threshold)) + } + FilterType::TpcdsQ9QuantityRange => { + let int64 = batch.column(batch.schema().index_of("int64")?); + let lower = gt(int64, &Int64Array::new_scalar(0))?; + let upper = lt(int64, &Int64Array::new_scalar(21))?; + and(&lower, &upper) + } + FilterType::ProjectedTs8PctClustered => { + let ts = batch.column(batch.schema().index_of("ts")?); + lt(ts, &TimestampMillisecondArray::new_scalar(800)) + } + FilterType::ProjectedTs20PctClustered => { + let ts = batch.column(batch.schema().index_of("ts")?); + lt(ts, &TimestampMillisecondArray::new_scalar(2000)) + } + FilterType::TpcdsSparseProjectedFactScan => { + let ts = batch + .column(batch.schema().index_of("ts")?) + .as_any() + .downcast_ref::() + .unwrap(); + Ok(BooleanArray::from( + ts.values() + .iter() + .map(|value| value % 1000 == 0) + .collect::>(), + )) + } } } @@ -414,7 +729,26 @@ impl FilterType { FilterType::UnselectiveUnclustered => &[1], FilterType::UnselectiveClustered => &[3], FilterType::Composite => &[1, 3], // Use float64 column and ts column as representative for composite - FilterType::Utf8ViewNonEmpty => &[2], + FilterType::Utf8ViewNonEmpty | FilterType::Utf8ViewMissing => &[2], + FilterType::ClickBenchQ37ScalarPrefix => &[0, 3], + FilterType::ClickBenchQ6MixedPredicates | FilterType::ClickBenchQ6VarWidthFirst => { + &[0, 2] + } + FilterType::ClickBenchQ40ScalarGroupBy => &[0, 1, 3], + FilterType::ClickBenchQ41SparseFixedOutput + | FilterType::TpcdsQ20ProjectedDynamicFilters + | FilterType::TpcdsQ21ProjectedFixedOutput => &[0, 3], + FilterType::TpcdsQ41ComplexOr => &[0, 1, 2, 3], + FilterType::TpcdsQ2ProjectedPredicate5Pct + | FilterType::TpcdsQ2ProjectedPredicate8Pct + | FilterType::TpcdsQ2ProjectedPredicate10Pct + | FilterType::TpcdsQ2ProjectedPredicate20Pct + | FilterType::TpcdsQ2ProjectedPredicate30Pct + | FilterType::TpcdsQ2ProjectedPredicate40Pct + | FilterType::TpcdsQ2ProjectedPredicate50Pct => &[0], + FilterType::TpcdsQ9QuantityRange => &[0], + FilterType::ProjectedTs8PctClustered | FilterType::ProjectedTs20PctClustered => &[3], + FilterType::TpcdsSparseProjectedFactScan => &[3], } } } @@ -449,17 +783,8 @@ fn benchmark_filters_and_projections(c: &mut Criterion) { for filter_type in filter_types { for proj_case in &projection_cases { - // All indices corresponding to the 10 columns. - let all_indices = vec![0, 1, 2, 3]; let filter_col = filter_type.filter_projection().to_vec(); - // For the projection, either select all columns or exclude the filter column(s). - let output_projection: Vec = match proj_case { - ProjectionCase::AllColumns => all_indices.clone(), - ProjectionCase::ExcludeFilterColumn => all_indices - .into_iter() - .filter(|i| !filter_col.contains(i)) - .collect(), - }; + let output_projection = output_projection_for(filter_type, proj_case); let reader = InMemoryReader::try_new(&parquet_file).unwrap(); let metadata = Arc::clone(reader.metadata()); @@ -510,6 +835,820 @@ fn benchmark_filters_and_projections(c: &mut Criterion) { } } +/// Compare full scan plus post-filtering against row-level pushdown strategies. +/// +/// This group is intentionally sync-only and smaller than +/// [`benchmark_filters_and_projections`]. It tracks the cases most likely to +/// inform a future default `Auto` policy: selective random filters, clustered +/// filters, ClickBench-like string filters, and the forced selector strategy +/// that originally motivated apache/arrow-rs#8565. +fn benchmark_sync_strategy_matrix(c: &mut Criterion) { + let parquet_file = Bytes::from(write_parquet_file()); + let filter_types = [ + FilterType::SelectiveUnclustered, + FilterType::ModeratelySelectiveClustered, + FilterType::ModeratelySelectiveUnclustered, + FilterType::Utf8ViewNonEmpty, + ]; + let strategies = [ + SyncStrategy::FullPostFilter, + SyncStrategy::PushdownAuto, + SyncStrategy::PushdownSelectors, + SyncStrategy::PushdownMask, + ]; + + let mut group = c.benchmark_group("arrow_reader_row_filter_strategy_matrix"); + + for filter_type in filter_types { + for projection_case in [ + ProjectionCase::AllColumns, + ProjectionCase::ExcludeFilterColumn, + ] { + let reader = InMemoryReader::try_new(&parquet_file).unwrap(); + let metadata = Arc::clone(reader.metadata()); + let schema_descr = metadata.file_metadata().schema_descr(); + let output_projection = output_projection_for(filter_type, &projection_case); + let read_projection = full_post_filter_read_projection(filter_type, &output_projection); + let output_column_names = projection_names(&output_projection); + let projection_mask = ProjectionMask::roots(schema_descr, output_projection); + let read_projection_mask = ProjectionMask::roots(schema_descr, read_projection); + let pred_mask = ProjectionMask::roots( + schema_descr, + filter_type.filter_projection().iter().copied(), + ); + + for strategy in strategies { + let bench_id = BenchmarkId::new( + format!("{filter_type}/{projection_case}"), + strategy.to_string(), + ); + + group.bench_function(bench_id, |b| { + b.iter(|| { + let reader = reader.clone(); + let pred_mask = pred_mask.clone(); + let projection_mask = projection_mask.clone(); + let read_projection_mask = read_projection_mask.clone(); + let output_column_names = output_column_names.clone(); + + match strategy { + SyncStrategy::FullPostFilter => benchmark_sync_reader_post_filter( + reader, + read_projection_mask, + output_column_names, + filter_type, + ), + SyncStrategy::PushdownAuto => { + let row_filter = row_filter_for(filter_type, pred_mask); + benchmark_sync_reader_with_policy( + reader, + projection_mask, + row_filter, + RowSelectionPolicy::default(), + ) + } + SyncStrategy::PushdownSelectors => { + let row_filter = row_filter_for(filter_type, pred_mask); + benchmark_sync_reader_with_policy( + reader, + projection_mask, + row_filter, + RowSelectionPolicy::Selectors, + ) + } + SyncStrategy::PushdownMask => { + let row_filter = row_filter_for(filter_type, pred_mask); + benchmark_sync_reader_with_policy( + reader, + projection_mask, + row_filter, + RowSelectionPolicy::Mask, + ) + } + } + }); + }); + } + } + } +} + +/// Compare async full scan plus post-filtering against async row-level pushdown +/// strategies. This is the matrix that exercises the current reader `Auto` +/// policy through the async stream backed by the push decoder row-group pipeline. +fn benchmark_async_strategy_matrix(c: &mut Criterion) { + let parquet_file = Bytes::from(write_parquet_file()); + let filter_types = [ + FilterType::SelectiveUnclustered, + FilterType::ModeratelySelectiveClustered, + FilterType::ModeratelySelectiveUnclustered, + FilterType::Utf8ViewNonEmpty, + ]; + let strategies = [ + AsyncStrategy::FullPostFilter, + AsyncStrategy::PushdownAuto, + AsyncStrategy::PushdownSelectors, + AsyncStrategy::PushdownMask, + ]; + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let mut group = c.benchmark_group("arrow_reader_row_filter_async_strategy_matrix"); + + for filter_type in filter_types { + for projection_case in [ + ProjectionCase::AllColumns, + ProjectionCase::ExcludeFilterColumn, + ] { + let reader = InMemoryReader::try_new(&parquet_file).unwrap(); + let metadata = Arc::clone(reader.metadata()); + let schema_descr = metadata.file_metadata().schema_descr(); + let output_projection = output_projection_for(filter_type, &projection_case); + let read_projection = full_post_filter_read_projection(filter_type, &output_projection); + let output_column_names = projection_names(&output_projection); + let projection_mask = ProjectionMask::roots(schema_descr, output_projection); + let read_projection_mask = ProjectionMask::roots(schema_descr, read_projection); + let pred_mask = ProjectionMask::roots( + schema_descr, + filter_type.filter_projection().iter().copied(), + ); + + for strategy in strategies { + let bench_id = BenchmarkId::new( + format!("{filter_type}/{projection_case}"), + strategy.to_string(), + ); + let rt_captured = rt.handle().clone(); + + group.bench_function(bench_id, |b| { + b.iter(|| { + let reader = reader.clone(); + let pred_mask = pred_mask.clone(); + let projection_mask = projection_mask.clone(); + let read_projection_mask = read_projection_mask.clone(); + let output_column_names = output_column_names.clone(); + + rt_captured.block_on(async { + match strategy { + AsyncStrategy::FullPostFilter => { + benchmark_async_reader_post_filter( + reader, + read_projection_mask, + output_column_names, + filter_type, + ) + .await + } + AsyncStrategy::PushdownAuto => { + let row_filter = row_filter_for(filter_type, pred_mask); + benchmark_async_reader_with_policy( + reader, + projection_mask, + row_filter, + RowSelectionPolicy::default(), + ) + .await + } + AsyncStrategy::PushdownSelectors => { + let row_filter = row_filter_for(filter_type, pred_mask); + benchmark_async_reader_with_policy( + reader, + projection_mask, + row_filter, + RowSelectionPolicy::Selectors, + ) + .await + } + AsyncStrategy::PushdownMask => { + let row_filter = row_filter_for(filter_type, pred_mask); + benchmark_async_reader_with_policy( + reader, + projection_mask, + row_filter, + RowSelectionPolicy::Mask, + ) + .await + } + } + }) + }); + }); + } + } + } +} + +/// A small async-only matrix that isolates the cases most relevant to the +/// row-filter Auto policy. This is intentionally narrower than +/// [`benchmark_async_strategy_matrix`]: it keeps the benchmark output focused +/// on cases where later PRs may teach `Auto` to switch execution modes or +/// explicitly keep predicate pushdown. +/// +/// The `profile_*` cases are derived from DataFusion ClickBench and TPC-DS +/// comparisons. They keep the reader-level shapes worth tracking while +/// excluding query regressions that did not construct a Parquet `RowFilter`. +fn benchmark_async_auto_policy_focus(c: &mut Criterion) { + const SMALL_TOTAL_ROWS: usize = 20_000; + const SMALL_ROW_GROUP_SIZE: usize = 5_000; + + let parquet_file = Bytes::from(write_parquet_file()); + let small_parquet_file = Bytes::from(write_parquet_file_with_rows( + SMALL_TOTAL_ROWS, + SMALL_ROW_GROUP_SIZE, + )); + let cases = [ + AsyncFocusCase::new( + "utf8_non_empty", + parquet_file.clone(), + FilterType::Utf8ViewNonEmpty, + ProjectionCase::ExcludeFilterColumn, + ), + AsyncFocusCase::new( + "utf8_non_empty", + parquet_file.clone(), + FilterType::Utf8ViewNonEmpty, + ProjectionCase::AllColumns, + ), + AsyncFocusCase::new( + "high_selectivity_float64", + parquet_file.clone(), + FilterType::UnselectiveUnclustered, + ProjectionCase::ExcludeFilterColumn, + ), + AsyncFocusCase::new( + "high_selectivity_ts_clustered", + parquet_file.clone(), + FilterType::UnselectiveClustered, + ProjectionCase::ExcludeFilterColumn, + ), + AsyncFocusCase::new( + "fragmented_int64_10pct", + parquet_file.clone(), + FilterType::ModeratelySelectiveUnclustered, + ProjectionCase::ExcludeFilterColumn, + ), + AsyncFocusCase::new( + "selective_float64_1pct", + parquet_file.clone(), + FilterType::SelectiveUnclustered, + ProjectionCase::ExcludeFilterColumn, + ), + AsyncFocusCase::new( + "profile_q37_scalar_utf8", + parquet_file.clone(), + FilterType::ClickBenchQ37ScalarPrefix, + ProjectionCase::Utf8Only, + ), + // Historical Q6 focus case: cheap fixed-width predicate before the + // unprojected variable-width predicate. + AsyncFocusCase::new( + "profile_q6_mixed_predicates", + parquet_file.clone(), + FilterType::ClickBenchQ6MixedPredicates, + ProjectionCase::Float64Only, + ), + AsyncFocusCase::new( + "profile_varwidth_then_fixed_prefix", + parquet_file.clone(), + FilterType::ClickBenchQ6VarWidthFirst, + ProjectionCase::Float64Only, + ), + AsyncFocusCase::new( + "profile_q40_scalar_group_by", + parquet_file.clone(), + FilterType::ClickBenchQ40ScalarGroupBy, + ProjectionCase::Float64AndTs, + ), + AsyncFocusCase::new( + "profile_q41_sparse_fixed_output", + parquet_file.clone(), + FilterType::ClickBenchQ41SparseFixedOutput, + ProjectionCase::Float64Only, + ), + AsyncFocusCase::new( + "profile_tpcds_q41_complex_or", + parquet_file.clone(), + FilterType::TpcdsQ41ComplexOr, + ProjectionCase::Float64Only, + ), + AsyncFocusCase::new( + "profile_tpcds_q20_projected_dynamic_filters", + parquet_file.clone(), + FilterType::TpcdsQ20ProjectedDynamicFilters, + ProjectionCase::FixedColumns, + ), + AsyncFocusCase::new( + "profile_q21_projected_predicate_fixed_output", + parquet_file.clone(), + FilterType::TpcdsQ21ProjectedFixedOutput, + ProjectionCase::FixedColumns, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_5pct", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate5Pct, + ProjectionCase::Int64AndFloat64, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_8pct_filter_only", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate8Pct, + ProjectionCase::FilterColumnsOnly, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_8pct_fixed_output", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate8Pct, + ProjectionCase::Int64AndFloat64, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_8pct_varwidth_output", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate8Pct, + ProjectionCase::Int64AndUtf8, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_10pct", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate10Pct, + ProjectionCase::Int64AndFloat64, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_20pct", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate20Pct, + ProjectionCase::Int64AndFloat64, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_20pct_varwidth_output", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate20Pct, + ProjectionCase::Int64AndUtf8, + ), + AsyncFocusCase::new( + "profile_projected_ts_8pct_fixed_output", + parquet_file.clone(), + FilterType::ProjectedTs8PctClustered, + ProjectionCase::Float64AndTs, + ), + AsyncFocusCase::new( + "profile_projected_ts_8pct_varwidth_output", + parquet_file.clone(), + FilterType::ProjectedTs8PctClustered, + ProjectionCase::TsAndUtf8, + ), + AsyncFocusCase::new( + "profile_projected_ts_20pct_fixed_output", + parquet_file.clone(), + FilterType::ProjectedTs20PctClustered, + ProjectionCase::Float64AndTs, + ), + AsyncFocusCase::new( + "profile_projected_ts_20pct_varwidth_output", + parquet_file.clone(), + FilterType::ProjectedTs20PctClustered, + ProjectionCase::TsAndUtf8, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_30pct", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate30Pct, + ProjectionCase::Int64AndFloat64, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_40pct", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate40Pct, + ProjectionCase::Int64AndFloat64, + ), + AsyncFocusCase::new( + "profile_q2_projected_predicate_50pct", + parquet_file.clone(), + FilterType::TpcdsQ2ProjectedPredicate50Pct, + ProjectionCase::Int64AndFloat64, + ), + AsyncFocusCase::new( + "profile_q1_count_only", + parquet_file.clone(), + FilterType::ClickBenchQ41SparseFixedOutput, + ProjectionCase::CountOnly, + ), + AsyncFocusCase::new( + "profile_q19_no_defer", + parquet_file.clone(), + FilterType::PointLookup, + ProjectionCase::FilterColumnsOnly, + ), + AsyncFocusCase::new( + "profile_sparse_fixed_deferred_output", + parquet_file.clone(), + FilterType::PointLookup, + ProjectionCase::Float64Only, + ), + AsyncFocusCase::new( + "profile_tpcds_sparse_projected_fact_scan", + parquet_file.clone(), + FilterType::TpcdsSparseProjectedFactScan, + ProjectionCase::FixedColumns, + ), + AsyncFocusCase::new( + "profile_q83_sparse_utf8_projected", + parquet_file.clone(), + FilterType::Utf8ViewMissing, + ProjectionCase::AllColumns, + ), + AsyncFocusCase::new( + "profile_small_scalar_no_defer", + small_parquet_file.clone(), + FilterType::ModeratelySelectiveUnclustered, + ProjectionCase::FilterColumnsOnly, + ), + AsyncFocusCase::new( + "profile_small_q37_scalar_utf8", + small_parquet_file, + FilterType::ClickBenchQ37ScalarPrefix, + ProjectionCase::Utf8Only, + ), + AsyncFocusCase::new( + "profile_q9_quantity_count", + parquet_file.clone(), + FilterType::TpcdsQ9QuantityRange, + ProjectionCase::FilterColumnsOnly, + ), + AsyncFocusCase::new( + "profile_q9_quantity_avg", + parquet_file, + FilterType::TpcdsQ9QuantityRange, + ProjectionCase::Float64Only, + ), + ]; + let strategies = [ + AsyncStrategy::FullPostFilter, + AsyncStrategy::PushdownAuto, + AsyncStrategy::PushdownMask, + AsyncStrategy::PushdownSelectors, + ]; + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let mut group = c.benchmark_group("arrow_reader_row_filter_async_auto_policy_focus"); + + for case in cases { + benchmark_async_focus_case(&mut group, &rt, case, &strategies); + } +} + +/// Isolate projected scans that do not construct a [`RowFilter`]. +/// +/// This tracks the reader-level shape seen in TPC-DS Q83 return-table scans: +/// a narrow primitive projection where row-level pushdown metrics are zero. +/// It deliberately lives outside the adaptive-materialization matrix because there is no +/// filter strategy to choose. +fn benchmark_projection_scan_focus(c: &mut Criterion) { + let parquet_file = Bytes::from(write_parquet_file()); + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let mut group = c.benchmark_group("arrow_reader_projection_scan_focus"); + + let case_name = "profile_q83_return_scan_primitives"; + let projection = vec![0, 1, 3]; + let reader = InMemoryReader::try_new(&parquet_file).unwrap(); + let metadata = Arc::clone(reader.metadata()); + let schema_descr = metadata.file_metadata().schema_descr(); + let projection_mask = ProjectionMask::roots(schema_descr, projection); + + let bench_id = BenchmarkId::new(case_name, "async"); + let rt_captured = rt.handle().clone(); + group.bench_function(bench_id, |b| { + b.iter(|| { + let reader = reader.clone(); + let projection_mask = projection_mask.clone(); + rt_captured.block_on(benchmark_async_reader_projected(reader, projection_mask)); + }); + }); + + let bench_id = BenchmarkId::new(case_name, "sync"); + group.bench_function(bench_id, |b| { + b.iter(|| { + let reader = reader.clone(); + let projection_mask = projection_mask.clone(); + benchmark_sync_reader_projected(reader, projection_mask); + }); + }); +} + +struct AsyncFocusCase { + case_name: &'static str, + parquet_file: Bytes, + filter_type: FilterType, + projection_case: ProjectionCase, +} + +impl AsyncFocusCase { + fn new( + case_name: &'static str, + parquet_file: Bytes, + filter_type: FilterType, + projection_case: ProjectionCase, + ) -> Self { + Self { + case_name, + parquet_file, + filter_type, + projection_case, + } + } +} + +fn benchmark_async_focus_case( + group: &mut BenchmarkGroup<'_, WallTime>, + rt: &tokio::runtime::Runtime, + case: AsyncFocusCase, + strategies: &[AsyncStrategy], +) { + let AsyncFocusCase { + case_name, + parquet_file, + filter_type, + projection_case, + } = case; + + let reader = InMemoryReader::try_new(&parquet_file).unwrap(); + let metadata = Arc::clone(reader.metadata()); + let schema_descr = metadata.file_metadata().schema_descr(); + let output_projection = output_projection_for(filter_type, &projection_case); + let read_projection = full_post_filter_read_projection(filter_type, &output_projection); + let output_column_names = projection_names(&output_projection); + let projection_mask = ProjectionMask::roots(schema_descr, output_projection); + let read_projection_mask = ProjectionMask::roots(schema_descr, read_projection); + let pred_mask = ProjectionMask::roots( + schema_descr, + filter_type.filter_projection().iter().copied(), + ); + let q6_int64_pred_mask = ProjectionMask::roots(schema_descr, [0]); + let q6_utf8_pred_mask = ProjectionMask::roots(schema_descr, [2]); + let q41_int64_pred_mask = ProjectionMask::roots(schema_descr, [0]); + let q41_ts_pred_mask = ProjectionMask::roots(schema_descr, [3]); + let q40_float64_pred_mask = ProjectionMask::roots(schema_descr, [1]); + + for strategy in strategies.iter().copied() { + let bench_id = BenchmarkId::new( + format!("{case_name}/{projection_case}"), + strategy.to_string(), + ); + let rt_captured = rt.handle().clone(); + + group.bench_function(bench_id, |b| { + b.iter(|| { + let reader = reader.clone(); + let pred_mask = pred_mask.clone(); + let q6_int64_pred_mask = q6_int64_pred_mask.clone(); + let q6_utf8_pred_mask = q6_utf8_pred_mask.clone(); + let q41_int64_pred_mask = q41_int64_pred_mask.clone(); + let q41_ts_pred_mask = q41_ts_pred_mask.clone(); + let q40_float64_pred_mask = q40_float64_pred_mask.clone(); + let projection_mask = projection_mask.clone(); + let read_projection_mask = read_projection_mask.clone(); + let output_column_names = output_column_names.clone(); + + rt_captured.block_on(async { + match strategy { + AsyncStrategy::FullPostFilter => { + benchmark_async_reader_post_filter( + reader, + read_projection_mask, + output_column_names, + filter_type, + ) + .await + } + AsyncStrategy::PushdownAuto => { + let row_filter = row_filter_for_focus_case( + filter_type, + pred_mask, + q6_int64_pred_mask, + q6_utf8_pred_mask, + q41_int64_pred_mask, + q41_ts_pred_mask, + q40_float64_pred_mask, + ); + benchmark_async_reader_with_policy( + reader, + projection_mask, + row_filter, + RowSelectionPolicy::default(), + ) + .await + } + AsyncStrategy::PushdownSelectors => { + let row_filter = row_filter_for_focus_case( + filter_type, + pred_mask, + q6_int64_pred_mask, + q6_utf8_pred_mask, + q41_int64_pred_mask, + q41_ts_pred_mask, + q40_float64_pred_mask, + ); + benchmark_async_reader_with_policy( + reader, + projection_mask, + row_filter, + RowSelectionPolicy::Selectors, + ) + .await + } + AsyncStrategy::PushdownMask => { + let row_filter = row_filter_for_focus_case( + filter_type, + pred_mask, + q6_int64_pred_mask, + q6_utf8_pred_mask, + q41_int64_pred_mask, + q41_ts_pred_mask, + q40_float64_pred_mask, + ); + benchmark_async_reader_with_policy( + reader, + projection_mask, + row_filter, + RowSelectionPolicy::Mask, + ) + .await + } + } + }) + }); + }); + } +} + +fn output_projection_for(filter_type: FilterType, projection_case: &ProjectionCase) -> Vec { + let filter_columns = filter_type.filter_projection(); + match projection_case { + ProjectionCase::AllColumns | ProjectionCase::ExcludeFilterColumn => COLUMN_NAMES + .iter() + .enumerate() + .map(|(idx, _)| idx) + .filter(move |idx| { + matches!(projection_case, ProjectionCase::AllColumns) + || !filter_columns.contains(idx) + }) + .collect(), + ProjectionCase::FilterColumnsOnly => filter_columns.to_vec(), + ProjectionCase::CountOnly => vec![], + ProjectionCase::FixedColumns => vec![0, 1, 3], + ProjectionCase::Float64AndTs => vec![1, 3], + ProjectionCase::Float64Only => vec![1], + ProjectionCase::Int64AndFloat64 => vec![0, 1], + ProjectionCase::Int64AndUtf8 => vec![0, 2], + ProjectionCase::TsAndUtf8 => vec![2, 3], + ProjectionCase::Utf8Only => vec![2], + } +} + +fn full_post_filter_read_projection( + filter_type: FilterType, + output_projection: &[usize], +) -> Vec { + let mut read_projection = output_projection.to_vec(); + for filter_idx in filter_type.filter_projection() { + if !read_projection.contains(filter_idx) { + read_projection.push(*filter_idx); + } + } + read_projection.sort_unstable(); + read_projection +} + +fn projection_names(projection: &[usize]) -> Vec<&'static str> { + projection.iter().map(|idx| COLUMN_NAMES[*idx]).collect() +} + +fn row_filter_for(filter_type: FilterType, pred_mask: ProjectionMask) -> RowFilter { + let filter = ArrowPredicateFn::new(pred_mask, move |batch| filter_type.filter_batch(&batch)); + RowFilter::new(vec![Box::new(filter)]) +} + +fn row_filter_for_focus_case( + filter_type: FilterType, + pred_mask: ProjectionMask, + q6_int64_pred_mask: ProjectionMask, + q6_utf8_pred_mask: ProjectionMask, + q41_int64_pred_mask: ProjectionMask, + q41_ts_pred_mask: ProjectionMask, + q40_float64_pred_mask: ProjectionMask, +) -> RowFilter { + match filter_type { + FilterType::ClickBenchQ6MixedPredicates | FilterType::ClickBenchQ6VarWidthFirst => { + let int64_filter = + ArrowPredicateFn::new(q6_int64_pred_mask, move |batch: RecordBatch| { + let int64 = batch.column(batch.schema().index_of("int64")?); + eq(int64, &Int64Array::new_scalar(9999)) + }); + let utf8_filter = + ArrowPredicateFn::new(q6_utf8_pred_mask, move |batch: RecordBatch| { + let utf8 = batch.column(batch.schema().index_of("utf8View")?); + neq(utf8, &StringViewArray::new_scalar("")) + }); + + match filter_type { + FilterType::ClickBenchQ6MixedPredicates => { + RowFilter::new(vec![Box::new(int64_filter), Box::new(utf8_filter)]) + } + FilterType::ClickBenchQ6VarWidthFirst => { + RowFilter::new(vec![Box::new(utf8_filter), Box::new(int64_filter)]) + } + _ => unreachable!(), + } + } + FilterType::ClickBenchQ40ScalarGroupBy => { + let int64_filter = + ArrowPredicateFn::new(q41_int64_pred_mask, move |batch: RecordBatch| { + let int64 = batch.column(batch.schema().index_of("int64")?); + eq(int64, &Int64Array::new_scalar(62)) + }); + let float64_filter = + ArrowPredicateFn::new(q40_float64_pred_mask, move |batch: RecordBatch| { + let float64 = batch.column(batch.schema().index_of("float64")?); + gt(float64, &Float64Array::new_scalar(10.0)) + }); + let ts_filter = ArrowPredicateFn::new(q41_ts_pred_mask, move |batch: RecordBatch| { + let ts = batch.column(batch.schema().index_of("ts")?); + lt(ts, &TimestampMillisecondArray::new_scalar(9000)) + }); + + RowFilter::new(vec![ + Box::new(int64_filter), + Box::new(float64_filter), + Box::new(ts_filter), + ]) + } + FilterType::ClickBenchQ41SparseFixedOutput + | FilterType::TpcdsQ20ProjectedDynamicFilters + | FilterType::TpcdsQ21ProjectedFixedOutput => { + let int64_filter = + ArrowPredicateFn::new(q41_int64_pred_mask, move |batch: RecordBatch| { + let int64 = batch.column(batch.schema().index_of("int64")?); + let scalar = match filter_type { + FilterType::TpcdsQ20ProjectedDynamicFilters => 12, + _ => 8, + }; + lt(int64, &Int64Array::new_scalar(scalar)) + }); + let ts_filter = ArrowPredicateFn::new(q41_ts_pred_mask, move |batch: RecordBatch| { + let ts = batch.column(batch.schema().index_of("ts")?); + lt(ts, &TimestampMillisecondArray::new_scalar(9000)) + }); + + RowFilter::new(vec![Box::new(int64_filter), Box::new(ts_filter)]) + } + _ => row_filter_for(filter_type, pred_mask), + } +} + +#[derive(Clone, Copy)] +enum NestedFilterType { + AlwaysTrueTag, + TagNotZero, +} + +impl std::fmt::Display for NestedFilterType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::AlwaysTrueTag => write!(f, "always_true_tag"), + Self::TagNotZero => write!(f, "tag_not_zero"), + } + } +} + +impl NestedFilterType { + fn filter_batch(self, batch: &RecordBatch) -> arrow::error::Result { + match self { + Self::AlwaysTrueTag => Ok(BooleanArray::from(vec![true; batch.num_rows()])), + Self::TagNotZero => { + let tag = batch.column(batch.schema().index_of("tag")?); + let scalar = StringViewArray::new_scalar("tag_0"); + neq(tag, &scalar) + } + } + } +} + +fn nested_row_filter_for(filter_type: NestedFilterType, pred_mask: ProjectionMask) -> RowFilter { + let filter = ArrowPredicateFn::new(pred_mask, move |batch| filter_type.filter_batch(&batch)); + RowFilter::new(vec![Box::new(filter)]) +} + /// Use async API async fn benchmark_async_reader( reader: InMemoryReader, @@ -529,6 +1668,94 @@ async fn benchmark_async_reader( } } +async fn benchmark_async_reader_with_policy( + reader: InMemoryReader, + projection_mask: ProjectionMask, + row_filter: RowFilter, + row_selection_policy: RowSelectionPolicy, +) { + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .with_batch_size(8192) + .with_projection(projection_mask) + .with_row_filter(row_filter) + .with_row_selection_policy(row_selection_policy) + .build() + .unwrap(); + while let Some(b) = stream.next().await { + b.unwrap(); // consume the batches, no buffering + } +} + +async fn benchmark_async_reader_post_filter( + reader: InMemoryReader, + read_projection: ProjectionMask, + output_column_names: Vec<&'static str>, + filter_type: FilterType, +) { + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .with_batch_size(8192) + .with_projection(read_projection) + .build() + .unwrap(); + + while let Some(b) = stream.next().await { + let batch = b.unwrap(); + let filter = filter_type.filter_batch(&batch).unwrap(); + let filtered = arrow_select::filter::filter_record_batch(&batch, &filter).unwrap(); + let output_projection = output_column_names + .iter() + .map(|name| filtered.schema().index_of(name).unwrap()) + .collect::>(); + let output = filtered.project(&output_projection).unwrap(); + std::hint::black_box(output.num_rows()); + } +} + +async fn benchmark_async_reader_post_filter_nested( + reader: InMemoryReader, + read_projection: ProjectionMask, + output_column_names: &[&str], + filter_type: NestedFilterType, +) { + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .with_batch_size(8192) + .with_projection(read_projection) + .build() + .unwrap(); + + while let Some(b) = stream.next().await { + let batch = b.unwrap(); + let filter = filter_type.filter_batch(&batch).unwrap(); + let filtered = arrow_select::filter::filter_record_batch(&batch, &filter).unwrap(); + let output_projection = output_column_names + .iter() + .map(|name| filtered.schema().index_of(name).unwrap()) + .collect::>(); + let output = filtered.project(&output_projection).unwrap(); + std::hint::black_box(output.num_rows()); + } +} + +async fn benchmark_async_reader_projected(reader: InMemoryReader, projection_mask: ProjectionMask) { + let mut stream = ParquetRecordBatchStreamBuilder::new(reader) + .await + .unwrap() + .with_batch_size(8192) + .with_projection(projection_mask) + .build() + .unwrap(); + while let Some(b) = stream.next().await { + let batch = b.unwrap(); + std::hint::black_box(batch.num_rows()); + } +} + /// Like [`benchmark_async_reader`] but also threads `with_limit(limit)` into /// the stream builder. Used by the `LIMIT` benchmark below. async fn benchmark_async_reader_with_limit( @@ -569,6 +1796,65 @@ fn benchmark_sync_reader( } } +fn benchmark_sync_reader_with_policy( + reader: InMemoryReader, + projection_mask: ProjectionMask, + row_filter: RowFilter, + row_selection_policy: RowSelectionPolicy, +) { + let stream = ParquetRecordBatchReaderBuilder::try_new(reader.into_inner()) + .unwrap() + .with_batch_size(8192) + .with_projection(projection_mask) + .with_row_filter(row_filter) + .with_row_selection_policy(row_selection_policy) + .build() + .unwrap(); + for b in stream { + b.unwrap(); // consume the batches, no buffering + } +} + +fn benchmark_sync_reader_post_filter( + reader: InMemoryReader, + read_projection: ProjectionMask, + output_column_names: Vec<&'static str>, + filter_type: FilterType, +) { + let stream = ParquetRecordBatchReaderBuilder::try_new(reader.into_inner()) + .unwrap() + .with_batch_size(8192) + .with_projection(read_projection) + .build() + .unwrap(); + + for b in stream { + let batch = b.unwrap(); + let filter = filter_type.filter_batch(&batch).unwrap(); + let filtered = arrow_select::filter::filter_record_batch(&batch, &filter).unwrap(); + let output_projection = output_column_names + .iter() + .map(|name| filtered.schema().index_of(name).unwrap()) + .collect::>(); + let output = filtered.project(&output_projection).unwrap(); + std::hint::black_box(output.num_rows()); + } +} + +fn benchmark_sync_reader_projected(reader: InMemoryReader, projection_mask: ProjectionMask) { + let stream = ParquetRecordBatchReaderBuilder::try_new(reader.into_inner()) + .unwrap() + .with_batch_size(8192) + .with_projection(projection_mask) + .build() + .unwrap(); + + for b in stream { + let batch = b.unwrap(); + std::hint::black_box(batch.num_rows()); + } +} + /// Adapter to read asynchronously from in memory bytes and always loads the /// metadata with page indexes. #[derive(Debug, Clone)] @@ -636,7 +1922,6 @@ fn benchmark_filters_with_limit(c: &mut Criterion) { ProjectionCase::AllColumns, ProjectionCase::ExcludeFilterColumn, ]; - let all_indices = vec![0, 1, 2, 3]; let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -648,14 +1933,7 @@ fn benchmark_filters_with_limit(c: &mut Criterion) { for filter_type in filter_types { for proj_case in &projection_cases { let filter_col = filter_type.filter_projection().to_vec(); - let output_projection: Vec = match proj_case { - ProjectionCase::AllColumns => all_indices.clone(), - ProjectionCase::ExcludeFilterColumn => all_indices - .iter() - .copied() - .filter(|i| !filter_col.contains(i)) - .collect(), - }; + let output_projection = output_projection_for(filter_type, proj_case); let reader = InMemoryReader::try_new(&parquet_file).unwrap(); let metadata = Arc::clone(reader.metadata()); @@ -693,9 +1971,109 @@ fn benchmark_filters_with_limit(c: &mut Criterion) { } } +/// Focused nested-output case for comparing manual post-filtering against +/// row-filter pushdown policies. +/// +/// The predicate column is an unprojected variable-width scalar column, and the +/// output is a whole nested `Struct` root. This isolates the reader case enabled +/// by root-aware post-filter projection without requiring recursive nested-child +/// projection. +fn benchmark_async_nested_post_filter_focus(c: &mut Criterion) { + let parquet_file = Bytes::from(write_nested_parquet_file_with_rows( + TOTAL_ROWS, + ROW_GROUP_SIZE, + )); + let strategies = [ + AsyncStrategy::FullPostFilter, + AsyncStrategy::PushdownAuto, + AsyncStrategy::PushdownMask, + AsyncStrategy::PushdownSelectors, + ]; + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let mut group = c.benchmark_group("arrow_reader_row_filter_async_nested_post_filter_focus"); + let reader = InMemoryReader::try_new(&parquet_file).unwrap(); + let metadata = Arc::clone(reader.metadata()); + let schema_descr = metadata.file_metadata().schema_descr(); + let output_projection = ProjectionMask::columns(schema_descr, ["payload"]); + let read_projection = ProjectionMask::columns(schema_descr, ["tag", "payload"]); + let pred_mask = ProjectionMask::columns(schema_descr, ["tag"]); + let filter_cases = [ + NestedFilterType::AlwaysTrueTag, + NestedFilterType::TagNotZero, + ]; + + for filter_case in filter_cases { + for strategy in strategies { + let bench_id = BenchmarkId::new( + format!("whole_struct_output/{filter_case}"), + strategy.to_string(), + ); + let rt_captured = rt.handle().clone(); + group.bench_function(bench_id, |b| { + b.iter(|| { + let reader = reader.clone(); + let pred_mask = pred_mask.clone(); + let output_projection = output_projection.clone(); + let read_projection = read_projection.clone(); + rt_captured.block_on(async { + match strategy { + AsyncStrategy::FullPostFilter => { + benchmark_async_reader_post_filter_nested( + reader, + read_projection, + &["payload"], + filter_case, + ) + .await + } + AsyncStrategy::PushdownAuto => { + benchmark_async_reader_with_policy( + reader, + output_projection, + nested_row_filter_for(filter_case, pred_mask), + RowSelectionPolicy::default(), + ) + .await + } + AsyncStrategy::PushdownSelectors => { + benchmark_async_reader_with_policy( + reader, + output_projection, + nested_row_filter_for(filter_case, pred_mask), + RowSelectionPolicy::Selectors, + ) + .await + } + AsyncStrategy::PushdownMask => { + benchmark_async_reader_with_policy( + reader, + output_projection, + nested_row_filter_for(filter_case, pred_mask), + RowSelectionPolicy::Mask, + ) + .await + } + } + }) + }); + }); + } + } +} + criterion_group!( benches, benchmark_filters_and_projections, + benchmark_sync_strategy_matrix, + benchmark_async_strategy_matrix, + benchmark_async_auto_policy_focus, + benchmark_projection_scan_focus, benchmark_filters_with_limit, + benchmark_async_nested_post_filter_focus, ); criterion_main!(benches); diff --git a/parquet/benches/row_selection_cursor.rs b/parquet/benches/row_selection_cursor.rs index 49c9e6d68acf..b5d73eefb6b6 100644 --- a/parquet/benches/row_selection_cursor.rs +++ b/parquet/benches/row_selection_cursor.rs @@ -34,6 +34,7 @@ const TOTAL_ROWS: usize = 1 << 20; const BATCH_SIZE: usize = 1 << 10; const BASE_SEED: u64 = 0xA55AA55A; const AVG_SELECTOR_LENGTHS: &[usize] = &[4, 8, 12, 16, 20, 24, 28, 32, 36, 40]; +const SHAPE_FOCUS_SELECTED_RUN_LENGTHS: &[usize] = &[1, 2, 4, 8, 32]; const COLUMN_WIDTHS: &[usize] = &[2, 4, 8, 16, 32]; const UTF8VIEW_LENS: &[usize] = &[4, 8, 16, 32, 64, 128, 256]; const BENCH_MODES: &[BenchMode] = &[BenchMode::ReadSelector, BenchMode::ReadMask]; @@ -203,6 +204,87 @@ fn criterion_benchmark(c: &mut Criterion) { BASE_SEED ^ ((offset as u64) << 40), ); } + + bench_shape_focus(c); +} + +fn bench_shape_focus(c: &mut Criterion) { + let scenarios = [ + ShapeFocusScenario { + name: "sparse10", + select_ratio: 0.1, + start_with_select: false, + }, + ShapeFocusScenario { + name: "sparse20", + select_ratio: 0.2, + start_with_select: false, + }, + ShapeFocusScenario { + name: "moderate40", + select_ratio: 0.4, + start_with_select: false, + }, + ShapeFocusScenario { + name: "dense80", + select_ratio: 0.8, + start_with_select: true, + }, + ]; + + let profiles = [ + DataProfile { + name: "int32", + build_batch: build_int32_batch, + }, + DataProfile { + name: "utf8view", + build_batch: build_utf8view_batch, + }, + ]; + + for profile in profiles { + let parquet_data = build_parquet_data(TOTAL_ROWS, profile.build_batch); + for scenario in &scenarios { + for &selected_run_len in SHAPE_FOCUS_SELECTED_RUN_LENGTHS { + let selectors = + generate_shape_focus_selectors(selected_run_len, TOTAL_ROWS, scenario); + if selectors.is_empty() { + continue; + } + + let stats = SelectorStats::new(&selectors); + let selection = RowSelection::from(selectors); + let suffix = format!( + "shape-focus-{}-{}-run{:02}-avg{:.1}-sel{:02}", + scenario.name, + profile.name, + selected_run_len, + stats.average_selector_len, + (stats.select_ratio * 100.0).round() as u32 + ); + + let bench_input = BenchInput { + parquet_data: parquet_data.clone(), + selection, + }; + + for &mode in BENCH_MODES { + c.bench_with_input( + BenchmarkId::new(mode.label(), &suffix), + &bench_input, + |b, input| { + b.iter(|| { + let total = + run_read(&input.parquet_data, &input.selection, mode.policy()); + hint::black_box(total); + }); + }, + ); + } + } + } + } } fn bench_over_lengths( @@ -349,6 +431,12 @@ struct Scenario { distribution: RunDistribution, } +struct ShapeFocusScenario { + name: &'static str, + select_ratio: f64, + start_with_select: bool, +} + #[derive(Clone)] enum RunDistribution { Constant, @@ -409,6 +497,66 @@ fn generate_selectors( selection.into() } +fn generate_shape_focus_selectors( + selected_run_len: usize, + total_rows: usize, + scenario: &ShapeFocusScenario, +) -> Vec { + const CYCLE_ROWS: usize = 1_000; + + assert!(selected_run_len > 0); + assert!( + (0.0..=1.0).contains(&scenario.select_ratio), + "select_ratio must be in [0, 1]" + ); + + let mut selectors = Vec::new(); + let mut remaining_rows = total_rows; + + while remaining_rows > 0 { + let cycle_rows = CYCLE_ROWS.min(remaining_rows); + let selected_rows = (cycle_rows as f64 * scenario.select_ratio).round() as usize; + if selected_rows == 0 { + selectors.push(RowSelector::skip(cycle_rows)); + remaining_rows -= cycle_rows; + continue; + } + if selected_rows >= cycle_rows { + selectors.push(RowSelector::select(cycle_rows)); + remaining_rows -= cycle_rows; + continue; + } + + let selected_runs = selected_rows.div_ceil(selected_run_len); + let skipped_rows = cycle_rows - selected_rows; + if skipped_rows < selected_runs { + return Vec::new(); + } + + let base_skip_len = skipped_rows / selected_runs; + let extra_skip_runs = skipped_rows % selected_runs; + let mut remaining_selected_rows = selected_rows; + + for run_idx in 0..selected_runs { + let skip_len = base_skip_len + usize::from(run_idx < extra_skip_runs); + let select_len = selected_run_len.min(remaining_selected_rows); + if scenario.start_with_select { + selectors.push(RowSelector::select(select_len)); + selectors.push(RowSelector::skip(skip_len)); + } else { + selectors.push(RowSelector::skip(skip_len)); + selectors.push(RowSelector::select(select_len)); + } + remaining_selected_rows -= select_len; + } + + remaining_rows -= cycle_rows; + } + + let selection: RowSelection = selectors.into(); + selection.into() +} + fn sample_length(mean: f64, distribution: &RunDistribution, rng: &mut StdRng) -> usize { match distribution { RunDistribution::Constant => mean.round().max(1.0) as usize, From f11b48e99e234354b292471d20a87ce55d5495b0 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Fri, 12 Jun 2026 18:54:50 +0800 Subject: [PATCH 2/9] ci: install cargo-msrv with locked dependencies --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 77fccdbebc46..964b16b4fc9d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -118,7 +118,7 @@ jobs: uses: ./.github/actions/setup-builder - name: Install cargo-msrv (if needed) # cargo-msrv binary may be cached by the cargo cache step in setup-builder, and cargo install will error if it is already installed - run: if which cargo-msrv ; then echo "using existing cargo-msrv binary" ; else cargo install cargo-msrv ; fi + run: if which cargo-msrv ; then echo "using existing cargo-msrv binary" ; else cargo install cargo-msrv --locked ; fi - name: Check all packages run: | # run `cargo msrv verify --manifest-path "path/to/Cargo.toml"` to see problematic dependencies From c8a627ced95613be91182850f1314ad38701ea1b Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Fri, 12 Jun 2026 19:00:51 +0800 Subject: [PATCH 3/9] Revert "ci: install cargo-msrv with locked dependencies" This reverts commit f11b48e99e234354b292471d20a87ce55d5495b0. --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 964b16b4fc9d..77fccdbebc46 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -118,7 +118,7 @@ jobs: uses: ./.github/actions/setup-builder - name: Install cargo-msrv (if needed) # cargo-msrv binary may be cached by the cargo cache step in setup-builder, and cargo install will error if it is already installed - run: if which cargo-msrv ; then echo "using existing cargo-msrv binary" ; else cargo install cargo-msrv --locked ; fi + run: if which cargo-msrv ; then echo "using existing cargo-msrv binary" ; else cargo install cargo-msrv ; fi - name: Check all packages run: | # run `cargo msrv verify --manifest-path "path/to/Cargo.toml"` to see problematic dependencies From faa058f146e09f0932808bd4e400413f4a88012a Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Fri, 12 Jun 2026 19:46:35 +0800 Subject: [PATCH 4/9] ci: install cargo-msrv with locked dependencies --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 77fccdbebc46..964b16b4fc9d 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -118,7 +118,7 @@ jobs: uses: ./.github/actions/setup-builder - name: Install cargo-msrv (if needed) # cargo-msrv binary may be cached by the cargo cache step in setup-builder, and cargo install will error if it is already installed - run: if which cargo-msrv ; then echo "using existing cargo-msrv binary" ; else cargo install cargo-msrv ; fi + run: if which cargo-msrv ; then echo "using existing cargo-msrv binary" ; else cargo install cargo-msrv --locked ; fi - name: Check all packages run: | # run `cargo msrv verify --manifest-path "path/to/Cargo.toml"` to see problematic dependencies From d95eddffc9f05fa2e03782767da3f3cf2fa680f9 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Fri, 12 Jun 2026 21:29:33 +0800 Subject: [PATCH 5/9] Revert "ci: install cargo-msrv with locked dependencies" This reverts commit faa058f146e09f0932808bd4e400413f4a88012a. --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 964b16b4fc9d..77fccdbebc46 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -118,7 +118,7 @@ jobs: uses: ./.github/actions/setup-builder - name: Install cargo-msrv (if needed) # cargo-msrv binary may be cached by the cargo cache step in setup-builder, and cargo install will error if it is already installed - run: if which cargo-msrv ; then echo "using existing cargo-msrv binary" ; else cargo install cargo-msrv --locked ; fi + run: if which cargo-msrv ; then echo "using existing cargo-msrv binary" ; else cargo install cargo-msrv ; fi - name: Check all packages run: | # run `cargo msrv verify --manifest-path "path/to/Cargo.toml"` to see problematic dependencies From 40727a52c64517d2831a2e4d992c4531dfc4c9ca Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Sat, 13 Jun 2026 00:26:24 +0800 Subject: [PATCH 6/9] Expose parquet benchmark helpers and streamline post-filtering --- parquet/benches/arrow_reader_row_filter.rs | 78 ++++++++++++++-------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/parquet/benches/arrow_reader_row_filter.rs b/parquet/benches/arrow_reader_row_filter.rs index 6ae7c816e56b..57ce949dcb2d 100644 --- a/parquet/benches/arrow_reader_row_filter.rs +++ b/parquet/benches/arrow_reader_row_filter.rs @@ -55,7 +55,7 @@ use arrow::array::{ ArrayRef, BooleanArray, Float64Array, Int64Array, StructArray, TimestampMillisecondArray, }; -use arrow::compute::kernels::cmp::{eq, gt, lt, neq}; +use arrow::compute::kernels::cmp::{eq, gt, lt, lt_eq, neq}; use arrow::compute::{and, or}; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow::record_batch::RecordBatch; @@ -115,6 +115,14 @@ fn create_float64_array(size: usize) -> ArrayRef { Arc::new(Float64Array::from(values)) as ArrayRef } +fn append_utf8_view_value(builder: &mut StringViewBuilder, value: &str) { + if builder.len() % 1_000 == 0 { + builder.append_value(UTF8_VIEW_MISSING_VALUE); + } else { + builder.append_value(value); + } +} + /// Creates a utf8View array of a given size with random strings. /// /// This is modeled after the "SearchPhrase" column in the ClickBench benchmark. @@ -146,11 +154,11 @@ fn create_utf8_view_array(size: usize) -> ArrayRef { let choice = rng.random_range(0..100); if choice < EMPTY_DENSITY { for _ in 0..run_length { - builder.append_value(""); + append_utf8_view_value(&mut builder, ""); } } else { for _ in 0..run_length { - builder.append_value(random_string(&mut rng)); + append_utf8_view_value(&mut builder, &random_string(&mut rng)); } } } @@ -165,7 +173,7 @@ fn create_ts_array(size: usize) -> ArrayRef { } /// Creates a RecordBatch with 100K rows and 4 columns: int64, float64, utf8View, and ts. -fn create_record_batch(size: usize) -> RecordBatch { +pub(crate) fn create_record_batch(size: usize) -> RecordBatch { let fields = vec![ Field::new("int64", DataType::Int64, false), Field::new("float64", DataType::Float64, false), @@ -327,7 +335,7 @@ impl std::fmt::Display for AsyncStrategy { /// FilterType encapsulates the different filter comparisons. /// The variants correspond to the different filter patterns. #[derive(Clone, Copy, Debug)] -enum FilterType { +pub(crate) enum FilterType { /// "Point Lookup": selects a single row /// ```text /// ┌───────────────┐ ┌───────────────┐ @@ -562,7 +570,7 @@ impl std::fmt::Display for FilterType { impl FilterType { /// Applies the specified filter on the given RecordBatch and returns a BooleanArray mask. - fn filter_batch(&self, batch: &RecordBatch) -> arrow::error::Result { + pub(crate) fn filter_batch(&self, batch: &RecordBatch) -> arrow::error::Result { match self { // Point Lookup on int64 column FilterType::PointLookup => { @@ -590,7 +598,7 @@ impl FilterType { // Unselective Unclustered on float64 column: NOT (float64 > 99.0) FilterType::UnselectiveUnclustered => { let array = batch.column(batch.schema().index_of("float64")?); - gt(array, &Float64Array::new_scalar(99.0)) + lt_eq(array, &Float64Array::new_scalar(99.0)) } // Unselective Clustered on ts column: ts < 9000 FilterType::UnselectiveClustered => { @@ -1534,6 +1542,32 @@ fn projection_names(projection: &[usize]) -> Vec<&'static str> { projection.iter().map(|idx| COLUMN_NAMES[*idx]).collect() } +pub(crate) fn filter_projected_record_batch( + batch: &RecordBatch, + filter: &BooleanArray, + output_column_names: &[&str], +) -> arrow::error::Result { + let output_projection = output_column_names + .iter() + .map(|name| batch.schema().index_of(name)) + .collect::>>()?; + let output = batch.project(&output_projection)?; + arrow_select::filter::filter_record_batch(&output, filter) +} + +pub(crate) fn post_filter_projected_num_rows( + batch: &RecordBatch, + filter: &BooleanArray, + output_column_names: &[&str], +) -> arrow::error::Result { + if output_column_names.is_empty() { + return Ok(filter.true_count()); + } + + let output = filter_projected_record_batch(batch, filter, output_column_names)?; + Ok(output.num_rows()) +} + fn row_filter_for(filter_type: FilterType, pred_mask: ProjectionMask) -> RowFilter { let filter = ArrowPredicateFn::new(pred_mask, move |batch| filter_type.filter_batch(&batch)); RowFilter::new(vec![Box::new(filter)]) @@ -1705,13 +1739,9 @@ async fn benchmark_async_reader_post_filter( while let Some(b) = stream.next().await { let batch = b.unwrap(); let filter = filter_type.filter_batch(&batch).unwrap(); - let filtered = arrow_select::filter::filter_record_batch(&batch, &filter).unwrap(); - let output_projection = output_column_names - .iter() - .map(|name| filtered.schema().index_of(name).unwrap()) - .collect::>(); - let output = filtered.project(&output_projection).unwrap(); - std::hint::black_box(output.num_rows()); + let output_rows = + post_filter_projected_num_rows(&batch, &filter, &output_column_names).unwrap(); + std::hint::black_box(output_rows); } } @@ -1732,13 +1762,9 @@ async fn benchmark_async_reader_post_filter_nested( while let Some(b) = stream.next().await { let batch = b.unwrap(); let filter = filter_type.filter_batch(&batch).unwrap(); - let filtered = arrow_select::filter::filter_record_batch(&batch, &filter).unwrap(); - let output_projection = output_column_names - .iter() - .map(|name| filtered.schema().index_of(name).unwrap()) - .collect::>(); - let output = filtered.project(&output_projection).unwrap(); - std::hint::black_box(output.num_rows()); + let output_rows = + post_filter_projected_num_rows(&batch, &filter, output_column_names).unwrap(); + std::hint::black_box(output_rows); } } @@ -1831,13 +1857,9 @@ fn benchmark_sync_reader_post_filter( for b in stream { let batch = b.unwrap(); let filter = filter_type.filter_batch(&batch).unwrap(); - let filtered = arrow_select::filter::filter_record_batch(&batch, &filter).unwrap(); - let output_projection = output_column_names - .iter() - .map(|name| filtered.schema().index_of(name).unwrap()) - .collect::>(); - let output = filtered.project(&output_projection).unwrap(); - std::hint::black_box(output.num_rows()); + let output_rows = + post_filter_projected_num_rows(&batch, &filter, &output_column_names).unwrap(); + std::hint::black_box(output_rows); } } From a5c636bf07faad3686f84fde5194481b7c8b08ea Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Sat, 13 Jun 2026 00:54:46 +0800 Subject: [PATCH 7/9] Refactor parquet row selection shape-focus benchmarks --- parquet/benches/row_selection_cursor.rs | 102 ++++++++++++++---------- 1 file changed, 62 insertions(+), 40 deletions(-) diff --git a/parquet/benches/row_selection_cursor.rs b/parquet/benches/row_selection_cursor.rs index b5d73eefb6b6..8b5b13cdeea5 100644 --- a/parquet/benches/row_selection_cursor.rs +++ b/parquet/benches/row_selection_cursor.rs @@ -35,9 +35,36 @@ const BATCH_SIZE: usize = 1 << 10; const BASE_SEED: u64 = 0xA55AA55A; const AVG_SELECTOR_LENGTHS: &[usize] = &[4, 8, 12, 16, 20, 24, 28, 32, 36, 40]; const SHAPE_FOCUS_SELECTED_RUN_LENGTHS: &[usize] = &[1, 2, 4, 8, 32]; +const DENSE_SHAPE_FOCUS_SELECTED_RUN_LENGTHS: &[usize] = &[4, 8, 32]; const COLUMN_WIDTHS: &[usize] = &[2, 4, 8, 16, 32]; const UTF8VIEW_LENS: &[usize] = &[4, 8, 16, 32, 64, 128, 256]; const BENCH_MODES: &[BenchMode] = &[BenchMode::ReadSelector, BenchMode::ReadMask]; +const SHAPE_FOCUS_SCENARIOS: &[ShapeFocusScenario] = &[ + ShapeFocusScenario { + name: "sparse10", + select_ratio: 0.1, + start_with_select: false, + selected_run_lengths: SHAPE_FOCUS_SELECTED_RUN_LENGTHS, + }, + ShapeFocusScenario { + name: "sparse20", + select_ratio: 0.2, + start_with_select: false, + selected_run_lengths: SHAPE_FOCUS_SELECTED_RUN_LENGTHS, + }, + ShapeFocusScenario { + name: "moderate40", + select_ratio: 0.4, + start_with_select: false, + selected_run_lengths: SHAPE_FOCUS_SELECTED_RUN_LENGTHS, + }, + ShapeFocusScenario { + name: "dense80", + select_ratio: 0.8, + start_with_select: true, + selected_run_lengths: DENSE_SHAPE_FOCUS_SELECTED_RUN_LENGTHS, + }, +]; struct DataProfile { name: &'static str, @@ -209,29 +236,6 @@ fn criterion_benchmark(c: &mut Criterion) { } fn bench_shape_focus(c: &mut Criterion) { - let scenarios = [ - ShapeFocusScenario { - name: "sparse10", - select_ratio: 0.1, - start_with_select: false, - }, - ShapeFocusScenario { - name: "sparse20", - select_ratio: 0.2, - start_with_select: false, - }, - ShapeFocusScenario { - name: "moderate40", - select_ratio: 0.4, - start_with_select: false, - }, - ShapeFocusScenario { - name: "dense80", - select_ratio: 0.8, - start_with_select: true, - }, - ]; - let profiles = [ DataProfile { name: "int32", @@ -245,25 +249,21 @@ fn bench_shape_focus(c: &mut Criterion) { for profile in profiles { let parquet_data = build_parquet_data(TOTAL_ROWS, profile.build_batch); - for scenario in &scenarios { - for &selected_run_len in SHAPE_FOCUS_SELECTED_RUN_LENGTHS { + for scenario in shape_focus_scenarios() { + for &selected_run_len in scenario.selected_run_lengths { let selectors = generate_shape_focus_selectors(selected_run_len, TOTAL_ROWS, scenario); - if selectors.is_empty() { - continue; - } - - let stats = SelectorStats::new(&selectors); - let selection = RowSelection::from(selectors); - let suffix = format!( - "shape-focus-{}-{}-run{:02}-avg{:.1}-sel{:02}", + assert!( + !selectors.is_empty(), + "invalid shape focus case {} maxrun {}", scenario.name, - profile.name, - selected_run_len, - stats.average_selector_len, - (stats.select_ratio * 100.0).round() as u32 + selected_run_len ); + let suffix = + shape_focus_suffix(scenario, profile.name, selected_run_len, &selectors); + let selection = RowSelection::from(selectors); + let bench_input = BenchInput { parquet_data: parquet_data.clone(), selection, @@ -431,10 +431,11 @@ struct Scenario { distribution: RunDistribution, } -struct ShapeFocusScenario { - name: &'static str, +pub(crate) struct ShapeFocusScenario { + pub(crate) name: &'static str, select_ratio: f64, start_with_select: bool, + pub(crate) selected_run_lengths: &'static [usize], } #[derive(Clone)] @@ -497,7 +498,11 @@ fn generate_selectors( selection.into() } -fn generate_shape_focus_selectors( +pub(crate) fn shape_focus_scenarios() -> &'static [ShapeFocusScenario] { + SHAPE_FOCUS_SCENARIOS +} + +pub(crate) fn generate_shape_focus_selectors( selected_run_len: usize, total_rows: usize, scenario: &ShapeFocusScenario, @@ -557,6 +562,23 @@ fn generate_shape_focus_selectors( selection.into() } +pub(crate) fn shape_focus_suffix( + scenario: &ShapeFocusScenario, + profile_name: &str, + selected_run_len: usize, + selectors: &[RowSelector], +) -> String { + let stats = SelectorStats::new(selectors); + format!( + "shape-focus-{}-{}-maxrun{:02}-avg{:.1}-sel{:02}", + scenario.name, + profile_name, + selected_run_len, + stats.average_selector_len, + (stats.select_ratio * 100.0).round() as u32 + ) +} + fn sample_length(mean: f64, distribution: &RunDistribution, rng: &mut StdRng) -> usize { match distribution { RunDistribution::Constant => mean.round().max(1.0) as usize, From fe69b217559040e1edc631ffebaa5ae0a0f45076 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Mon, 15 Jun 2026 00:48:19 +0800 Subject: [PATCH 8/9] Rename parquet row-filter benchmark cases for clarity --- parquet/benches/arrow_reader_row_filter.rs | 801 +++++++++++++-------- parquet/benches/row_selection_cursor.rs | 17 + 2 files changed, 535 insertions(+), 283 deletions(-) diff --git a/parquet/benches/arrow_reader_row_filter.rs b/parquet/benches/arrow_reader_row_filter.rs index 57ce949dcb2d..cb26443fa3b0 100644 --- a/parquet/benches/arrow_reader_row_filter.rs +++ b/parquet/benches/arrow_reader_row_filter.rs @@ -35,22 +35,27 @@ //! //! [Efficient Filter Pushdown in Parquet]: https://datafusion.apache.org/blog/2025/03/21/parquet-pushdown/ //! -//! The benchmark creates an in-memory Parquet file with 100K rows and ten columns. -//! The first four columns are: -//! - int64: random integers (range: 0..100) generated with a fixed seed. -//! - float64: random floating-point values (range: 0.0..100.0) generated with a fixed seed. -//! - utf8View: random strings with some empty values and occasional constant "const" values. -//! - ts: sequential timestamps in milliseconds. -//! -//! The following six columns (for filtering) are generated to mimic different -//! filter selectivity and clustering patterns: -//! - pt: for Point Lookup – exactly one row is set to "unique_point", all others are random strings. -//! - sel: for Selective Unclustered – exactly 1% of rows (those with i % 100 == 0) are "selected". -//! - mod_clustered: for Moderately Selective Clustered – in each 10K-row block, the first 10 rows are "mod_clustered". -//! - mod_unclustered: for Moderately Selective Unclustered – exactly 10% of rows (those with i % 10 == 1) are "mod_unclustered". -//! - unsel_unclustered: for Unselective Unclustered – exactly 99% of rows (those with i % 100 != 0) are "unsel_unclustered". -//! - unsel_clustered: for Unselective Clustered – in each 10K-row block, rows with an offset >= 1000 are "unsel_clustered". +//! The benchmark creates an in-memory Parquet file with 500K rows and four root +//! columns: +//! - `int64`: random integers with an injected point-lookup value. +//! - `float64`: random floating-point values used for sparse and dense filters. +//! - `utf8View`: ClickBench-like string values with sparse sentinel values. +//! - `ts`: sequential timestamps used for clustered filters. //! +//! The benchmark groups cover a few distinct reader-level questions: +//! - `arrow_reader_row_filter`: baseline filter/projection combinations. +//! - `arrow_reader_row_filter_{async_,}strategy_matrix`: full post-filtering +//! versus row-filter pushdown with `Auto`, forced `Selectors`, and forced +//! `Mask`. +//! - `arrow_reader_row_filter_async_auto_policy_focus`: focused synthetic shapes +//! derived from ClickBench and TPC-DS regressions, including sparse and dense +//! filters, clustered and fragmented selections, variable-width predicates, +//! projected predicate columns, count/filter-only outputs, and mixed predicate +//! order. +//! - `arrow_reader_projection_scan_focus`: projection-only scans that do not +//! construct a `RowFilter`. +//! - `arrow_reader_row_filter_async_nested_post_filter_focus`: nested root output +//! with a separate predicate column. use arrow::array::{ ArrayRef, BooleanArray, Float64Array, Int64Array, StructArray, TimestampMillisecondArray, @@ -441,10 +446,10 @@ pub(crate) enum FilterType { /// [ClickBench]: https://github.com/ClickHouse/ClickBench /// [Q21-Q27]: https://github.com/apache/datafusion/blob/b7177234e65cbbb2dcc04c252f6acd80bb026362/benchmarks/queries/clickbench/queries.sql#L22-L28 Utf8ViewNonEmpty, - /// Sparse variable-width predicate shaped like TPC-DS Q83 dynamic - /// `i_item_id` filters, where the predicate column is also projected. - Utf8ViewMissing, - /// Scalar-only part of ClickBench Q37: + + // Deferred-output shapes. Predicate columns are separate from the output, + // so rejected rows can skip output-column decoding. + /// Scalar-prefix shape derived from DataFusion ClickBench Q37: /// /// ```sql /// WHERE CounterID = 62 @@ -456,56 +461,263 @@ pub(crate) enum FilterType { /// /// DataFusion `Auto` does not push down the `Title <> ''` string predicate, /// but it can push down the scalar prefix to defer decoding `Title`. - /// This synthetic predicate keeps that reader-level shape: cheap scalar - /// filter columns protect an expensive `Utf8View` output column. - ClickBenchQ37ScalarPrefix, - /// Shape of ClickBench extended Q6 under DataFusion row-filter pushdown: + /// Fragmented ~0.9% selection: approx 4,500 selected rows in 500K. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + ScalarPrefixUtf8Output, + /// Sparse fragmented scalar predicates (~7%, approx 36,000 selected rows + /// in 500K) with a cheap fixed-width output projection, derived from a + /// ClickBench Q41-like shape. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + SparseScalarFixedOutput, + /// Scalar range predicate derived from TPC-DS Q9 `ss_quantity BETWEEN ...` + /// subqueries. The selected rows are random and moderately selective, and + /// benchmark projections cover both count-only and numeric aggregate cases. + /// Fragmented ~20% selection: approx 100,000 selected rows in 500K. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + QuantityRangePredicate, + + // Multi-predicate shapes. These focus predicate ordering and predicate + // evaluation cost independently of projection cost. + /// Predicate-order shape derived from DataFusion ClickBench extended Q6: /// an early cheap fixed-width predicate can prune almost all rows before a /// later unprojected variable-width predicate is decoded. - ClickBenchQ6MixedPredicates, - /// Same scalar + variable-width predicate columns as [`Self::ClickBenchQ6MixedPredicates`], - /// but with the variable-width predicate evaluated first. This anchors the - /// static post-filter gate against predicate-order drift. - ClickBenchQ6VarWidthFirst, - /// Shape of ClickBench Q41-like fixed-width filters: sparse fragmented - /// scalar predicates with a cheap fixed-width output projection. - ClickBenchQ41SparseFixedOutput, - /// Shape of ClickBench Q40: multiple cheap scalar predicates, very small - /// output, and one projected predicate column used later by grouping. - ClickBenchQ40ScalarGroupBy, - /// Shape of TPC-DS Q41: a complex OR predicate over dictionary/string-like - /// and scalar columns where predicate evaluation dominates reader time. - TpcdsQ41ComplexOr, - /// Shape of TPC-DS Q20 catalog_sales after dynamic filters: multiple - /// fixed-width predicates where predicate columns are also projected. - TpcdsQ20ProjectedDynamicFilters, + /// Point-lookup prefix: at most 1 row reaches the variable-width predicate. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ ... │ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ ... │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ ... │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + FixedThenVarWidthPredicates, + /// Same scalar + variable-width predicate columns as + /// [`Self::FixedThenVarWidthPredicates`], but with the variable-width + /// predicate evaluated first. This anchors the static post-filter gate + /// against predicate-order drift. + /// At most 1 row survives the final point lookup. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ ... │ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ ... │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ ... │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + VarWidthThenFixedPredicates, + /// Multiple cheap scalar predicates, very small output, and projected + /// predicate columns used later by grouping. Derived from ClickBench Q40. + /// Fragmented ~0.8% selection: approx 4,000 selected rows in 500K. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + MultiScalarProjectedKey, + /// Complex OR predicate over dictionary/string-like and scalar columns + /// where predicate evaluation dominates reader time. Derived from TPC-DS + /// Q41. + /// Mixed string/scalar OR branches select approx 1% of rows. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ ... │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ ... │ │ │ + /// │ │ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + ComplexOrMixedPredicates, + + // Projected-predicate shapes. At least one predicate column is also needed + // in the final projection. + /// Multiple fixed-width dynamic filters where predicate columns are also + /// projected. Derived from TPC-DS Q20 catalog_sales. + /// Fragmented ~11% selection: approx 54,000 selected rows in 500K. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + ProjectedDynamicFilters, /// Shape of TPC-DS Q21 after dynamic-filter pruning: sparse fragmented /// fixed-width predicates where the final projection still includes the /// predicate columns. This protects against choosing selectors for columns /// that were already decoded/cached by predicate evaluation. - TpcdsQ21ProjectedFixedOutput, - /// Shape of TPC-DS Q2 fact scans: the dynamic filter applies to the date - /// key, the same date key is projected, and an additional fixed-width sales - /// value can still be deferred by predicate pushdown. - TpcdsQ2ProjectedPredicate5Pct, - TpcdsQ2ProjectedPredicate8Pct, - TpcdsQ2ProjectedPredicate10Pct, - TpcdsQ2ProjectedPredicate20Pct, - TpcdsQ2ProjectedPredicate30Pct, - TpcdsQ2ProjectedPredicate40Pct, - TpcdsQ2ProjectedPredicate50Pct, - /// Scalar range predicate shaped like TPC-DS Q9 `ss_quantity BETWEEN ...` - /// subqueries. The selected rows are random and moderately selective, and - /// benchmark projections cover both count-only and numeric aggregate cases. - TpcdsQ9QuantityRange, + /// Fragmented ~7% selection: approx 36,000 selected rows in 500K. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + SparseProjectedPredicatesFixedOutput, + /// Projected-predicate shape derived from TPC-DS Q2 fact scans: the + /// dynamic filter applies to the date key, the same date key is projected, + /// and an additional fixed-width sales value can still be deferred by + /// predicate pushdown. + /// Selectivity ranges from 5% to 50%: approx 25K to 250K selected rows in + /// 500K. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ ... │ │ ... │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// └───────────────┘ └───────────────┘ + /// ``` + ProjectedPredicate5Pct, + ProjectedPredicate8Pct, + ProjectedPredicate10Pct, + ProjectedPredicate20Pct, + ProjectedPredicate30Pct, + ProjectedPredicate40Pct, + ProjectedPredicate50Pct, /// Exact shape for the projected-predicate moderate-selectivity gate: /// a clustered 20% timestamp predicate where the predicate column is /// projected and the deferred output is variable-width. - ProjectedTs8PctClustered, - ProjectedTs20PctClustered, + /// Clustered 8% or 20% selection: 40,000 or 100,000 selected rows in 500K. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ │ │ │ + /// │ │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ ... │ │ ... │ + /// │ │ │ │ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + ClusteredTs8PctProjectedPredicate, + ClusteredTs20PctProjectedPredicate, + /// Sparse variable-width predicate shaped like TPC-DS Q83 dynamic + /// `i_item_id` filters, where the predicate column is also projected. + /// Sparse 0.1% selection: 500 sentinel rows in 500K, one every 1,000 rows. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// └───────────────┘ └───────────────┘ + /// ``` + Utf8ViewMissing, /// Very sparse projected fixed-width scan shaped like TPC-DS fact-table /// filters where the predicate column is also needed in the output projection. - TpcdsSparseProjectedFactScan, + /// Sparse 0.1% selection: 500 rows in 500K, one timestamp match every + /// 1,000 rows. + /// + /// ```text + /// ┌───────────────┐ ┌───────────────┐ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// │ │ │ │ + /// │ │ │ │ + /// │ ... │ │ ... │ + /// │ │ │ │ + /// │ │ │ │ + /// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ + /// └───────────────┘ └───────────────┘ + /// ``` + SparseProjectedFactScan, } impl std::fmt::Display for FilterType { @@ -520,49 +732,45 @@ impl std::fmt::Display for FilterType { FilterType::Composite => "float64 > 99.0 AND ts >= 9000", FilterType::Utf8ViewNonEmpty => "utf8View <> ''", FilterType::Utf8ViewMissing => "utf8View == ''", - FilterType::ClickBenchQ37ScalarPrefix => "int64 == 62 AND ts < 9000", - FilterType::ClickBenchQ6MixedPredicates => "int64 == 9999 AND utf8View <> ''", - FilterType::ClickBenchQ6VarWidthFirst => "utf8View <> '' AND int64 == 9999", - FilterType::ClickBenchQ41SparseFixedOutput => "int64 < 8 AND ts < 9000", - FilterType::ClickBenchQ40ScalarGroupBy => { - "int64 == 62 AND float64 > 10.0 AND ts < 9000" - } - FilterType::TpcdsQ41ComplexOr => { + FilterType::ScalarPrefixUtf8Output => "int64 == 62 AND ts < 9000", + FilterType::FixedThenVarWidthPredicates => "int64 == 9999 AND utf8View <> ''", + FilterType::VarWidthThenFixedPredicates => "utf8View <> '' AND int64 == 9999", + FilterType::SparseScalarFixedOutput => "int64 < 8 AND ts < 9000", + FilterType::MultiScalarProjectedKey => "int64 == 62 AND float64 > 10.0 AND ts < 9000", + FilterType::ComplexOrMixedPredicates => { "(utf8View <> '' AND int64 < 8) OR (ts < 100 AND float64 > 95.0)" } - FilterType::TpcdsQ20ProjectedDynamicFilters => { + FilterType::ProjectedDynamicFilters => { "int64 < 12 AND ts < 9000 projected dynamic filters" } - FilterType::TpcdsQ21ProjectedFixedOutput => { + FilterType::SparseProjectedPredicatesFixedOutput => { "int64 < 8 AND ts < 9000 projected predicates" } - FilterType::TpcdsQ2ProjectedPredicate10Pct => { + FilterType::ProjectedPredicate10Pct => { "int64 < 10 projected predicate with fixed output" } - FilterType::TpcdsQ2ProjectedPredicate5Pct => { - "int64 < 5 projected predicate with fixed output" - } - FilterType::TpcdsQ2ProjectedPredicate8Pct => { - "int64 < 8 projected predicate with fixed output" - } - FilterType::TpcdsQ2ProjectedPredicate20Pct => { + FilterType::ProjectedPredicate5Pct => "int64 < 5 projected predicate with fixed output", + FilterType::ProjectedPredicate8Pct => "int64 < 8 projected predicate with fixed output", + FilterType::ProjectedPredicate20Pct => { "int64 < 20 projected predicate with fixed output" } - FilterType::TpcdsQ2ProjectedPredicate30Pct => { + FilterType::ProjectedPredicate30Pct => { "int64 < 30 projected predicate with fixed output" } - FilterType::TpcdsQ2ProjectedPredicate40Pct => { + FilterType::ProjectedPredicate40Pct => { "int64 < 40 projected predicate with fixed output" } - FilterType::TpcdsQ2ProjectedPredicate50Pct => { + FilterType::ProjectedPredicate50Pct => { "int64 < 50 projected predicate with fixed output" } - FilterType::TpcdsQ9QuantityRange => "int64 > 0 AND int64 < 21", - FilterType::ProjectedTs20PctClustered => { + FilterType::QuantityRangePredicate => "int64 > 0 AND int64 < 21", + FilterType::ClusteredTs20PctProjectedPredicate => { "ts < 2000 projected predicate with utf8 output" } - FilterType::ProjectedTs8PctClustered => "ts < 800 projected predicate with utf8 output", - FilterType::TpcdsSparseProjectedFactScan => "ts % 1000 == 0", + FilterType::ClusteredTs8PctProjectedPredicate => { + "ts < 800 projected predicate with utf8 output" + } + FilterType::SparseProjectedFactScan => "ts % 1000 == 0", }; write!(f, "{s}") } @@ -622,31 +830,31 @@ impl FilterType { let scalar = StringViewArray::new_scalar(UTF8_VIEW_MISSING_VALUE); eq(array, &scalar) } - // ClickBenchQ37ScalarPrefix: a cheap fragmented scalar predicate + // ScalarPrefixUtf8Output: a cheap fragmented scalar predicate // evaluated before decoding a variable-width output column. - FilterType::ClickBenchQ37ScalarPrefix => { + FilterType::ScalarPrefixUtf8Output => { let int64 = batch.column(batch.schema().index_of("int64")?); let ts = batch.column(batch.schema().index_of("ts")?); let counter_match = eq(int64, &Int64Array::new_scalar(62))?; let date_like_range = lt(ts, &TimestampMillisecondArray::new_scalar(9000))?; and(&counter_match, &date_like_range) } - FilterType::ClickBenchQ6MixedPredicates | FilterType::ClickBenchQ6VarWidthFirst => { + FilterType::FixedThenVarWidthPredicates | FilterType::VarWidthThenFixedPredicates => { let int64 = batch.column(batch.schema().index_of("int64")?); let utf8 = batch.column(batch.schema().index_of("utf8View")?); let cheap_prefix = eq(int64, &Int64Array::new_scalar(9999))?; let string_suffix = neq(utf8, &StringViewArray::new_scalar(""))?; and(&cheap_prefix, &string_suffix) } - FilterType::ClickBenchQ41SparseFixedOutput - | FilterType::TpcdsQ21ProjectedFixedOutput => { + FilterType::SparseScalarFixedOutput + | FilterType::SparseProjectedPredicatesFixedOutput => { let int64 = batch.column(batch.schema().index_of("int64")?); let ts = batch.column(batch.schema().index_of("ts")?); let counter_like = lt(int64, &Int64Array::new_scalar(8))?; let date_like = lt(ts, &TimestampMillisecondArray::new_scalar(9000))?; and(&counter_like, &date_like) } - FilterType::ClickBenchQ40ScalarGroupBy => { + FilterType::MultiScalarProjectedKey => { let int64 = batch.column(batch.schema().index_of("int64")?); let float64 = batch.column(batch.schema().index_of("float64")?); let ts = batch.column(batch.schema().index_of("ts")?); @@ -655,7 +863,7 @@ impl FilterType { let date_like = lt(ts, &TimestampMillisecondArray::new_scalar(9000))?; and(&and(&counter_match, &width_match)?, &date_like) } - FilterType::TpcdsQ41ComplexOr => { + FilterType::ComplexOrMixedPredicates => { let int64 = batch.column(batch.schema().index_of("int64")?); let float64 = batch.column(batch.schema().index_of("float64")?); let utf8 = batch.column(batch.schema().index_of("utf8View")?); @@ -670,48 +878,48 @@ impl FilterType { )?; or(&string_branch, &scalar_branch) } - FilterType::TpcdsQ20ProjectedDynamicFilters => { + FilterType::ProjectedDynamicFilters => { let int64 = batch.column(batch.schema().index_of("int64")?); let ts = batch.column(batch.schema().index_of("ts")?); let item_like = lt(int64, &Int64Array::new_scalar(12))?; let date_like = lt(ts, &TimestampMillisecondArray::new_scalar(9000))?; and(&item_like, &date_like) } - FilterType::TpcdsQ2ProjectedPredicate5Pct - | FilterType::TpcdsQ2ProjectedPredicate8Pct - | FilterType::TpcdsQ2ProjectedPredicate10Pct - | FilterType::TpcdsQ2ProjectedPredicate20Pct - | FilterType::TpcdsQ2ProjectedPredicate30Pct - | FilterType::TpcdsQ2ProjectedPredicate40Pct - | FilterType::TpcdsQ2ProjectedPredicate50Pct => { + FilterType::ProjectedPredicate5Pct + | FilterType::ProjectedPredicate8Pct + | FilterType::ProjectedPredicate10Pct + | FilterType::ProjectedPredicate20Pct + | FilterType::ProjectedPredicate30Pct + | FilterType::ProjectedPredicate40Pct + | FilterType::ProjectedPredicate50Pct => { let int64 = batch.column(batch.schema().index_of("int64")?); let threshold = match self { - FilterType::TpcdsQ2ProjectedPredicate5Pct => 5, - FilterType::TpcdsQ2ProjectedPredicate8Pct => 8, - FilterType::TpcdsQ2ProjectedPredicate10Pct => 10, - FilterType::TpcdsQ2ProjectedPredicate20Pct => 20, - FilterType::TpcdsQ2ProjectedPredicate30Pct => 30, - FilterType::TpcdsQ2ProjectedPredicate40Pct => 40, - FilterType::TpcdsQ2ProjectedPredicate50Pct => 50, + FilterType::ProjectedPredicate5Pct => 5, + FilterType::ProjectedPredicate8Pct => 8, + FilterType::ProjectedPredicate10Pct => 10, + FilterType::ProjectedPredicate20Pct => 20, + FilterType::ProjectedPredicate30Pct => 30, + FilterType::ProjectedPredicate40Pct => 40, + FilterType::ProjectedPredicate50Pct => 50, _ => unreachable!(), }; lt(int64, &Int64Array::new_scalar(threshold)) } - FilterType::TpcdsQ9QuantityRange => { + FilterType::QuantityRangePredicate => { let int64 = batch.column(batch.schema().index_of("int64")?); let lower = gt(int64, &Int64Array::new_scalar(0))?; let upper = lt(int64, &Int64Array::new_scalar(21))?; and(&lower, &upper) } - FilterType::ProjectedTs8PctClustered => { + FilterType::ClusteredTs8PctProjectedPredicate => { let ts = batch.column(batch.schema().index_of("ts")?); lt(ts, &TimestampMillisecondArray::new_scalar(800)) } - FilterType::ProjectedTs20PctClustered => { + FilterType::ClusteredTs20PctProjectedPredicate => { let ts = batch.column(batch.schema().index_of("ts")?); lt(ts, &TimestampMillisecondArray::new_scalar(2000)) } - FilterType::TpcdsSparseProjectedFactScan => { + FilterType::SparseProjectedFactScan => { let ts = batch .column(batch.schema().index_of("ts")?) .as_any() @@ -738,25 +946,26 @@ impl FilterType { FilterType::UnselectiveClustered => &[3], FilterType::Composite => &[1, 3], // Use float64 column and ts column as representative for composite FilterType::Utf8ViewNonEmpty | FilterType::Utf8ViewMissing => &[2], - FilterType::ClickBenchQ37ScalarPrefix => &[0, 3], - FilterType::ClickBenchQ6MixedPredicates | FilterType::ClickBenchQ6VarWidthFirst => { + FilterType::ScalarPrefixUtf8Output => &[0, 3], + FilterType::FixedThenVarWidthPredicates | FilterType::VarWidthThenFixedPredicates => { &[0, 2] } - FilterType::ClickBenchQ40ScalarGroupBy => &[0, 1, 3], - FilterType::ClickBenchQ41SparseFixedOutput - | FilterType::TpcdsQ20ProjectedDynamicFilters - | FilterType::TpcdsQ21ProjectedFixedOutput => &[0, 3], - FilterType::TpcdsQ41ComplexOr => &[0, 1, 2, 3], - FilterType::TpcdsQ2ProjectedPredicate5Pct - | FilterType::TpcdsQ2ProjectedPredicate8Pct - | FilterType::TpcdsQ2ProjectedPredicate10Pct - | FilterType::TpcdsQ2ProjectedPredicate20Pct - | FilterType::TpcdsQ2ProjectedPredicate30Pct - | FilterType::TpcdsQ2ProjectedPredicate40Pct - | FilterType::TpcdsQ2ProjectedPredicate50Pct => &[0], - FilterType::TpcdsQ9QuantityRange => &[0], - FilterType::ProjectedTs8PctClustered | FilterType::ProjectedTs20PctClustered => &[3], - FilterType::TpcdsSparseProjectedFactScan => &[3], + FilterType::MultiScalarProjectedKey => &[0, 1, 3], + FilterType::SparseScalarFixedOutput + | FilterType::ProjectedDynamicFilters + | FilterType::SparseProjectedPredicatesFixedOutput => &[0, 3], + FilterType::ComplexOrMixedPredicates => &[0, 1, 2, 3], + FilterType::ProjectedPredicate5Pct + | FilterType::ProjectedPredicate8Pct + | FilterType::ProjectedPredicate10Pct + | FilterType::ProjectedPredicate20Pct + | FilterType::ProjectedPredicate30Pct + | FilterType::ProjectedPredicate40Pct + | FilterType::ProjectedPredicate50Pct => &[0], + FilterType::QuantityRangePredicate => &[0], + FilterType::ClusteredTs8PctProjectedPredicate + | FilterType::ClusteredTs20PctProjectedPredicate => &[3], + FilterType::SparseProjectedFactScan => &[3], } } } @@ -1049,15 +1258,18 @@ fn benchmark_async_strategy_matrix(c: &mut Criterion) { } } -/// A small async-only matrix that isolates the cases most relevant to the +/// A focused async-only matrix that isolates the cases most relevant to the /// row-filter Auto policy. This is intentionally narrower than /// [`benchmark_async_strategy_matrix`]: it keeps the benchmark output focused /// on cases where later PRs may teach `Auto` to switch execution modes or /// explicitly keep predicate pushdown. /// -/// The `profile_*` cases are derived from DataFusion ClickBench and TPC-DS -/// comparisons. They keep the reader-level shapes worth tracking while -/// excluding query regressions that did not construct a Parquet `RowFilter`. +/// The cases use structure-oriented names. Comments on [`FilterType`] keep the +/// ClickBench and TPC-DS provenance, but these are synthetic reader shapes, not +/// end-to-end query benchmarks. +/// +/// Individual [`FilterType`] variants include shaded-row diagrams for the +/// representative selection shapes. fn benchmark_async_auto_policy_focus(c: &mut Criterion) { const SMALL_TOTAL_ROWS: usize = 20_000; const SMALL_ROW_GROUP_SIZE: usize = 5_000; @@ -1068,6 +1280,7 @@ fn benchmark_async_auto_policy_focus(c: &mut Criterion) { SMALL_ROW_GROUP_SIZE, )); let cases = [ + // Baseline selectivity shapes. AsyncFocusCase::new( "utf8_non_empty", parquet_file.clone(), @@ -1104,194 +1317,200 @@ fn benchmark_async_auto_policy_focus(c: &mut Criterion) { FilterType::SelectiveUnclustered, ProjectionCase::ExcludeFilterColumn, ), + // Filter-only and count-only shapes. These guard the cases where there + // is no deferred output column to amortize the cost of row selection. AsyncFocusCase::new( - "profile_q37_scalar_utf8", + "point_lookup_filter_only", parquet_file.clone(), - FilterType::ClickBenchQ37ScalarPrefix, - ProjectionCase::Utf8Only, + FilterType::PointLookup, + ProjectionCase::FilterColumnsOnly, ), - // Historical Q6 focus case: cheap fixed-width predicate before the - // unprojected variable-width predicate. AsyncFocusCase::new( - "profile_q6_mixed_predicates", + "projected_predicate_8pct_filter_only", parquet_file.clone(), - FilterType::ClickBenchQ6MixedPredicates, - ProjectionCase::Float64Only, + FilterType::ProjectedPredicate8Pct, + ProjectionCase::FilterColumnsOnly, ), AsyncFocusCase::new( - "profile_varwidth_then_fixed_prefix", + "sparse_scalar_count_only", parquet_file.clone(), - FilterType::ClickBenchQ6VarWidthFirst, - ProjectionCase::Float64Only, + FilterType::SparseScalarFixedOutput, + ProjectionCase::CountOnly, + ), + AsyncFocusCase::new( + "small_fragmented_scalar_filter_only", + small_parquet_file.clone(), + FilterType::ModeratelySelectiveUnclustered, + ProjectionCase::FilterColumnsOnly, ), AsyncFocusCase::new( - "profile_q40_scalar_group_by", + "quantity_range_filter_columns_only", parquet_file.clone(), - FilterType::ClickBenchQ40ScalarGroupBy, - ProjectionCase::Float64AndTs, + FilterType::QuantityRangePredicate, + ProjectionCase::FilterColumnsOnly, ), + // Deferred-output shapes. Predicate columns are not part of the output, + // so pushdown can skip decoding projected columns for rejected rows. AsyncFocusCase::new( - "profile_q41_sparse_fixed_output", + "scalar_prefix_utf8_output", parquet_file.clone(), - FilterType::ClickBenchQ41SparseFixedOutput, + FilterType::ScalarPrefixUtf8Output, + ProjectionCase::Utf8Only, + ), + AsyncFocusCase::new( + "small_scalar_prefix_utf8_output", + small_parquet_file.clone(), + FilterType::ScalarPrefixUtf8Output, + ProjectionCase::Utf8Only, + ), + AsyncFocusCase::new( + "point_lookup_deferred_fixed_output", + parquet_file.clone(), + FilterType::PointLookup, ProjectionCase::Float64Only, ), AsyncFocusCase::new( - "profile_tpcds_q41_complex_or", + "sparse_scalar_fixed_output", parquet_file.clone(), - FilterType::TpcdsQ41ComplexOr, + FilterType::SparseScalarFixedOutput, ProjectionCase::Float64Only, ), AsyncFocusCase::new( - "profile_tpcds_q20_projected_dynamic_filters", + "quantity_range_numeric_output", parquet_file.clone(), - FilterType::TpcdsQ20ProjectedDynamicFilters, - ProjectionCase::FixedColumns, + FilterType::QuantityRangePredicate, + ProjectionCase::Float64Only, ), + // Multi-predicate shapes. These make predicate order and predicate + // evaluation cost visible separately from projection cost. AsyncFocusCase::new( - "profile_q21_projected_predicate_fixed_output", + "fixed_then_varwidth_predicates", parquet_file.clone(), - FilterType::TpcdsQ21ProjectedFixedOutput, - ProjectionCase::FixedColumns, + FilterType::FixedThenVarWidthPredicates, + ProjectionCase::Float64Only, ), AsyncFocusCase::new( - "profile_q2_projected_predicate_5pct", + "varwidth_then_fixed_predicates", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate5Pct, - ProjectionCase::Int64AndFloat64, + FilterType::VarWidthThenFixedPredicates, + ProjectionCase::Float64Only, ), AsyncFocusCase::new( - "profile_q2_projected_predicate_8pct_filter_only", + "multi_scalar_projected_key", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate8Pct, - ProjectionCase::FilterColumnsOnly, + FilterType::MultiScalarProjectedKey, + ProjectionCase::Float64AndTs, ), AsyncFocusCase::new( - "profile_q2_projected_predicate_8pct_fixed_output", + "complex_or_mixed_predicates", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate8Pct, - ProjectionCase::Int64AndFloat64, + FilterType::ComplexOrMixedPredicates, + ProjectionCase::Float64Only, ), + // Projected-predicate shapes. The predicate column is also projected, + // so pushdown must not assume the predicate decode is purely overhead. AsyncFocusCase::new( - "profile_q2_projected_predicate_8pct_varwidth_output", + "projected_dynamic_filters", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate8Pct, - ProjectionCase::Int64AndUtf8, + FilterType::ProjectedDynamicFilters, + ProjectionCase::FixedColumns, ), AsyncFocusCase::new( - "profile_q2_projected_predicate_10pct", + "sparse_projected_predicates_fixed_output", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate10Pct, + FilterType::SparseProjectedPredicatesFixedOutput, + ProjectionCase::FixedColumns, + ), + AsyncFocusCase::new( + "projected_predicate_5pct_fixed_output", + parquet_file.clone(), + FilterType::ProjectedPredicate5Pct, ProjectionCase::Int64AndFloat64, ), AsyncFocusCase::new( - "profile_q2_projected_predicate_20pct", + "projected_predicate_8pct_fixed_output", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate20Pct, + FilterType::ProjectedPredicate8Pct, ProjectionCase::Int64AndFloat64, ), AsyncFocusCase::new( - "profile_q2_projected_predicate_20pct_varwidth_output", + "projected_predicate_8pct_varwidth_output", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate20Pct, + FilterType::ProjectedPredicate8Pct, ProjectionCase::Int64AndUtf8, ), AsyncFocusCase::new( - "profile_projected_ts_8pct_fixed_output", + "projected_predicate_10pct_fixed_output", parquet_file.clone(), - FilterType::ProjectedTs8PctClustered, - ProjectionCase::Float64AndTs, + FilterType::ProjectedPredicate10Pct, + ProjectionCase::Int64AndFloat64, ), AsyncFocusCase::new( - "profile_projected_ts_8pct_varwidth_output", + "projected_predicate_20pct_fixed_output", parquet_file.clone(), - FilterType::ProjectedTs8PctClustered, - ProjectionCase::TsAndUtf8, + FilterType::ProjectedPredicate20Pct, + ProjectionCase::Int64AndFloat64, ), AsyncFocusCase::new( - "profile_projected_ts_20pct_fixed_output", + "projected_predicate_20pct_varwidth_output", parquet_file.clone(), - FilterType::ProjectedTs20PctClustered, - ProjectionCase::Float64AndTs, + FilterType::ProjectedPredicate20Pct, + ProjectionCase::Int64AndUtf8, ), AsyncFocusCase::new( - "profile_projected_ts_20pct_varwidth_output", + "projected_predicate_30pct_fixed_output", parquet_file.clone(), - FilterType::ProjectedTs20PctClustered, - ProjectionCase::TsAndUtf8, + FilterType::ProjectedPredicate30Pct, + ProjectionCase::Int64AndFloat64, ), AsyncFocusCase::new( - "profile_q2_projected_predicate_30pct", + "projected_predicate_40pct_fixed_output", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate30Pct, + FilterType::ProjectedPredicate40Pct, ProjectionCase::Int64AndFloat64, ), AsyncFocusCase::new( - "profile_q2_projected_predicate_40pct", + "projected_predicate_50pct_fixed_output", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate40Pct, + FilterType::ProjectedPredicate50Pct, ProjectionCase::Int64AndFloat64, ), AsyncFocusCase::new( - "profile_q2_projected_predicate_50pct", + "clustered_ts_8pct_fixed_output", parquet_file.clone(), - FilterType::TpcdsQ2ProjectedPredicate50Pct, - ProjectionCase::Int64AndFloat64, + FilterType::ClusteredTs8PctProjectedPredicate, + ProjectionCase::Float64AndTs, ), AsyncFocusCase::new( - "profile_q1_count_only", + "clustered_ts_8pct_varwidth_output", parquet_file.clone(), - FilterType::ClickBenchQ41SparseFixedOutput, - ProjectionCase::CountOnly, + FilterType::ClusteredTs8PctProjectedPredicate, + ProjectionCase::TsAndUtf8, ), AsyncFocusCase::new( - "profile_q19_no_defer", + "clustered_ts_20pct_fixed_output", parquet_file.clone(), - FilterType::PointLookup, - ProjectionCase::FilterColumnsOnly, + FilterType::ClusteredTs20PctProjectedPredicate, + ProjectionCase::Float64AndTs, ), AsyncFocusCase::new( - "profile_sparse_fixed_deferred_output", + "clustered_ts_20pct_varwidth_output", parquet_file.clone(), - FilterType::PointLookup, - ProjectionCase::Float64Only, + FilterType::ClusteredTs20PctProjectedPredicate, + ProjectionCase::TsAndUtf8, ), AsyncFocusCase::new( - "profile_tpcds_sparse_projected_fact_scan", + "sparse_projected_fact_scan", parquet_file.clone(), - FilterType::TpcdsSparseProjectedFactScan, + FilterType::SparseProjectedFactScan, ProjectionCase::FixedColumns, ), AsyncFocusCase::new( - "profile_q83_sparse_utf8_projected", + "sparse_utf8_projected_predicate", parquet_file.clone(), FilterType::Utf8ViewMissing, ProjectionCase::AllColumns, ), - AsyncFocusCase::new( - "profile_small_scalar_no_defer", - small_parquet_file.clone(), - FilterType::ModeratelySelectiveUnclustered, - ProjectionCase::FilterColumnsOnly, - ), - AsyncFocusCase::new( - "profile_small_q37_scalar_utf8", - small_parquet_file, - FilterType::ClickBenchQ37ScalarPrefix, - ProjectionCase::Utf8Only, - ), - AsyncFocusCase::new( - "profile_q9_quantity_count", - parquet_file.clone(), - FilterType::TpcdsQ9QuantityRange, - ProjectionCase::FilterColumnsOnly, - ), - AsyncFocusCase::new( - "profile_q9_quantity_avg", - parquet_file, - FilterType::TpcdsQ9QuantityRange, - ProjectionCase::Float64Only, - ), ]; let strategies = [ AsyncStrategy::FullPostFilter, @@ -1318,6 +1537,21 @@ fn benchmark_async_auto_policy_focus(c: &mut Criterion) { /// a narrow primitive projection where row-level pushdown metrics are zero. /// It deliberately lives outside the adaptive-materialization matrix because there is no /// filter strategy to choose. +/// +/// ```text +/// no RowFilter projected primitive columns +/// ┌───────────────┐ ┌───────────────┐ +/// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ +/// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ +/// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ +/// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ +/// │ ... │ │ ... │ +/// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ +/// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ +/// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ +/// │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ │▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│ +/// └───────────────┘ └───────────────┘ +/// ``` fn benchmark_projection_scan_focus(c: &mut Criterion) { let parquet_file = Bytes::from(write_parquet_file()); let rt = tokio::runtime::Builder::new_multi_thread() @@ -1327,7 +1561,7 @@ fn benchmark_projection_scan_focus(c: &mut Criterion) { let mut group = c.benchmark_group("arrow_reader_projection_scan_focus"); - let case_name = "profile_q83_return_scan_primitives"; + let case_name = "primitive_projection_only"; let projection = vec![0, 1, 3]; let reader = InMemoryReader::try_new(&parquet_file).unwrap(); let metadata = Arc::clone(reader.metadata()); @@ -1402,11 +1636,11 @@ fn benchmark_async_focus_case( schema_descr, filter_type.filter_projection().iter().copied(), ); - let q6_int64_pred_mask = ProjectionMask::roots(schema_descr, [0]); - let q6_utf8_pred_mask = ProjectionMask::roots(schema_descr, [2]); - let q41_int64_pred_mask = ProjectionMask::roots(schema_descr, [0]); - let q41_ts_pred_mask = ProjectionMask::roots(schema_descr, [3]); - let q40_float64_pred_mask = ProjectionMask::roots(schema_descr, [1]); + let fixed_pred_mask = ProjectionMask::roots(schema_descr, [0]); + let varwidth_pred_mask = ProjectionMask::roots(schema_descr, [2]); + let sparse_int64_pred_mask = ProjectionMask::roots(schema_descr, [0]); + let sparse_ts_pred_mask = ProjectionMask::roots(schema_descr, [3]); + let scalar_float64_pred_mask = ProjectionMask::roots(schema_descr, [1]); for strategy in strategies.iter().copied() { let bench_id = BenchmarkId::new( @@ -1419,11 +1653,11 @@ fn benchmark_async_focus_case( b.iter(|| { let reader = reader.clone(); let pred_mask = pred_mask.clone(); - let q6_int64_pred_mask = q6_int64_pred_mask.clone(); - let q6_utf8_pred_mask = q6_utf8_pred_mask.clone(); - let q41_int64_pred_mask = q41_int64_pred_mask.clone(); - let q41_ts_pred_mask = q41_ts_pred_mask.clone(); - let q40_float64_pred_mask = q40_float64_pred_mask.clone(); + let fixed_pred_mask = fixed_pred_mask.clone(); + let varwidth_pred_mask = varwidth_pred_mask.clone(); + let sparse_int64_pred_mask = sparse_int64_pred_mask.clone(); + let sparse_ts_pred_mask = sparse_ts_pred_mask.clone(); + let scalar_float64_pred_mask = scalar_float64_pred_mask.clone(); let projection_mask = projection_mask.clone(); let read_projection_mask = read_projection_mask.clone(); let output_column_names = output_column_names.clone(); @@ -1443,11 +1677,11 @@ fn benchmark_async_focus_case( let row_filter = row_filter_for_focus_case( filter_type, pred_mask, - q6_int64_pred_mask, - q6_utf8_pred_mask, - q41_int64_pred_mask, - q41_ts_pred_mask, - q40_float64_pred_mask, + fixed_pred_mask, + varwidth_pred_mask, + sparse_int64_pred_mask, + sparse_ts_pred_mask, + scalar_float64_pred_mask, ); benchmark_async_reader_with_policy( reader, @@ -1461,11 +1695,11 @@ fn benchmark_async_focus_case( let row_filter = row_filter_for_focus_case( filter_type, pred_mask, - q6_int64_pred_mask, - q6_utf8_pred_mask, - q41_int64_pred_mask, - q41_ts_pred_mask, - q40_float64_pred_mask, + fixed_pred_mask, + varwidth_pred_mask, + sparse_int64_pred_mask, + sparse_ts_pred_mask, + scalar_float64_pred_mask, ); benchmark_async_reader_with_policy( reader, @@ -1479,11 +1713,11 @@ fn benchmark_async_focus_case( let row_filter = row_filter_for_focus_case( filter_type, pred_mask, - q6_int64_pred_mask, - q6_utf8_pred_mask, - q41_int64_pred_mask, - q41_ts_pred_mask, - q40_float64_pred_mask, + fixed_pred_mask, + varwidth_pred_mask, + sparse_int64_pred_mask, + sparse_ts_pred_mask, + scalar_float64_pred_mask, ); benchmark_async_reader_with_policy( reader, @@ -1576,50 +1810,50 @@ fn row_filter_for(filter_type: FilterType, pred_mask: ProjectionMask) -> RowFilt fn row_filter_for_focus_case( filter_type: FilterType, pred_mask: ProjectionMask, - q6_int64_pred_mask: ProjectionMask, - q6_utf8_pred_mask: ProjectionMask, - q41_int64_pred_mask: ProjectionMask, - q41_ts_pred_mask: ProjectionMask, - q40_float64_pred_mask: ProjectionMask, + fixed_pred_mask: ProjectionMask, + varwidth_pred_mask: ProjectionMask, + sparse_int64_pred_mask: ProjectionMask, + sparse_ts_pred_mask: ProjectionMask, + scalar_float64_pred_mask: ProjectionMask, ) -> RowFilter { match filter_type { - FilterType::ClickBenchQ6MixedPredicates | FilterType::ClickBenchQ6VarWidthFirst => { - let int64_filter = - ArrowPredicateFn::new(q6_int64_pred_mask, move |batch: RecordBatch| { - let int64 = batch.column(batch.schema().index_of("int64")?); - eq(int64, &Int64Array::new_scalar(9999)) - }); + FilterType::FixedThenVarWidthPredicates | FilterType::VarWidthThenFixedPredicates => { + let int64_filter = ArrowPredicateFn::new(fixed_pred_mask, move |batch: RecordBatch| { + let int64 = batch.column(batch.schema().index_of("int64")?); + eq(int64, &Int64Array::new_scalar(9999)) + }); let utf8_filter = - ArrowPredicateFn::new(q6_utf8_pred_mask, move |batch: RecordBatch| { + ArrowPredicateFn::new(varwidth_pred_mask, move |batch: RecordBatch| { let utf8 = batch.column(batch.schema().index_of("utf8View")?); neq(utf8, &StringViewArray::new_scalar("")) }); match filter_type { - FilterType::ClickBenchQ6MixedPredicates => { + FilterType::FixedThenVarWidthPredicates => { RowFilter::new(vec![Box::new(int64_filter), Box::new(utf8_filter)]) } - FilterType::ClickBenchQ6VarWidthFirst => { + FilterType::VarWidthThenFixedPredicates => { RowFilter::new(vec![Box::new(utf8_filter), Box::new(int64_filter)]) } _ => unreachable!(), } } - FilterType::ClickBenchQ40ScalarGroupBy => { + FilterType::MultiScalarProjectedKey => { let int64_filter = - ArrowPredicateFn::new(q41_int64_pred_mask, move |batch: RecordBatch| { + ArrowPredicateFn::new(sparse_int64_pred_mask, move |batch: RecordBatch| { let int64 = batch.column(batch.schema().index_of("int64")?); eq(int64, &Int64Array::new_scalar(62)) }); let float64_filter = - ArrowPredicateFn::new(q40_float64_pred_mask, move |batch: RecordBatch| { + ArrowPredicateFn::new(scalar_float64_pred_mask, move |batch: RecordBatch| { let float64 = batch.column(batch.schema().index_of("float64")?); gt(float64, &Float64Array::new_scalar(10.0)) }); - let ts_filter = ArrowPredicateFn::new(q41_ts_pred_mask, move |batch: RecordBatch| { - let ts = batch.column(batch.schema().index_of("ts")?); - lt(ts, &TimestampMillisecondArray::new_scalar(9000)) - }); + let ts_filter = + ArrowPredicateFn::new(sparse_ts_pred_mask, move |batch: RecordBatch| { + let ts = batch.column(batch.schema().index_of("ts")?); + lt(ts, &TimestampMillisecondArray::new_scalar(9000)) + }); RowFilter::new(vec![ Box::new(int64_filter), @@ -1627,22 +1861,23 @@ fn row_filter_for_focus_case( Box::new(ts_filter), ]) } - FilterType::ClickBenchQ41SparseFixedOutput - | FilterType::TpcdsQ20ProjectedDynamicFilters - | FilterType::TpcdsQ21ProjectedFixedOutput => { + FilterType::SparseScalarFixedOutput + | FilterType::ProjectedDynamicFilters + | FilterType::SparseProjectedPredicatesFixedOutput => { let int64_filter = - ArrowPredicateFn::new(q41_int64_pred_mask, move |batch: RecordBatch| { + ArrowPredicateFn::new(sparse_int64_pred_mask, move |batch: RecordBatch| { let int64 = batch.column(batch.schema().index_of("int64")?); let scalar = match filter_type { - FilterType::TpcdsQ20ProjectedDynamicFilters => 12, + FilterType::ProjectedDynamicFilters => 12, _ => 8, }; lt(int64, &Int64Array::new_scalar(scalar)) }); - let ts_filter = ArrowPredicateFn::new(q41_ts_pred_mask, move |batch: RecordBatch| { - let ts = batch.column(batch.schema().index_of("ts")?); - lt(ts, &TimestampMillisecondArray::new_scalar(9000)) - }); + let ts_filter = + ArrowPredicateFn::new(sparse_ts_pred_mask, move |batch: RecordBatch| { + let ts = batch.column(batch.schema().index_of("ts")?); + lt(ts, &TimestampMillisecondArray::new_scalar(9000)) + }); RowFilter::new(vec![Box::new(int64_filter), Box::new(ts_filter)]) } diff --git a/parquet/benches/row_selection_cursor.rs b/parquet/benches/row_selection_cursor.rs index 8b5b13cdeea5..c458f5c12a52 100644 --- a/parquet/benches/row_selection_cursor.rs +++ b/parquet/benches/row_selection_cursor.rs @@ -15,6 +15,15 @@ // specific language governing permissions and limitations // under the License. +//! Benchmarks the cost of applying `RowSelection` as selector queues versus +//! boolean masks. +//! +//! The broad sweep varies selector length, selection density, run-length +//! distribution, data type, projected column count, and `Utf8View` payload size. +//! The shape-focus suite keeps the data shape narrower and varies the maximum +//! selected-run length (`maxrun`) so the results can show where +//! `RowSelectionPolicy::Auto` should prefer `Selectors` or `Mask`. + use std::hint; use std::sync::Arc; @@ -35,6 +44,8 @@ const BATCH_SIZE: usize = 1 << 10; const BASE_SEED: u64 = 0xA55AA55A; const AVG_SELECTOR_LENGTHS: &[usize] = &[4, 8, 12, 16, 20, 24, 28, 32, 36, 40]; const SHAPE_FOCUS_SELECTED_RUN_LENGTHS: &[usize] = &[1, 2, 4, 8, 32]; +// At 80% selectivity, maxrun1 and maxrun2 cannot be represented without +// zero-length skip runs, so the dense-focused cases start at maxrun4. const DENSE_SHAPE_FOCUS_SELECTED_RUN_LENGTHS: &[usize] = &[4, 8, 32]; const COLUMN_WIDTHS: &[usize] = &[2, 4, 8, 16, 32]; const UTF8VIEW_LENS: &[usize] = &[4, 8, 16, 32, 64, 128, 256]; @@ -235,6 +246,12 @@ fn criterion_benchmark(c: &mut Criterion) { bench_shape_focus(c); } +/// Focused selector-shape matrix for `Selectors` versus `Mask`. +/// +/// It fixes the input profile to `int32` and `utf8view`, then varies +/// selectivity and the requested maximum selected-run length. The benchmark +/// suffix reports this as `maxrunNN` because the final selected run may be +/// shorter than the requested maximum. fn bench_shape_focus(c: &mut Criterion) { let profiles = [ DataProfile { From a717d7f8ee74bedb4ecf5195e93edef2d5846442 Mon Sep 17 00:00:00 2001 From: Qiwei Huang Date: Mon, 15 Jun 2026 01:00:37 +0800 Subject: [PATCH 9/9] ci(parquet): lock pyspark cargo installs --- .github/workflows/parquet.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/parquet.yml b/.github/workflows/parquet.yml index 9ae7d47eddc6..a0d8495170eb 100644 --- a/.github/workflows/parquet.yml +++ b/.github/workflows/parquet.yml @@ -175,8 +175,8 @@ jobs: rustup default ${{ matrix.rust }} - name: Install binary for checking run: | - cargo install --path parquet --bin parquet-show-bloom-filter --features=cli - cargo install --path parquet --bin parquet-fromcsv --features=arrow,cli + cargo install --path parquet --bin parquet-show-bloom-filter --features=cli --locked + cargo install --path parquet --bin parquet-fromcsv --features=arrow,cli --locked - name: Run pytest run: | cd parquet/pytest