Skip to content

Follow-up to #21807: thread PhysicalProtoConverterExtension into PhysicalExtensionCodec expression-level methods #22920

@zhuqi-lucas

Description

@zhuqi-lucas

Follow-up to #21807: thread PhysicalProtoConverterExtension into PhysicalExtensionCodec expression-level methods

Edit 2: Clarifying scope after re-reading the upstream encode/decode path. This gap only matters for downstream-defined custom PhysicalExpr types. Custom file sources and custom plan extension types serialize through plan-level try_encode / try_decode, which already take proto_converter — they are unaffected. The remaining gap is specifically the expression-level fallback path.

Context

#21807's motivation was distributed execution (cf. Informs: datafusion-contrib/datafusion-distributed#180). The dedup pipeline lets a SortExec(TopK) running on one worker keep Arc<DynamicFilterPhysicalExpr> identity with the FileScan predicate that FilterPushdown cloned from it, so heap-max updates propagate to row-group pruning after the plan crosses a network/proto boundary.

For distributed engines built on top of datafusion-proto (datafusion-distributed, ballista, in-house distributed query engines), the win materializes fully when every plan and expression in the wire format flows through the dedup pipeline. The current trait surface leaves a gap at the expression-level extension codec fallback for downstream-defined custom PhysicalExpr types whose codec needs to serialize nested PhysicalExprNode fields.

Why this is expression-level only

Looking at serialize_physical_expr_with_converter in datafusion-proto's to_proto.rs, the codec's try_encode_expr is only reached as a fallback after the built-in path:

pub fn serialize_physical_expr_with_converter(
    value: &Arc<dyn PhysicalExpr>,
    codec: &dyn PhysicalExtensionCodec,
    proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<protobuf::PhysicalExprNode> {
    let expr_id = value.expression_id();

    // 1. Built-in `try_to_proto` path -- DynamicFilterPhysicalExpr,
    //    Column, BinaryExpr, etc. take this and return immediately with
    //    expr_id stamped.
    if let Some(node) = expr.try_to_proto(&ctx)? { return Ok(node); }

    // 2. ScalarFunctionExpr downcast, ...

    } else {
        // 3. Extension fallback -- only entered if 1+2 didn't match,
        //    i.e. `value` is a downstream-defined custom PhysicalExpr.
        match codec.try_encode_expr(value, &mut buf, /* no proto_converter */) { ... }
    }
}

So:

  • A custom file source carrying a DynamicFilterPhysicalExpr predicate (or any other built-in expr) is serialized at the plan-level via try_encode, then the file source's codec routes its nested predicate through proto_converter.physical_expr_to_proto(...) (which goes through path 1 above and stamps expr_id correctly). No change needed there.
  • A custom plan extension type is the same story: plan-level try_encode already takes proto_converter, and its codec routes nested exprs through the converter's physical_expr_to_proto. No change needed there.
  • A custom PhysicalExpr type (e.g. a Wrapper/Annotated/Lifted expression that downstream defines) is the case where try_encode_expr is actually reached. If its codec carries nested PhysicalExprNode fields and wants those nested exprs to participate in dedup, it needs proto_converter available inside try_encode_expr to call proto_converter.physical_expr_to_proto on them. Today the trait method signature doesn't expose it.

The same shape on the decode side: parse_physical_expr_with_converter enters codec.try_decode_expr(...) only for the ExprType::Extension variant on the wire, which is the inverse of the encode-side fallback above. Same gap.

Gap

Looking at PhysicalExtensionCodec on main today:

pub trait PhysicalExtensionCodec: Debug + Send + Sync + Any {
    fn try_decode(
        &self,
        buf: &[u8],
        inputs: &[Arc<dyn ExecutionPlan>],
        ctx: &TaskContext,
        proto_converter: &dyn PhysicalProtoConverterExtension,  // ✅ plan-level OK
    ) -> Result<Arc<dyn ExecutionPlan>>;

    fn try_encode(
        &self,
        node: Arc<dyn ExecutionPlan>,
        buf: &mut Vec<u8>,
        proto_converter: &dyn PhysicalProtoConverterExtension,  // ✅ plan-level OK
    ) -> Result<()>;

    // ...

    fn try_decode_expr(
        &self,
        _buf: &[u8],
        _inputs: &[Arc<dyn PhysicalExpr>],
        // ❌ no proto_converter -- breaks dedup for custom PhysicalExpr types
    ) -> Result<Arc<dyn PhysicalExpr>> { ... }

