Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use arrow::ipc::writer::StreamWriter;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{
DataFusionError, Result, internal_datafusion_err, internal_err, not_impl_err,
};
Expand All @@ -34,7 +35,6 @@ use datafusion_expr::WindowFrame;
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_plan::expressions::{
BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
Expand Down Expand Up @@ -324,8 +324,33 @@ pub fn serialize_physical_expr_with_converter(
}

// Snapshot the expr in case it has dynamic predicate state so
// it can be serialized
let value = snapshot_physical_expr(Arc::clone(value))?;
// it can be serialized.
//
// BUT: `DynamicFilterPhysicalExpr` has its own dedicated wire format
// (`PhysicalDynamicFilterNode`) and a dedup pipeline that lets multiple
// references on the data-server side share one `Arc<Inner>` across the
// wire. If we let the generic snapshot walker fold a nested
// `DynamicFilterPhysicalExpr` to its `current()` literal here, the
// wrapper is gone and the data server can never observe a live update
// again. Skip `DynamicFilterPhysicalExpr` during the snapshot walk so
// it survives to the downcast chain below (which has a dedicated branch
// serializing it as `PhysicalDynamicFilterNode`).
//
// Upstream apache/datafusion#21929 fixed this structurally by removing
// the top-level `snapshot_physical_expr` call entirely and routing
// through per-expression `try_to_proto` hooks. Until atlas adopts that
// refactor, this is the minimal patch.
let value = Arc::clone(value)
.transform_up(|e| {
if e.as_any().is::<DynamicFilterPhysicalExpr>() {
Ok(Transformed::no(e))
} else if let Some(snapshot) = e.snapshot()? {
Ok(Transformed::yes(snapshot))
} else {
Ok(Transformed::no(e))
}
})
.data()?;
let expr = value.as_any();

// HashTableLookupExpr is used for dynamic filter pushdown in hash joins.
Expand Down
108 changes: 108 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3486,6 +3486,114 @@ fn dynamic_filter_dedup_distinct_children_via_with_new_children() -> Result<()>
Ok(())
}

/// Regression for the X-2935 ny2 prod symptom: a `DynamicFilterPhysicalExpr`
/// nested inside a `BinaryExpr` (the shape produced when `FilterPushdown(Post)`
/// re-pushes a SortExec self-filter into a FileScan that already had a static
/// WHERE) must survive proto roundtrip as a live wrapper, NOT be folded into
/// a `Literal` snapshot.
///
/// Before this patch, `serialize_physical_expr_with_converter`'s top-level
/// `snapshot_physical_expr(value)` call walked the whole expression tree and
/// replaced every `DynamicFilterPhysicalExpr` with its `current()` literal.
/// The dedicated `DynamicFilterPhysicalExpr` branch in the downcast chain
/// only fired when the wrapper was the OUTERMOST node; any nesting
/// (BinaryExpr/CastExpr/NotExpr/...) folded the inner wrapper to a literal,
/// breaking the live link between SortExec and the FileScan predicate on
/// the data-server side.
///
/// Upstream apache/datafusion#21929 (commit 077f08a9a, merged 2026-05-22)
/// fixed this structurally by removing that top-level call and routing each
/// expression through `try_to_proto` hooks. This test pins the minimal patch
/// in atlas's DF fork: skip `DynamicFilterPhysicalExpr` during the snapshot
/// walk so it survives to the downcast chain.
#[test]
fn dynamic_filter_nested_in_binary_expr_survives_proto_roundtrip() -> Result<()> {
use datafusion::logical_expr::Operator;
use datafusion::physical_plan::expressions::Column;
use datafusion_physical_expr::expressions::{BinaryExpr, DynamicFilterPhysicalExpr};
use datafusion_proto::physical_plan::{
DeduplicatingDeserializer, DeduplicatingSerializer,
PhysicalProtoConverterExtension,
};

let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);

let initial: Arc<dyn PhysicalExpr> = lit(true);
let col_a: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
let dyn_filter: Arc<dyn PhysicalExpr> = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::clone(&col_a)],
Arc::clone(&initial),
));

// Mimic the prod shape: a static WHERE ANDed with the TopK dyn filter.
// The static side is `a > 0`; the dynamic side is the wrapper.
let static_where: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::clone(&col_a),
Operator::Gt,
lit(0_i64),
));
let nested: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
static_where,
Operator::And,
Arc::clone(&dyn_filter),
));

let codec = DefaultPhysicalExtensionCodec {};
let serializer = DeduplicatingSerializer::new();
let proto = serializer.physical_expr_to_proto(&nested, &codec)?;

let ctx = SessionContext::new().task_ctx();
let deserializer = DeduplicatingDeserializer::new();
let decoded = deserializer.proto_to_physical_expr(&proto, &ctx, &schema, &codec)?;

// The decoded predicate must still be `BinaryExpr(static, AND, dyn)` --
// crucially, the right side must be a live `DynamicFilterPhysicalExpr`,
// NOT a `Literal(true)` snapshot from before the patch.
let binary = decoded
.as_any()
.downcast_ref::<BinaryExpr>()
.expect("decoded predicate must be a BinaryExpr");
assert_eq!(*binary.op(), Operator::And, "op preserved");

let right_is_dyn = binary
.right()
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.is_some();
assert!(
right_is_dyn,
"BinaryExpr.right must remain a DynamicFilterPhysicalExpr after \
proto roundtrip. If it's a Literal here, the snapshot walker in \
to_proto.rs folded the wrapper -- that is the X-2935 ny2 bug."
);

// And tree-wide `snapshot_generation` must still be nonzero (i.e.
// dynamic), and `update()` on the decoded dyn must bump it.
use datafusion_physical_expr_common::physical_expr::snapshot_generation;
let g_before = snapshot_generation(&decoded);
assert!(
g_before > 0,
"decoded predicate must report a dynamic snapshot_generation"
);
let decoded_dyn = binary
.right()
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.unwrap();
// Boolean-typed update -- the wrapped predicate is Boolean, so the
// refreshed inner expr must stay Boolean too (DynamicFilterPhysicalExpr
// caches data_type and asserts immutability in test builds).
decoded_dyn.update(lit(false))?;
let g_after = snapshot_generation(&decoded);
assert!(
g_after > g_before,
"update on decoded dyn must bump tree snapshot_generation \
(before={g_before}, after={g_after})"
);

Ok(())
}

/// Without the deduplicating codec, two decodes still both reconstruct the
/// wrapper (no snapshotting) but get distinct Arcs. Guards the invariant
/// that the wire format itself is dedup-agnostic; dedup is the codec's
Expand Down
Loading