From 7a58afee5274f81a71fe9770fcdc14ac98cdccf1 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 12 Jun 2026 10:56:57 +0800 Subject: [PATCH] feat(proto): plumb PhysicalProtoConverterExtension into try_encode_expr / try_decode_expr Closes #22920. #21807 introduced the `DynamicFilterPhysicalExpr` dedup pipeline so that identical references on the wire reconstruct to one shared `Arc` on the decode side via `expression_id` cache keys. The pipeline works end-to-end for plans built from upstream types, and #22011 hooked it through Sort/Aggregate/HashJoin plan codecs. However, the pipeline stops at the `PhysicalExtensionCodec` boundary. Downstream users with custom file sources or plan extension types encode/decode their nested `PhysicalExprNode` fields via `try_encode_expr` / `try_decode_expr`, which did not receive the active `PhysicalProtoConverterExtension`. The only entry points available inside such a codec were the free `serialize_physical_expr` / `parse_physical_expr` helpers, hardwired to `DefaultPhysicalProtoConverter`. So any expression inside an extension codec's serialized blob got `expr_id = None` on the wire, and the deduplicating deserializer's cache never hit for refs that crossed an extension boundary -- even when an outer `DeduplicatingProtoConverter` was in effect on the rest of the plan. Two references to the same `DynamicFilterPhysicalExpr` (one inside an extension expr, one outside) reconstructed as distinct `Inner` allocations, so heap-max updates from a TopK SortExec failed to propagate to a FileScan predicate carried by a custom file source. This matches the existing plan-level pattern (`try_encode` and `try_decode` already take a `proto_converter` parameter, added when the plan codec was wired into the converter pipeline). This change extends the expr-level methods the same way: fn try_decode_expr( &self, buf: &[u8], inputs: &[Arc], proto_converter: &dyn PhysicalProtoConverterExtension, // new ) -> Result>; fn try_encode_expr( &self, node: &Arc, buf: &mut Vec, proto_converter: &dyn PhysicalProtoConverterExtension, // new ) -> Result<()>; The two internal call sites in `serialize_physical_expr_with_converter` and `parse_physical_expr_with_converter` are updated to thread the active converter through. Downstream codecs can now route nested expression encode/decode through `proto_converter.physical_expr_to_proto` / `proto_converter.proto_to_physical_expr`, picking up dedup automatically. This is a breaking change for downstream codecs that override `try_encode_expr` / `try_decode_expr` -- they will need to add the new parameter (and may pass `_proto_converter` if they don't carry nested expressions). Codecs that only override `try_encode` / `try_decode` (plan-level) are unaffected. Wire format is unchanged (`PhysicalExprNode.expr_id` is unchanged); the fix is purely on the Rust trait API. Test: a new `extension_codec_expr_participates_in_deduplication` proto-roundtrip test constructs a `BinaryExpr` whose left operand is a bare `DynamicFilterPhysicalExpr` and right operand is a custom `WrapperExpr` whose extension codec embeds the same dynamic filter inside its serialized blob. After a `DeduplicatingProtoConverter` roundtrip, an `update()` applied to the bare-side decoded filter is observed by `current()` on the wrapped-side decoded filter, proving both refs back the same `Inner`. Without the converter-aware extension codec, this test fails because the two refs end up with distinct `Inner` allocations. --- .../proto/src/physical_plan/from_proto.rs | 7 +- datafusion/proto/src/physical_plan/mod.rs | 18 ++ .../proto/src/physical_plan/to_proto.rs | 2 +- .../tests/cases/roundtrip_physical_plan.rs | 238 ++++++++++++++++++ 4 files changed, 262 insertions(+), 3 deletions(-) diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 36751d8a61a3e..990e0ff1e6549 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -412,8 +412,11 @@ pub fn parse_physical_expr_with_converter( .iter() .map(|e| proto_converter.proto_to_physical_expr(e, input_schema, ctx)) .collect::>()?; - ctx.codec() - .try_decode_expr(extension.expr.as_slice(), &inputs)? as _ + ctx.codec().try_decode_expr( + extension.expr.as_slice(), + &inputs, + proto_converter, + )? as _ } }; diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 9efcd25fcb412..fb36d13951839 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -3940,18 +3940,36 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync + Any { Ok(()) } + /// Decode a custom extension expression from `buf`. + /// + /// Implementations whose proto carries nested `PhysicalExprNode` fields + /// should route those through `proto_converter.proto_to_physical_expr` + /// (rather than the free `parse_physical_expr` function) so that an + /// active `DeduplicatingDeserializer` can cache-hit on matching + /// `expr_id`s and re-share `Arc` (e.g. a + /// `DynamicFilterPhysicalExpr` referenced both from a SortExec.filter + /// and from a custom file-source's predicate field). fn try_decode_expr( &self, _buf: &[u8], _inputs: &[Arc], + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { not_impl_err!("PhysicalExtensionCodec is not provided") } + /// Encode a custom extension expression into `buf`. + /// + /// Implementations whose proto carries nested `PhysicalExprNode` fields + /// should route those through `proto_converter.physical_expr_to_proto` + /// (rather than the free `serialize_physical_expr` function) so that + /// an active `DeduplicatingProtoConverter` can stamp matching `expr_id`s + /// for shared inner expressions. See [`Self::try_decode_expr`]. fn try_encode_expr( &self, _node: &Arc, _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { not_impl_err!("PhysicalExtensionCodec is not provided") } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index d9315af431e22..d3e23280c6127 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -365,7 +365,7 @@ pub fn serialize_physical_expr_with_converter( }) } else { let mut buf: Vec = vec![]; - match codec.try_encode_expr(value, &mut buf) { + match codec.try_encode_expr(value, &mut buf, proto_converter) { Ok(_) => { let inputs: Vec = value .children() diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 8e80467788598..c440f9ae53740 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1161,6 +1161,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, buf: &[u8], inputs: &[Arc], + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result> { if buf == "CustomPredicateExpr".as_bytes() { Ok(Arc::new(CustomPredicateExpr { @@ -1175,6 +1176,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { &self, node: &Arc, buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result<()> { if node.downcast_ref::().is_some() { buf.extend_from_slice("CustomPredicateExpr".as_bytes()); @@ -4127,3 +4129,239 @@ fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { Ok(()) } + +/// Demonstrates that `try_encode_expr` / `try_decode_expr` extension codecs +/// can now participate in the dedup pipeline by routing their nested +/// `PhysicalExprNode` fields through the active +/// `PhysicalProtoConverterExtension`. +/// +/// Without the `proto_converter` parameter on the expr-level codec methods, +/// any nested `PhysicalExprNode` inside a custom expression's serialized +/// blob would skip dedup (`expr_id` would be `None` on the wire). Two +/// references to the same `DynamicFilterPhysicalExpr` (one inside an +/// extension expr, one outside) would reconstruct as two distinct Arcs on +/// the decode side, breaking heap-max propagation across plan-node +/// boundaries in distributed execution. +/// +/// With the converter parameter plumbed through, the extension codec +/// delegates nested expr serialization to the active deduplicating +/// converter, and both refs cache-hit on the same `expr_id` on decode. +#[test] +fn extension_codec_expr_participates_in_deduplication() -> Result<()> { + use datafusion_common::DataFusionError; + use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; + use prost::Message; + use std::fmt::{self, Display}; + + // A minimal custom expression whose only logical content is a nested + // `Arc`. Its codec routes that nested expr through + // the active `PhysicalProtoConverterExtension`. + #[derive(Debug)] + struct WrapperExpr { + inner: Arc, + } + impl Display for WrapperExpr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "WrapperExpr({})", self.inner) + } + } + impl std::hash::Hash for WrapperExpr { + fn hash(&self, state: &mut H) { + self.inner.dyn_hash(state); + } + } + impl PartialEq for WrapperExpr { + fn eq(&self, other: &Self) -> bool { + self.inner.eq(&other.inner) + } + } + impl Eq for WrapperExpr {} + impl PhysicalExpr for WrapperExpr { + fn data_type(&self, input_schema: &Schema) -> Result { + self.inner.data_type(input_schema) + } + fn nullable(&self, input_schema: &Schema) -> Result { + self.inner.nullable(input_schema) + } + fn evaluate( + &self, + _batch: &arrow::record_batch::RecordBatch, + ) -> Result { + internal_err!("WrapperExpr is not executable in this test") + } + fn children(&self) -> Vec<&Arc> { + vec![&self.inner] + } + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(WrapperExpr { + inner: children[0].clone(), + })) + } + fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{self}") + } + } + + // Wire layout: a single nested PhysicalExprNode field. + #[derive(Clone, PartialEq, prost::Message)] + struct WrapperExprProto { + #[prost(message, optional, boxed, tag = "1")] + inner: Option>, + } + + #[derive(Debug)] + struct WrapperCodec; + impl PhysicalExtensionCodec for WrapperCodec { + fn try_decode( + &self, + _buf: &[u8], + _inputs: &[Arc], + _ctx: &TaskContext, + _proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result> { + internal_err!("not used") + } + fn try_encode( + &self, + _node: Arc, + _buf: &mut Vec, + _proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()> { + internal_err!("not used") + } + fn try_decode_expr( + &self, + buf: &[u8], + _inputs: &[Arc], + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result> { + let proto = WrapperExprProto::decode(buf).map_err(|e| { + DataFusionError::Internal(format!("decode WrapperExprProto: {e}")) + })?; + let inner_proto = proto + .inner + .ok_or_else(|| DataFusionError::Internal("missing inner".into()))?; + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let ctx = SessionContext::new(); + let task_ctx = ctx.task_ctx(); + let decode_ctx = PhysicalPlanDecodeContext::new(task_ctx.as_ref(), self); + let inner = proto_converter.proto_to_physical_expr( + &inner_proto, + &schema, + &decode_ctx, + )?; + Ok(Arc::new(WrapperExpr { inner })) + } + fn try_encode_expr( + &self, + node: &Arc, + buf: &mut Vec, + proto_converter: &dyn PhysicalProtoConverterExtension, + ) -> Result<()> { + let wrapper = node + .downcast_ref::() + .ok_or_else(|| DataFusionError::Internal("not WrapperExpr".into()))?; + let inner_proto = + proto_converter.physical_expr_to_proto(&wrapper.inner, self)?; + let proto = WrapperExprProto { + inner: Some(Box::new(inner_proto)), + }; + proto.encode(buf).map_err(|e| { + DataFusionError::Internal(format!("encode WrapperExprProto: {e}")) + })?; + Ok(()) + } + } + + // Build a single composite expression that holds TWO references to the + // same `Arc`: a bare one on the left of a + // BinaryExpr, and one wrapped inside our extension expr on the right. + // Roundtripping the whole composite in a single decode call exercises + // a shared dedup cache. + let dyn_filter = make_dynamic_filter(); + let wrapper: Arc = Arc::new(WrapperExpr { + inner: Arc::clone(&dyn_filter), + }); + let composite: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&dyn_filter), + Operator::And, + Arc::clone(&wrapper), + )); + + let codec = WrapperCodec; + let converter = DeduplicatingProtoConverter {}; + + let proto = converter.physical_expr_to_proto(&composite, &codec)?; + // Round-trip through prost bytes to mimic the wire. + let bytes = proto.encode_to_vec(); + let decoded_proto = + datafusion_proto::protobuf::PhysicalExprNode::decode(bytes.as_slice()).unwrap(); + + let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]); + let ctx = SessionContext::new(); + let task_ctx = ctx.task_ctx(); + let decode_ctx = PhysicalPlanDecodeContext::new(task_ctx.as_ref(), &codec); + let decoded = + converter.proto_to_physical_expr(&decoded_proto, &schema, &decode_ctx)?; + + let binary = decoded + .downcast_ref::() + .expect("must decode back to BinaryExpr"); + let decoded_left = Arc::clone(binary.left()); + let decoded_right = Arc::clone(binary.right()); + let decoded_wrapper = decoded_right + .downcast_ref::() + .expect("right side must decode back to WrapperExpr"); + + // Semantic referential-integrity check: both decoded sides must + // observe the same logical filter. The bare side is a + // `DynamicFilterPhysicalExpr`; the wrapped side's inner is too. An + // `update()` on one must be visible from the other via + // `current()`, proving they share the same `Inner` (the entire + // point of #21807's dedup pipeline). + // + // Without the `proto_converter` parameter on `try_encode_expr` / + // `try_decode_expr`, the wrapper codec would route its nested + // expression through `DefaultPhysicalProtoConverter` and skip dedup + // entirely, so `decoded_left` and `decoded_wrapper.inner` would + // back distinct `Inner` allocations and an update would not + // propagate across the extension boundary. + let left_dyn = decoded_left + .downcast_ref::() + .expect("decoded bare expr must still be DynamicFilterPhysicalExpr"); + let right_dyn = decoded_wrapper + .inner + .downcast_ref::() + .expect("decoded wrapped inner must still be DynamicFilterPhysicalExpr"); + + // Sanity: matching expression_id is necessary for dedup to fire. + assert_eq!( + left_dyn.expression_id(), + right_dyn.expression_id(), + "shared `Inner` must report the same expression_id" + ); + + // The load-bearing check: prove `Inner` really is shared by updating + // one side and reading from the other. + let original_generation = left_dyn.snapshot_generation(); + let new_expr: Arc = lit(false); + left_dyn.update(Arc::clone(&new_expr))?; + assert_eq!( + right_dyn.snapshot_generation(), + original_generation + 1, + "update on the bare ref must be visible from the wrapped ref \ + (same Inner)" + ); + let observed = right_dyn.current()?; + assert_eq!( + format!("{observed}"), + format!("{new_expr}"), + "wrapped ref's current() must reflect the update applied to the \ + bare ref" + ); + + Ok(()) +}