feat(proto): plumb PhysicalProtoConverterExtension into try_encode_expr / try_decode_expr#22922
feat(proto): plumb PhysicalProtoConverterExtension into try_encode_expr / try_decode_expr#22922zhuqi-lucas wants to merge 1 commit into
Conversation
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
There was a problem hiding this comment.
Pull request overview
This PR extends DataFusion’s physical-expression protobuf conversion pipeline so extension codecs can participate in expression deduplication (notably for DynamicFilterPhysicalExpr) by threading the active PhysicalProtoConverterExtension through PhysicalExtensionCodec::{try_encode_expr, try_decode_expr}. This closes the gap where expressions serialized inside extension-codec blobs could not benefit from a deduplicating converter on decode, breaking referential integrity across extension boundaries.
Changes:
- Add a
proto_converter: &dyn PhysicalProtoConverterExtensionparameter toPhysicalExtensionCodec::try_encode_exprandtry_decode_expr. - Plumb the converter through the internal extension-codec expression fallback paths in
serialize_physical_expr_with_converterandparse_physical_expr_with_converter. - Add a regression test proving a nested extension expression can share the same decoded
DynamicFilterPhysicalExpr::Innervia deduplication.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| datafusion/proto/src/physical_plan/mod.rs | Extends PhysicalExtensionCodec expr-level API to accept proto_converter and documents intended usage. |
| datafusion/proto/src/physical_plan/to_proto.rs | Forwards the active converter into codec.try_encode_expr from the extension-expression fallback path. |
| datafusion/proto/src/physical_plan/from_proto.rs | Forwards the active converter into codec.try_decode_expr when decoding extension expressions. |
| datafusion/proto/tests/cases/roundtrip_physical_plan.rs | Adds a proto-roundtrip test validating deduplication works across an extension-expr boundary; updates an existing codec impl for the new signature. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…pr / try_decode_expr Closes apache#22920. apache#21807 introduced the `DynamicFilterPhysicalExpr` dedup pipeline so that identical references on the wire reconstruct to one shared `Arc<Inner>` on the decode side via `expression_id` cache keys. The pipeline works end-to-end for plans built from upstream types, and apache#22011 hooked it through Sort/Aggregate/HashJoin plan codecs. However, the pipeline stops at the `PhysicalExtensionCodec` boundary. Downstream users with custom file sources or plan extension types encode/decode their nested `PhysicalExprNode` fields via `try_encode_expr` / `try_decode_expr`, which did not receive the active `PhysicalProtoConverterExtension`. The only entry points available inside such a codec were the free `serialize_physical_expr` / `parse_physical_expr` helpers, hardwired to `DefaultPhysicalProtoConverter`. So any expression inside an extension codec's serialized blob got `expr_id = None` on the wire, and the deduplicating deserializer's cache never hit for refs that crossed an extension boundary -- even when an outer `DeduplicatingProtoConverter` was in effect on the rest of the plan. Two references to the same `DynamicFilterPhysicalExpr` (one inside an extension expr, one outside) reconstructed as distinct `Inner` allocations, so heap-max updates from a TopK SortExec failed to propagate to a FileScan predicate carried by a custom file source. This matches the existing plan-level pattern (`try_encode` and `try_decode` already take a `proto_converter` parameter, added when the plan codec was wired into the converter pipeline). This change extends the expr-level methods the same way: fn try_decode_expr( &self, buf: &[u8], inputs: &[Arc<dyn PhysicalExpr>], proto_converter: &dyn PhysicalProtoConverterExtension, // new ) -> Result<Arc<dyn PhysicalExpr>>; fn try_encode_expr( &self, node: &Arc<dyn PhysicalExpr>, buf: &mut Vec<u8>, proto_converter: &dyn PhysicalProtoConverterExtension, // new ) -> Result<()>; The two internal call sites in `serialize_physical_expr_with_converter` and `parse_physical_expr_with_converter` are updated to thread the active converter through. Downstream codecs can now route nested expression encode/decode through `proto_converter.physical_expr_to_proto` / `proto_converter.proto_to_physical_expr`, picking up dedup automatically. This is a breaking change for downstream codecs that override `try_encode_expr` / `try_decode_expr` -- they will need to add the new parameter (and may pass `_proto_converter` if they don't carry nested expressions). Codecs that only override `try_encode` / `try_decode` (plan-level) are unaffected. Wire format is unchanged (`PhysicalExprNode.expr_id` is unchanged); the fix is purely on the Rust trait API. Test: a new `extension_codec_expr_participates_in_deduplication` proto-roundtrip test constructs a `BinaryExpr` whose left operand is a bare `DynamicFilterPhysicalExpr` and right operand is a custom `WrapperExpr` whose extension codec embeds the same dynamic filter inside its serialized blob. After a `DeduplicatingProtoConverter` roundtrip, an `update()` applied to the bare-side decoded filter is observed by `current()` on the wrapped-side decoded filter, proving both refs back the same `Inner`. Without the converter-aware extension codec, this test fails because the two refs end up with distinct `Inner` allocations.
3ca0edd to
7a58afe
Compare
|
I want to take a deep look at this. It points out a real problem but I worry that there are like 4 different traits now going around... and I want to make sure we pass the right one / the API shapes are what we need. |
Which issue does this PR close?
Closes #22920.
Rationale for this change
#21807 introduced the
DynamicFilterPhysicalExprdedup pipeline so that identical references on the wire reconstruct to one sharedArc<Inner>on the decode side viaexpression_idcache keys. The pipeline works end-to-end for plans built from upstream types, and #22011 hooked it throughSortExec/AggregateExec/HashJoinExecplan codecs.In
to_proto.rs::serialize_physical_expr_with_converter, the extension codec'stry_encode_expris reached only as a fallback after the built-in path (expr.try_to_proto(&ctx)?) and theScalarFunctionExprdowncast. Sotry_encode_expronly fires for downstream-defined customPhysicalExprtypes. The same applies symmetrically toparse_physical_expr_with_converterandtry_decode_expron the decode side.When such a custom-
PhysicalExprcodec needs to serialize nestedPhysicalExprNodefields inside its blob, the only helper available today is the freeserialize_physical_expr(hardwired toDefaultPhysicalProtoConverter). So nested expressions inside a custom-PhysicalExprblob getexpr_id: Noneon the wire, and theDeduplicatingDeserializer's cache never hits for refs that travel through a custom-expr boundary — even when an outerDeduplicatingProtoConverteris in effect on the rest of the plan.Concretely: two references to the same
DynamicFilterPhysicalExpr(one inside a custom-expr's nested field, one outside) reconstruct as distinctInnerallocations after roundtrip, so heap-max updates from aSortExecfail to propagate to the wrapped reference.What changes are included in this PR?
Matches the existing plan-level pattern.
try_encodeandtry_decodealready take aproto_converterparameter; this change extends the expr-level methods the same way:The two internal call sites in
serialize_physical_expr_with_converterandparse_physical_expr_with_converterare updated to thread the active converter through. Downstream codecs whose customPhysicalExprtypes embed nestedPhysicalExprNodefields can now route those throughproto_converter.physical_expr_to_proto/proto_converter.proto_to_physical_exprand pick up dedup automatically.Are these changes tested?
Yes — a new
extension_codec_expr_participates_in_deduplicationproto-roundtrip test constructs aBinaryExprwhose left operand is a bareDynamicFilterPhysicalExprand right operand is a customWrapperExprwhose extension codec embeds the same dynamic filter inside its serialized blob. After aDeduplicatingProtoConverterroundtrip, anupdate()applied to the bare-side decoded filter is observed bycurrent()on the wrapped-side decoded filter, proving both refs back the sameInner. Without the converter-aware extension codec the test fails because the two refs end up with distinctInnerallocations.Are there any user-facing changes?
Yes — this is a breaking change for downstream codecs that override
try_encode_expr/try_decode_expr(i.e. codecs that handle customPhysicalExprtypes). They need to add the newproto_converterparameter (and may name it_proto_converterif they don't carry nested expressions). Codecs that only override the plan-leveltry_encode/try_decodeare unaffected.Wire format is unchanged (
PhysicalExprNode.expr_idis unchanged); the fix is purely on the Rust trait API.