Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,11 @@ pub fn parse_physical_expr_with_converter(
.iter()
.map(|e| proto_converter.proto_to_physical_expr(e, input_schema, ctx))
.collect::<Result<_>>()?;
ctx.codec()
.try_decode_expr(extension.expr.as_slice(), &inputs)? as _
ctx.codec().try_decode_expr(
extension.expr.as_slice(),
&inputs,
proto_converter,
)? as _
}
};

Expand Down
18 changes: 18 additions & 0 deletions datafusion/proto/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3940,18 +3940,36 @@ pub trait PhysicalExtensionCodec: Debug + Send + Sync + Any {
Ok(())
}

/// Decode a custom extension expression from `buf`.
///
/// Implementations whose proto carries nested `PhysicalExprNode` fields
/// should route those through `proto_converter.proto_to_physical_expr`
/// (rather than the free `parse_physical_expr` function) so that an
/// active `DeduplicatingDeserializer` can cache-hit on matching
/// `expr_id`s and re-share `Arc<dyn PhysicalExpr>` (e.g. a
/// `DynamicFilterPhysicalExpr` referenced both from a SortExec.filter
/// and from a custom file-source's predicate field).
fn try_decode_expr(
&self,
_buf: &[u8],
_inputs: &[Arc<dyn PhysicalExpr>],
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn PhysicalExpr>> {
not_impl_err!("PhysicalExtensionCodec is not provided")
}

/// Encode a custom extension expression into `buf`.
///
/// Implementations whose proto carries nested `PhysicalExprNode` fields
/// should route those through `proto_converter.physical_expr_to_proto`
/// (rather than the free `serialize_physical_expr` function) so that
/// an active `DeduplicatingProtoConverter` can stamp matching `expr_id`s
/// for shared inner expressions. See [`Self::try_decode_expr`].
fn try_encode_expr(
&self,
_node: &Arc<dyn PhysicalExpr>,
_buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
not_impl_err!("PhysicalExtensionCodec is not provided")
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/physical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ pub fn serialize_physical_expr_with_converter(
})
} else {
let mut buf: Vec<u8> = vec![];
match codec.try_encode_expr(value, &mut buf) {
match codec.try_encode_expr(value, &mut buf, proto_converter) {
Ok(_) => {
let inputs: Vec<protobuf::PhysicalExprNode> = value
.children()
Expand Down
238 changes: 238 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> {
&self,
buf: &[u8],
inputs: &[Arc<dyn PhysicalExpr>],
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn PhysicalExpr>> {
if buf == "CustomPredicateExpr".as_bytes() {
Ok(Arc::new(CustomPredicateExpr {
Expand All @@ -1175,6 +1176,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> {
&self,
node: &Arc<dyn PhysicalExpr>,
buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
if node.downcast_ref::<CustomPredicateExpr>().is_some() {
buf.extend_from_slice("CustomPredicateExpr".as_bytes());
Expand Down Expand Up @@ -4127,3 +4129,239 @@ fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> {

Ok(())
}

/// Demonstrates that `try_encode_expr` / `try_decode_expr` extension codecs
/// can now participate in the dedup pipeline by routing their nested
/// `PhysicalExprNode` fields through the active
/// `PhysicalProtoConverterExtension`.
///
/// Without the `proto_converter` parameter on the expr-level codec methods,
/// any nested `PhysicalExprNode` inside a custom expression's serialized
/// blob would skip dedup (`expr_id` would be `None` on the wire). Two
/// references to the same `DynamicFilterPhysicalExpr` (one inside an
/// extension expr, one outside) would reconstruct as two distinct Arcs on
/// the decode side, breaking heap-max propagation across plan-node
/// boundaries in distributed execution.
///
/// With the converter parameter plumbed through, the extension codec
/// delegates nested expr serialization to the active deduplicating
/// converter, and both refs cache-hit on the same `expr_id` on decode.
#[test]
fn extension_codec_expr_participates_in_deduplication() -> Result<()> {
use datafusion_common::DataFusionError;
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
use prost::Message;
use std::fmt::{self, Display};

// A minimal custom expression whose only logical content is a nested
// `Arc<dyn PhysicalExpr>`. Its codec routes that nested expr through
// the active `PhysicalProtoConverterExtension`.
#[derive(Debug)]
struct WrapperExpr {
inner: Arc<dyn PhysicalExpr>,
}
impl Display for WrapperExpr {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "WrapperExpr({})", self.inner)
}
}
impl std::hash::Hash for WrapperExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.inner.dyn_hash(state);
}
}
impl PartialEq for WrapperExpr {
fn eq(&self, other: &Self) -> bool {
self.inner.eq(&other.inner)
}
}
impl Eq for WrapperExpr {}
impl PhysicalExpr for WrapperExpr {
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.inner.data_type(input_schema)
}
fn nullable(&self, input_schema: &Schema) -> Result<bool> {
self.inner.nullable(input_schema)
}
fn evaluate(
&self,
_batch: &arrow::record_batch::RecordBatch,
) -> Result<datafusion::physical_plan::ColumnarValue> {
internal_err!("WrapperExpr is not executable in this test")
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.inner]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(WrapperExpr {
inner: children[0].clone(),
}))
}
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{self}")
}
}

// Wire layout: a single nested PhysicalExprNode field.
#[derive(Clone, PartialEq, prost::Message)]
struct WrapperExprProto {
#[prost(message, optional, boxed, tag = "1")]
inner: Option<Box<datafusion_proto::protobuf::PhysicalExprNode>>,
}

#[derive(Debug)]
struct WrapperCodec;
impl PhysicalExtensionCodec for WrapperCodec {
fn try_decode(
&self,
_buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn ExecutionPlan>> {
internal_err!("not used")
}
fn try_encode(
&self,
_node: Arc<dyn ExecutionPlan>,
_buf: &mut Vec<u8>,
_proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
internal_err!("not used")
}
fn try_decode_expr(
&self,
buf: &[u8],
_inputs: &[Arc<dyn PhysicalExpr>],
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<Arc<dyn PhysicalExpr>> {
let proto = WrapperExprProto::decode(buf).map_err(|e| {
DataFusionError::Internal(format!("decode WrapperExprProto: {e}"))
})?;
let inner_proto = proto
.inner
.ok_or_else(|| DataFusionError::Internal("missing inner".into()))?;
let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
let ctx = SessionContext::new();
let task_ctx = ctx.task_ctx();
let decode_ctx = PhysicalPlanDecodeContext::new(task_ctx.as_ref(), self);
let inner = proto_converter.proto_to_physical_expr(
&inner_proto,
&schema,
&decode_ctx,
)?;
Ok(Arc::new(WrapperExpr { inner }))
}
fn try_encode_expr(
&self,
node: &Arc<dyn PhysicalExpr>,
buf: &mut Vec<u8>,
proto_converter: &dyn PhysicalProtoConverterExtension,
) -> Result<()> {
let wrapper = node
.downcast_ref::<WrapperExpr>()
.ok_or_else(|| DataFusionError::Internal("not WrapperExpr".into()))?;
let inner_proto =
proto_converter.physical_expr_to_proto(&wrapper.inner, self)?;
let proto = WrapperExprProto {
inner: Some(Box::new(inner_proto)),
};
proto.encode(buf).map_err(|e| {
DataFusionError::Internal(format!("encode WrapperExprProto: {e}"))
})?;
Ok(())
}
}

// Build a single composite expression that holds TWO references to the
// same `Arc<DynamicFilterPhysicalExpr>`: a bare one on the left of a
// BinaryExpr, and one wrapped inside our extension expr on the right.
// Roundtripping the whole composite in a single decode call exercises
// a shared dedup cache.
let dyn_filter = make_dynamic_filter();
let wrapper: Arc<dyn PhysicalExpr> = Arc::new(WrapperExpr {
inner: Arc::clone(&dyn_filter),
});
let composite: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
Arc::clone(&dyn_filter),
Operator::And,
Arc::clone(&wrapper),
));

let codec = WrapperCodec;
let converter = DeduplicatingProtoConverter {};

let proto = converter.physical_expr_to_proto(&composite, &codec)?;
// Round-trip through prost bytes to mimic the wire.
let bytes = proto.encode_to_vec();
let decoded_proto =
datafusion_proto::protobuf::PhysicalExprNode::decode(bytes.as_slice()).unwrap();

let schema = Schema::new(vec![Field::new("a", DataType::Int64, false)]);
let ctx = SessionContext::new();
let task_ctx = ctx.task_ctx();
let decode_ctx = PhysicalPlanDecodeContext::new(task_ctx.as_ref(), &codec);
let decoded =
converter.proto_to_physical_expr(&decoded_proto, &schema, &decode_ctx)?;

let binary = decoded
.downcast_ref::<BinaryExpr>()
.expect("must decode back to BinaryExpr");
let decoded_left = Arc::clone(binary.left());
let decoded_right = Arc::clone(binary.right());
let decoded_wrapper = decoded_right
.downcast_ref::<WrapperExpr>()
.expect("right side must decode back to WrapperExpr");

// Semantic referential-integrity check: both decoded sides must
// observe the same logical filter. The bare side is a
// `DynamicFilterPhysicalExpr`; the wrapped side's inner is too. An
// `update()` on one must be visible from the other via
// `current()`, proving they share the same `Inner` (the entire
// point of #21807's dedup pipeline).
//
// Without the `proto_converter` parameter on `try_encode_expr` /
// `try_decode_expr`, the wrapper codec would route its nested
// expression through `DefaultPhysicalProtoConverter` and skip dedup
// entirely, so `decoded_left` and `decoded_wrapper.inner` would
// back distinct `Inner` allocations and an update would not
// propagate across the extension boundary.
let left_dyn = decoded_left
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("decoded bare expr must still be DynamicFilterPhysicalExpr");
let right_dyn = decoded_wrapper
.inner
.downcast_ref::<DynamicFilterPhysicalExpr>()
.expect("decoded wrapped inner must still be DynamicFilterPhysicalExpr");

// Sanity: matching expression_id is necessary for dedup to fire.
assert_eq!(
left_dyn.expression_id(),
right_dyn.expression_id(),
"shared `Inner` must report the same expression_id"
);

// The load-bearing check: prove `Inner` really is shared by updating
// one side and reading from the other.
let original_generation = left_dyn.snapshot_generation();
let new_expr: Arc<dyn PhysicalExpr> = lit(false);
left_dyn.update(Arc::clone(&new_expr))?;
assert_eq!(
right_dyn.snapshot_generation(),
original_generation + 1,
"update on the bare ref must be visible from the wrapped ref \
(same Inner)"
);
let observed = right_dyn.current()?;
assert_eq!(
format!("{observed}"),
format!("{new_expr}"),
"wrapped ref's current() must reflect the update applied to the \
bare ref"
);

Ok(())
}
Loading