Skip to content

Commit d213820

Browse files
committed
feat: make execution transfomation application rule generic
Currently, it depends on the particular execution transformation that resolves placeholders. However, it would be more convenient to be able to specify the rule which is required to be applied to the plan.
1 parent f44a1c7 commit d213820

11 files changed

Lines changed: 125 additions & 138 deletions

File tree

datafusion/core/tests/physical_optimizer/physical_expr_resolver.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Integration tests for [`PhysicalExprResolver`] optimizer rule.
18+
//! Integration tests for [`ExecutionTransformationApplier`] optimizer rule.
1919
2020
use std::{collections::HashMap, sync::Arc};
2121

@@ -29,11 +29,14 @@ use datafusion_physical_expr::{
2929
expressions::{BinaryExpr, col, lit, placeholder},
3030
};
3131
use datafusion_physical_optimizer::{
32-
PhysicalOptimizerRule, physical_expr_resolver::PhysicalExprResolver,
32+
PhysicalOptimizerRule, exec_transform_apply::ExecutionTransformationApplier,
3333
};
3434
use datafusion_physical_plan::{
35-
ExecutionPlan, filter::FilterExec, get_plan_string,
36-
plan_transformer::TransformPlanExec, repartition::RepartitionExec,
35+
ExecutionPlan,
36+
filter::FilterExec,
37+
get_plan_string,
38+
plan_transformer::{ResolvePlaceholdersRule, TransformPlanExec},
39+
repartition::RepartitionExec,
3740
};
3841

3942
use crate::physical_optimizer::test_utils::{
@@ -85,6 +88,12 @@ fn repartition_exec(
8588
)?))
8689
}
8790

