Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,17 @@ public class OptimizerConfigOptions {
.withDescription(
"Strategy for optimizing the delta-join. Only AUTO, FORCE or NONE can be set. Default it AUTO.");

@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<ProjectFilterTransposeRule>
TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE =
key("table.optimizer.project-filter-transpose-rule")
.enumType(ProjectFilterTransposeRule.class)
.defaultValue(ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE)
.withDescription(
"Selects which Calcite ProjectFilterTransposeRule variant the optimizer "
+ "uses when pushing a Project past a Filter in the "
+ "PROJECT_REWRITE phase.");

/** Strategy for handling non-deterministic updates. */
@PublicEvolving
public enum NonDeterministicUpdateStrategy {
Expand Down Expand Up @@ -496,4 +507,37 @@ public InlineElement getDescription() {
return description;
}
}

/**
* Variant of Calcite's ProjectFilterTransposeRule used when pushing a Project past a Filter.
*/
@PublicEvolving
public enum ProjectFilterTransposeRule implements DescribedEnum {
PROJECT_FILTER_TRANSPOSE(
text(
"Default variant. Pushes a Project past a Filter, splitting expressions "
+ "so that only the columns referenced by the Filter remain below it.")),
PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS(
text(
"Pushes the Project past the Filter without splitting expressions. "
+ "Each top-level Project expression is preserved as a whole and "
+ "either pushed below the Filter or kept above it."
+ "NOTE: Required for proper nested projection + filtering")),
PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS(
text(
"Pushes the Project past the Filter only when the Project as a whole can be "
+ "moved below the Filter; the Project is treated as atomic and is "
+ "never split."));

private final InlineElement description;

ProjectFilterTransposeRule(InlineElement description) {
this.description = description;
}

@Override
public InlineElement getDescription() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ import org.apache.flink.table.api.config.OptimizerConfigOptions
import org.apache.flink.table.planner.plan.nodes.FlinkConventions
import org.apache.flink.table.planner.plan.rules.FlinkBatchRuleSets

import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.rel.rules.CoreRules
import org.apache.calcite.tools.{RuleSet, RuleSets}

import scala.collection.JavaConverters._

/** Defines a sequence of programs to optimize flink batch table plan. */
object FlinkBatchProgram {
Expand All @@ -45,6 +50,7 @@ object FlinkBatchProgram {

def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[BatchOptimizeContext] = {
val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]()
val projectFilterTransposeRule = pickProjectFilterTransposeRule(tableConfig)

chainedProgram.addLast(
// rewrite sub-queries to joins
Expand Down Expand Up @@ -239,17 +245,23 @@ object FlinkBatchProgram {
.build()
)

// window rewrite
// In PROJECT_REWRITE we utilize user desired transposition method
chainedProgram.addLast(
PROJECT_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.PROJECT_RULES)
.add(
substituteRule(
FlinkBatchRuleSets.PROJECT_RULES,
CoreRules.PROJECT_FILTER_TRANSPOSE,
projectFilterTransposeRule))
.build()
)

// optimize the logical plan
// Note: the project-filter-transpose variant is intentionally NOT appended to the Volcano
// LOGICAL phase. Prevents infinite oscillation with optimization from PROJECT_REWRITE
chainedProgram.addLast(
LOGICAL,
FlinkVolcanoProgramBuilder.newBuilder
Expand Down Expand Up @@ -298,4 +310,23 @@ object FlinkBatchProgram {

chainedProgram
}

private def pickProjectFilterTransposeRule(tableConfig: ReadableConfig): RelOptRule = {
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE) match {
case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE =>
CoreRules.PROJECT_FILTER_TRANSPOSE
case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS =>
CoreRules.PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS
case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS =>
CoreRules.PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS
}
}

private def substituteRule(base: RuleSet, oldRule: RelOptRule, newRule: RelOptRule): RuleSet = {
if (oldRule eq newRule) {
base
} else {
RuleSets.ofList((base.asScala.filterNot(_ eq oldRule) ++ Seq(newRule)).asJava)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ import org.apache.flink.table.planner.plan.rules.FlinkStreamRuleSets
import org.apache.flink.table.planner.plan.rules.logical.EventTimeTemporalJoinRewriteRule
import org.apache.flink.table.planner.plan.rules.physical.stream.{FlinkDuplicateChangesTraitInitProgram, FlinkMarkChangelogNormalizeProgram}

import org.apache.calcite.plan.RelOptRule
import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.rel.rules.CoreRules
import org.apache.calcite.tools.{RuleSet, RuleSets}

import scala.collection.JavaConverters._

/** Defines a sequence of programs to optimize for stream table plan. */
object FlinkStreamProgram {
Expand All @@ -45,6 +50,7 @@ object FlinkStreamProgram {

def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[StreamOptimizeContext] = {
val chainedProgram = new FlinkChainedProgram[StreamOptimizeContext]()
val projectFilterTransposeRule = pickProjectFilterTransposeRule(tableConfig)

// rewrite sub-queries to joins
chainedProgram.addLast(
Expand Down Expand Up @@ -248,17 +254,23 @@ object FlinkStreamProgram {
.build()
)

// project rewrite
// In PROJECT_REWRITE we utilize user desired transposition method
chainedProgram.addLast(
PROJECT_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkStreamRuleSets.PROJECT_RULES)
.add(
substituteRule(
FlinkStreamRuleSets.PROJECT_RULES,
CoreRules.PROJECT_FILTER_TRANSPOSE,
projectFilterTransposeRule))
.build()
)

// optimize the logical plan
// Note: the project-filter-transpose variant is intentionally NOT appended to the Volcano
// LOGICAL phase. Prevents infinite oscillation with optimization from PROJECT_REWRITE
chainedProgram.addLast(
LOGICAL,
FlinkVolcanoProgramBuilder.newBuilder
Expand Down Expand Up @@ -360,4 +372,23 @@ object FlinkStreamProgram {

chainedProgram
}

private def pickProjectFilterTransposeRule(tableConfig: ReadableConfig): RelOptRule = {
tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_PROJECT_FILTER_TRANSPOSE_RULE) match {
case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE =>
CoreRules.PROJECT_FILTER_TRANSPOSE
case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS =>
CoreRules.PROJECT_FILTER_TRANSPOSE_WHOLE_EXPRESSIONS
case OptimizerConfigOptions.ProjectFilterTransposeRule.PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS =>
CoreRules.PROJECT_FILTER_TRANSPOSE_WHOLE_PROJECT_EXPRESSIONS
}
}

private def substituteRule(base: RuleSet, oldRule: RelOptRule, newRule: RelOptRule): RuleSet = {
if (oldRule eq newRule) {
base
} else {
RuleSets.ofList((base.asScala.filterNot(_ eq oldRule) ++ Seq(newRule)).asJava)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,8 @@ object FlinkBatchRuleSets {
CoreRules.AGGREGATE_VALUES
)

/** RuleSet about project */
val PROJECT_RULES: RuleSet = RuleSets.ofList(
// push a projection past a filter
// push a projection past a filter (safe default; HEP swaps in the configured variant)
CoreRules.PROJECT_FILTER_TRANSPOSE,
// push a projection to the children of a non semi/anti join
// push all expressions to handle the time indicator correctly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,8 @@ object FlinkStreamRuleSets {
CoreRules.AGGREGATE_VALUES
)

/** RuleSet about project */
val PROJECT_RULES: RuleSet = RuleSets.ofList(
// push a projection past a filter
// push a projection past a filter (safe default; HEP swaps in the configured variant)
CoreRules.PROJECT_FILTER_TRANSPOSE,
// push a projection to the children of a non semi/anti join
// push all expressions to handle the time indicator correctly
Expand Down
Loading