From cc23637eefa011d028ef8135e0aa0b95901fb7f5 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Fri, 20 Mar 2026 19:55:37 +0100 Subject: [PATCH 1/8] Add ExpressionAnalyzer for pluggable expression-level statistics Introduce ExpressionAnalyzer, a chain-of-responsibility framework for expression-level statistics estimation (NDV, selectivity, min/max). Framework: - ExpressionAnalyzer trait with registry parameter for chain delegation - ExpressionAnalyzerRegistry to chain analyzers (first Computed wins) - DefaultExpressionAnalyzer: Selinger-style estimation for columns, literals, binary expressions, NOT, boolean predicates Integration: - ExpressionAnalyzerRegistry stored in SessionState, initialized once - ProjectionExprs stores optional registry (non-breaking, no signature changes to project_statistics) - ProjectionExec sets registry via Projector, injected by planner - FilterExec uses registry for selectivity when interval analysis cannot handle the predicate - Custom nodes get builtin analyzer as fallback when registry is absent --- datafusion/common/src/config.rs | 7 + .../core/src/execution/session_state.rs | 38 ++ datafusion/core/src/physical_planner.rs | 57 ++- .../src/expression_analyzer/default.rs | 269 +++++++++++++++ .../src/expression_analyzer/mod.rs | 274 +++++++++++++++ .../src/expression_analyzer/tests.rs | 326 ++++++++++++++++++ datafusion/physical-expr/src/lib.rs | 1 + datafusion/physical-expr/src/projection.rs | 117 ++++++- datafusion/physical-plan/src/filter.rs | 43 ++- datafusion/physical-plan/src/projection.rs | 12 + 10 files changed, 1131 insertions(+), 13 deletions(-) create mode 100644 datafusion/physical-expr/src/expression_analyzer/default.rs create mode 100644 datafusion/physical-expr/src/expression_analyzer/mod.rs create mode 100644 datafusion/physical-expr/src/expression_analyzer/tests.rs diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 0551cbbb15ae1..6b44d0e21001c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -963,6 +963,13 @@ config_namespace! { /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. pub enable_dynamic_filter_pushdown: bool, default = true + /// When set to true, the physical planner will use the ExpressionAnalyzer + /// framework for expression-level statistics estimation (NDV, selectivity, + /// min/max, null fraction) in projections and filters. When false, projections + /// return unknown statistics for non-column expressions and filters use the + /// default selectivity heuristic. + pub enable_expression_analyzer: bool, default = false + /// When set to true, the optimizer will insert filters before a join between /// a nullable and non-nullable column to filter out nulls on the nullable side. This /// filter can add additional overhead when the file format does not fully support diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index f0888e01049ad..acdaaad43c732 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -67,6 +67,7 @@ use datafusion_optimizer::{ Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule, }; use datafusion_physical_expr::create_physical_expr; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_optimizer::optimizer::PhysicalOptimizer; @@ -189,6 +190,8 @@ pub struct SessionState { /// /// It will be invoked on `CREATE FUNCTION` statements. /// thus, changing dialect o PostgreSql is required + /// Registry for expression-level statistics analyzers (NDV, selectivity, etc.) + expression_analyzer_registry: Arc, function_factory: Option>, cache_factory: Option>, /// Cache logical plans of prepared statements for later execution. @@ -207,6 +210,10 @@ impl Debug for SessionState { .field("runtime_env", &self.runtime_env) .field("catalog_list", &self.catalog_list) .field("serializer_registry", &self.serializer_registry) + .field( + "expression_analyzer_registry", + &self.expression_analyzer_registry, + ) .field("file_formats", &self.file_formats) .field("execution_props", &self.execution_props) .field("table_options", &self.table_options) @@ -918,6 +925,11 @@ impl SessionState { &self.serializer_registry } + /// Return the [`ExpressionAnalyzerRegistry`] for expression-level statistics + pub fn expression_analyzer_registry(&self) -> &Arc { + &self.expression_analyzer_registry + } + /// Return version of the cargo package that produced this query pub fn version(&self) -> &str { env!("CARGO_PKG_VERSION") @@ -998,6 +1010,7 @@ pub struct SessionStateBuilder { window_functions: Option>>, extension_types: Option, serializer_registry: Option>, + expression_analyzer_registry: Option>, file_formats: Option>>, config: Option, table_options: Option, @@ -1039,6 +1052,7 @@ impl SessionStateBuilder { window_functions: None, extension_types: None, serializer_registry: None, + expression_analyzer_registry: None, file_formats: None, table_options: None, config: None, @@ -1095,6 +1109,7 @@ impl SessionStateBuilder { window_functions: Some(existing.window_functions.into_values().collect_vec()), extension_types: Some(existing.extension_types), serializer_registry: Some(existing.serializer_registry), + expression_analyzer_registry: Some(existing.expression_analyzer_registry), file_formats: Some(existing.file_formats.into_values().collect_vec()), config: Some(new_config), table_options: Some(existing.table_options), @@ -1352,6 +1367,15 @@ impl SessionStateBuilder { self } + /// Set the [`ExpressionAnalyzerRegistry`] for expression-level statistics + pub fn with_expression_analyzer_registry( + mut self, + expression_analyzer_registry: Arc, + ) -> Self { + self.expression_analyzer_registry = Some(expression_analyzer_registry); + self + } + /// Set the map of [`FileFormatFactory`]s pub fn with_file_formats( mut self, @@ -1483,6 +1507,7 @@ impl SessionStateBuilder { window_functions, extension_types, serializer_registry, + expression_analyzer_registry, file_formats, table_options, config, @@ -1521,6 +1546,8 @@ impl SessionStateBuilder { extension_types: Arc::new(MemoryExtensionTypeRegistry::default()), serializer_registry: serializer_registry .unwrap_or_else(|| Arc::new(EmptySerializerRegistry)), + expression_analyzer_registry: expression_analyzer_registry + .unwrap_or_else(|| Arc::new(ExpressionAnalyzerRegistry::new())), file_formats: HashMap::new(), table_options: table_options.unwrap_or_else(|| { TableOptions::default_from_session_config(config.options()) @@ -1707,6 +1734,13 @@ impl SessionStateBuilder { &mut self.serializer_registry } + /// Returns the current expression_analyzer_registry value + pub fn expression_analyzer_registry( + &mut self, + ) -> &mut Option> { + &mut self.expression_analyzer_registry + } + /// Returns the current file_formats value pub fn file_formats(&mut self) -> &mut Option>> { &mut self.file_formats @@ -1782,6 +1816,10 @@ impl Debug for SessionStateBuilder { .field("runtime_env", &self.runtime_env) .field("catalog_list", &self.catalog_list) .field("serializer_registry", &self.serializer_registry) + .field( + "expression_analyzer_registry", + &self.expression_analyzer_registry, + ) .field("file_formats", &self.file_formats) .field("execution_props", &self.execution_props) .field("table_options", &self.table_options) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index e25969903521c..2087ab88aabfd 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -1105,12 +1105,23 @@ impl DefaultPhysicalPlanner { input_schema.as_arrow(), )? { PlanAsyncExpr::Sync(PlannedExprResult::Expr(runtime_expr)) => { - FilterExecBuilder::new( + let builder = FilterExecBuilder::new( Arc::clone(&runtime_expr[0]), physical_input, ) - .with_batch_size(session_state.config().batch_size()) - .build()? + .with_batch_size(session_state.config().batch_size()); + let builder = if session_state + .config_options() + .optimizer + .enable_expression_analyzer + { + builder.with_expression_analyzer_registry(Arc::clone( + session_state.expression_analyzer_registry(), + )) + } else { + builder + }; + builder.build()? } PlanAsyncExpr::Async( async_map, @@ -1120,7 +1131,7 @@ impl DefaultPhysicalPlanner { async_map.async_exprs, physical_input, )?; - FilterExecBuilder::new( + let builder = FilterExecBuilder::new( Arc::clone(&runtime_expr[0]), Arc::new(async_exec), ) @@ -1129,8 +1140,19 @@ impl DefaultPhysicalPlanner { .apply_projection(Some( (0..input.schema().fields().len()).collect::>(), ))? - .with_batch_size(session_state.config().batch_size()) - .build()? + .with_batch_size(session_state.config().batch_size()); + let builder = if session_state + .config_options() + .optimizer + .enable_expression_analyzer + { + builder.with_expression_analyzer_registry(Arc::clone( + session_state.expression_analyzer_registry(), + )) + } else { + builder + }; + builder.build()? } _ => { return internal_err!( @@ -2898,7 +2920,17 @@ impl DefaultPhysicalPlanner { .into_iter() .map(|(expr, alias)| ProjectionExpr { expr, alias }) .collect(); - Ok(Arc::new(ProjectionExec::try_new(proj_exprs, input_exec)?)) + let mut proj_exec = ProjectionExec::try_new(proj_exprs, input_exec)?; + if session_state + .config_options() + .optimizer + .enable_expression_analyzer + { + proj_exec = proj_exec.with_expression_analyzer_registry(Arc::clone( + session_state.expression_analyzer_registry(), + )); + } + Ok(Arc::new(proj_exec)) } PlanAsyncExpr::Async( async_map, @@ -2910,8 +2942,17 @@ impl DefaultPhysicalPlanner { .into_iter() .map(|(expr, alias)| ProjectionExpr { expr, alias }) .collect(); - let new_proj_exec = + let mut new_proj_exec = ProjectionExec::try_new(proj_exprs, Arc::new(async_exec))?; + if session_state + .config_options() + .optimizer + .enable_expression_analyzer + { + new_proj_exec = new_proj_exec.with_expression_analyzer_registry( + Arc::clone(session_state.expression_analyzer_registry()), + ); + } Ok(Arc::new(new_proj_exec)) } _ => internal_err!("Unexpected PlanAsyncExpressions variant"), diff --git a/datafusion/physical-expr/src/expression_analyzer/default.rs b/datafusion/physical-expr/src/expression_analyzer/default.rs new file mode 100644 index 0000000000000..29b651a493560 --- /dev/null +++ b/datafusion/physical-expr/src/expression_analyzer/default.rs @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Default expression analyzer with Selinger-style estimation. + +use std::sync::Arc; + +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; +use datafusion_expr::Operator; + +use crate::PhysicalExpr; +use crate::expressions::{BinaryExpr, Column, Literal, NotExpr}; + +use super::{AnalysisResult, ExpressionAnalyzer, ExpressionAnalyzerRegistry}; + +/// Default expression analyzer with Selinger-style estimation. +/// +/// Handles common expression types: +/// - Column references (uses column statistics) +/// - Binary expressions (AND, OR, comparison operators) +/// - Literals (constant selectivity/NDV) +/// - NOT expressions (1 - child selectivity) +#[derive(Debug, Default, Clone)] +pub struct DefaultExpressionAnalyzer; + +impl DefaultExpressionAnalyzer { + /// Get column index from a Column expression + fn get_column_index(expr: &Arc) -> Option { + expr.as_any().downcast_ref::().map(|c| c.index()) + } + + /// Get column statistics for an expression if it's a column reference + fn get_column_stats<'a>( + expr: &Arc, + input_stats: &'a Statistics, + ) -> Option<&'a ColumnStatistics> { + Self::get_column_index(expr) + .and_then(|idx| input_stats.column_statistics.get(idx)) + } + + /// Recursive selectivity estimation through the registry chain + fn estimate_selectivity_recursive( + &self, + expr: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> f64 { + registry.get_selectivity(expr, input_stats).unwrap_or(0.5) + } +} + +impl ExpressionAnalyzer for DefaultExpressionAnalyzer { + fn get_selectivity( + &self, + expr: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + // Binary expressions: AND, OR, comparisons + if let Some(binary) = expr.as_any().downcast_ref::() { + let left_sel = + self.estimate_selectivity_recursive(binary.left(), input_stats, registry); + let right_sel = self.estimate_selectivity_recursive( + binary.right(), + input_stats, + registry, + ); + + let sel = match binary.op() { + // Logical operators + Operator::And => left_sel * right_sel, + Operator::Or => left_sel + right_sel - (left_sel * right_sel), + + // Equality: selectivity = 1/NDV + Operator::Eq => { + let ndv = Self::get_column_stats(binary.left(), input_stats) + .or_else(|| Self::get_column_stats(binary.right(), input_stats)) + .and_then(|s| s.distinct_count.get_value()) + .filter(|&&ndv| ndv > 0); + if let Some(ndv) = ndv { + return AnalysisResult::Computed(1.0 / (*ndv as f64)); + } + 0.1 // Default equality selectivity + } + + // Inequality: selectivity = 1 - 1/NDV + Operator::NotEq => { + let ndv = Self::get_column_stats(binary.left(), input_stats) + .or_else(|| Self::get_column_stats(binary.right(), input_stats)) + .and_then(|s| s.distinct_count.get_value()) + .filter(|&&ndv| ndv > 0); + if let Some(ndv) = ndv { + return AnalysisResult::Computed(1.0 - (1.0 / (*ndv as f64))); + } + 0.9 + } + + // Range predicates: classic 1/3 estimate + Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => 0.33, + + // LIKE: depends on pattern, use conservative estimate + Operator::LikeMatch | Operator::ILikeMatch => 0.25, + Operator::NotLikeMatch | Operator::NotILikeMatch => 0.75, + + // Other operators: default + _ => 0.5, + }; + + return AnalysisResult::Computed(sel); + } + + // NOT expression: 1 - child selectivity + if let Some(not_expr) = expr.as_any().downcast_ref::() { + let child_sel = self.estimate_selectivity_recursive( + not_expr.arg(), + input_stats, + registry, + ); + return AnalysisResult::Computed(1.0 - child_sel); + } + + // Literal boolean: 0.0 or 1.0 + if let Some(b) = expr + .as_any() + .downcast_ref::() + .and_then(|lit| match lit.value() { + ScalarValue::Boolean(Some(b)) => Some(*b), + _ => None, + }) + { + return AnalysisResult::Computed(if b { 1.0 } else { 0.0 }); + } + + // Column reference as predicate (boolean column) + if expr.as_any().downcast_ref::().is_some() { + return AnalysisResult::Computed(0.5); + } + + AnalysisResult::Delegate + } + + fn get_distinct_count( + &self, + expr: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + // Column reference: use column NDV + if let Some(ndv) = Self::get_column_stats(expr, input_stats) + .and_then(|col_stats| col_stats.distinct_count.get_value().copied()) + { + return AnalysisResult::Computed(ndv); + } + + // Literal: NDV = 1 + if expr.as_any().downcast_ref::().is_some() { + return AnalysisResult::Computed(1); + } + + // BinaryExpr: for arithmetic with a literal operand, treat as injective + // (preserves NDV). This is an approximation: col * 0 or col % 1 are + // technically not injective, but the common case (col + 1, col * 2, etc.) is + if let Some(binary) = expr.as_any().downcast_ref::() { + let is_arithmetic = matches!( + binary.op(), + Operator::Plus + | Operator::Minus + | Operator::Multiply + | Operator::Divide + | Operator::Modulo + ); + + if is_arithmetic { + // If one side is a literal, the operation is injective on the other side + let left_is_literal = binary.left().as_any().is::(); + let right_is_literal = binary.right().as_any().is::(); + + if left_is_literal + && let Some(ndv) = + registry.get_distinct_count(binary.right(), input_stats) + { + return AnalysisResult::Computed(ndv); + } else if right_is_literal + && let Some(ndv) = + registry.get_distinct_count(binary.left(), input_stats) + { + return AnalysisResult::Computed(ndv); + } + // Both sides are non-literals: could combine, but delegate for now + } + } + + AnalysisResult::Delegate + } + + fn get_min_max( + &self, + expr: &Arc, + input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult<(ScalarValue, ScalarValue)> { + // Column reference: use column min/max + if let Some((min, max)) = + Self::get_column_stats(expr, input_stats).and_then(|col_stats| { + match ( + col_stats.min_value.get_value(), + col_stats.max_value.get_value(), + ) { + (Some(min), Some(max)) => Some((min.clone(), max.clone())), + _ => None, + } + }) + { + return AnalysisResult::Computed((min, max)); + } + + // Literal: min = max = value + if let Some(lit_expr) = expr.as_any().downcast_ref::() { + let val = lit_expr.value().clone(); + return AnalysisResult::Computed((val.clone(), val)); + } + + AnalysisResult::Delegate + } + + fn get_null_fraction( + &self, + expr: &Arc, + input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + // Column reference: null_count / num_rows + if let Some(fraction) = + Self::get_column_stats(expr, input_stats).and_then(|col_stats| { + let null_count = col_stats.null_count.get_value().copied()?; + let num_rows = input_stats.num_rows.get_value().copied()?; + if num_rows > 0 { + Some(null_count as f64 / num_rows as f64) + } else { + None + } + }) + { + return AnalysisResult::Computed(fraction); + } + + // Literal: null fraction depends on whether it's null + if let Some(lit_expr) = expr.as_any().downcast_ref::() { + let is_null = lit_expr.value().is_null(); + return AnalysisResult::Computed(if is_null { 1.0 } else { 0.0 }); + } + + AnalysisResult::Delegate + } +} diff --git a/datafusion/physical-expr/src/expression_analyzer/mod.rs b/datafusion/physical-expr/src/expression_analyzer/mod.rs new file mode 100644 index 0000000000000..0103342134c4d --- /dev/null +++ b/datafusion/physical-expr/src/expression_analyzer/mod.rs @@ -0,0 +1,274 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pluggable expression-level statistics analysis. +//! +//! This module provides an extensible mechanism for computing expression-level +//! statistics metadata (selectivity, NDV, min/max bounds) following the chain +//! of responsibility pattern. +//! +//! # Overview +//! +//! Different expressions have different statistical properties: +//! +//! - **Injective functions** (UPPER, LOWER, ABS on non-negative): preserve NDV +//! - **Non-injective functions** (FLOOR, YEAR, SUBSTRING): reduce NDV +//! - **Monotonic functions**: allow min/max bound propagation +//! - **Constants**: NDV = 1, selectivity depends on value +//! +//! The default implementation uses classic Selinger-style estimation. Users can +//! register custom [`ExpressionAnalyzer`] implementations to: +//! +//! 1. Provide statistics for custom UDFs +//! 2. Override default estimation with domain-specific knowledge +//! 3. Plug in advanced approaches (e.g., histogram-based estimation) +//! +//! # Example +//! +//! ```ignore +//! use datafusion_physical_plan::expression_analyzer::*; +//! +//! // Create registry with default analyzer +//! let mut registry = ExpressionAnalyzerRegistry::new(); +//! +//! // Register custom analyzer (higher priority) +//! registry.register(Arc::new(MyCustomAnalyzer)); +//! +//! // Query expression statistics +//! let selectivity = registry.get_selectivity(&predicate, &input_stats); +//! ``` + +mod default; + +#[cfg(test)] +mod tests; + +pub use default::DefaultExpressionAnalyzer; + +use std::fmt::Debug; +use std::sync::Arc; + +use datafusion_common::{ScalarValue, Statistics}; + +use crate::PhysicalExpr; + +/// Result of expression analysis - either computed or delegate to next analyzer. +#[derive(Debug, Clone)] +pub enum AnalysisResult { + /// Analysis was performed, here's the result + Computed(T), + /// This analyzer doesn't handle this expression; delegate to next + Delegate, +} + +/// Expression-level metadata analysis. +/// +/// Implementations can handle specific expression types or provide domain +/// knowledge for custom UDFs. The chain of analyzers is traversed until one +/// returns [`AnalysisResult::Computed`]. +/// +/// The `registry` parameter allows analyzers to delegate sub-expression +/// analysis back through the full chain, rather than hard-coding a specific +/// analyzer. For example, a function analyzer can ask the registry for the +/// NDV of its input argument, which will traverse the full chain (including +/// any custom analyzers the user registered). +/// +/// # Implementing a Custom Analyzer +/// +/// ```ignore +/// #[derive(Debug)] +/// struct MyUdfAnalyzer; +/// +/// impl ExpressionAnalyzer for MyUdfAnalyzer { +/// fn get_selectivity( +/// &self, +/// expr: &Arc, +/// input_stats: &Statistics, +/// registry: &ExpressionAnalyzerRegistry, +/// ) -> AnalysisResult { +/// // Recognize my custom is_valid_email() UDF +/// if is_my_email_validator(expr) { +/// return AnalysisResult::Computed(0.8); // ~80% valid +/// } +/// AnalysisResult::Delegate +/// } +/// } +/// ``` +pub trait ExpressionAnalyzer: Debug + Send + Sync { + /// Estimate selectivity when this expression is used as a predicate. + /// + /// Returns a value in [0.0, 1.0] representing the fraction of rows + /// that satisfy the predicate. + fn get_selectivity( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Delegate + } + + /// Estimate the number of distinct values in the expression's output. + /// + /// Properties: + /// - Injective functions preserve input NDV + /// - Non-injective functions reduce NDV (e.g., FLOOR, YEAR) + /// - Constants have NDV = 1 + fn get_distinct_count( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Delegate + } + + /// Estimate min/max bounds of the expression's output. + /// + /// Monotonic functions can transform input bounds: + /// - Increasing: (f(min), f(max)) + /// - Decreasing: (f(max), f(min)) + /// - Non-monotonic: may need wider bounds or return Delegate + fn get_min_max( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult<(ScalarValue, ScalarValue)> { + AnalysisResult::Delegate + } + + /// Estimate the fraction of null values in the expression's output. + /// + /// Returns a value in [0.0, 1.0]. + fn get_null_fraction( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Delegate + } +} + +/// Registry that chains [`ExpressionAnalyzer`] implementations. +/// +/// Analyzers are tried in order; the first to return [`AnalysisResult::Computed`] +/// wins. Register domain-specific analyzers before the default for override. +#[derive(Debug, Clone)] +pub struct ExpressionAnalyzerRegistry { + analyzers: Vec>, +} + +impl Default for ExpressionAnalyzerRegistry { + fn default() -> Self { + Self::new() + } +} + +impl ExpressionAnalyzerRegistry { + /// Create a new registry with the [`DefaultExpressionAnalyzer`]. + pub fn new() -> Self { + Self { + analyzers: vec![Arc::new(DefaultExpressionAnalyzer)], + } + } + + /// Create a registry with only the given analyzers (no builtins). + pub fn with_analyzers(analyzers: Vec>) -> Self { + Self { analyzers } + } + + /// Create a registry with custom analyzers followed by the + /// [`DefaultExpressionAnalyzer`] as fallback. + pub fn with_analyzers_and_default( + analyzers: impl IntoIterator>, + ) -> Self { + let mut all: Vec> = analyzers.into_iter().collect(); + all.push(Arc::new(DefaultExpressionAnalyzer)); + Self { analyzers: all } + } + + /// Register an analyzer at the front of the chain (higher priority). + pub fn register(&mut self, analyzer: Arc) { + self.analyzers.insert(0, analyzer); + } + + /// Get selectivity through the analyzer chain. + pub fn get_selectivity( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(sel) = + analyzer.get_selectivity(expr, input_stats, self) + { + return Some(sel); + } + } + None + } + + /// Get distinct count through the analyzer chain. + pub fn get_distinct_count( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(ndv) = + analyzer.get_distinct_count(expr, input_stats, self) + { + return Some(ndv); + } + } + None + } + + /// Get min/max bounds through the analyzer chain. + pub fn get_min_max( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option<(ScalarValue, ScalarValue)> { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(bounds) = + analyzer.get_min_max(expr, input_stats, self) + { + return Some(bounds); + } + } + None + } + + /// Get null fraction through the analyzer chain. + pub fn get_null_fraction( + &self, + expr: &Arc, + input_stats: &Statistics, + ) -> Option { + for analyzer in &self.analyzers { + if let AnalysisResult::Computed(frac) = + analyzer.get_null_fraction(expr, input_stats, self) + { + return Some(frac); + } + } + None + } +} diff --git a/datafusion/physical-expr/src/expression_analyzer/tests.rs b/datafusion/physical-expr/src/expression_analyzer/tests.rs new file mode 100644 index 0000000000000..703953a211bf1 --- /dev/null +++ b/datafusion/physical-expr/src/expression_analyzer/tests.rs @@ -0,0 +1,326 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::*; +use crate::PhysicalExpr; +use crate::expressions::{BinaryExpr, Column, Literal, NotExpr}; +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; +use datafusion_expr::Operator; +use std::sync::Arc; + +fn make_stats_with_ndv(num_rows: usize, ndv: usize) -> Statistics { + Statistics { + num_rows: Precision::Exact(num_rows), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Exact(ndv), + byte_size: Precision::Absent, + }], + } +} + +// NDV tests + +#[test] +fn test_column_ndv() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + assert_eq!(registry.get_distinct_count(&col, &stats), Some(100)); +} + +#[test] +fn test_literal_ndv() { + let stats = make_stats_with_ndv(1000, 100); + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + assert_eq!(registry.get_distinct_count(&lit, &stats), Some(1)); +} + +#[test] +fn test_arithmetic_ndv() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + // col + 1: injective, preserves NDV + let plus = Arc::new(BinaryExpr::new( + Arc::clone(&col), + Operator::Plus, + Arc::clone(&lit), + )) as Arc; + assert_eq!(registry.get_distinct_count(&plus, &stats), Some(100)); + + // 1 + col: also injective (literal on left) + let plus_rev = + Arc::new(BinaryExpr::new(lit, Operator::Plus, col)) as Arc; + assert_eq!(registry.get_distinct_count(&plus_rev, &stats), Some(100)); +} + +// Selectivity tests + +#[test] +fn test_equality_selectivity() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + assert!((sel - 0.01).abs() < 0.001); // 1/NDV = 1/100 +} + +#[test] +fn test_equality_selectivity_column_on_right() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(lit, Operator::Eq, col)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + assert!((sel - 0.01).abs() < 0.001); +} + +#[test] +fn test_and_selectivity() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit1 = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let lit2 = + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))) as Arc; + + let eq = Arc::new(BinaryExpr::new(Arc::clone(&col), Operator::Eq, lit1)) + as Arc; + let gt = Arc::new(BinaryExpr::new(col, Operator::Gt, lit2)) as Arc; + let and_expr = + Arc::new(BinaryExpr::new(eq, Operator::And, gt)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&and_expr, &stats).unwrap(); + assert!((sel - 0.0033).abs() < 0.001); // 0.01 * 0.33 +} + +#[test] +fn test_or_selectivity() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit1 = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let lit2 = + Arc::new(Literal::new(ScalarValue::Int32(Some(10)))) as Arc; + + let eq = Arc::new(BinaryExpr::new(Arc::clone(&col), Operator::Eq, lit1)) + as Arc; + let gt = Arc::new(BinaryExpr::new(col, Operator::Gt, lit2)) as Arc; + let or_expr = + Arc::new(BinaryExpr::new(eq, Operator::Or, gt)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&or_expr, &stats).unwrap(); + assert!((sel - 0.3367).abs() < 0.001); // 0.01 + 0.33 - 0.01*0.33 +} + +#[test] +fn test_not_selectivity() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + let not_expr = Arc::new(NotExpr::new(eq)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(¬_expr, &stats).unwrap(); + assert!((sel - 0.99).abs() < 0.001); // 1 - 0.01 +} + +// Min/max tests + +#[test] +fn test_column_min_max() { + let stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + max_value: Precision::Exact(ScalarValue::Int32(Some(100))), + distinct_count: Precision::Absent, + null_count: Precision::Exact(0), + sum_value: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let col = Arc::new(Column::new("a", 0)) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + assert_eq!( + registry.get_min_max(&col, &stats), + Some((ScalarValue::Int32(Some(1)), ScalarValue::Int32(Some(100)))) + ); +} + +#[test] +fn test_literal_min_max() { + let stats = make_stats_with_ndv(100, 10); + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + assert_eq!( + registry.get_min_max(&lit, &stats), + Some((ScalarValue::Int32(Some(42)), ScalarValue::Int32(Some(42)))) + ); +} + +// Null fraction tests + +#[test] +fn test_column_null_fraction() { + let stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + null_count: Precision::Exact(250), + min_value: Precision::Absent, + max_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let col = Arc::new(Column::new("a", 0)) as Arc; + let registry = ExpressionAnalyzerRegistry::new(); + + let frac = registry.get_null_fraction(&col, &stats).unwrap(); + assert!((frac - 0.25).abs() < 0.001); +} + +#[test] +fn test_literal_null_fraction() { + let stats = make_stats_with_ndv(100, 10); + let registry = ExpressionAnalyzerRegistry::new(); + + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + assert_eq!(registry.get_null_fraction(&lit, &stats), Some(0.0)); + + let null_lit = + Arc::new(Literal::new(ScalarValue::Int32(None))) as Arc; + assert_eq!(registry.get_null_fraction(&null_lit, &stats), Some(1.0)); +} + +// Custom analyzer tests + +#[derive(Debug)] +struct FixedSelectivityAnalyzer(f64); + +impl ExpressionAnalyzer for FixedSelectivityAnalyzer { + fn get_selectivity( + &self, + _expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + AnalysisResult::Computed(self.0) + } +} + +#[test] +fn test_custom_analyzer_overrides_default() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let eq = Arc::new(BinaryExpr::new(col, Operator::Eq, lit)) as Arc; + + let mut registry = ExpressionAnalyzerRegistry::new(); + registry.register(Arc::new(FixedSelectivityAnalyzer(0.42))); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + assert!((sel - 0.42).abs() < 0.001); +} + +#[derive(Debug)] +struct ColumnAOnlyAnalyzer; + +impl ExpressionAnalyzer for ColumnAOnlyAnalyzer { + fn get_selectivity( + &self, + expr: &Arc, + _input_stats: &Statistics, + _registry: &ExpressionAnalyzerRegistry, + ) -> AnalysisResult { + if let Some(binary) = expr.as_any().downcast_ref::() + && let Some(col) = binary.left().as_any().downcast_ref::() + && col.name() == "a" + && matches!(binary.op(), Operator::Eq) + { + return AnalysisResult::Computed(0.99); + } + AnalysisResult::Delegate + } +} + +#[test] +fn test_custom_analyzer_delegates_to_default() { + let stats = make_stats_with_ndv(1000, 100); + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 0)) as Arc; + let lit = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + + let eq_a = Arc::new(BinaryExpr::new(col_a, Operator::Eq, Arc::clone(&lit))) + as Arc; + let eq_b = + Arc::new(BinaryExpr::new(col_b, Operator::Eq, lit)) as Arc; + + let mut registry = ExpressionAnalyzerRegistry::new(); + registry.register(Arc::new(ColumnAOnlyAnalyzer)); + + let sel_a = registry.get_selectivity(&eq_a, &stats).unwrap(); + assert!((sel_a - 0.99).abs() < 0.001); + + let sel_b = registry.get_selectivity(&eq_b, &stats).unwrap(); + assert!((sel_b - 0.01).abs() < 0.001); +} + +#[test] +fn test_with_analyzers_and_default() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + + let registry = + ExpressionAnalyzerRegistry::with_analyzers_and_default(vec![Arc::new( + ColumnAOnlyAnalyzer, + ) + as Arc]); + + assert_eq!(registry.get_distinct_count(&col, &stats), Some(100)); +} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index bedd348dab92f..0e5f3945bf571 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -33,6 +33,7 @@ pub mod binary_map { } pub mod async_scalar_function; pub mod equivalence; +pub mod expression_analyzer; pub mod expressions; pub mod intervals; mod partitioning; diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index e133e5a849cd8..e3d1adfb58070 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -21,6 +21,7 @@ use std::ops::Deref; use std::sync::Arc; use crate::PhysicalExpr; +use crate::expression_analyzer::ExpressionAnalyzerRegistry; use crate::expressions::{Column, Literal}; use crate::utils::collect_columns; @@ -124,12 +125,22 @@ impl From for (Arc, String) { /// /// See [`ProjectionExprs::from_indices`] to select a subset of columns by /// indices. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone)] pub struct ProjectionExprs { /// [`Arc`] used for a cheap clone, which improves physical plan optimization performance. exprs: Arc<[ProjectionExpr]>, + /// Optional expression analyzer registry for statistics estimation + expression_analyzer_registry: Option>, } +impl PartialEq for ProjectionExprs { + fn eq(&self, other: &Self) -> bool { + self.exprs == other.exprs + } +} + +impl Eq for ProjectionExprs {} + impl std::fmt::Display for ProjectionExprs { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let exprs: Vec = self.exprs.iter().map(|e| e.to_string()).collect(); @@ -141,6 +152,7 @@ impl From> for ProjectionExprs { fn from(value: Vec) -> Self { Self { exprs: value.into(), + expression_analyzer_registry: None, } } } @@ -149,6 +161,7 @@ impl From<&[ProjectionExpr]> for ProjectionExprs { fn from(value: &[ProjectionExpr]) -> Self { Self { exprs: value.iter().cloned().collect(), + expression_analyzer_registry: None, } } } @@ -157,6 +170,7 @@ impl FromIterator for ProjectionExprs { fn from_iter>(exprs: T) -> Self { Self { exprs: exprs.into_iter().collect(), + expression_analyzer_registry: None, } } } @@ -172,6 +186,7 @@ impl ProjectionExprs { pub fn new(exprs: impl IntoIterator) -> Self { Self { exprs: exprs.into_iter().collect(), + expression_analyzer_registry: None, } } @@ -179,9 +194,26 @@ impl ProjectionExprs { pub fn from_expressions(exprs: impl Into>) -> Self { Self { exprs: exprs.into(), + expression_analyzer_registry: None, } } + /// Set the expression analyzer registry for statistics estimation. + /// + /// The physical planner injects the registry from [`SessionState`] when + /// creating projections. Projections created later by optimizer rules + /// do not receive the registry and fall back to + /// [`DefaultExpressionAnalyzer`]. Propagating the registry to all + /// operator construction sites requires an operator-level statistics + /// registry, which is orthogonal to this work. + pub fn with_expression_analyzer_registry( + mut self, + registry: Arc, + ) -> Self { + self.expression_analyzer_registry = Some(registry); + self + } + /// Creates a [`ProjectionExpr`] from a list of column indices. /// /// This is a convenience method for creating simple column-only projections, where each projection expression is a reference to a column @@ -713,9 +745,35 @@ impl ProjectionExprs { byte_size, } } + } else if let Some(registry) = &self.expression_analyzer_registry { + // Use ExpressionAnalyzer to estimate statistics for arbitrary expressions + let distinct_count = registry + .get_distinct_count(expr, &stats) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent); + let (min_value, max_value) = registry + .get_min_max(expr, &stats) + .map(|(min, max)| (Precision::Inexact(min), Precision::Inexact(max))) + .unwrap_or((Precision::Absent, Precision::Absent)); + let null_count = registry + .get_null_fraction(expr, &stats) + .and_then(|frac| { + stats + .num_rows + .get_value() + .map(|&rows| (rows as f64 * frac).ceil() as usize) + }) + .map(Precision::Inexact) + .unwrap_or(Precision::Absent); + + ColumnStatistics { + distinct_count, + min_value, + max_value, + null_count, + ..ColumnStatistics::new_unknown() + } } else { - // TODO stats: estimate more statistics from expressions - // (expressions should compute their statistics themselves) ColumnStatistics::new_unknown() }; column_statistics.push(col_stats); @@ -806,6 +864,14 @@ impl Projector { pub fn projection(&self) -> &ProjectionExprs { &self.projection } + + /// Set the expression analyzer registry on the underlying projection + pub fn set_expression_analyzer_registry( + &mut self, + registry: Arc, + ) { + self.projection.expression_analyzer_registry = Some(registry); + } } /// Describes an immutable reference counted projection. @@ -2719,7 +2785,7 @@ pub(crate) mod tests { // Should have 2 column statistics assert_eq!(output_stats.column_statistics.len(), 2); - // First column (expression) should have unknown statistics + // First column (col0 + 1): no registry set, so statistics are unknown assert_eq!( output_stats.column_statistics[0].distinct_count, Precision::Absent @@ -2738,6 +2804,49 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn test_project_statistics_with_expression_analyzer() -> Result<()> { + let input_stats = get_stats(); + let input_schema = get_schema(); + + // Same projection as test_project_statistics_with_expressions, + // but with the analyzer registry enabled + let projection = ProjectionExprs::new(vec![ + ProjectionExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("col0", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), + )), + alias: "incremented".to_string(), + }, + ProjectionExpr { + expr: Arc::new(Column::new("col1", 1)), + alias: "text".to_string(), + }, + ]) + .with_expression_analyzer_registry(Arc::new(ExpressionAnalyzerRegistry::new())); + + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; + + // With analyzer: col0 + 1 is injective, NDV preserved from col0 (= 5) + assert_eq!( + output_stats.column_statistics[0].distinct_count, + Precision::Inexact(5) + ); + + // Second column (col1) still preserves exact statistics + assert_eq!( + output_stats.column_statistics[1].distinct_count, + Precision::Exact(1) + ); + + Ok(()) + } + #[test] fn test_project_statistics_primitive_width_only() -> Result<()> { let input_stats = get_stats(); diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 275fddd5e4d53..cb9f22f3f2253 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -94,6 +94,10 @@ pub struct FilterExec { batch_size: usize, /// Number of rows to fetch fetch: Option, + /// Optional expression analyzer registry for selectivity estimation + expression_analyzer_registry: Option< + Arc, + >, } /// Builder for [`FilterExec`] to set optional parameters @@ -104,6 +108,9 @@ pub struct FilterExecBuilder { default_selectivity: u8, batch_size: usize, fetch: Option, + expression_analyzer_registry: Option< + Arc, + >, } impl FilterExecBuilder { @@ -116,6 +123,7 @@ impl FilterExecBuilder { default_selectivity: FILTER_EXEC_DEFAULT_SELECTIVITY, batch_size: FILTER_EXEC_DEFAULT_BATCH_SIZE, fetch: None, + expression_analyzer_registry: None, } } @@ -174,6 +182,25 @@ impl FilterExecBuilder { self } + /// Set the expression analyzer registry for selectivity estimation. + /// + /// Same limitation as [`ProjectionExprs::with_expression_analyzer_registry`]: + /// the planner injects this from [`SessionState`], but filters created + /// by optimizer rules (e.g., filter pushdown into unions) fall back to + /// the default selectivity. An operator-level statistics registry is + /// needed for full coverage. + /// + /// [`ProjectionExprs::with_expression_analyzer_registry`]: datafusion_physical_expr::projection::ProjectionExprs::with_expression_analyzer_registry + pub fn with_expression_analyzer_registry( + mut self, + registry: Arc< + datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry, + >, + ) -> Self { + self.expression_analyzer_registry = Some(registry); + self + } + /// Build the FilterExec, computing properties once with all configured parameters pub fn build(self) -> Result { // Validate predicate type @@ -213,6 +240,7 @@ impl FilterExecBuilder { projection: self.projection, batch_size: self.batch_size, fetch: self.fetch, + expression_analyzer_registry: self.expression_analyzer_registry, }) } } @@ -226,6 +254,7 @@ impl From<&FilterExec> for FilterExecBuilder { default_selectivity: exec.default_selectivity, batch_size: exec.batch_size, fetch: exec.fetch, + expression_analyzer_registry: exec.expression_analyzer_registry.clone(), // We could cache / copy over PlanProperties // here but that would require invalidating them in FilterExecBuilder::apply_projection, etc. // and currently every call to this method ends up invalidating them anyway. @@ -286,6 +315,7 @@ impl FilterExec { projection: self.projection.clone(), batch_size, fetch: self.fetch, + expression_analyzer_registry: self.expression_analyzer_registry.clone(), }) } @@ -315,9 +345,16 @@ impl FilterExec { input_stats: Statistics, predicate: &Arc, default_selectivity: u8, + expression_analyzer_registry: Option< + &datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry, + >, ) -> Result { if !check_support(predicate, schema) { - let selectivity = default_selectivity as f64 / 100.0; + // Use ExpressionAnalyzer for better selectivity when available, + // fall back to the configured default_selectivity + let selectivity = expression_analyzer_registry + .and_then(|r| r.get_selectivity(predicate, &input_stats)) + .unwrap_or(default_selectivity as f64 / 100.0); let mut stats = input_stats.to_inexact(); stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity); stats.total_byte_size = stats @@ -365,6 +402,7 @@ impl FilterExec { Arc::unwrap_or_clone(input.partition_statistics(None)?), predicate, default_selectivity, + None, )?; let mut eq_properties = input.equivalence_properties().clone(); let (equal_pairs, _) = collect_columns_from_predicate_inner(predicate); @@ -550,6 +588,7 @@ impl ExecutionPlan for FilterExec { input_stats, self.predicate(), self.default_selectivity, + self.expression_analyzer_registry.as_deref(), )?; Ok(Arc::new(stats.project(self.projection.as_ref()))) } @@ -710,6 +749,7 @@ impl ExecutionPlan for FilterExec { projection: self.projection.clone(), batch_size: self.batch_size, fetch: self.fetch, + expression_analyzer_registry: self.expression_analyzer_registry.clone(), }; Some(Arc::new(new) as _) }; @@ -734,6 +774,7 @@ impl ExecutionPlan for FilterExec { projection: self.projection.clone(), batch_size: self.batch_size, fetch, + expression_analyzer_registry: self.expression_analyzer_registry.clone(), })) } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index cd80277156fcb..0ab63105fab05 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -179,6 +179,18 @@ impl ProjectionExec { &self.input } + /// Set the expression analyzer registry for statistics estimation. + /// The registry is stored on the underlying [`ProjectionExprs`]. + pub fn with_expression_analyzer_registry( + mut self, + registry: Arc< + datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry, + >, + ) -> Self { + self.projector.set_expression_analyzer_registry(registry); + self + } + /// This function creates the cache object that stores the plan properties such as schema, equivalence properties, ordering, partitioning, etc. fn compute_properties( input: &Arc, From 011f44122707743242c65b0cb483f219359ed00f Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Tue, 24 Mar 2026 09:52:18 +0100 Subject: [PATCH 2/8] Fix CI: update config docs, sqllogictest, and doc links - Regenerate configs.md for new enable_expression_analyzer option - Add enable_expression_analyzer to information_schema.slt expected output - Fix unresolved doc links to SessionState and DefaultExpressionAnalyzer (cross-crate references use backticks instead of doc links) - Simplify config description --- datafusion/common/src/config.rs | 5 ++--- datafusion/physical-expr/src/projection.rs | 4 ++-- datafusion/physical-plan/src/filter.rs | 2 +- datafusion/sqllogictest/test_files/information_schema.slt | 2 ++ docs/source/user-guide/configs.md | 1 + 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 6b44d0e21001c..e99c8e1588bdd 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -965,9 +965,8 @@ config_namespace! { /// When set to true, the physical planner will use the ExpressionAnalyzer /// framework for expression-level statistics estimation (NDV, selectivity, - /// min/max, null fraction) in projections and filters. When false, projections - /// return unknown statistics for non-column expressions and filters use the - /// default selectivity heuristic. + /// min/max, null fraction). When false, existing behavior without + /// expression-level statistics support is used. pub enable_expression_analyzer: bool, default = false /// When set to true, the optimizer will insert filters before a join between diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index e3d1adfb58070..6ce88d34dbf24 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -200,10 +200,10 @@ impl ProjectionExprs { /// Set the expression analyzer registry for statistics estimation. /// - /// The physical planner injects the registry from [`SessionState`] when + /// The physical planner injects the registry from `SessionState` when /// creating projections. Projections created later by optimizer rules /// do not receive the registry and fall back to - /// [`DefaultExpressionAnalyzer`]. Propagating the registry to all + /// `DefaultExpressionAnalyzer`. Propagating the registry to all /// operator construction sites requires an operator-level statistics /// registry, which is orthogonal to this work. pub fn with_expression_analyzer_registry( diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index cb9f22f3f2253..dcb0caad59c53 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -185,7 +185,7 @@ impl FilterExecBuilder { /// Set the expression analyzer registry for selectivity estimation. /// /// Same limitation as [`ProjectionExprs::with_expression_analyzer_registry`]: - /// the planner injects this from [`SessionState`], but filters created + /// the planner injects this from `SessionState`, but filters created /// by optimizer rules (e.g., filter pushdown into unions) fall back to /// the default selectivity. An operator-level statistics registry is /// needed for full coverage. diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index ba1f1403450a9..1a12aabb46cc4 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -298,6 +298,7 @@ datafusion.optimizer.default_filter_selectivity 20 datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true +datafusion.optimizer.enable_expression_analyzer false datafusion.optimizer.enable_join_dynamic_filter_pushdown true datafusion.optimizer.enable_leaf_expression_pushdown true datafusion.optimizer.enable_piecewise_merge_join false @@ -440,6 +441,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. +datafusion.optimizer.enable_expression_analyzer false When set to true, the physical planner will use the ExpressionAnalyzer framework for expression-level statistics estimation (NDV, selectivity, min/max, null fraction). When false, existing behavior without expression-level statistics support is used. datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 69627e3cb9148..dc7e0f2cd604e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -143,6 +143,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | | datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | | datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | +| datafusion.optimizer.enable_expression_analyzer | false | When set to true, the physical planner will use the ExpressionAnalyzer framework for expression-level statistics estimation (NDV, selectivity, min/max, null fraction). When false, existing behavior without expression-level statistics support is used. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. | From f6f27acb2c1bc9f6f6aa01bb5e2cfff826b9f30a Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Wed, 1 Apr 2026 19:11:10 +0200 Subject: [PATCH 3/8] Fix doc placement, import path, extract helper, defer selectivity computation - Fix expression_analyzer_registry doc comment misplaced between function_factory's doc comment and field declaration - Fix module doc example import path (physical_plan -> physical_expr) - Extract expression_analyzer_registry() helper in planner to avoid repeating the config check 4 times - Defer left_sel/right_sel computation to AND/OR arms only, avoiding unnecessary sub-expression selectivity estimation for comparison operators --- .../core/src/execution/session_state.rs | 4 +- datafusion/core/src/physical_planner.rs | 54 +++++++++---------- .../src/expression_analyzer/default.rs | 38 +++++++++---- .../src/expression_analyzer/mod.rs | 2 +- 4 files changed, 54 insertions(+), 44 deletions(-) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index acdaaad43c732..4f80e547bd314 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -186,12 +186,12 @@ pub struct SessionState { table_factories: HashMap>, /// Runtime environment runtime_env: Arc, + /// Registry for expression-level statistics analyzers (NDV, selectivity, etc.) + expression_analyzer_registry: Arc, /// [FunctionFactory] to support pluggable user defined function handler. /// /// It will be invoked on `CREATE FUNCTION` statements. /// thus, changing dialect o PostgreSql is required - /// Registry for expression-level statistics analyzers (NDV, selectivity, etc.) - expression_analyzer_registry: Arc, function_factory: Option>, cache_factory: Option>, /// Cache logical plans of prepared statements for later execution. diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 2087ab88aabfd..da880f511b07a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -56,6 +56,7 @@ use crate::physical_plan::{ WindowExpr, displayable, windows, }; use crate::schema_equivalence::schema_satisfied_by; +use datafusion_physical_expr::expression_analyzer::ExpressionAnalyzerRegistry; use arrow::array::{RecordBatch, builder::StringBuilder}; use arrow::compute::SortOptions; @@ -1110,17 +1111,11 @@ impl DefaultPhysicalPlanner { physical_input, ) .with_batch_size(session_state.config().batch_size()); - let builder = if session_state - .config_options() - .optimizer - .enable_expression_analyzer - { - builder.with_expression_analyzer_registry(Arc::clone( - session_state.expression_analyzer_registry(), - )) - } else { - builder - }; + let builder = + match Self::expression_analyzer_registry(session_state) { + Some(r) => builder.with_expression_analyzer_registry(r), + None => builder, + }; builder.build()? } PlanAsyncExpr::Async( @@ -1141,17 +1136,11 @@ impl DefaultPhysicalPlanner { (0..input.schema().fields().len()).collect::>(), ))? .with_batch_size(session_state.config().batch_size()); - let builder = if session_state - .config_options() - .optimizer - .enable_expression_analyzer - { - builder.with_expression_analyzer_registry(Arc::clone( - session_state.expression_analyzer_registry(), - )) - } else { - builder - }; + let builder = + match Self::expression_analyzer_registry(session_state) { + Some(r) => builder.with_expression_analyzer_registry(r), + None => builder, + }; builder.build()? } _ => { @@ -2861,6 +2850,17 @@ impl DefaultPhysicalPlanner { Ok(mem_exec) } + /// Returns the expression analyzer registry if the config option is enabled. + fn expression_analyzer_registry( + session_state: &SessionState, + ) -> Option> { + session_state + .config_options() + .optimizer + .enable_expression_analyzer + .then(|| Arc::clone(session_state.expression_analyzer_registry())) + } + fn create_project_physical_exec( &self, session_state: &SessionState, @@ -2921,14 +2921,8 @@ impl DefaultPhysicalPlanner { .map(|(expr, alias)| ProjectionExpr { expr, alias }) .collect(); let mut proj_exec = ProjectionExec::try_new(proj_exprs, input_exec)?; - if session_state - .config_options() - .optimizer - .enable_expression_analyzer - { - proj_exec = proj_exec.with_expression_analyzer_registry(Arc::clone( - session_state.expression_analyzer_registry(), - )); + if let Some(r) = Self::expression_analyzer_registry(session_state) { + proj_exec = proj_exec.with_expression_analyzer_registry(r); } Ok(Arc::new(proj_exec)) } diff --git a/datafusion/physical-expr/src/expression_analyzer/default.rs b/datafusion/physical-expr/src/expression_analyzer/default.rs index 29b651a493560..3c215d963aada 100644 --- a/datafusion/physical-expr/src/expression_analyzer/default.rs +++ b/datafusion/physical-expr/src/expression_analyzer/default.rs @@ -72,18 +72,34 @@ impl ExpressionAnalyzer for DefaultExpressionAnalyzer { ) -> AnalysisResult { // Binary expressions: AND, OR, comparisons if let Some(binary) = expr.as_any().downcast_ref::() { - let left_sel = - self.estimate_selectivity_recursive(binary.left(), input_stats, registry); - let right_sel = self.estimate_selectivity_recursive( - binary.right(), - input_stats, - registry, - ); - let sel = match binary.op() { - // Logical operators - Operator::And => left_sel * right_sel, - Operator::Or => left_sel + right_sel - (left_sel * right_sel), + // Logical operators: need child selectivities + Operator::And => { + let left_sel = self.estimate_selectivity_recursive( + binary.left(), + input_stats, + registry, + ); + let right_sel = self.estimate_selectivity_recursive( + binary.right(), + input_stats, + registry, + ); + left_sel * right_sel + } + Operator::Or => { + let left_sel = self.estimate_selectivity_recursive( + binary.left(), + input_stats, + registry, + ); + let right_sel = self.estimate_selectivity_recursive( + binary.right(), + input_stats, + registry, + ); + left_sel + right_sel - (left_sel * right_sel) + } // Equality: selectivity = 1/NDV Operator::Eq => { diff --git a/datafusion/physical-expr/src/expression_analyzer/mod.rs b/datafusion/physical-expr/src/expression_analyzer/mod.rs index 0103342134c4d..e06871a37dd48 100644 --- a/datafusion/physical-expr/src/expression_analyzer/mod.rs +++ b/datafusion/physical-expr/src/expression_analyzer/mod.rs @@ -40,7 +40,7 @@ //! # Example //! //! ```ignore -//! use datafusion_physical_plan::expression_analyzer::*; +//! use datafusion_physical_expr::expression_analyzer::*; //! //! // Create registry with default analyzer //! let mut registry = ExpressionAnalyzerRegistry::new(); From c62a5cbcf6830718f52d4537202a47e08ece7949 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Thu, 2 Apr 2026 18:16:33 +0200 Subject: [PATCH 4/8] Snapshot column stats for analyzer to avoid ordering dependency --- datafusion/physical-expr/src/projection.rs | 124 ++++++++++++++++++++- 1 file changed, 120 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 6ce88d34dbf24..6d83a41a42c06 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -688,6 +688,15 @@ impl ProjectionExprs { mut stats: Statistics, output_schema: &Schema, ) -> Result { + // Snapshot for analyzer lookups + let original_stats = + self.expression_analyzer_registry + .as_ref() + .map(|_| Statistics { + num_rows: stats.num_rows, + total_byte_size: stats.total_byte_size, + column_statistics: stats.column_statistics.clone(), + }); let mut column_statistics = vec![]; for proj_expr in self.exprs.iter() { @@ -747,18 +756,19 @@ impl ProjectionExprs { } } else if let Some(registry) = &self.expression_analyzer_registry { // Use ExpressionAnalyzer to estimate statistics for arbitrary expressions + let original_stats = original_stats.as_ref().unwrap(); let distinct_count = registry - .get_distinct_count(expr, &stats) + .get_distinct_count(expr, original_stats) .map(Precision::Inexact) .unwrap_or(Precision::Absent); let (min_value, max_value) = registry - .get_min_max(expr, &stats) + .get_min_max(expr, original_stats) .map(|(min, max)| (Precision::Inexact(min), Precision::Inexact(max))) .unwrap_or((Precision::Absent, Precision::Absent)); let null_count = registry - .get_null_fraction(expr, &stats) + .get_null_fraction(expr, original_stats) .and_then(|frac| { - stats + original_stats .num_rows .get_value() .map(|&rows| (rows as f64 * frac).ceil() as usize) @@ -3147,4 +3157,110 @@ pub(crate) mod tests { Ok(()) } + + #[test] + fn test_project_statistics_column_then_expression() -> Result<()> { + // SELECT a, a + 1: bare column first, then expression on same column + let input_stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Exact(100), + null_count: Precision::Exact(0), + min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + max_value: Precision::Exact(ScalarValue::Int64(Some(1000))), + sum_value: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let input_schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + + let projection = ProjectionExprs::new(vec![ + ProjectionExpr { + expr: Arc::new(Column::new("a", 0)), + alias: "a".to_string(), + }, + ProjectionExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), + )), + alias: "a_plus_1".to_string(), + }, + ]) + .with_expression_analyzer_registry(Arc::new(ExpressionAnalyzerRegistry::new())); + + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; + + // Bare column: exact stats preserved + assert_eq!( + output_stats.column_statistics[0].distinct_count, + Precision::Exact(100) + ); + + // Expression on same column: analyzer should still see a's NDV + assert_eq!( + output_stats.column_statistics[1].distinct_count, + Precision::Inexact(100) + ); + + Ok(()) + } + + #[test] + fn test_project_statistics_expression_then_column() -> Result<()> { + // SELECT a + 1, a: expression first, then bare column + let input_stats = Statistics { + num_rows: Precision::Exact(1000), + total_byte_size: Precision::Absent, + column_statistics: vec![ColumnStatistics { + distinct_count: Precision::Exact(100), + null_count: Precision::Exact(0), + min_value: Precision::Exact(ScalarValue::Int64(Some(1))), + max_value: Precision::Exact(ScalarValue::Int64(Some(1000))), + sum_value: Precision::Absent, + byte_size: Precision::Absent, + }], + }; + let input_schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + + let projection = ProjectionExprs::new(vec![ + ProjectionExpr { + expr: Arc::new(BinaryExpr::new( + Arc::new(Column::new("a", 0)), + Operator::Plus, + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))), + )), + alias: "a_plus_1".to_string(), + }, + ProjectionExpr { + expr: Arc::new(Column::new("a", 0)), + alias: "a".to_string(), + }, + ]) + .with_expression_analyzer_registry(Arc::new(ExpressionAnalyzerRegistry::new())); + + let output_stats = projection.project_statistics( + input_stats, + &projection.project_schema(&input_schema)?, + )?; + + // Expression: analyzer sees a's NDV (no take yet) + assert_eq!( + output_stats.column_statistics[0].distinct_count, + Precision::Inexact(100) + ); + + // Bare column: exact stats preserved + assert_eq!( + output_stats.column_statistics[1].distinct_count, + Precision::Exact(100) + ); + + Ok(()) + } } From eeb3b503e96722e28967d8254eacfb413edf0e72 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Thu, 2 Apr 2026 18:52:55 +0200 Subject: [PATCH 5/8] Use registry NDV for equality/inequality selectivity on expressions --- .../src/expression_analyzer/default.rs | 47 ++++++++++++++----- .../src/expression_analyzer/tests.rs | 38 +++++++++++++++ 2 files changed, 73 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-expr/src/expression_analyzer/default.rs b/datafusion/physical-expr/src/expression_analyzer/default.rs index 3c215d963aada..9681f24118664 100644 --- a/datafusion/physical-expr/src/expression_analyzer/default.rs +++ b/datafusion/physical-expr/src/expression_analyzer/default.rs @@ -52,6 +52,27 @@ impl DefaultExpressionAnalyzer { .and_then(|idx| input_stats.column_statistics.get(idx)) } + /// Resolve NDV for a binary expression: try direct column stats first, + /// then fall back to the registry for arbitrary expressions + fn resolve_ndv( + left: &Arc, + right: &Arc, + input_stats: &Statistics, + registry: &ExpressionAnalyzerRegistry, + ) -> Option { + Self::get_column_stats(left, input_stats) + .or_else(|| Self::get_column_stats(right, input_stats)) + .and_then(|s| s.distinct_count.get_value()) + .filter(|&&ndv| ndv > 0) + .copied() + .or_else(|| { + let l = registry.get_distinct_count(left, input_stats); + let r = registry.get_distinct_count(right, input_stats); + l.max(r) + }) + .filter(|&n| n > 0) + } + /// Recursive selectivity estimation through the registry chain fn estimate_selectivity_recursive( &self, @@ -103,24 +124,26 @@ impl ExpressionAnalyzer for DefaultExpressionAnalyzer { // Equality: selectivity = 1/NDV Operator::Eq => { - let ndv = Self::get_column_stats(binary.left(), input_stats) - .or_else(|| Self::get_column_stats(binary.right(), input_stats)) - .and_then(|s| s.distinct_count.get_value()) - .filter(|&&ndv| ndv > 0); - if let Some(ndv) = ndv { - return AnalysisResult::Computed(1.0 / (*ndv as f64)); + if let Some(ndv) = Self::resolve_ndv( + binary.left(), + binary.right(), + input_stats, + registry, + ) { + return AnalysisResult::Computed(1.0 / (ndv as f64)); } 0.1 // Default equality selectivity } // Inequality: selectivity = 1 - 1/NDV Operator::NotEq => { - let ndv = Self::get_column_stats(binary.left(), input_stats) - .or_else(|| Self::get_column_stats(binary.right(), input_stats)) - .and_then(|s| s.distinct_count.get_value()) - .filter(|&&ndv| ndv > 0); - if let Some(ndv) = ndv { - return AnalysisResult::Computed(1.0 - (1.0 / (*ndv as f64))); + if let Some(ndv) = Self::resolve_ndv( + binary.left(), + binary.right(), + input_stats, + registry, + ) { + return AnalysisResult::Computed(1.0 - (1.0 / (ndv as f64))); } 0.9 } diff --git a/datafusion/physical-expr/src/expression_analyzer/tests.rs b/datafusion/physical-expr/src/expression_analyzer/tests.rs index 703953a211bf1..a951ce084bffb 100644 --- a/datafusion/physical-expr/src/expression_analyzer/tests.rs +++ b/datafusion/physical-expr/src/expression_analyzer/tests.rs @@ -162,6 +162,44 @@ fn test_not_selectivity() { assert!((sel - 0.99).abs() < 0.001); // 1 - 0.01 } +#[test] +fn test_equality_selectivity_expression_eq_literal() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let one = + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; + let forty_two = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let a_plus_1 = + Arc::new(BinaryExpr::new(col, Operator::Plus, one)) as Arc; + let eq = Arc::new(BinaryExpr::new(a_plus_1, Operator::Eq, forty_two)) + as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + // NDV(a + 1) = NDV(a) = 100, so selectivity = 1/100 = 0.01 + assert!((sel - 0.01).abs() < 0.001); +} + +#[test] +fn test_inequality_selectivity_expression_neq_literal() { + let stats = make_stats_with_ndv(1000, 100); + let col = Arc::new(Column::new("a", 0)) as Arc; + let one = + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; + let forty_two = + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; + let a_plus_1 = + Arc::new(BinaryExpr::new(col, Operator::Plus, one)) as Arc; + let neq = Arc::new(BinaryExpr::new(a_plus_1, Operator::NotEq, forty_two)) + as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&neq, &stats).unwrap(); + // NDV(a + 1) = 100, selectivity = 1 - 1/100 = 0.99 + assert!((sel - 0.99).abs() < 0.001); +} + // Min/max tests #[test] From 40ae37ed74693ae155e940fbfcc458e44b10ba11 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Thu, 2 Apr 2026 19:00:26 +0200 Subject: [PATCH 6/8] Narrow injective arithmetic to addition and subtraction only --- .../src/expression_analyzer/default.rs | 16 ++++------------ .../src/expression_analyzer/tests.rs | 8 ++++++++ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-expr/src/expression_analyzer/default.rs b/datafusion/physical-expr/src/expression_analyzer/default.rs index 9681f24118664..f4fbfac1c47da 100644 --- a/datafusion/physical-expr/src/expression_analyzer/default.rs +++ b/datafusion/physical-expr/src/expression_analyzer/default.rs @@ -210,20 +210,12 @@ impl ExpressionAnalyzer for DefaultExpressionAnalyzer { return AnalysisResult::Computed(1); } - // BinaryExpr: for arithmetic with a literal operand, treat as injective - // (preserves NDV). This is an approximation: col * 0 or col % 1 are - // technically not injective, but the common case (col + 1, col * 2, etc.) is + // BinaryExpr: addition/subtraction with a literal is always injective + // TODO: support more injective operators (e.g. multiply by non-zero) if let Some(binary) = expr.as_any().downcast_ref::() { - let is_arithmetic = matches!( - binary.op(), - Operator::Plus - | Operator::Minus - | Operator::Multiply - | Operator::Divide - | Operator::Modulo - ); + let is_injective = matches!(binary.op(), Operator::Plus | Operator::Minus); - if is_arithmetic { + if is_injective { // If one side is a literal, the operation is injective on the other side let left_is_literal = binary.left().as_any().is::(); let right_is_literal = binary.right().as_any().is::(); diff --git a/datafusion/physical-expr/src/expression_analyzer/tests.rs b/datafusion/physical-expr/src/expression_analyzer/tests.rs index a951ce084bffb..0fe84dd50c1dc 100644 --- a/datafusion/physical-expr/src/expression_analyzer/tests.rs +++ b/datafusion/physical-expr/src/expression_analyzer/tests.rs @@ -73,6 +73,14 @@ fn test_arithmetic_ndv() { )) as Arc; assert_eq!(registry.get_distinct_count(&plus, &stats), Some(100)); + // col - 1: injective, preserves NDV + let minus = Arc::new(BinaryExpr::new( + Arc::clone(&col), + Operator::Minus, + Arc::clone(&lit), + )) as Arc; + assert_eq!(registry.get_distinct_count(&minus, &stats), Some(100)); + // 1 + col: also injective (literal on left) let plus_rev = Arc::new(BinaryExpr::new(lit, Operator::Plus, col)) as Arc; From 25e473ad377af52772cd9cceaee8da06088f0475 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Wed, 8 Apr 2026 15:42:17 +0200 Subject: [PATCH 7/8] Make resolve_ndv symmetric: use max NDV across both operands --- .../src/expression_analyzer/default.rs | 21 ++-- .../src/expression_analyzer/tests.rs | 100 +++++++++++++----- 2 files changed, 84 insertions(+), 37 deletions(-) diff --git a/datafusion/physical-expr/src/expression_analyzer/default.rs b/datafusion/physical-expr/src/expression_analyzer/default.rs index f4fbfac1c47da..8687495e0838b 100644 --- a/datafusion/physical-expr/src/expression_analyzer/default.rs +++ b/datafusion/physical-expr/src/expression_analyzer/default.rs @@ -52,25 +52,20 @@ impl DefaultExpressionAnalyzer { .and_then(|idx| input_stats.column_statistics.get(idx)) } - /// Resolve NDV for a binary expression: try direct column stats first, - /// then fall back to the registry for arbitrary expressions + /// Resolve NDV for a binary predicate by taking the max of both sides. + /// + /// Using max is symmetric (order-independent) and handles column-vs-column, + /// column-vs-expression, and expression-vs-expression uniformly through the + /// registry chain fn resolve_ndv( left: &Arc, right: &Arc, input_stats: &Statistics, registry: &ExpressionAnalyzerRegistry, ) -> Option { - Self::get_column_stats(left, input_stats) - .or_else(|| Self::get_column_stats(right, input_stats)) - .and_then(|s| s.distinct_count.get_value()) - .filter(|&&ndv| ndv > 0) - .copied() - .or_else(|| { - let l = registry.get_distinct_count(left, input_stats); - let r = registry.get_distinct_count(right, input_stats); - l.max(r) - }) - .filter(|&n| n > 0) + let l = registry.get_distinct_count(left, input_stats); + let r = registry.get_distinct_count(right, input_stats); + l.max(r).filter(|&n| n > 0) } /// Recursive selectivity estimation through the registry chain diff --git a/datafusion/physical-expr/src/expression_analyzer/tests.rs b/datafusion/physical-expr/src/expression_analyzer/tests.rs index 0fe84dd50c1dc..928bb9a67be51 100644 --- a/datafusion/physical-expr/src/expression_analyzer/tests.rs +++ b/datafusion/physical-expr/src/expression_analyzer/tests.rs @@ -23,18 +23,21 @@ use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; use datafusion_expr::Operator; use std::sync::Arc; -fn make_stats_with_ndv(num_rows: usize, ndv: usize) -> Statistics { +fn make_stats_with_ndvs(num_rows: usize, ndvs: &[usize]) -> Statistics { Statistics { num_rows: Precision::Exact(num_rows), total_byte_size: Precision::Absent, - column_statistics: vec![ColumnStatistics { - null_count: Precision::Exact(0), - max_value: Precision::Absent, - min_value: Precision::Absent, - sum_value: Precision::Absent, - distinct_count: Precision::Exact(ndv), - byte_size: Precision::Absent, - }], + column_statistics: ndvs + .iter() + .map(|&ndv| ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Exact(ndv), + byte_size: Precision::Absent, + }) + .collect(), } } @@ -42,7 +45,7 @@ fn make_stats_with_ndv(num_rows: usize, ndv: usize) -> Statistics { #[test] fn test_column_ndv() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let registry = ExpressionAnalyzerRegistry::new(); assert_eq!(registry.get_distinct_count(&col, &stats), Some(100)); @@ -50,7 +53,7 @@ fn test_column_ndv() { #[test] fn test_literal_ndv() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; let registry = ExpressionAnalyzerRegistry::new(); @@ -59,7 +62,7 @@ fn test_literal_ndv() { #[test] fn test_arithmetic_ndv() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let lit = Arc::new(Literal::new(ScalarValue::Int64(Some(1)))) as Arc; @@ -91,7 +94,7 @@ fn test_arithmetic_ndv() { #[test] fn test_equality_selectivity() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; @@ -104,7 +107,7 @@ fn test_equality_selectivity() { #[test] fn test_equality_selectivity_column_on_right() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; @@ -117,7 +120,7 @@ fn test_equality_selectivity_column_on_right() { #[test] fn test_and_selectivity() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; @@ -137,7 +140,7 @@ fn test_and_selectivity() { #[test] fn test_or_selectivity() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let lit1 = Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; @@ -157,7 +160,7 @@ fn test_or_selectivity() { #[test] fn test_not_selectivity() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; @@ -172,7 +175,7 @@ fn test_not_selectivity() { #[test] fn test_equality_selectivity_expression_eq_literal() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let one = Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; @@ -191,7 +194,7 @@ fn test_equality_selectivity_expression_eq_literal() { #[test] fn test_inequality_selectivity_expression_neq_literal() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let one = Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc; @@ -208,6 +211,55 @@ fn test_inequality_selectivity_expression_neq_literal() { assert!((sel - 0.99).abs() < 0.001); } +// Tests for resolve_ndv symmetry (column-vs-column and column-vs-expression) + +#[test] +fn test_equality_selectivity_column_eq_column_symmetric() { + // a = b and b = a must give the same selectivity regardless of operand order + let stats = make_stats_with_ndvs(1000, &[50, 200]); + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 1)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + + let eq_ab = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::Eq, + Arc::clone(&col_b), + )) as Arc; + let eq_ba = Arc::new(BinaryExpr::new( + Arc::clone(&col_b), + Operator::Eq, + Arc::clone(&col_a), + )) as Arc; + + let sel_ab = registry.get_selectivity(&eq_ab, &stats).unwrap(); + let sel_ba = registry.get_selectivity(&eq_ba, &stats).unwrap(); + + assert_eq!(sel_ab, sel_ba); +} + +#[test] +fn test_equality_selectivity_column_eq_expression_uses_max_ndv() { + // col = (expr) should use max(ndv(col), ndv(expr)), not just ndv(col) + // Here col has ndv=10 and expr=(b+1) has ndv=200 from column b + let stats = make_stats_with_ndvs(1000, &[10, 200]); + let col_a = Arc::new(Column::new("a", 0)) as Arc; + let col_b = Arc::new(Column::new("b", 1)) as Arc; + let one = + Arc::new(Literal::new(ScalarValue::Int64(Some(1)))) as Arc; + let b_plus_1 = + Arc::new(BinaryExpr::new(col_b, Operator::Plus, one)) as Arc; + + let eq = + Arc::new(BinaryExpr::new(col_a, Operator::Eq, b_plus_1)) as Arc; + + let registry = ExpressionAnalyzerRegistry::new(); + let sel = registry.get_selectivity(&eq, &stats).unwrap(); + // max(ndv(a)=10, ndv(b+1)=200) = 200, so selectivity = 1/200 + assert_eq!(sel, 1.0 / 200.0); +} + // Min/max tests #[test] @@ -235,7 +287,7 @@ fn test_column_min_max() { #[test] fn test_literal_min_max() { - let stats = make_stats_with_ndv(100, 10); + let stats = make_stats_with_ndvs(100, &[10]); let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; let registry = ExpressionAnalyzerRegistry::new(); @@ -271,7 +323,7 @@ fn test_column_null_fraction() { #[test] fn test_literal_null_fraction() { - let stats = make_stats_with_ndv(100, 10); + let stats = make_stats_with_ndvs(100, &[10]); let registry = ExpressionAnalyzerRegistry::new(); let lit = @@ -301,7 +353,7 @@ impl ExpressionAnalyzer for FixedSelectivityAnalyzer { #[test] fn test_custom_analyzer_overrides_default() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let lit = Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc; @@ -336,7 +388,7 @@ impl ExpressionAnalyzer for ColumnAOnlyAnalyzer { #[test] fn test_custom_analyzer_delegates_to_default() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col_a = Arc::new(Column::new("a", 0)) as Arc; let col_b = Arc::new(Column::new("b", 0)) as Arc; let lit = @@ -359,7 +411,7 @@ fn test_custom_analyzer_delegates_to_default() { #[test] fn test_with_analyzers_and_default() { - let stats = make_stats_with_ndv(1000, 100); + let stats = make_stats_with_ndvs(1000, &[100]); let col = Arc::new(Column::new("a", 0)) as Arc; let registry = From 54a0df74acc3dd60b8abc0f31e07f22b950bef89 Mon Sep 17 00:00:00 2001 From: Alessandro Solimando Date: Wed, 8 Apr 2026 21:47:00 +0200 Subject: [PATCH 8/8] Clarify enable_expression_analyzer scope in config and doc comments --- datafusion/common/src/config.rs | 7 ++++--- datafusion/physical-expr/src/projection.rs | 6 +++--- datafusion/physical-plan/src/filter.rs | 5 +++-- datafusion/sqllogictest/test_files/information_schema.slt | 2 +- docs/source/user-guide/configs.md | 2 +- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e99c8e1588bdd..3d93bc294a395 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -963,10 +963,11 @@ config_namespace! { /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. pub enable_dynamic_filter_pushdown: bool, default = true - /// When set to true, the physical planner will use the ExpressionAnalyzer + /// When set to true, the physical planner uses the ExpressionAnalyzer /// framework for expression-level statistics estimation (NDV, selectivity, - /// min/max, null fraction). When false, existing behavior without - /// expression-level statistics support is used. + /// min/max, null fraction) for operators created during logical-to-physical + /// translation. Optimizer-created operators fall back to built-in estimation. + /// When false, existing behavior is unchanged. pub enable_expression_analyzer: bool, default = false /// When set to true, the optimizer will insert filters before a join between diff --git a/datafusion/physical-expr/src/projection.rs b/datafusion/physical-expr/src/projection.rs index 6d83a41a42c06..811574ca3c3fd 100644 --- a/datafusion/physical-expr/src/projection.rs +++ b/datafusion/physical-expr/src/projection.rs @@ -203,9 +203,9 @@ impl ProjectionExprs { /// The physical planner injects the registry from `SessionState` when /// creating projections. Projections created later by optimizer rules /// do not receive the registry and fall back to - /// `DefaultExpressionAnalyzer`. Propagating the registry to all - /// operator construction sites requires an operator-level statistics - /// registry, which is orthogonal to this work. + /// `DefaultExpressionAnalyzer`. Full coverage requires an operator-level + /// statistics registry (tracked in + /// ). pub fn with_expression_analyzer_registry( mut self, registry: Arc, diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index dcb0caad59c53..4021ab962367f 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -187,8 +187,9 @@ impl FilterExecBuilder { /// Same limitation as [`ProjectionExprs::with_expression_analyzer_registry`]: /// the planner injects this from `SessionState`, but filters created /// by optimizer rules (e.g., filter pushdown into unions) fall back to - /// the default selectivity. An operator-level statistics registry is - /// needed for full coverage. + /// the default selectivity. Full coverage requires an operator-level + /// statistics registry (tracked in + /// ). /// /// [`ProjectionExprs::with_expression_analyzer_registry`]: datafusion_physical_expr::projection::ProjectionExprs::with_expression_analyzer_registry pub fn with_expression_analyzer_registry( diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 1a12aabb46cc4..c2e4eb186a658 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -441,7 +441,7 @@ datafusion.optimizer.default_filter_selectivity 20 The default filter selectivit datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. -datafusion.optimizer.enable_expression_analyzer false When set to true, the physical planner will use the ExpressionAnalyzer framework for expression-level statistics estimation (NDV, selectivity, min/max, null fraction). When false, existing behavior without expression-level statistics support is used. +datafusion.optimizer.enable_expression_analyzer false When set to true, the physical planner uses the ExpressionAnalyzer framework for expression-level statistics estimation (NDV, selectivity, min/max, null fraction) for operators created during logical-to-physical translation. Optimizer-created operators fall back to built-in estimation. When false, existing behavior is unchanged. datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes. datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter. diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index dc7e0f2cd604e..030b97de1829c 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -143,7 +143,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_join_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase. | | datafusion.optimizer.enable_aggregate_dynamic_filter_pushdown | true | When set to true, the optimizer will attempt to push down Aggregate dynamic filters into the file scan phase. | | datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden. | -| datafusion.optimizer.enable_expression_analyzer | false | When set to true, the physical planner will use the ExpressionAnalyzer framework for expression-level statistics estimation (NDV, selectivity, min/max, null fraction). When false, existing behavior without expression-level statistics support is used. | +| datafusion.optimizer.enable_expression_analyzer | false | When set to true, the physical planner uses the ExpressionAnalyzer framework for expression-level statistics estimation (NDV, selectivity, min/max, null fraction) for operators created during logical-to-physical translation. Optimizer-created operators fall back to built-in estimation. When false, existing behavior is unchanged. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. |