    fn try_encode_expr(
        &self,
        _node: &Arc<dyn PhysicalExpr>,
        _buf: &mut Vec<u8>,
        // ❌ no proto_converter -- breaks dedup for custom PhysicalExpr types
    ) -> Result<()> { ... }
}

When a custom-PhysicalExpr codec serializes a nested PhysicalExprNode inside its try_encode_expr blob, the only helper available is the free serialize_physical_expr, hardwired to DefaultPhysicalProtoConverter. Even when an outer DeduplicatingProtoConverter is in effect, that nested PhysicalExprNode gets expr_id: None on the wire and the dedup cache never finds a hit, so refs that travel through a custom-expr blob reconstruct as distinct Arc<Inner> allocations.

Wire format already has bandwidth for this (PhysicalExprNode.expr_id is optional uint64); the gap is purely on the Rust trait API for the expr-level methods.

Demonstration sketch

// A minimal custom expression whose only logical content is a nested
// Arc<dyn PhysicalExpr>. Its codec needs to serialize the nested
// PhysicalExprNode inside `try_encode_expr` / `try_decode_expr`.
struct WrapperExpr { inner: Arc<dyn PhysicalExpr>, /* ... */ }

struct WrapperCodec;
impl PhysicalExtensionCodec for WrapperCodec {
    fn try_encode_expr(&self, node: &Arc<dyn PhysicalExpr>, buf: &mut Vec<u8>) -> Result<()> {
        let w = node.downcast_ref::<WrapperExpr>().unwrap();
        // The only available helper is the Default-converter one --
        // no way to forward the active DeduplicatingProtoConverter.
        let inner_proto = serialize_physical_expr(&w.inner, self)?;
        WrapperExprProto { inner: Some(Box::new(inner_proto)) }.encode(buf)?;
        Ok(())
    }
    // ...
}

// Build a composite that references the same dyn filter both bare and wrapped:
let dyn_filter = make_dynamic_filter();
let composite = BinaryExpr::new(
    Arc::clone(&dyn_filter),
    Operator::And,
    Arc::new(WrapperExpr { inner: Arc::clone(&dyn_filter) }),
);

// Roundtrip through DeduplicatingProtoConverter:
let proto = converter.physical_expr_to_proto(&composite, &codec)?;
let decoded = converter.proto_to_physical_expr(&proto, &schema, &decode_ctx)?;

// After decode, the two refs back DIFFERENT Inner allocations:
// update() on one is NOT visible from current() on the other.

Proposed fix

Add the proto_converter parameter to the two expr-level methods, matching the breaking-change pattern already applied to try_encode / try_decode:

fn try_decode_expr(
    &self,
    _buf: &[u8],
    _inputs: &[Arc<dyn PhysicalExpr>],
    _proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn PhysicalExpr>> {
    not_impl_err!("PhysicalExtensionCodec is not provided")
}

fn try_encode_expr(
    &self,
    _node: &Arc<dyn PhysicalExpr>,
    _buf: &mut Vec<u8>,
    _proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
    not_impl_err!("PhysicalExtensionCodec is not provided")
}

Plus the two internal call sites in datafusion-proto:

  1. serialize_physical_expr_with_converter (the Extension fallback): codec.try_encode_expr(value, &mut buf, proto_converter).
  2. parse_physical_expr_with_converter (the ExprType::Extension arm): ctx.codec().try_decode_expr(extension.expr.as_slice(), &inputs, proto_converter).

Downstream codecs that override these methods then route nested expr (de)serialization through proto_converter.physical_expr_to_proto / proto_converter.proto_to_physical_expr, and the dedup cache extends naturally through the custom-expr boundary.

This is the same breaking-change shape that try_encode / try_decode already use — codecs that override the expr-level methods will need to add the new parameter. Codecs that only override the plan-level methods are unaffected. Wire format is unchanged.

Status

I have a local implementation + a roundtrip test (extension_codec_expr_participates_in_deduplication) that uses DeduplicatingProtoConverter and asserts referential integrity via update() / current() across the wrapped/bare boundary, opened in #22922. Happy to take direction on whether the breaking-change pattern (matching plan-level) or an additive _with_converter variant with default forwarding is preferred.

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions