From 44b859742adc8e8f32e6e3c0a02b4c791bfb04e6 Mon Sep 17 00:00:00 2001 From: Huaijin Date: Wed, 8 Apr 2026 19:29:37 +0800 Subject: [PATCH] fix: FilterExec should drop projection when apply projection pushdown (cherry picked from commit 330d57ff4d1752cfc60ba5faa5d24391ab4cca86) --- datafusion/physical-plan/src/filter.rs | 68 ++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 21f5727d866e4..6bd779e3d5023 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -606,6 +606,10 @@ impl ExecutionPlan for FilterExec { return FilterExecBuilder::from(self) .with_input(make_with_child(projection, self.input())?) .with_predicate(new_predicate) + // The original FilterExec projection referenced columns from its old + // input. After the swap the new input is the ProjectionExec which + // already handles column selection, so clear the projection here. + .apply_projection(None)? .build() .map(|e| Some(Arc::new(e) as _)); } @@ -2265,4 +2269,68 @@ mod tests { Ok(()) } + + /// Regression test: ProjectionExec on top of a FilterExec that already has + /// an explicit projection must not panic when `try_swapping_with_projection` + /// attempts to swap the two nodes. + /// + /// Before the fix, `FilterExecBuilder::from(self)` copied the old projection + /// (e.g. `[0, 1, 2]`) from the FilterExec. After `.with_input` replaced the + /// input with the narrower ProjectionExec (2 columns), `.build()` tried to + /// validate the stale `[0, 1, 2]` projection against the 2-column schema and + /// panicked with "project index 2 out of bounds, max field 2". + #[test] + fn test_filter_with_projection_swap_does_not_panic() -> Result<()> { + use crate::projection::ProjectionExpr; + use datafusion_physical_expr::expressions::col; + + // Schema: [ts: Int64, tokens: Int64, svc: Utf8] + let schema = Arc::new(Schema::new(vec![ + Field::new("ts", DataType::Int64, false), + Field::new("tokens", DataType::Int64, false), + Field::new("svc", DataType::Utf8, false), + ])); + let input = Arc::new(EmptyExec::new(Arc::clone(&schema))); + + // FilterExec: ts > 0, projection=[ts@0, tokens@1, svc@2] (all 3 cols) + let predicate = Arc::new(BinaryExpr::new( + Arc::new(Column::new("ts", 0)), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int64(Some(0)))), + )); + let filter = Arc::new( + FilterExecBuilder::new(predicate, input) + .apply_projection(Some(vec![0, 1, 2]))? + .build()?, + ); + + // ProjectionExec: narrows to [ts, tokens] (drops svc) + let proj_exprs = vec![ + ProjectionExpr { + expr: col("ts", &filter.schema())?, + alias: "ts".to_string(), + }, + ProjectionExpr { + expr: col("tokens", &filter.schema())?, + alias: "tokens".to_string(), + }, + ]; + let projection = Arc::new(ProjectionExec::try_new( + proj_exprs, + Arc::clone(&filter) as _, + )?); + + // This must not panic + let result = filter.try_swapping_with_projection(&projection)?; + assert!(result.is_some(), "swap should succeed"); + + let new_plan = result.unwrap(); + // Output schema must still be [ts, tokens] + let out_schema = new_plan.schema(); + assert_eq!(out_schema.fields().len(), 2); + assert_eq!(out_schema.field(0).name(), "ts"); + assert_eq!(out_schema.field(1).name(), "tokens"); + + Ok(()) + } }