Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,10 @@ impl ExecutionPlan for FilterExec {
return FilterExecBuilder::from(self)
.with_input(make_with_child(projection, self.input())?)
.with_predicate(new_predicate)
// The original FilterExec projection referenced columns from its old
// input. After the swap the new input is the ProjectionExec which
// already handles column selection, so clear the projection here.
.apply_projection(None)?
.build()
.map(|e| Some(Arc::new(e) as _));
}
Expand Down Expand Up @@ -2265,4 +2269,68 @@ mod tests {

Ok(())
}

/// Regression test: ProjectionExec on top of a FilterExec that already has
/// an explicit projection must not panic when `try_swapping_with_projection`
/// attempts to swap the two nodes.
///
/// Before the fix, `FilterExecBuilder::from(self)` copied the old projection
/// (e.g. `[0, 1, 2]`) from the FilterExec. After `.with_input` replaced the
/// input with the narrower ProjectionExec (2 columns), `.build()` tried to
/// validate the stale `[0, 1, 2]` projection against the 2-column schema and
/// panicked with "project index 2 out of bounds, max field 2".
#[test]
fn test_filter_with_projection_swap_does_not_panic() -> Result<()> {
use crate::projection::ProjectionExpr;
use datafusion_physical_expr::expressions::col;

// Schema: [ts: Int64, tokens: Int64, svc: Utf8]
let schema = Arc::new(Schema::new(vec![
Field::new("ts", DataType::Int64, false),
Field::new("tokens", DataType::Int64, false),
Field::new("svc", DataType::Utf8, false),
]));
let input = Arc::new(EmptyExec::new(Arc::clone(&schema)));

// FilterExec: ts > 0, projection=[ts@0, tokens@1, svc@2] (all 3 cols)
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new("ts", 0)),
Operator::Gt,
Arc::new(Literal::new(ScalarValue::Int64(Some(0)))),
));
let filter = Arc::new(
FilterExecBuilder::new(predicate, input)
.apply_projection(Some(vec![0, 1, 2]))?
.build()?,
);

// ProjectionExec: narrows to [ts, tokens] (drops svc)
let proj_exprs = vec![
ProjectionExpr {
expr: col("ts", &filter.schema())?,
alias: "ts".to_string(),
},
ProjectionExpr {
expr: col("tokens", &filter.schema())?,
alias: "tokens".to_string(),
},
];
let projection = Arc::new(ProjectionExec::try_new(
proj_exprs,
Arc::clone(&filter) as _,
)?);

// This must not panic
let result = filter.try_swapping_with_projection(&projection)?;
assert!(result.is_some(), "swap should succeed");

let new_plan = result.unwrap();
// Output schema must still be [ts, tokens]
let out_schema = new_plan.schema();
assert_eq!(out_schema.fields().len(), 2);
assert_eq!(out_schema.field(0).name(), "ts");
assert_eq!(out_schema.field(1).name(), "tokens");

Ok(())
}
}
Loading