Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4072,24 +4072,49 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer {
where
Self: Sized,
{
if let Some(expr_id) = proto.expr_id {
// Check cache first
if let Some(cached) = self.cache.borrow().get(&expr_id) {
return Ok(Arc::clone(cached));
}
// Deserialize and cache
let expr = parse_physical_expr_with_converter(
let Some(expr_id) = proto.expr_id else {
return parse_physical_expr_with_converter(
proto,
ctx,
input_schema,
codec,
self,
)?;
self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr));
Ok(expr)
} else {
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)
);
};

// Always parse the proto body first. The cache hit path below uses
// `parsed.children()` (which carry this occurrence's column refs as
// they appeared in the proto -- e.g. file-schema indices after
// FilterPushdown rewrote them), so we can't short-circuit before
// parsing.
let parsed =
parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?;

let mut cache = self.cache.borrow_mut();
if let Some(cached) = cache.get(&expr_id) {
// Since expressions may manage their own internal state when
// deriving expressions via `with_new_children`, we use
// `with_new_children` to opt into the same behavior.
//
// For example, one `DynamicFilterPhysicalExpr` may be derived
// from another resulting in shared references (e.g. a SortExec
// dynamic filter at the parent schema and a pushed-down FileScan
// predicate at the file schema, sharing the same `Inner` Arc
// but with different children). Using `with_new_children`
// preserves those references: the cached `Arc<Inner>` is kept,
// while this occurrence's children (parsed from the proto body)
// are installed onto a fresh outer wrapper.
//
// Ported from apache/datafusion#21807; without this the cache
// hit path returned the entire cached outer Arc and silently
// discarded the proto body's children, breaking column ref
// alignment in plans where FilterPushdown rewrote them.
let children: Vec<_> = parsed.children().into_iter().cloned().collect();
return Arc::clone(cached).with_new_children(children);
}

cache.insert(expr_id, Arc::clone(&parsed));
Ok(parsed)
}

