diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 19546d3150039..d8435e293249a 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -4072,24 +4072,49 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { where Self: Sized, { - if let Some(expr_id) = proto.expr_id { - // Check cache first - if let Some(cached) = self.cache.borrow().get(&expr_id) { - return Ok(Arc::clone(cached)); - } - // Deserialize and cache - let expr = parse_physical_expr_with_converter( + let Some(expr_id) = proto.expr_id else { + return parse_physical_expr_with_converter( proto, ctx, input_schema, codec, self, - )?; - self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); - Ok(expr) - } else { - parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) + ); + }; + + // Always parse the proto body first. The cache hit path below uses + // `parsed.children()` (which carry this occurrence's column refs as + // they appeared in the proto -- e.g. file-schema indices after + // FilterPushdown rewrote them), so we can't short-circuit before + // parsing. + let parsed = + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?; + + let mut cache = self.cache.borrow_mut(); + if let Some(cached) = cache.get(&expr_id) { + // Since expressions may manage their own internal state when + // deriving expressions via `with_new_children`, we use + // `with_new_children` to opt into the same behavior. + // + // For example, one `DynamicFilterPhysicalExpr` may be derived + // from another resulting in shared references (e.g. a SortExec + // dynamic filter at the parent schema and a pushed-down FileScan + // predicate at the file schema, sharing the same `Inner` Arc + // but with different children). Using `with_new_children` + // preserves those references: the cached `Arc` is kept, + // while this occurrence's children (parsed from the proto body) + // are installed onto a fresh outer wrapper. + // + // Ported from apache/datafusion#21807; without this the cache + // hit path returned the entire cached outer Arc and silently + // discarded the proto body's children, breaking column ref + // alignment in plans where FilterPushdown rewrote them. + let children: Vec<_> = parsed.children().into_iter().cloned().collect(); + return Arc::clone(cached).with_new_children(children); } + + cache.insert(expr_id, Arc::clone(&parsed)); + Ok(parsed) } fn physical_expr_to_proto( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b3e0a3ebcf88b..247687f9a7a5b 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3215,26 +3215,64 @@ fn dynamic_filter_dedup_with_deduplicating_codec() -> Result<()> { let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?; let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?; - assert!( - Arc::ptr_eq(&d1, &d2), - "DeduplicatingDeserializer must return the same Arc for two refs with the same expr_id" - ); - + // d1 and d2 may be distinct OUTER Arcs because the cache-hit path + // rewraps via `with_new_children` to preserve each occurrence's + // children. The invariant that matters is shared INNER (TopK updates + // visible to both). expression_id equality is necessary but not + // sufficient (the wire carries it across decode), so we also assert + // observable shared state via `update()` -> `snapshot_generation()`. let d1_df = d1 .as_any() .downcast_ref::() .expect( "decoded expr must be DynamicFilterPhysicalExpr; snapshot path is bypassed", ); + let d2_df = d2 + .as_any() + .downcast_ref::() + .expect( + "decoded expr must be DynamicFilterPhysicalExpr; snapshot path is bypassed", + ); assert_eq!( d1_df.inner().expression_id, id_before, "expression_id must survive proto roundtrip" ); + assert_dynamic_filter_inner_is_shared(d1_df, d2_df)?; Ok(()) } +/// Asserts that two decoded [`DynamicFilterPhysicalExpr`] outer Arcs back +/// the same `Arc>`. expression_id equality alone is not +/// sufficient (the wire format carries the value across decode, so two +/// independently-allocated `Inner`s can have the same id). The only +/// observable proof of shared inner state is that an `update()` on one +/// side bumps the `snapshot_generation()` of the other. +fn assert_dynamic_filter_inner_is_shared( + left: &datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr, + right: &datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr, +) -> Result<()> { + let gen_before = left.snapshot_generation(); + assert_eq!( + gen_before, + right.snapshot_generation(), + "left and right must start at the same generation if they share Inner" + ); + left.update(lit(123_i64))?; + let gen_after_left = left.snapshot_generation(); + let gen_after_right = right.snapshot_generation(); + assert!( + gen_after_left > gen_before, + "left's update must bump its own generation" + ); + assert_eq!( + gen_after_left, gen_after_right, + "right must observe left's update (proves Arc is shared, not just equal expression_id)" + ); + Ok(()) +} + /// Two distinct outer Arcs that share the same `Inner` (e.g. via /// `with_new_children`) must still dedup to the same decoded Arc, because /// `DeduplicatingSerializer` now hashes on `expression_id` (which is @@ -3300,11 +3338,151 @@ fn dynamic_filter_dedup_distinct_outer_arcs_same_inner() -> Result<()> { let deserializer = DeduplicatingDeserializer::new(); let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?; let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?; - assert!( - Arc::ptr_eq(&d1, &d2), - "Distinct-outer same-Inner refs must reconstruct to one Arc" + // d1 and d2 may be distinct OUTER Arcs (cache-hit path rewraps via + // `with_new_children`); the invariant is shared INNER. Verify via + // observable update propagation, not just expression_id equality. + let d1_df = d1 + .as_any() + .downcast_ref::() + .unwrap(); + let d2_df = d2 + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(d1_df.inner().expression_id, expected_id); + assert_dynamic_filter_inner_is_shared(d1_df, d2_df)?; + + Ok(()) +} + +/// Two outer `Arc` sharing the same Inner but +/// carrying DIFFERENT children must roundtrip to two outer Arcs whose +/// children match their original occurrence's children. This is the shape +/// `FilterPushdown` actually produces: it clones a SortExec's dyn filter and +/// rewrites the children column refs to match the FileScan's schema (e.g. +/// `ticker@0` at SortExec input → `ticker@12` at file schema). Both outer +/// wrappers share the same `Inner` Arc (so TopK heap-max updates propagate) +/// but the column-ref indices differ. +/// +/// Without `DeduplicatingDeserializer::proto_to_physical_expr` applying +/// `with_new_children(parsed.children())` on cache hits, the second decode +/// silently returns the FIRST occurrence's children and discards the proto +/// body's children -- `prune_by_statistics` then resolves column refs against +/// the wrong file-schema positions and pruning becomes a no-op. Observed on +/// X-2935 ny2 staging 2026-06-15: `row_groups_pruned_statistics=0`, +/// `bytes_scanned=91 MB`, `time_elapsed_processing=31 s`. +#[test] +fn dynamic_filter_dedup_distinct_children_via_with_new_children() -> Result<()> { + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use datafusion_proto::physical_plan::{ + DeduplicatingDeserializer, DeduplicatingSerializer, + PhysicalProtoConverterExtension, + }; + + // Schema where the `a` column sits at physical index 1 (with a filler + // `extra` at index 0). Mimics a FilterPushdown step where the parent + // sees `a@0` against its narrower input schema, while the FileScan + // below it has the same `a` column at index 1 in its broader file + // schema. Physical `Column` resolves by index, so each occurrence + // needs to keep its OWN index after roundtrip; this is what the dedup + // cache-hit's `with_new_children` step preserves. + let schema = Schema::new(vec![ + Field::new("extra", DataType::Int64, false), // physical position 0 + Field::new("a", DataType::Int64, false), // physical position 1 + ]); + + let initial: Arc = lit(true); + let col_a_idx0: Arc = Arc::new(Column::new("a", 0)); + let col_a_idx1: Arc = Arc::new(Column::new("a", 1)); + + // outer A: SortExec.filter-style (children at index 0). + let df_arc1: Arc = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a_idx0)], + Arc::clone(&initial), + )); + // outer B: FileScan.predicate-style (children at index 1) sharing Inner. + let df_arc2 = + Arc::clone(&df_arc1).with_new_children(vec![Arc::clone(&col_a_idx1)])?; + assert_eq!( + df_arc1 + .as_any() + .downcast_ref::() + .unwrap() + .inner() + .expression_id, + df_arc2 + .as_any() + .downcast_ref::() + .unwrap() + .inner() + .expression_id, + "shared Inner must produce identical expression_id" ); + let codec = DefaultPhysicalExtensionCodec {}; + let serializer = DeduplicatingSerializer::new(); + let proto1 = serializer.physical_expr_to_proto(&df_arc1, &codec)?; + let proto2 = serializer.physical_expr_to_proto(&df_arc2, &codec)?; + assert_eq!( + proto1.expr_id, proto2.expr_id, + "same Inner must stamp same wire expr_id" + ); + + let ctx = SessionContext::new().task_ctx(); + let deserializer = DeduplicatingDeserializer::new(); + let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?; + let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?; + + let d1_df = d1 + .as_any() + .downcast_ref::() + .expect("d1 must remain a DynamicFilterPhysicalExpr after roundtrip"); + let d2_df = d2 + .as_any() + .downcast_ref::() + .expect("d2 must remain a DynamicFilterPhysicalExpr after roundtrip"); + + // Children preservation (the core bug): each outer wrapper must carry + // its OWN occurrence's children. d1 was serialized with Column index 0, + // d2 with index 1. Without the cache-hit `with_new_children` block in + // `proto_to_physical_expr`, the second decode would return the first + // decode's outer Arc and silently keep its children. + fn child_column_index(expr: &Arc) -> usize { + let dyn_filter = expr + .as_any() + .downcast_ref::() + .unwrap(); + dyn_filter + .children() + .first() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap() + .index() + } + assert_eq!( + child_column_index(&d1), + 0, + "decoded d1 must carry its OCCURRENCE's children (Column at index 0); \ + if this is 1, dedup-by-Inner is missing and the cache hit clobbered \ + the children with d2's" + ); + assert_eq!( + child_column_index(&d2), + 1, + "decoded d2 must carry its OCCURRENCE's children (Column at index 1); \ + if this is 0, dedup-by-Inner is missing and the cache hit returned \ + d1's children unchanged" + ); + + // Inner sharing (the related invariant): an `update()` on one decoded + // wrapper must bump the other's `snapshot_generation()`. This proves + // they back the same `Arc>`, not just two independent + // Inners that happen to carry the same `expression_id` from the wire. + assert_dynamic_filter_inner_is_shared(d1_df, d2_df)?; + Ok(()) }