From 87ab5cfffbfe53f4ab0ed21152a851c193cd247b Mon Sep 17 00:00:00 2001 From: liujiwen-up Date: Mon, 15 Jun 2026 22:58:46 +0800 Subject: [PATCH 1/3] feat: add conservative ORC predicate pushdown --- crates/integration_tests/tests/read_tables.rs | 139 +++++ crates/paimon/src/arrow/format/mod.rs | 3 +- crates/paimon/src/arrow/format/orc.rs | 570 +++++++++++++++++- crates/paimon/src/table/read_builder.rs | 11 +- 4 files changed, 710 insertions(+), 13 deletions(-) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index e5c3b05d..236eaf6e 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -96,8 +96,19 @@ async fn scan_and_read_with_fs_catalog( async fn scan_and_read_with_filter( table: &paimon::Table, filter: Predicate, +) -> (Plan, Vec) { + scan_and_read_with_projection_and_filter(table, None, filter).await +} + +async fn scan_and_read_with_projection_and_filter( + table: &paimon::Table, + projection: Option<&[&str]>, + filter: Predicate, ) -> (Plan, Vec) { let mut read_builder = table.new_read_builder(); + if let Some(cols) = projection { + read_builder.with_projection(cols); + } read_builder.with_filter(filter); let scan = read_builder.new_scan(); let plan = scan.plan().await.expect("Failed to plan scan"); @@ -2935,6 +2946,134 @@ async fn test_read_full_types_table() { assert_eq!(r.18, ("carol".into(), 300)); // struct } +#[tokio::test] +async fn test_read_orc_with_filter_only_column_projection() { + use paimon::spec::{Datum, PredicateBuilder}; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "full_types_table").await; + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb + .equal("id", Datum::Int(2)) + .expect("Failed to build id predicate"); + + let (_, batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["col_string"]), filter).await; + + let mut values = Vec::new(); + for batch in &batches { + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "col_string"); + let col_string = batch + .column_by_name("col_string") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("Expected StringArray for col_string"); + values.extend((0..batch.num_rows()).map(|row| col_string.value(row).to_string())); + } + + assert_eq!(values, vec!["orc-world"]); +} + +async fn assert_full_types_orc_filter_matches( + filter: Predicate, + projected_column: &str, + expected_string_values: &[&str], +) { + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "full_types_table").await; + + let (_, batches) = + scan_and_read_with_projection_and_filter(&table, Some(&[projected_column]), filter).await; + + let mut values = Vec::new(); + for batch in &batches { + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), projected_column); + let column = batch + .column_by_name(projected_column) + .and_then(|c| c.as_any().downcast_ref::()) + .expect("Expected StringArray for projected column"); + values.extend((0..batch.num_rows()).map(|row| column.value(row).to_string())); + } + + assert_eq!(values, expected_string_values); +} + +#[tokio::test] +async fn test_read_orc_with_supported_predicate_pushdown_types() { + use paimon::spec::{Datum, PredicateBuilder}; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "full_types_table").await; + let pb = PredicateBuilder::new(table.schema().fields()); + + let cases = vec![ + pb.equal("col_boolean", Datum::Bool(false)) + .expect("build boolean predicate"), + pb.equal("col_tinyint", Datum::TinyInt(2)) + .expect("build tinyint predicate"), + pb.equal("col_smallint", Datum::SmallInt(200)) + .expect("build smallint predicate"), + pb.equal("col_int", Datum::Int(2000)) + .expect("build int predicate"), + pb.equal("col_bigint", Datum::Long(200000)) + .expect("build bigint predicate"), + pb.greater_or_equal("col_string", Datum::String("orc-world".to_string())) + .expect("build string lower-bound predicate"), + pb.less_or_equal("col_string", Datum::String("orc-world".to_string())) + .expect("build string upper-bound predicate"), + ]; + + for filter in cases { + let (_, batches) = + scan_and_read_with_projection_and_filter(&table, Some(&["col_string"]), filter).await; + + let mut values = Vec::new(); + for batch in &batches { + assert_eq!(batch.num_columns(), 1); + assert_eq!(batch.schema().field(0).name(), "col_string"); + let col_string = batch + .column_by_name("col_string") + .and_then(|c| c.as_any().downcast_ref::()) + .expect("Expected StringArray for col_string"); + values.extend((0..batch.num_rows()).map(|row| col_string.value(row).to_string())); + } + + assert_eq!(values, vec!["orc-world"]); + } +} + +#[tokio::test] +async fn test_read_orc_with_unsupported_date_predicate_remains_residual() { + use paimon::spec::{Datum, PredicateBuilder}; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "full_types_table").await; + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb + .greater_or_equal("col_date", Datum::Date(19889)) + .expect("build date predicate"); + + assert_full_types_orc_filter_matches(filter, "col_string", &["orc-world", "avro-test"]).await; +} + +#[tokio::test] +async fn test_read_orc_predicate_pushdown_remains_conservative() { + use paimon::spec::{Datum, PredicateBuilder}; + + let catalog = create_file_system_catalog(); + let table = get_table_from_catalog(&catalog, "full_types_table").await; + let pb = PredicateBuilder::new(table.schema().fields()); + let filter = pb.equal("id", Datum::Int(2)).expect("build id predicate"); + + assert!( + !table.new_read_builder().is_exact_filter_pushdown(&filter), + "ORC reader pruning must not make data predicates exact at the table boundary" + ); + + assert_full_types_orc_filter_matches(filter, "col_string", &["orc-world"]).await; +} + #[tokio::test] async fn test_read_full_types_boundary_table() { use arrow_array::{ diff --git a/crates/paimon/src/arrow/format/mod.rs b/crates/paimon/src/arrow/format/mod.rs index 17552e1b..e568bc5a 100644 --- a/crates/paimon/src/arrow/format/mod.rs +++ b/crates/paimon/src/arrow/format/mod.rs @@ -43,7 +43,8 @@ pub(crate) struct FilePredicates { /// /// Each implementation (Parquet, ORC, ...) handles: /// - Column projection -/// - Predicate pushdown (row-group/stripe pruning + row-level filtering) +/// - Predicate pushdown where supported (row-group/stripe pruning and, for +/// some formats, row-level filtering) /// - Row range selection #[async_trait] pub(crate) trait FormatFileReader: Send + Sync { diff --git a/crates/paimon/src/arrow/format/orc.rs b/crates/paimon/src/arrow/format/orc.rs index 8806fec1..f4b078c0 100644 --- a/crates/paimon/src/arrow/format/orc.rs +++ b/crates/paimon/src/arrow/format/orc.rs @@ -17,16 +17,20 @@ use super::{FilePredicates, FormatFileReader}; use crate::io::FileRead; -use crate::spec::DataField; +use crate::spec::{DataField, DataType, Datum, Predicate, PredicateOperator}; use crate::table::{ArrowRecordBatchStream, RowRange}; use crate::Error; +use arrow_array::RecordBatch; use async_trait::async_trait; use bytes::Bytes; use futures::{future::BoxFuture, StreamExt}; +use orc_rust::predicate::PredicateValue; use orc_rust::projection::ProjectionMask; use orc_rust::reader::AsyncChunkReader; use orc_rust::ArrowReaderBuilder; +const ORC_IN_PREDICATE_MAX_LITERALS: usize = 20; + pub(crate) struct OrcFormatReader; #[async_trait] @@ -36,8 +40,7 @@ impl FormatFileReader for OrcFormatReader { reader: Box, file_size: u64, read_fields: &[DataField], - // TODO: support predicate pushdown for ORC (stripe pruning + row-level filtering) - _predicates: Option<&FilePredicates>, + predicates: Option<&FilePredicates>, batch_size: Option, row_selection: Option>, ) -> crate::Result { @@ -50,12 +53,21 @@ impl FormatFileReader for OrcFormatReader { source: Some(Box::new(e)), })?; - let projected_names: Vec<&str> = read_fields.iter().map(|f| f.name()).collect(); + let mut projected_names: Vec = + read_fields.iter().map(|f| f.name().to_string()).collect(); + let orc_predicate = build_orc_predicate(predicates); + if let Some(ref predicate) = orc_predicate { + collect_orc_predicate_columns(predicate, &mut projected_names); + } let projection = ProjectionMask::named_roots(builder.file_metadata().root_data_type(), &projected_names); let mut builder = builder.with_projection(projection); + if let Some(predicate) = orc_predicate { + builder = builder.with_predicate(predicate); + } + if let Some(size) = batch_size { builder = builder.with_batch_size(size); } @@ -72,17 +84,263 @@ impl FormatFileReader for OrcFormatReader { } let stream = builder.build_async(); + let requested_names: Vec = + read_fields.iter().map(|f| f.name().to_string()).collect(); Ok(stream - .map(|r| { - r.map_err(|e| Error::UnexpectedError { + .map(move |r| { + let batch = r.map_err(|e| Error::UnexpectedError { message: format!("ORC read error: {e}"), source: Some(Box::new(e)), - }) + })?; + project_orc_batch_to_requested_fields(batch, &requested_names) }) .boxed()) } } +// --------------------------------------------------------------------------- +// Paimon predicates → orc-rust conservative row-group predicates. +// +// orc-rust evaluates these predicates against row-group statistics and may keep +// non-matching rows from a selected row group. Exact residual filtering remains +// the caller's responsibility. +// --------------------------------------------------------------------------- + +fn build_orc_predicate( + predicates: Option<&FilePredicates>, +) -> Option { + let predicates = predicates?; + let mut orc_predicates = Vec::new(); + for predicate in &predicates.predicates { + if let Some(predicate) = build_orc_predicate_inner( + predicate, + &predicates.file_fields, + CompoundPredicateMode::RootAnd, + ) { + orc_predicates.push(predicate); + } + } + + match orc_predicates.len() { + 0 => None, + 1 => orc_predicates.pop(), + _ => Some(orc_rust::predicate::Predicate::and(orc_predicates)), + } +} + +fn build_orc_predicate_inner( + predicate: &Predicate, + file_fields: &[DataField], + mode: CompoundPredicateMode, +) -> Option { + match predicate { + Predicate::Leaf { .. } => build_orc_leaf_predicate(predicate, file_fields), + Predicate::And(children) => build_orc_and_predicate(children, file_fields, mode), + Predicate::Or(children) => build_orc_or_predicate(children, file_fields), + Predicate::AlwaysTrue => None, + Predicate::AlwaysFalse | Predicate::Not(_) => None, + } +} + +#[derive(Clone, Copy)] +enum CompoundPredicateMode { + RootAnd, + RequireExact, +} + +fn build_orc_and_predicate( + children: &[Predicate], + file_fields: &[DataField], + mode: CompoundPredicateMode, +) -> Option { + let require_exact = matches!(mode, CompoundPredicateMode::RequireExact); + + let mut converted = Vec::with_capacity(children.len()); + for child in children { + match build_orc_predicate_inner(child, file_fields, CompoundPredicateMode::RootAnd) { + Some(predicate) => converted.push(predicate), + None if require_exact => return None, + None => {} + } + } + + match converted.len() { + 0 => None, + 1 => converted.pop(), + _ => Some(orc_rust::predicate::Predicate::and(converted)), + } +} + +fn build_orc_or_predicate( + children: &[Predicate], + file_fields: &[DataField], +) -> Option { + let mut converted = Vec::with_capacity(children.len()); + for child in children { + converted.push(build_orc_predicate_inner( + child, + file_fields, + CompoundPredicateMode::RequireExact, + )?); + } + + match converted.len() { + 0 => None, + 1 => converted.pop(), + _ => Some(orc_rust::predicate::Predicate::or(converted)), + } +} + +fn build_orc_leaf_predicate( + predicate: &Predicate, + file_fields: &[DataField], +) -> Option { + let Predicate::Leaf { + index, + op, + literals, + .. + } = predicate + else { + return None; + }; + let file_field = file_fields.get(*index)?; + let column = file_field.name(); + + match op { + PredicateOperator::IsNotNull + if data_type_supported_for_orc_predicate(file_field.data_type()) => + { + Some(orc_rust::predicate::Predicate::is_not_null(column)) + } + PredicateOperator::Eq + | PredicateOperator::Lt + | PredicateOperator::LtEq + | PredicateOperator::Gt + | PredicateOperator::GtEq => { + if *op == PredicateOperator::Eq + && matches!( + file_field.data_type(), + DataType::Float(_) | DataType::Double(_) + ) + { + return None; + } + let literal = literals.first()?; + let value = datum_to_orc_value(literal, file_field.data_type())?; + Some(match op { + PredicateOperator::Eq => orc_rust::predicate::Predicate::eq(column, value), + PredicateOperator::Lt => orc_rust::predicate::Predicate::lt(column, value), + PredicateOperator::LtEq => orc_rust::predicate::Predicate::lte(column, value), + PredicateOperator::Gt => orc_rust::predicate::Predicate::gt(column, value), + PredicateOperator::GtEq => orc_rust::predicate::Predicate::gte(column, value), + _ => unreachable!(), + }) + } + PredicateOperator::In => { + if literals.is_empty() || literals.len() > ORC_IN_PREDICATE_MAX_LITERALS { + return None; + } + let mut values = Vec::with_capacity(literals.len()); + for literal in literals { + values.push(orc_rust::predicate::Predicate::eq( + column, + datum_to_orc_value(literal, file_field.data_type())?, + )); + } + Some(orc_rust::predicate::Predicate::or(values)) + } + PredicateOperator::IsNull | PredicateOperator::NotEq | PredicateOperator::NotIn => None, + PredicateOperator::IsNotNull => None, + } +} + +fn data_type_supported_for_orc_predicate(data_type: &DataType) -> bool { + matches!( + data_type, + DataType::Boolean(_) + | DataType::TinyInt(_) + | DataType::SmallInt(_) + | DataType::Int(_) + | DataType::BigInt(_) + | DataType::Float(_) + | DataType::Double(_) + | DataType::Char(_) + | DataType::VarChar(_) + ) +} + +fn datum_to_orc_value(datum: &Datum, data_type: &DataType) -> Option { + match (datum, data_type) { + (Datum::Bool(value), DataType::Boolean(_)) => Some(PredicateValue::Boolean(Some(*value))), + (Datum::TinyInt(value), DataType::TinyInt(_)) => Some(PredicateValue::Int8(Some(*value))), + (Datum::SmallInt(value), DataType::SmallInt(_)) => { + Some(PredicateValue::Int16(Some(*value))) + } + (Datum::Int(value), DataType::Int(_)) => Some(PredicateValue::Int32(Some(*value))), + (Datum::Long(value), DataType::BigInt(_)) => Some(PredicateValue::Int64(Some(*value))), + (Datum::Float(value), DataType::Float(_)) => Some(PredicateValue::Float32(Some(*value))), + (Datum::Double(value), DataType::Double(_)) => Some(PredicateValue::Float64(Some(*value))), + (Datum::String(value), DataType::Char(_) | DataType::VarChar(_)) => { + Some(PredicateValue::Utf8(Some(value.clone()))) + } + _ => None, + } +} + +fn collect_orc_predicate_columns( + predicate: &orc_rust::predicate::Predicate, + projected_names: &mut Vec, +) { + collect_orc_predicate_columns_inner(predicate, projected_names); +} + +fn collect_orc_predicate_columns_inner( + predicate: &orc_rust::predicate::Predicate, + projected_names: &mut Vec, +) { + match predicate { + orc_rust::predicate::Predicate::Comparison { column, .. } + | orc_rust::predicate::Predicate::IsNull { column } + | orc_rust::predicate::Predicate::IsNotNull { column } => { + if !projected_names.iter().any(|name| name == column) { + projected_names.push(column.clone()); + } + } + orc_rust::predicate::Predicate::And(children) + | orc_rust::predicate::Predicate::Or(children) => { + for child in children { + collect_orc_predicate_columns_inner(child, projected_names); + } + } + orc_rust::predicate::Predicate::Not(child) => { + collect_orc_predicate_columns_inner(child, projected_names); + } + } +} + +fn project_orc_batch_to_requested_fields( + batch: RecordBatch, + requested_names: &[String], +) -> crate::Result { + let indices: Vec = requested_names + .iter() + .map(|name| { + batch + .schema() + .index_of(name) + .map_err(|e| Error::UnexpectedError { + message: format!("ORC batch is missing requested column '{name}': {e}"), + source: Some(Box::new(e)), + }) + }) + .collect::>()?; + batch.project(&indices).map_err(|e| Error::UnexpectedError { + message: format!("Failed to project ORC batch: {e}"), + source: Some(Box::new(e)), + }) +} + // --------------------------------------------------------------------------- // Row ranges → orc_rust::RowSelection // --------------------------------------------------------------------------- @@ -156,7 +414,33 @@ impl AsyncChunkReader for OrcFileReader { #[cfg(test)] mod tests { use super::*; + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType as ArrowDataType, Field, Schema}; use orc_rust::row_selection::RowSelector; + use std::sync::Arc; + + use crate::spec::{DateType, DecimalType, DoubleType, FloatType, IntType}; + + fn field(index: i32, name: &str, data_type: DataType) -> DataField { + DataField::new(index, name.to_string(), data_type) + } + + fn leaf(index: usize, op: PredicateOperator, literals: Vec) -> Predicate { + Predicate::Leaf { + column: format!("c{index}"), + index, + data_type: DataType::Int(IntType::new()), + op, + literals, + } + } + + fn file_predicates(predicates: Vec, file_fields: Vec) -> FilePredicates { + FilePredicates { + predicates, + file_fields, + } + } #[test] fn test_build_range_row_selection_single_range() { @@ -192,4 +476,276 @@ mod tests { let expected: orc_rust::row_selection::RowSelection = vec![RowSelector::skip(5)].into(); assert_eq!(sel, expected); } + + #[test] + fn test_build_orc_predicate_supported_leaf() { + let predicates = file_predicates( + vec![leaf(0, PredicateOperator::GtEq, vec![Datum::Int(7)])], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + let predicate = build_orc_predicate(Some(&predicates)).unwrap(); + assert_eq!( + predicate, + orc_rust::predicate::Predicate::gte("id", PredicateValue::Int32(Some(7))) + ); + } + + #[test] + fn test_build_orc_predicate_type_mismatch_fails_open() { + let predicates = file_predicates( + vec![leaf(0, PredicateOperator::Eq, vec![Datum::Long(7)])], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + assert!(build_orc_predicate(Some(&predicates)).is_none()); + } + + #[test] + fn test_build_orc_predicate_unsupported_type_fails_open() { + let predicates = file_predicates( + vec![Predicate::Leaf { + column: "dt".to_string(), + index: 0, + data_type: DataType::Date(DateType::new()), + op: PredicateOperator::Eq, + literals: vec![Datum::Date(1)], + }], + vec![field(0, "dt", DataType::Date(DateType::new()))], + ); + + assert!(build_orc_predicate(Some(&predicates)).is_none()); + } + + #[test] + fn test_build_orc_predicate_unsupported_operator_fails_open() { + let predicates = file_predicates( + vec![leaf(0, PredicateOperator::NotEq, vec![Datum::Int(7)])], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + assert!(build_orc_predicate(Some(&predicates)).is_none()); + } + + #[test] + fn test_build_orc_predicate_float_eq_fails_open() { + let float_predicates = file_predicates( + vec![Predicate::Leaf { + column: "f".to_string(), + index: 0, + data_type: DataType::Float(FloatType::new()), + op: PredicateOperator::Eq, + literals: vec![Datum::Float(1.5)], + }], + vec![field(0, "f", DataType::Float(FloatType::new()))], + ); + let double_predicates = file_predicates( + vec![Predicate::Leaf { + column: "d".to_string(), + index: 0, + data_type: DataType::Double(DoubleType::new()), + op: PredicateOperator::Eq, + literals: vec![Datum::Double(2.5)], + }], + vec![field(0, "d", DataType::Double(DoubleType::new()))], + ); + + assert!(build_orc_predicate(Some(&float_predicates)).is_none()); + assert!(build_orc_predicate(Some(&double_predicates)).is_none()); + } + + #[test] + fn test_build_orc_predicate_is_not_null_requires_supported_type() { + let decimal_type = DataType::Decimal(DecimalType::new(10, 2).unwrap()); + let predicates = file_predicates( + vec![Predicate::Leaf { + column: "amount".to_string(), + index: 0, + data_type: decimal_type.clone(), + op: PredicateOperator::IsNotNull, + literals: vec![], + }], + vec![field(0, "amount", decimal_type)], + ); + + assert!(build_orc_predicate(Some(&predicates)).is_none()); + } + + #[test] + fn test_build_orc_predicate_index_out_of_bounds_fails_open() { + let predicates = file_predicates( + vec![leaf(1, PredicateOperator::Eq, vec![Datum::Int(7)])], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + assert!(build_orc_predicate(Some(&predicates)).is_none()); + } + + #[test] + fn test_build_orc_predicate_and_pushes_supported_children() { + let predicates = file_predicates( + vec![Predicate::and(vec![ + leaf(0, PredicateOperator::Gt, vec![Datum::Int(1)]), + leaf(0, PredicateOperator::NotEq, vec![Datum::Int(7)]), + ])], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + let predicate = build_orc_predicate(Some(&predicates)).unwrap(); + assert_eq!( + predicate, + orc_rust::predicate::Predicate::gt("id", PredicateValue::Int32(Some(1))) + ); + } + + #[test] + fn test_build_orc_predicate_top_level_and_pushes_supported_predicates() { + let predicates = file_predicates( + vec![ + leaf(0, PredicateOperator::Gt, vec![Datum::Int(1)]), + leaf(0, PredicateOperator::LtEq, vec![Datum::Int(9)]), + leaf(0, PredicateOperator::NotEq, vec![Datum::Int(7)]), + ], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + let predicate = build_orc_predicate(Some(&predicates)).unwrap(); + assert_eq!( + predicate, + orc_rust::predicate::Predicate::and(vec![ + orc_rust::predicate::Predicate::gt("id", PredicateValue::Int32(Some(1))), + orc_rust::predicate::Predicate::lte("id", PredicateValue::Int32(Some(9))), + ]) + ); + } + + #[test] + fn test_build_orc_predicate_or_requires_all_children_supported() { + let predicates = file_predicates( + vec![Predicate::or(vec![ + leaf(0, PredicateOperator::Lt, vec![Datum::Int(1)]), + Predicate::Not(Box::new(leaf( + 0, + PredicateOperator::Eq, + vec![Datum::Int(7)], + ))), + ])], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + assert!(build_orc_predicate(Some(&predicates)).is_none()); + } + + #[test] + fn test_build_orc_predicate_or_with_nested_and_requires_exact_children() { + let predicates = file_predicates( + vec![Predicate::or(vec![ + Predicate::and(vec![ + leaf(0, PredicateOperator::Gt, vec![Datum::Int(1)]), + leaf(0, PredicateOperator::NotEq, vec![Datum::Int(7)]), + ]), + leaf(0, PredicateOperator::Lt, vec![Datum::Int(0)]), + ])], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + assert!(build_orc_predicate(Some(&predicates)).is_none()); + } + + #[test] + fn test_build_orc_predicate_in_limit() { + let predicates = file_predicates( + vec![leaf( + 0, + PredicateOperator::In, + (0..=ORC_IN_PREDICATE_MAX_LITERALS) + .map(|value| Datum::Int(value as i32)) + .collect(), + )], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + assert!(build_orc_predicate(Some(&predicates)).is_none()); + } + + #[test] + fn test_build_orc_predicate_in_supported_literals() { + let predicates = file_predicates( + vec![leaf( + 0, + PredicateOperator::In, + vec![Datum::Int(1), Datum::Int(3)], + )], + vec![field(0, "id", DataType::Int(IntType::new()))], + ); + + let predicate = build_orc_predicate(Some(&predicates)).unwrap(); + assert_eq!( + predicate, + orc_rust::predicate::Predicate::or(vec![ + orc_rust::predicate::Predicate::eq("id", PredicateValue::Int32(Some(1))), + orc_rust::predicate::Predicate::eq("id", PredicateValue::Int32(Some(3))), + ]) + ); + } + + #[test] + fn test_collect_orc_predicate_columns_adds_filter_columns_once() { + let predicate = orc_rust::predicate::Predicate::and(vec![ + orc_rust::predicate::Predicate::eq( + "category", + PredicateValue::Utf8(Some("a".to_string())), + ), + orc_rust::predicate::Predicate::gt("score", PredicateValue::Int32(Some(1))), + ]); + let mut projected_names = vec!["name".to_string()]; + + collect_orc_predicate_columns(&predicate, &mut projected_names); + collect_orc_predicate_columns(&predicate, &mut projected_names); + + assert_eq!(projected_names, vec!["name", "category", "score"]); + } + + #[test] + fn test_project_orc_batch_to_requested_fields() { + let schema = Arc::new(Schema::new(vec![ + Field::new("name", ArrowDataType::Utf8, true), + Field::new("id", ArrowDataType::Int32, true), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec!["a", "b"])), + Arc::new(Int32Array::from(vec![1, 2])), + ], + ) + .unwrap(); + + let projected = project_orc_batch_to_requested_fields(batch, &["name".to_string()]) + .expect("project ORC batch"); + + assert_eq!(projected.num_columns(), 1); + assert_eq!(projected.schema().field(0).name(), "name"); + } + + #[test] + fn test_project_orc_batch_to_requested_fields_errors_on_missing_column() { + let schema = Arc::new(Schema::new(vec![Field::new( + "name", + ArrowDataType::Utf8, + true, + )])); + let batch = + RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(vec!["a"]))]).unwrap(); + + let error = project_orc_batch_to_requested_fields(batch, &["id".to_string()]) + .expect_err("missing requested column should error"); + + assert!( + error + .to_string() + .contains("ORC batch is missing requested column 'id'"), + "unexpected error: {error}" + ); + } } diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 3e7684d1..a884d3e7 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -150,11 +150,12 @@ impl<'a> ReadBuilder<'a> { /// Stats pruning is per file. Files with a different `schema_id`, /// incompatible stats layout, or inconclusive stats are kept. /// - /// [`TableRead`] may use supported non-partition data predicates only on - /// the regular Parquet read path for conservative row-group pruning and - /// native Parquet row filtering. Unsupported predicates, non-Parquet - /// reads, and data-evolution reads remain residual and should still be - /// applied by the caller if exact filtering semantics are required. + /// [`TableRead`] may use supported non-partition data predicates on regular + /// Parquet and ORC read paths for conservative row-group pruning. Parquet + /// may also use native row filtering. Unsupported predicates, formats + /// without reader pruning, and data-evolution reads remain residual and + /// should still be applied by the caller if exact filtering semantics are + /// required. pub fn with_filter(&mut self, filter: Predicate) -> &mut Self { self.filter = normalize_filter(self.table, filter); self.try_extract_row_id_ranges(); From 48e01e9d1ed2978755da152d6f3c71f6fcdb2829 Mon Sep 17 00:00:00 2001 From: liujiwen-up Date: Tue, 16 Jun 2026 11:17:59 +0800 Subject: [PATCH 2/3] test: fix ORC predicate expected rows --- crates/integration_tests/tests/read_tables.rs | 53 +++++++++++++------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 236eaf6e..52eb7e31 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -3008,23 +3008,44 @@ async fn test_read_orc_with_supported_predicate_pushdown_types() { let pb = PredicateBuilder::new(table.schema().fields()); let cases = vec![ - pb.equal("col_boolean", Datum::Bool(false)) - .expect("build boolean predicate"), - pb.equal("col_tinyint", Datum::TinyInt(2)) - .expect("build tinyint predicate"), - pb.equal("col_smallint", Datum::SmallInt(200)) - .expect("build smallint predicate"), - pb.equal("col_int", Datum::Int(2000)) - .expect("build int predicate"), - pb.equal("col_bigint", Datum::Long(200000)) - .expect("build bigint predicate"), - pb.greater_or_equal("col_string", Datum::String("orc-world".to_string())) - .expect("build string lower-bound predicate"), - pb.less_or_equal("col_string", Datum::String("orc-world".to_string())) - .expect("build string upper-bound predicate"), + ( + pb.equal("col_boolean", Datum::Bool(false)) + .expect("build boolean predicate"), + vec!["orc-world"], + ), + ( + pb.equal("col_tinyint", Datum::TinyInt(2)) + .expect("build tinyint predicate"), + vec!["orc-world"], + ), + ( + pb.equal("col_smallint", Datum::SmallInt(200)) + .expect("build smallint predicate"), + vec!["orc-world"], + ), + ( + pb.equal("col_int", Datum::Int(2000)) + .expect("build int predicate"), + vec!["orc-world"], + ), + ( + pb.equal("col_bigint", Datum::Long(200000)) + .expect("build bigint predicate"), + vec!["orc-world"], + ), + ( + pb.greater_or_equal("col_string", Datum::String("orc-world".to_string())) + .expect("build string lower-bound predicate"), + vec!["orc-world"], + ), + ( + pb.less_or_equal("col_string", Datum::String("orc-world".to_string())) + .expect("build string upper-bound predicate"), + vec!["parquet-hello", "orc-world"], + ), ]; - for filter in cases { + for (filter, expected_string_values) in cases { let (_, batches) = scan_and_read_with_projection_and_filter(&table, Some(&["col_string"]), filter).await; @@ -3039,7 +3060,7 @@ async fn test_read_orc_with_supported_predicate_pushdown_types() { values.extend((0..batch.num_rows()).map(|row| col_string.value(row).to_string())); } - assert_eq!(values, vec!["orc-world"]); + assert_eq!(values, expected_string_values); } } From 31ccf78ba358f127e10d08783b17a8252bad7618 Mon Sep 17 00:00:00 2001 From: liujiwen-up Date: Tue, 16 Jun 2026 11:28:49 +0800 Subject: [PATCH 3/3] test: align ORC string predicate expectations --- crates/integration_tests/tests/read_tables.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/integration_tests/tests/read_tables.rs b/crates/integration_tests/tests/read_tables.rs index 52eb7e31..c9e941a3 100644 --- a/crates/integration_tests/tests/read_tables.rs +++ b/crates/integration_tests/tests/read_tables.rs @@ -3009,43 +3009,50 @@ async fn test_read_orc_with_supported_predicate_pushdown_types() { let cases = vec![ ( + "col_boolean_eq", pb.equal("col_boolean", Datum::Bool(false)) .expect("build boolean predicate"), vec!["orc-world"], ), ( + "col_tinyint_eq", pb.equal("col_tinyint", Datum::TinyInt(2)) .expect("build tinyint predicate"), vec!["orc-world"], ), ( + "col_smallint_eq", pb.equal("col_smallint", Datum::SmallInt(200)) .expect("build smallint predicate"), vec!["orc-world"], ), ( + "col_int_eq", pb.equal("col_int", Datum::Int(2000)) .expect("build int predicate"), vec!["orc-world"], ), ( + "col_bigint_eq", pb.equal("col_bigint", Datum::Long(200000)) .expect("build bigint predicate"), vec!["orc-world"], ), ( + "col_string_gte", pb.greater_or_equal("col_string", Datum::String("orc-world".to_string())) .expect("build string lower-bound predicate"), - vec!["orc-world"], + vec!["parquet-hello", "orc-world"], ), ( + "col_string_lte", pb.less_or_equal("col_string", Datum::String("orc-world".to_string())) .expect("build string upper-bound predicate"), - vec!["parquet-hello", "orc-world"], + vec!["orc-world", "avro-test"], ), ]; - for (filter, expected_string_values) in cases { + for (case_name, filter, expected_string_values) in cases { let (_, batches) = scan_and_read_with_projection_and_filter(&table, Some(&["col_string"]), filter).await; @@ -3060,7 +3067,7 @@ async fn test_read_orc_with_supported_predicate_pushdown_types() { values.extend((0..batch.num_rows()).map(|row| col_string.value(row).to_string())); } - assert_eq!(values, expected_string_values); + assert_eq!(values, expected_string_values, "case {case_name}"); } }