fn physical_expr_to_proto(
Expand Down
194 changes: 186 additions & 8 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3215,26 +3215,64 @@ fn dynamic_filter_dedup_with_deduplicating_codec() -> Result<()> {
let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?;
let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?;

assert!(
Arc::ptr_eq(&d1, &d2),
"DeduplicatingDeserializer must return the same Arc for two refs with the same expr_id"
);

// d1 and d2 may be distinct OUTER Arcs because the cache-hit path
// rewraps via `with_new_children` to preserve each occurrence's
// children. The invariant that matters is shared INNER (TopK updates
// visible to both). expression_id equality is necessary but not
// sufficient (the wire carries it across decode), so we also assert
// observable shared state via `update()` -> `snapshot_generation()`.
let d1_df = d1
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect(
"decoded expr must be DynamicFilterPhysicalExpr; snapshot path is bypassed",
);
let d2_df = d2
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect(
"decoded expr must be DynamicFilterPhysicalExpr; snapshot path is bypassed",
);
assert_eq!(
d1_df.inner().expression_id,
id_before,
"expression_id must survive proto roundtrip"
);
assert_dynamic_filter_inner_is_shared(d1_df, d2_df)?;

Ok(())
}

/// Asserts that two decoded [`DynamicFilterPhysicalExpr`] outer Arcs back
/// the same `Arc<RwLock<Inner>>`. expression_id equality alone is not
/// sufficient (the wire format carries the value across decode, so two
/// independently-allocated `Inner`s can have the same id). The only
/// observable proof of shared inner state is that an `update()` on one
/// side bumps the `snapshot_generation()` of the other.
fn assert_dynamic_filter_inner_is_shared(
left: &datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr,
right: &datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr,
) -> Result<()> {
let gen_before = left.snapshot_generation();
assert_eq!(
gen_before,
right.snapshot_generation(),
"left and right must start at the same generation if they share Inner"
);
left.update(lit(123_i64))?;
let gen_after_left = left.snapshot_generation();
let gen_after_right = right.snapshot_generation();
assert!(
gen_after_left > gen_before,
"left's update must bump its own generation"
);
assert_eq!(
gen_after_left, gen_after_right,
"right must observe left's update (proves Arc<Inner> is shared, not just equal expression_id)"
);
Ok(())
}

/// Two distinct outer Arcs that share the same `Inner` (e.g. via
/// `with_new_children`) must still dedup to the same decoded Arc, because
/// `DeduplicatingSerializer` now hashes on `expression_id` (which is
Expand Down Expand Up @@ -3300,11 +3338,151 @@ fn dynamic_filter_dedup_distinct_outer_arcs_same_inner() -> Result<()> {
let deserializer = DeduplicatingDeserializer::new();
let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?;
let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?;
assert!(
Arc::ptr_eq(&d1, &d2),
"Distinct-outer same-Inner refs must reconstruct to one Arc"
// d1 and d2 may be distinct OUTER Arcs (cache-hit path rewraps via
// `with_new_children`); the invariant is shared INNER. Verify via
// observable update propagation, not just expression_id equality.
let d1_df = d1
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.unwrap();
let d2_df = d2
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.unwrap();
assert_eq!(d1_df.inner().expression_id, expected_id);
assert_dynamic_filter_inner_is_shared(d1_df, d2_df)?;

Ok(())
}

/// Two outer `Arc<DynamicFilterPhysicalExpr>` sharing the same Inner but
/// carrying DIFFERENT children must roundtrip to two outer Arcs whose
/// children match their original occurrence's children. This is the shape
/// `FilterPushdown` actually produces: it clones a SortExec's dyn filter and
/// rewrites the children column refs to match the FileScan's schema (e.g.
/// `ticker@0` at SortExec input → `ticker@12` at file schema). Both outer
/// wrappers share the same `Inner` Arc (so TopK heap-max updates propagate)
/// but the column-ref indices differ.
///
/// Without `DeduplicatingDeserializer::proto_to_physical_expr` applying
/// `with_new_children(parsed.children())` on cache hits, the second decode
/// silently returns the FIRST occurrence's children and discards the proto
/// body's children -- `prune_by_statistics` then resolves column refs against
/// the wrong file-schema positions and pruning becomes a no-op. Observed on
/// X-2935 ny2 staging 2026-06-15: `row_groups_pruned_statistics=0`,
/// `bytes_scanned=91 MB`, `time_elapsed_processing=31 s`.
#[test]
fn dynamic_filter_dedup_distinct_children_via_with_new_children() -> Result<()> {
use datafusion::physical_plan::expressions::Column;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
use datafusion_proto::physical_plan::{
DeduplicatingDeserializer, DeduplicatingSerializer,
PhysicalProtoConverterExtension,
};

// Schema where the `a` column sits at physical index 1 (with a filler
// `extra` at index 0). Mimics a FilterPushdown step where the parent
// sees `a@0` against its narrower input schema, while the FileScan
// below it has the same `a` column at index 1 in its broader file
// schema. Physical `Column` resolves by index, so each occurrence
// needs to keep its OWN index after roundtrip; this is what the dedup
// cache-hit's `with_new_children` step preserves.
let schema = Schema::new(vec![
Field::new("extra", DataType::Int64, false), // physical position 0
Field::new("a", DataType::Int64, false), // physical position 1
]);

let initial: Arc<dyn PhysicalExpr> = lit(true);
let col_a_idx0: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 0));
let col_a_idx1: Arc<dyn PhysicalExpr> = Arc::new(Column::new("a", 1));

// outer A: SortExec.filter-style (children at index 0).
let df_arc1: Arc<dyn PhysicalExpr> = Arc::new(DynamicFilterPhysicalExpr::new(
vec![Arc::clone(&col_a_idx0)],
Arc::clone(&initial),
));
// outer B: FileScan.predicate-style (children at index 1) sharing Inner.
let df_arc2 =
Arc::clone(&df_arc1).with_new_children(vec![Arc::clone(&col_a_idx1)])?;
assert_eq!(
df_arc1
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.unwrap()
.inner()
.expression_id,
df_arc2
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.unwrap()
.inner()
.expression_id,
"shared Inner must produce identical expression_id"
);

let codec = DefaultPhysicalExtensionCodec {};
let serializer = DeduplicatingSerializer::new();
let proto1 = serializer.physical_expr_to_proto(&df_arc1, &codec)?;
let proto2 = serializer.physical_expr_to_proto(&df_arc2, &codec)?;
assert_eq!(
proto1.expr_id, proto2.expr_id,
"same Inner must stamp same wire expr_id"
);

let ctx = SessionContext::new().task_ctx();
let deserializer = DeduplicatingDeserializer::new();
let d1 = deserializer.proto_to_physical_expr(&proto1, &ctx, &schema, &codec)?;
let d2 = deserializer.proto_to_physical_expr(&proto2, &ctx, &schema, &codec)?;

let d1_df = d1
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("d1 must remain a DynamicFilterPhysicalExpr after roundtrip");
let d2_df = d2
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("d2 must remain a DynamicFilterPhysicalExpr after roundtrip");

// Children preservation (the core bug): each outer wrapper must carry
// its OWN occurrence's children. d1 was serialized with Column index 0,
// d2 with index 1. Without the cache-hit `with_new_children` block in
// `proto_to_physical_expr`, the second decode would return the first
// decode's outer Arc and silently keep its children.
fn child_column_index(expr: &Arc<dyn PhysicalExpr>) -> usize {
let dyn_filter = expr
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.unwrap();
dyn_filter
.children()
.first()
.unwrap()
.as_any()
.downcast_ref::<Column>()
.unwrap()
.index()
}
assert_eq!(
child_column_index(&d1),
0,
"decoded d1 must carry its OCCURRENCE's children (Column at index 0); \
if this is 1, dedup-by-Inner is missing and the cache hit clobbered \
the children with d2's"
);
assert_eq!(
child_column_index(&d2),
1,
"decoded d2 must carry its OCCURRENCE's children (Column at index 1); \
if this is 0, dedup-by-Inner is missing and the cache hit returned \
d1's children unchanged"
);

// Inner sharing (the related invariant): an `update()` on one decoded
// wrapper must bump the other's `snapshot_generation()`. This proves
// they back the same `Arc<RwLock<Inner>>`, not just two independent
// Inners that happen to carry the same `expression_id` from the wire.
assert_dynamic_filter_inner_is_shared(d1_df, d2_df)?;

Ok(())
}

Expand Down
Loading