From 01099b98c00f84f85a554bf54c610aa5136e3f01 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 16 Jun 2026 11:12:08 +0800 Subject: [PATCH 1/3] [X-2935] proto: preserve nested DynamicFilterPhysicalExpr through snapshot walk `serialize_physical_expr_with_converter` only honored the top-level `DynamicFilterPhysicalExpr` special case; any nested wrapper inside a `BinaryExpr` / `CastExpr` / etc. was folded into its current() snapshot by the unconditional `snapshot_physical_expr(value)` call right after. When a `FilterPushdown(Post)` pass re-pushes a SortExec's self-filter into a FileScan that already carries a static WHERE, the resulting predicate is `BinaryExpr(AND, static_WHERE, dyn_filter)`. The old code serialized this as `BinaryExpr(AND, static_WHERE, lit(true))` on the wire (since `current()` returns `lit(true)` before TopK runs), and the data-server side decoded a fully static predicate with no live link to TopK -- the IncrementalRowGroupPruner saw no `snapshot_generation` movement and never tightened pruning. Replace the unconditional snapshot walker with a `transform_up` that skips `DynamicFilterPhysicalExpr` nodes. Other dynamic exprs (`HashTableLookupExpr`, etc.) still get snapshotted as before; the DynamicFilter wrapper survives to the downcast chain, where the existing line-318 branch serializes it as `PhysicalDynamicFilterNode` at whatever nesting depth. Adds `dynamic_filter_nested_in_binary_expr_survives_proto_roundtrip` covering the `BinaryExpr(static_WHERE, AND, dyn_filter)` shape. Upstream apache/datafusion#21929 (commit 077f08a9a, merged 2026-05-22) fixed this structurally by removing the top-level snapshot call and routing every PhysicalExpr through `try_to_proto` hooks. This patch is the minimal port until the atlas DF fork adopts that refactor. --- .../proto/src/physical_plan/to_proto.rs | 32 +++++- .../tests/cases/roundtrip_physical_plan.rs | 104 ++++++++++++++++++ 2 files changed, 133 insertions(+), 3 deletions(-) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index d9905954678fa..b492fa41fb100 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -34,7 +34,7 @@ use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; -use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, @@ -324,8 +324,34 @@ pub fn serialize_physical_expr_with_converter( } // Snapshot the expr in case it has dynamic predicate state so - // it can be serialized - let value = snapshot_physical_expr(Arc::clone(value))?; + // it can be serialized. + // + // BUT: `DynamicFilterPhysicalExpr` has its own dedicated wire format + // (`PhysicalDynamicFilterNode`) and a dedup pipeline that lets multiple + // references on the data-server side share one `Arc` across the + // wire. If we let the generic snapshot walker fold a nested + // `DynamicFilterPhysicalExpr` to its `current()` literal here, the + // wrapper is gone and the data server can never observe a live update + // again -- which is the X-2935 ny2 prod regression in atlas's distributed + // pipeline. Skip `DynamicFilterPhysicalExpr` during the snapshot walk so + // it survives to the downcast chain below (which has a dedicated branch + // serializing it as `PhysicalDynamicFilterNode`). + // + // Upstream apache/datafusion#21929 fixed this structurally by removing + // the top-level `snapshot_physical_expr` call entirely and routing + // through per-expression `try_to_proto` hooks. Until atlas adopts that + // refactor, this is the minimal patch. + let value = Arc::clone(value) + .transform_up(|e| { + if e.as_any().is::() { + Ok(Transformed::no(e)) + } else if let Some(snapshot) = e.snapshot()? { + Ok(Transformed::yes(snapshot)) + } else { + Ok(Transformed::no(e)) + } + }) + .data()?; let expr = value.as_any(); // HashTableLookupExpr is used for dynamic filter pushdown in hash joins. diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 247687f9a7a5b..8833ec6e5601a 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3486,6 +3486,110 @@ fn dynamic_filter_dedup_distinct_children_via_with_new_children() -> Result<()> Ok(()) } +/// Regression for the X-2935 ny2 prod symptom: a `DynamicFilterPhysicalExpr` +/// nested inside a `BinaryExpr` (the shape produced when `FilterPushdown(Post)` +/// re-pushes a SortExec self-filter into a FileScan that already had a static +/// WHERE) must survive proto roundtrip as a live wrapper, NOT be folded into +/// a `Literal` snapshot. +/// +/// Before this patch, `serialize_physical_expr_with_converter`'s top-level +/// `snapshot_physical_expr(value)` call walked the whole expression tree and +/// replaced every `DynamicFilterPhysicalExpr` with its current() literal. +/// The top-level special case at line 318 only saved the wrapper when it was +/// the OUTERMOST node; any nesting (BinaryExpr/CastExpr/NotExpr/...) folded +/// the inner wrapper to a literal, breaking the live link between SortExec +/// and the FileScan predicate on the data-server side. +/// +/// Upstream apache/datafusion#21929 (commit 077f08a9a, merged 2026-05-22) +/// fixed this structurally by removing that top-level call and routing each +/// expression through `try_to_proto` hooks. This test pins the minimal patch +/// in atlas's DF fork: skip `DynamicFilterPhysicalExpr` during the snapshot +/// walk so it survives to the downcast chain. +#[test] +fn dynamic_filter_nested_in_binary_expr_survives_proto_roundtrip() -> Result<()> { + use datafusion::logical_expr::Operator; + use datafusion::physical_plan::expressions::Column; + use datafusion_physical_expr::expressions::{BinaryExpr, DynamicFilterPhysicalExpr}; + use datafusion_proto::physical_plan::{ + DeduplicatingDeserializer, DeduplicatingSerializer, + PhysicalProtoConverterExtension, + }; + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + + let initial: Arc = lit(true); + let col_a: Arc = Arc::new(Column::new("a", 0)); + let dyn_filter: Arc = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + Arc::clone(&initial), + )); + + // Mimic the prod shape: a static WHERE ANDed with the TopK dyn filter. + // The static side is `a > 0`; the dynamic side is the wrapper. + let static_where: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + Operator::Gt, + lit(0_i64), + )); + let nested: Arc = Arc::new(BinaryExpr::new( + static_where, + Operator::And, + Arc::clone(&dyn_filter), + )); + + let codec = DefaultPhysicalExtensionCodec {}; + let serializer = DeduplicatingSerializer::new(); + let proto = serializer.physical_expr_to_proto(&nested, &codec)?; + + let ctx = SessionContext::new().task_ctx(); + let deserializer = DeduplicatingDeserializer::new(); + let decoded = deserializer.proto_to_physical_expr(&proto, &ctx, &schema, &codec)?; + + // The decoded predicate must still be `BinaryExpr(static, AND, dyn)` -- + // crucially, the right side must be a live `DynamicFilterPhysicalExpr`, + // NOT a `Literal(true)` snapshot from before the patch. + let binary = decoded + .as_any() + .downcast_ref::() + .expect("decoded predicate must be a BinaryExpr"); + assert_eq!(*binary.op(), Operator::And, "op preserved"); + + let right_is_dyn = binary + .right() + .as_any() + .downcast_ref::() + .is_some(); + assert!( + right_is_dyn, + "BinaryExpr.right must remain a DynamicFilterPhysicalExpr after \ + proto roundtrip. If it's a Literal here, the snapshot walker in \ + to_proto.rs folded the wrapper -- that is the X-2935 ny2 bug." + ); + + // And tree-wide `snapshot_generation` must still be nonzero (i.e. + // dynamic), and `update()` on the decoded dyn must bump it. + use datafusion_physical_expr_common::physical_expr::snapshot_generation; + let g_before = snapshot_generation(&decoded); + assert!( + g_before > 0, + "decoded predicate must report a dynamic snapshot_generation" + ); + let decoded_dyn = binary + .right() + .as_any() + .downcast_ref::() + .unwrap(); + decoded_dyn.update(lit(42_i64))?; + let g_after = snapshot_generation(&decoded); + assert!( + g_after > g_before, + "update on decoded dyn must bump tree snapshot_generation \ + (before={g_before}, after={g_after})" + ); + + Ok(()) +} + /// Without the deduplicating codec, two decodes still both reconstruct the /// wrapper (no snapshotting) but get distinct Arcs. Guards the invariant /// that the wire format itself is dedup-agnostic; dedup is the codec's From 1e8e7bd0962b3d82c86f913c57a828b5ac878157 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 16 Jun 2026 11:28:28 +0800 Subject: [PATCH 2/3] fmt: reorder imports per cargo fmt --- datafusion/proto/src/physical_plan/to_proto.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index b492fa41fb100..8c73daa328bfc 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use arrow::array::RecordBatch; use arrow::datatypes::Schema; use arrow::ipc::writer::StreamWriter; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err, }; @@ -34,7 +35,6 @@ use datafusion_expr::WindowFrame; use datafusion_physical_expr::ScalarFunctionExpr; use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr}; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, From 21d2509252021b1bd78a4d2a6ad4c11adf20bb62 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Tue, 16 Jun 2026 11:31:17 +0800 Subject: [PATCH 3/3] address Copilot review: drop hardcoded line number, use Boolean update, drop env label --- datafusion/proto/src/physical_plan/to_proto.rs | 3 +-- .../proto/tests/cases/roundtrip_physical_plan.rs | 16 ++++++++++------ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 8c73daa328bfc..d585a04876e89 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -332,8 +332,7 @@ pub fn serialize_physical_expr_with_converter( // wire. If we let the generic snapshot walker fold a nested // `DynamicFilterPhysicalExpr` to its `current()` literal here, the // wrapper is gone and the data server can never observe a live update - // again -- which is the X-2935 ny2 prod regression in atlas's distributed - // pipeline. Skip `DynamicFilterPhysicalExpr` during the snapshot walk so + // again. Skip `DynamicFilterPhysicalExpr` during the snapshot walk so // it survives to the downcast chain below (which has a dedicated branch // serializing it as `PhysicalDynamicFilterNode`). // diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8833ec6e5601a..0f3634bdb6c3e 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3494,11 +3494,12 @@ fn dynamic_filter_dedup_distinct_children_via_with_new_children() -> Result<()> /// /// Before this patch, `serialize_physical_expr_with_converter`'s top-level /// `snapshot_physical_expr(value)` call walked the whole expression tree and -/// replaced every `DynamicFilterPhysicalExpr` with its current() literal. -/// The top-level special case at line 318 only saved the wrapper when it was -/// the OUTERMOST node; any nesting (BinaryExpr/CastExpr/NotExpr/...) folded -/// the inner wrapper to a literal, breaking the live link between SortExec -/// and the FileScan predicate on the data-server side. +/// replaced every `DynamicFilterPhysicalExpr` with its `current()` literal. +/// The dedicated `DynamicFilterPhysicalExpr` branch in the downcast chain +/// only fired when the wrapper was the OUTERMOST node; any nesting +/// (BinaryExpr/CastExpr/NotExpr/...) folded the inner wrapper to a literal, +/// breaking the live link between SortExec and the FileScan predicate on +/// the data-server side. /// /// Upstream apache/datafusion#21929 (commit 077f08a9a, merged 2026-05-22) /// fixed this structurally by removing that top-level call and routing each @@ -3579,7 +3580,10 @@ fn dynamic_filter_nested_in_binary_expr_survives_proto_roundtrip() -> Result<()> .as_any() .downcast_ref::() .unwrap(); - decoded_dyn.update(lit(42_i64))?; + // Boolean-typed update -- the wrapped predicate is Boolean, so the + // refreshed inner expr must stay Boolean too (DynamicFilterPhysicalExpr + // caches data_type and asserts immutability in test builds). + decoded_dyn.update(lit(false))?; let g_after = snapshot_generation(&decoded); assert!( g_after > g_before,