91+
fn placeholder_resolver() -> ExecutionTransformationApplier {
92+
ExecutionTransformationApplier::new_post_optimization(Arc::new(
93+
ResolvePlaceholdersRule::new(),
94+
))
95+
}
96+
8897
#[test]
8998
fn test_noop_if_no_placeholders_found() -> Result<()> {
9099
let schema = create_schema();
@@ -105,8 +114,7 @@ fn test_noop_if_no_placeholders_found() -> Result<()> {
105114

106115
assert_eq!(initial, expected_initial);
107116

108-
let after_optimize = PhysicalExprResolver::new_post_optimization()
109-
.optimize(plan, &ConfigOptions::new())?;
117+
let after_optimize = placeholder_resolver().optimize(plan, &ConfigOptions::new())?;
110118

111119
let optimized_plan_string = get_plan_string(&after_optimize);
112120
assert_eq!(initial, optimized_plan_string);
@@ -134,8 +142,7 @@ fn test_wrap_plan_with_transformer() -> Result<()> {
134142

135143
assert_eq!(initial, expected_initial);
136144

137-
let after_optimize = PhysicalExprResolver::new_post_optimization()
138-
.optimize(plan, &ConfigOptions::new())?;
145+
let after_optimize = placeholder_resolver().optimize(plan, &ConfigOptions::new())?;
139146

140147
let expected_optimized = [
141148
"TransformPlanExec: rules=[ResolvePlaceholders: plans_to_modify=1]",
@@ -201,7 +208,7 @@ fn test_remove_useless_transformers() -> Result<()> {
201208
assert_eq!(initial, expected_initial);
202209

203210
let after_optimize =
204-
PhysicalExprResolver::new().optimize(plan, &ConfigOptions::new())?;
211+
ExecutionTransformationApplier::new().optimize(plan, &ConfigOptions::new())?;
205212

206213
let expected_optimized = [
207214
"GlobalLimitExec: skip=0, fetch=5",
@@ -244,7 +251,7 @@ fn test_combine_transformers() -> Result<()> {
244251
assert_eq!(initial, expected_initial);
245252

246253
let after_pre_optimization =
247-
PhysicalExprResolver::new().optimize(plan, &ConfigOptions::new())?;
254+
ExecutionTransformationApplier::new().optimize(plan, &ConfigOptions::new())?;
248255

249256
let expected_optimized = [
250257
"GlobalLimitExec: skip=0, fetch=5",
@@ -257,8 +264,8 @@ fn test_combine_transformers() -> Result<()> {
257264
let optimized_plan_string = get_plan_string(&after_pre_optimization);
258265
assert_eq!(optimized_plan_string, expected_optimized);
259266

260-
let after_post_optimization = PhysicalExprResolver::new_post_optimization()
261-
.optimize(after_pre_optimization, &ConfigOptions::new())?;
267+
let after_post_optimization =
268+
placeholder_resolver().optimize(after_pre_optimization, &ConfigOptions::new())?;
262269

263270
let expected_optimized = [
264271
"TransformPlanExec: rules=[ResolvePlaceholders: plans_to_modify=1]",

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ pub fn resolve_placeholders_exec(
400400
input: Arc<dyn ExecutionPlan>,
401401
) -> Arc<dyn ExecutionPlan> {
402402
Arc::new(
403-
TransformPlanExec::try_new(input, vec![Box::new(ResolvePlaceholdersRule::new())])
403+
TransformPlanExec::try_new(input, vec![Arc::new(ResolvePlaceholdersRule::new())])
404404
.unwrap(),
405405
)
406406
}

datafusion/datasource/src/values.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,7 @@ mod tests {
437437
// Should be ValuesSource because of placeholder.
438438
assert!(values_exec.data_source().as_any().is::<ValuesSource>());
439439

440-
let rules = vec![Box::new(ResolvePlaceholdersRule::new()) as Box<_>];
440+
let rules = vec![Arc::new(ResolvePlaceholdersRule::new()) as _];
441441
let exec = Arc::new(TransformPlanExec::try_new(values_exec, rules)?);
442442
let task_ctx = Arc::new(TaskContext::default().with_param_values(
443443
ParamValues::List(vec![ScalarValue::Int32(Some(10)).into()]),
@@ -470,7 +470,7 @@ mod tests {
470470
]];
471471

472472
let values_exec = ValuesSource::try_new_exec(Arc::clone(&schema), data)?;
473-
let rules = vec![Box::new(ResolvePlaceholdersRule::new()) as Box<_>];
473+
let rules = vec![Arc::new(ResolvePlaceholdersRule::new()) as _];
474474
let exec = Arc::new(TransformPlanExec::try_new(values_exec, rules)?) as Arc<_>;
475475

476476
let task_ctx = Arc::new(TaskContext::default().with_param_values(
@@ -528,7 +528,7 @@ mod tests {
528528
vec![vec![lit(10), placeholder("$foo", DataType::Int32)]];
529529

530530
let values_exec = ValuesSource::try_new_exec(Arc::clone(&schema), data)?;
531-
let rules = vec![Box::new(ResolvePlaceholdersRule::new()) as Box<_>];
531+
let rules = vec![Arc::new(ResolvePlaceholdersRule::new()) as _];
532532
let exec = Arc::new(TransformPlanExec::try_new(values_exec, rules)?) as Arc<_>;
533533

534534
let task_ctx = Arc::new(TaskContext::default());

datafusion/physical-optimizer/src/physical_expr_resolver.rs renamed to datafusion/physical-optimizer/src/exec_transform_apply.rs

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! [`PhysicalExprResolver`] ensures that the physical plan is prepared for placeholder resolution
19-
//! by wrapping it in a [`TransformPlanExec`] with a [`ResolvePlaceholdersRule`] if the plan
20-
//! contains any unresolved placeholders. The actual resolution happens during execution.
18+
//! [`ExecutionTransformationApplier`] ensures that the required execution transformations
19+
//! are applied to the physical plan.
2120
22-
use std::sync::Arc;
21+
use std::{borrow::Cow, sync::Arc};
2322

2423
use datafusion_common::{
2524
Result,
@@ -28,71 +27,72 @@ use datafusion_common::{
2827
};
2928
use datafusion_physical_plan::{
3029
ExecutionPlan,
31-
plan_transformer::{ResolvePlaceholdersRule, TransformPlanExec},
30+
plan_transformer::{ExecutionTransformationRule, TransformPlanExec},
3231
};
3332

3433
use crate::PhysicalOptimizerRule;
3534

36-
/// The phase in which the [`PhysicalExprResolver`] rule is applied.
37-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38-
pub enum PhysicalExprResolverPhase {
35+
/// The phase in which the [`ExecutionTransformationApplier`] rule is applied.
36+
#[derive(Debug)]
37+
pub enum ExecutionTransformationApplierPhase {
3938
/// Optimization that happens before most other optimizations.
4039
/// This optimization removes all [`TransformPlanExec`] execution plans from the plan
4140
/// tree.
4241
Pre,
4342
/// Optimization that happens after most other optimizations.
44-
/// This optimization checks for the presence of placeholders in the optimized plan, and if
45-
/// they are present, wraps the plan in a [`TransformPlanExec`] with a [`ResolvePlaceholdersRule`].
46-
Post,
43+
/// This optimization checks if `rule` requires to transform the plan and wraps the plan with
44+
/// [`TransformPlanExec`] if it so, or adds rule to the existing transformation node.
45+
Post {
46+
rule: Arc<dyn ExecutionTransformationRule>,
47+
},
4748
}
4849

49-
/// Physical optimizer rule that prepares the plan for placeholder resolution during execution.
50+
/// Physical optimizer rule that wraps the plan with a certain execution-stage transformation.
5051
#[derive(Debug)]
51-
pub struct PhysicalExprResolver {
52-
phase: PhysicalExprResolverPhase,
52+
pub struct ExecutionTransformationApplier {
53+
phase: ExecutionTransformationApplierPhase,
54+
name: Cow<'static, str>,
5355
}
5456

55-
impl PhysicalExprResolver {
56-
/// Creates a new [`PhysicalExprResolver`] optimizer rule that runs in the pre-optimization
57-
/// phase. In this phase, the rule removes any existing [`TransformPlanExec`] from the
58-
/// plan tree.
57+
impl ExecutionTransformationApplier {
58+
/// Creates a new [`ExecutionTransformationApplier`] optimizer rule that runs in the
59+
/// pre-optimization phase.
5960
pub fn new() -> Self {
6061
Self {
61-
phase: PhysicalExprResolverPhase::Pre,
62+
phase: ExecutionTransformationApplierPhase::Pre,
63+
name: Cow::Borrowed("ExecutionTransformationApplier"),
6264
}
6365
}
6466

65-
/// Creates a new [`PhysicalExprResolver`] optimizer rule that runs in the post-optimization
66-
/// phase. In this phase, the rule wraps the physical plan in a [`TransformPlanExec`] with a
67-
/// [`ResolvePlaceholdersRule`] if the plan contains any unresolved placeholders.
68-
pub fn new_post_optimization() -> Self {
67+
/// Creates a new [`ExecutionTransformationApplier`] optimizer rule that runs in the
68+
/// post-optimization phase.
69+
pub fn new_post_optimization(rule: Arc<dyn ExecutionTransformationRule>) -> Self {
70+
let name = format!("ExecutionTransformationApplier({})", rule.name());
6971
Self {
70-
phase: PhysicalExprResolverPhase::Post,
72+
phase: ExecutionTransformationApplierPhase::Post { rule },
73+
name: name.into(),
7174
}
7275
}
7376
}
7477

75-
impl Default for PhysicalExprResolver {
78+
impl Default for ExecutionTransformationApplier {
7679
fn default() -> Self {
7780
Self::new()
7881
}
7982
}
8083

81-
impl PhysicalOptimizerRule for PhysicalExprResolver {
84+
impl PhysicalOptimizerRule for ExecutionTransformationApplier {
8285
fn name(&self) -> &str {
83-
match self.phase {
84-
PhysicalExprResolverPhase::Pre => "PhysicalExprResolver",
85-
PhysicalExprResolverPhase::Post => "PhysicalExprResolver(Post)",
86-
}
86+
&self.name
8787
}
8888

8989
fn optimize(
9090
&self,
9191
plan: Arc<dyn ExecutionPlan>,
9292
_config: &ConfigOptions,
9393
) -> Result<Arc<dyn ExecutionPlan>> {
94-
match self.phase {
95-
PhysicalExprResolverPhase::Pre => plan
94+
match &self.phase {
95+
ExecutionTransformationApplierPhase::Pre => plan
9696
.transform_up(|plan| {
9797
if let Some(plan) = plan.as_any().downcast_ref::<TransformPlanExec>()
9898
{
@@ -102,26 +102,22 @@ impl PhysicalOptimizerRule for PhysicalExprResolver {
102102
}
103103
})
104104
.map(|t| t.data),
105-
PhysicalExprResolverPhase::Post => {
105+
ExecutionTransformationApplierPhase::Post { rule } => {
106106
if let Some(transformer) =
107107
plan.as_any().downcast_ref::<TransformPlanExec>()
108108
{
109-
let resolves_placeholders =
110-
transformer.has_rule::<ResolvePlaceholdersRule>();
111-
112-
if resolves_placeholders {
109+
let has_rule = transformer.has_dyn_rule(rule);
110+
if has_rule {
111+
// Rule is already applied.
113112
Ok(plan)
114113
} else {
115114
transformer
116-
.add_rule(Box::new(ResolvePlaceholdersRule::new()))
115+
.add_rule(Arc::clone(rule))
117116
.map(|r| Arc::new(r) as Arc<_>)
118117
}
119118
} else {
120-
let transformer = TransformPlanExec::try_new(
121-
plan,
122-
vec![Box::new(ResolvePlaceholdersRule::new())],
123-
)?;
124-
119+
let transformer =
120+
TransformPlanExec::try_new(plan, vec![Arc::clone(rule)])?;
125121
if transformer.plans_to_transform() > 0 {
126122
Ok(Arc::new(transformer))
127123
} else {

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ pub mod combine_partial_final_agg;
3030
pub mod enforce_distribution;
3131
pub mod enforce_sorting;
3232
pub mod ensure_coop;
33+
pub mod exec_transform_apply;
3334
pub mod filter_pushdown;
3435
pub mod join_selection;
3536
pub mod limit_pushdown;
3637
pub mod limit_pushdown_past_window;
3738
pub mod limited_distinct_aggregation;
3839
pub mod optimizer;
3940
pub mod output_requirements;
40-
pub mod physical_expr_resolver;
4141
pub mod projection_pushdown;
4242
pub use datafusion_pruning as pruning;
4343
pub mod pushdown_sort;

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ use crate::combine_partial_final_agg::CombinePartialFinalAggregate;
2525
use crate::enforce_distribution::EnforceDistribution;
2626
use crate::enforce_sorting::EnforceSorting;
2727
use crate::ensure_coop::EnsureCooperative;
28+
use crate::exec_transform_apply::ExecutionTransformationApplier;
2829
use crate::filter_pushdown::FilterPushdown;
2930
use crate::join_selection::JoinSelection;
3031
use crate::limit_pushdown::LimitPushdown;
3132
use crate::limited_distinct_aggregation::LimitedDistinctAggregation;
3233
use crate::output_requirements::OutputRequirements;
33-
use crate::physical_expr_resolver::PhysicalExprResolver;
3434
use crate::projection_pushdown::ProjectionPushdown;
3535
use crate::sanity_checker::SanityCheckPlan;
3636
use crate::topk_aggregation::TopKAggregation;
@@ -41,6 +41,7 @@ use crate::pushdown_sort::PushdownSort;
4141
use datafusion_common::Result;
4242
use datafusion_common::config::ConfigOptions;
4343
use datafusion_physical_plan::ExecutionPlan;
44+
use datafusion_physical_plan::plan_transformer::ResolvePlaceholdersRule;
4445

4546
/// `PhysicalOptimizerRule` transforms one ['ExecutionPlan'] into another which
4647
/// computes the same results, but in a potentially more efficient way.
@@ -88,7 +89,7 @@ impl PhysicalOptimizer {
8889
// this information is not lost across different rules during optimization.
8990
Arc::new(OutputRequirements::new_add_mode()),
9091
// This rule removes all existing `TransformPlanExec` nodes from the plan tree.
91-
Arc::new(PhysicalExprResolver::new()),
92+
Arc::new(ExecutionTransformationApplier::new()),
9293
Arc::new(AggregateStatistics::new()),
9394
// Statistics-based join selection will change the Auto mode to a real join implementation,
9495
// like collect left, or hash join, or future sort merge join, which will influence the
@@ -151,7 +152,9 @@ impl PhysicalOptimizer {
151152
// This rule prepares the physical plan for placeholder resolution by wrapping it in a
152153
// `TransformPlanExec` with a `ResolvePlaceholdersRule` if it contains any unresolved
153154
// placeholders.
154-
Arc::new(PhysicalExprResolver::new_post_optimization()),
155+
Arc::new(ExecutionTransformationApplier::new_post_optimization(
156+
Arc::new(ResolvePlaceholdersRule::new()),
157+
)),
155158
// This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan.
156159
// Therefore it should be run at the end of the optimization process since any changes to the plan may break the dynamic filter's references.
157160
// See `FilterPushdownPhase` for more details.

0 commit comments

Comments
 (0)