diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index d9905954678f..d585a04876e8 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use arrow::array::RecordBatch; use arrow::datatypes::Schema; use arrow::ipc::writer::StreamWriter; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err, }; @@ -34,7 +35,6 @@ use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; -use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, @@ -324,8 +324,33 @@ pub fn serialize_physical_expr_with_converter( } // Snapshot the expr in case it has dynamic predicate state so - // it can be serialized - let value = snapshot_physical_expr(Arc::clone(value))?; + // it can be serialized. + // + // BUT: `DynamicFilterPhysicalExpr` has its own dedicated wire format + // (`PhysicalDynamicFilterNode`) and a dedup pipeline that lets multiple + // references on the data-server side share one `Arc` across the + // wire. If we let the generic snapshot walker fold a nested + // `DynamicFilterPhysicalExpr` to its `current()` literal here, the + // wrapper is gone and the data server can never observe a live update + // again. Skip `DynamicFilterPhysicalExpr` during the snapshot walk so + // it survives to the downcast chain below (which has a dedicated branch + // serializing it as `PhysicalDynamicFilterNode`). + // + // Upstream apache/datafusion#21929 fixed this structurally by removing + // the top-level `snapshot_physical_expr` call entirely and routing + // through per-expression `try_to_proto` hooks. Until atlas adopts that + // refactor, this is the minimal patch. + let value = Arc::clone(value) + .transform_up(|e| { + if e.as_any().is::() { + Ok(Transformed::no(e)) + } else if let Some(snapshot) = e.snapshot()? { + Ok(Transformed::yes(snapshot)) + } else { + Ok(Transformed::no(e)) + } + }) + .data()?; let expr = value.as_any(); // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 247687f9a7a5..0f3634bdb6c3 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3486,6 +3486,114 @@ fn dynamic_filter_dedup_distinct_children_via_with_new_children() -> Result<()> Ok(()) } +/// Regression for the X-2935 ny2 prod symptom: a `DynamicFilterPhysicalExpr` +/// nested inside a `BinaryExpr` (the shape produced when `FilterPushdown(Post)` +/// re-pushes a SortExec self-filter into a FileScan that already had a static +/// WHERE) must survive proto roundtrip as a live wrapper, NOT be folded into +/// a `Literal` snapshot. +/// +/// Before this patch, `serialize_physical_expr_with_converter`'s top-level +/// `snapshot_physical_expr(value)` call walked the whole expression tree and +/// replaced every `DynamicFilterPhysicalExpr` with its `current()` literal. +/// The dedicated `DynamicFilterPhysicalExpr` branch in the downcast chain +/// only fired when the wrapper was the OUTERMOST node; any nesting +/// (BinaryExpr/CastExpr/NotExpr/...) folded the inner wrapper to a literal, +/// breaking the live link between SortExec and the FileScan predicate on +/// the data-server side. +/// +/// Upstream apache/datafusion#21929 (commit 077f08a9a, merged 2026-05-22) +/// fixed this structurally by removing that top-level call and routing each +/// expression through `try_to_proto` hooks. This test pins the minimal patch +/// in atlas's DF fork: skip `DynamicFilterPhysicalExpr` during the snapshot +/// walk so it survives to the downcast chain. +#[test] +fn dynamic_filter_nested_in_binary_expr_survives_proto_roundtrip() -> Result<()> { + use datafusion::logical_expr::Operator; + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::{BinaryExpr, DynamicFilterPhysicalExpr}; + use datafusion_proto::physical_plan::{ + DeduplicatingDeserializer, DeduplicatingSerializer, + PhysicalProtoConverterExtension, + }; + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + + let initial: Arc = lit(true); + let col_a: Arc = Arc::new(Column::new("a", 0)); + let dyn_filter: Arc = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + Arc::clone(&initial), + )); + + // Mimic the prod shape: a static WHERE ANDed with the TopK dyn filter. + // The static side is `a > 0`; the dynamic side is the wrapper. + let static_where: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::Gt, + lit(0_i64), + )); + let nested: Arc = Arc::new(BinaryExpr::new( + static_where, + Operator::And, + Arc::clone(&dyn_filter), + )); + + let codec = DefaultPhysicalExtensionCodec {}; + let serializer = DeduplicatingSerializer::new(); + let proto = serializer.physical_expr_to_proto(&nested, &codec)?; + + let ctx = SessionContext::new().task_ctx(); + let deserializer = DeduplicatingDeserializer::new(); + let decoded = deserializer.proto_to_physical_expr(&proto, &ctx, &schema, &codec)?; + + // The decoded predicate must still be `BinaryExpr(static, AND, dyn)` -- + // crucially, the right side must be a live `DynamicFilterPhysicalExpr`, + // NOT a `Literal(true)` snapshot from before the patch. + let binary = decoded + .as_any() + .downcast_ref::() + .expect("decoded predicate must be a BinaryExpr"); + assert_eq!(*binary.op(), Operator::And, "op preserved"); + + let right_is_dyn = binary + .right() + .as_any() + .downcast_ref::() + .is_some(); + assert!( + right_is_dyn, + "BinaryExpr.right must remain a DynamicFilterPhysicalExpr after \ + proto roundtrip. If it's a Literal here, the snapshot walker in \ + to_proto.rs folded the wrapper -- that is the X-2935 ny2 bug." + ); + + // And tree-wide `snapshot_generation` must still be nonzero (i.e. + // dynamic), and `update()` on the decoded dyn must bump it. + use datafusion_physical_expr_common::physical_expr::snapshot_generation; + let g_before = snapshot_generation(&decoded); + assert!( + g_before > 0, + "decoded predicate must report a dynamic snapshot_generation" + ); + let decoded_dyn = binary + .right() + .as_any() + .downcast_ref::() + .unwrap(); + // Boolean-typed update -- the wrapped predicate is Boolean, so the + // refreshed inner expr must stay Boolean too (DynamicFilterPhysicalExpr + // caches data_type and asserts immutability in test builds). + decoded_dyn.update(lit(false))?; + let g_after = snapshot_generation(&decoded); + assert!( + g_after > g_before, + "update on decoded dyn must bump tree snapshot_generation \ + (before={g_before}, after={g_after})" + ); + + Ok(()) +} + /// Without the deduplicating codec, two decodes still both reconstruct the /// wrapper (no snapshotting) but get distinct Arcs. Guards the invariant /// that the wire format itself is dedup-agnostic; dedup is the codec's