From acb335fe665b9fda0a93b46dcfb7906e7117447f Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Thu, 16 Apr 2026 15:54:37 -0400 Subject: [PATCH 1/6] First pass working source-reuse. Tested with BQ connector --- .../api/config/OptimizerConfigOptions.java | 12 ++ .../plan/reuse/ReusableScanVisitor.java | 9 +- .../table/planner/plan/reuse/ScanReuser.java | 190 +++++++++++++++++- .../planner/plan/reuse/ScanReuserUtils.java | 24 ++- .../planner/plan/reuse/SubplanReuser.scala | 6 +- .../planner/plan/optimize/ScanReuseTest.java | 9 + 6 files changed, 236 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index a02915f100293..6d1617c6b51cb 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -105,6 +105,18 @@ public class OptimizerConfigOptions { + TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key() + " is true."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption TABLE_OPTIMIZER_REUSE_SOURCE_FILTER = + key("table.optimizer.source.reuse-filter") + .booleanType() + .defaultValue(false) + .withDescription( + "When it is true, the optimizer will try to reuse table sources that differ " + + "only in their pushed-down filters by moving filters into Calc nodes " + + "and sharing a single source reader. This works only when " + + TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED.key() + + " is true."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_OPTIMIZER_REUSE_SINK_ENABLED = key("table.optimizer.reuse-sink-enabled") diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java index b0b4050ccb203..afa47950bcad7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java @@ -33,14 +33,21 @@ /** Find reusable sources. */ public class ReusableScanVisitor extends RelVisitor { + private final boolean escapeFilter; + private final Map> digestToReusableScans = new HashMap<>(); + // Todo don't love this, want to see if I can do this so getDigest gets this at constant level + public ReusableScanVisitor(boolean escapeFilter) { + this.escapeFilter = escapeFilter; + } + @Override public void visit(RelNode node, int ordinal, RelNode parent) { if (node instanceof CommonPhysicalTableSourceScan) { CommonPhysicalTableSourceScan scan = (CommonPhysicalTableSourceScan) node; - String digest = getDigest(scan, true); + String digest = getDigest(scan, true, escapeFilter); digestToReusableScans.computeIfAbsent(digest, k -> new ArrayList<>()).add(scan); // If the scan has input such as dpp dynamic scan node, so also need to consider the // input diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java index 8da14bb06f25f..037e9c9b5c64c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java @@ -21,11 +21,15 @@ import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.connectors.DynamicSourceUtils; +import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; +import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; @@ -38,7 +42,11 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgramBuilder; +import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import java.util.ArrayList; import java.util.Arrays; @@ -56,6 +64,7 @@ import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.concatProjectedFields; import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.createCalcForScan; import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.enforceMetadataKeyOrder; +import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.getAbilitySpec; import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.getAdjustedWatermarkSpec; import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.indexOf; import static org.apache.flink.table.planner.plan.reuse.ScanReuserUtils.metadataKeys; @@ -100,6 +109,13 @@ * : +- TableSourceScan(table=[[MyTable, project=[a, b, c]]], fields=[a, b, c]) * } * + *

When {@code filterReuseEnabled} is set, scans with different pushed-down filters can also be + * merged. The per-scan filters are OR'd into a combined predicate. Before committing to the merge, + * the OR'd predicate is tested against a copy of the source via {@link + * SupportsFilterPushDown#applyFilters}. If the connector accepts it, the merged source pushes the + * OR'd filter and each consumer's Calc applies its original filter. If the connector rejects it, + * the group is skipped and scans remain separate — no full-table read occurs. + * *

This class do not reuse all sources, sources with same digest will be reused by {@link * SubplanReuser}. * @@ -128,14 +144,27 @@ public class ScanReuser { private final FlinkContext flinkContext; private final FlinkTypeFactory flinkTypeFactory; + private final boolean filterReuseEnabled; public ScanReuser(FlinkContext flinkContext, FlinkTypeFactory flinkTypeFactory) { + this(flinkContext, flinkTypeFactory, false); + } + + public ScanReuser( + FlinkContext flinkContext, + FlinkTypeFactory flinkTypeFactory, + boolean filterReuseEnabled) { this.flinkContext = flinkContext; this.flinkTypeFactory = flinkTypeFactory; + this.filterReuseEnabled = filterReuseEnabled; } public List reuseDuplicatedScan(List relNodes) { - ReusableScanVisitor visitor = new ReusableScanVisitor(); + // When filterReuseEnabled, escape filters in digest so scans with different + // filters are grouped together. Per-group, we check SupportsFilteredSourceReuse + // to decide whether to OR filters. If the source doesn't support it, we skip + // groups where filters differ (same-filter groups proceed normally for projection merge). + ReusableScanVisitor visitor = new ReusableScanVisitor(filterReuseEnabled); relNodes.forEach(visitor::go); for (List reusableNodes : @@ -149,6 +178,13 @@ public List reuseDuplicatedScan(List relNodes) { continue; } + // Determine if we should attempt to merge different filters (OR them). + // Requires: config enabled + source supports filter pushdown. + DynamicTableSource tableSource = + reusableNodes.get(0).tableSourceTable().tableSource(); + boolean mergeFilters = + filterReuseEnabled && tableSource instanceof SupportsFilterPushDown; + CommonPhysicalTableSourceScan pickScan = pickScanWithWatermark(reusableNodes); TableSourceTable pickTable = pickScan.tableSourceTable(); RexBuilder rexBuilder = pickScan.getCluster().getRexBuilder(); @@ -165,12 +201,18 @@ public List reuseDuplicatedScan(List relNodes) { allMetaKeySet.addAll(metadataKeys(source)); } + // 1.1 When merging filters, add filter-referenced columns to projection so Calc can filter. + if (mergeFilters) { + collectFilterReferencedColumns(reusableNodes, allProjectFieldSet); + } + int[][] allProjectFields = allProjectFieldSet.toArray(new int[0][]); List allMetaKeys = enforceMetadataKeyOrder(allMetaKeySet, pickTable.tableSource()); // 2. Create new source. - List specs = abilitySpecsWithoutEscaped(pickTable); + List specs = + abilitySpecsWithoutEscaped(pickTable, mergeFilters); // 2.1 Create produced type. // The source produced type is the input type into the runtime. The format looks as: @@ -182,7 +224,7 @@ public List reuseDuplicatedScan(List relNodes) { pickTable.contextResolvedTable().getResolvedSchema(), pickTable.tableSource()); - // 2.2 Apply projections + // 2.2 Apply projections and metadata List newSpecs = new ArrayList<>(); RowType newSourceType = applyPhysicalAndMetadataPushDown( @@ -206,7 +248,13 @@ public List reuseDuplicatedScan(List relNodes) { newSourceType = watermarkSpec.get().getProducedType().get(); } - // 2.4 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. + // 2.4 OR all per-scan filters into combined predicate for the source. + if (mergeFilters) { + buildOrFilterSpec(reusableNodes, newSourceType, rexBuilder) + .ifPresent(specs::add); + } + + // 2.5 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. DynamicTableSourceSpec tableSourceSpec = new DynamicTableSourceSpec(pickTable.contextResolvedTable(), specs); ScanTableSource newTableSource = @@ -221,15 +269,24 @@ public List reuseDuplicatedScan(List relNodes) { RelNode newScan = pickScan.copy(newSourceTable); - // 3. Create projects. + // 3. Create per-consumer Calc nodes. for (CommonPhysicalTableSourceScan scan : reusableNodes) { TableSourceTable source = scan.tableSourceTable(); int[][] projectedFields = projectedFields(source); List metaKeys = metadataKeys(source); + // check to see if we have any merged filters we need to create Calc nodes for + FilterPushDownSpec filterSpec = + getAbilitySpec(source.abilitySpecs(), FilterPushDownSpec.class); + boolean hasFilter = + mergeFilters + && filterSpec != null + && !filterSpec.getPredicates().isEmpty(); + // Don't need add calc if (Arrays.deepEquals(projectedFields, allProjectFields) - && metaKeys.equals(allMetaKeys)) { + && metaKeys.equals(allMetaKeys) + && !hasFilter) { // full project may be pushed into source, update to the new source replaceMap.put(scan, newScan); continue; @@ -247,6 +304,14 @@ public List reuseDuplicatedScan(List relNodes) { builder.addProject(index, newScan.getRowType().getFieldNames().get(index)); } + // Add original filter as Calc condition with remapped indices + if (hasFilter) { + RexShuttle remap = createFieldNameRemap( + physicalFieldNames(source), newScan.getRowType().getFieldNames()); + RexNode condition = andPredicates(filterSpec.getPredicates(), remap, rexBuilder); + builder.addCondition(condition); + } + replaceMap.put(scan, createCalcForScan(newScan, builder.getProgram())); } } @@ -257,6 +322,119 @@ public List reuseDuplicatedScan(List relNodes) { .collect(Collectors.toList()); } + /** Add all columns referenced by the filters to the projection set. */ + private static void collectFilterReferencedColumns( + List scans, + TreeSet allProjectFieldSet) { + for (CommonPhysicalTableSourceScan scan : scans) { + FilterPushDownSpec fs = + getAbilitySpec(scan.tableSourceTable().abilitySpecs(), FilterPushDownSpec.class); + if (fs == null) continue; + for (RexNode pred : fs.getPredicates()) { + for (RexInputRef ref : FlinkRexUtil.findAllInputRefs(pred)) { + allProjectFieldSet.add(new int[] {ref.getIndex()}); + } + } + } + } + + /** + * OR all table scan filter into a combined FilterPushDownSpec for the unified source. Before + * returning, verifies the connector accepts the OR'd predicate by calling applyFilters on a + * throwaway copy. Returns empty if any scan has no filter or the connector rejects the OR. + * TODO, still on the fence if this should return empty or throw exception indicating source + * incompatible with this source reuse + */ + // todo remove extra comments once finalized approach + private Optional buildOrFilterSpec( + List scans, + RowType newSourceType, + RexBuilder rexBuilder) { + List perScanFilters = new ArrayList<>(); + // Get on Node per table source scan + for (CommonPhysicalTableSourceScan scan : scans) { + FilterPushDownSpec fs = + getAbilitySpec(scan.tableSourceTable().abilitySpecs(), FilterPushDownSpec.class); + if (fs == null || fs.getPredicates().isEmpty()) { + // OR with empty filter -> no filter + return Optional.empty(); + } + RexShuttle remap = createFieldNameRemap( + physicalFieldNames(scan.tableSourceTable()), newSourceType.getFieldNames()); + perScanFilters.add(andPredicates(fs.getPredicates(), remap, rexBuilder)); + } + // todo figure out when this could happen + if (perScanFilters.isEmpty()) { + return Optional.empty(); + } + + // more efficient way to write this + RexNode combined = perScanFilters.get(0); + for (int i = 1; i < perScanFilters.size(); i++) { + combined = rexBuilder.makeCall(SqlStdOperatorTable.OR, combined, perScanFilters.get(i)); + } + + if (!connectorAcceptsFilter(scans.get(0).tableSourceTable().tableSource(), combined, newSourceType)) { + // todo decide if exception thrown here instead of no filter being passed + // could be dangerous passing no filter + return Optional.empty(); + } + + return Optional.of(new FilterPushDownSpec(List.of(combined), false)); + } + + /** + * Test whether the connector accepts a filter predicate by calling applyFilters on a + * throwaway copy. No calls to actual data-source is used. + */ + private boolean connectorAcceptsFilter( + DynamicTableSource source, RexNode filter, RowType sourceType) { + DynamicTableSource copy = source.copy(); + SourceAbilityContext context = + new SourceAbilityContext(flinkContext, flinkTypeFactory, sourceType); + // todo triple check the apply here is proper way to do this check is valid for ORs + SupportsFilterPushDown.Result result = + FilterPushDownSpec.apply(List.of(filter), copy, context); + return !result.getAcceptedFilters().isEmpty(); + } + + /** Create a RexShuttle that remaps RexInputRef indices by matching field names. */ + private static RexShuttle createFieldNameRemap( + List oldNames, List newNames) { + return new RexShuttle() { + @Override + public RexNode visitInputRef(RexInputRef ref) { + String fieldName = oldNames.get(ref.getIndex()); + int newIndex = newNames.indexOf(fieldName); + if (newIndex < 0) { + throw new org.apache.flink.table.api.TableException( + String.format( + "Field '%s' not found in target type %s during filter remap.", + fieldName, newNames)); + } + return new RexInputRef(newIndex, ref.getType()); + } + }; + } + + /** Get physical column field names from a TableSourceTable. */ + // needed because want to remap indicies from filter predicates to our unified scan + private static List physicalFieldNames(TableSourceTable source) { + return ((RowType) source.contextResolvedTable().getResolvedSchema() + .toPhysicalRowDataType().getLogicalType()).getFieldNames(); + } + + /** AND a list of predicates together, applying a remap shuttle to each. */ + // used for or'ing all our statements together + private static RexNode andPredicates( + List predicates, RexShuttle remap, RexBuilder rexBuilder) { + RexNode result = predicates.get(0).accept(remap); + for (int i = 1; i < predicates.size(); i++) { + result = rexBuilder.makeCall(SqlStdOperatorTable.AND, result, predicates.get(i).accept(remap)); + } + return result; + } + /** * Generate sourceAbilitySpecs and newProducedType by projected physical fields and metadata * keys. diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java index 38209afff8f3e..173470d4b742b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java @@ -91,10 +91,15 @@ public static int indexOf(int[][] projectedFields, int[] fieldIndices) { * ProjectPushDownSpec}. These abilities don't need before do scan reuse. */ public static List abilitySpecsWithoutEscaped(TableSourceTable table) { + return abilitySpecsWithoutEscaped(table, false); + } + + public static List abilitySpecsWithoutEscaped( + TableSourceTable table, boolean escapeFilter) { List ret = new ArrayList<>(); SourceAbilitySpec[] specs = table.abilitySpecs(); for (SourceAbilitySpec spec : specs) { - if (!isEscapeDigest(spec)) { + if (!isEscapeDigest(spec, escapeFilter)) { ret.add(spec); } } @@ -117,21 +122,23 @@ private static boolean isIgnoreDigest(SourceAbilitySpec spec) { return false; } - public static boolean isEscapeDigest(SourceAbilitySpec spec) { + public static boolean isEscapeDigest(SourceAbilitySpec spec, boolean reuseSourceFilter) { // WatermarkPushDownSpec is based on index, which is unstable by projection push down. // We can ignore Watermark, because sources will produce same watermark. return spec instanceof ProjectPushDownSpec || spec instanceof ReadingMetadataSpec - || spec instanceof WatermarkPushDownSpec; + || spec instanceof WatermarkPushDownSpec + || (spec instanceof FilterPushDownSpec && reuseSourceFilter); } - private static List extraDigestsWithoutEscapedAndIgnored(TableSourceTable table) { + private static List extraDigestsWithoutEscapedAndIgnored( + TableSourceTable table, boolean escapeFilter) { List ret = new ArrayList<>(); List digests = table.getSpecDigests(); SourceAbilitySpec[] specs = table.abilitySpecs(); for (int i = 0; i < specs.length; i++) { SourceAbilitySpec spec = specs[i]; - if (!isEscapeDigest(spec) && !isIgnoreDigest(spec)) { + if (!isEscapeDigest(spec, escapeFilter) && !isIgnoreDigest(spec)) { ret.add(digests.get(i)); } } @@ -343,6 +350,11 @@ public static boolean reusableWithoutAdjust(List reusableNode * @return the digest that ignore certain {@link SourceAbilitySpec}. */ public static String getDigest(CommonPhysicalTableSourceScan scan, boolean withoutEscape) { + return getDigest(scan, withoutEscape, false); + } + + public static String getDigest( + CommonPhysicalTableSourceScan scan, boolean withoutEscape, boolean escapeFilter) { TableSourceTable table = scan.tableSourceTable(); List digest = new ArrayList<>(); digest.addAll(table.getNames()); @@ -358,7 +370,7 @@ public static String getDigest(CommonPhysicalTableSourceScan scan, boolean witho } if (withoutEscape) { - digest.addAll(extraDigestsWithoutEscapedAndIgnored(table)); + digest.addAll(extraDigestsWithoutEscapedAndIgnored(table, escapeFilter)); } else { digest.addAll(extraDigestsWithoutIgnored(table)); } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala index db04a9c5a3303..e80a838d0213f 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/reuse/SubplanReuser.scala @@ -66,9 +66,13 @@ object SubplanReuser { val tableSinkReuseEnabled = tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED) + val filterReuseEnabled = + tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER) + var newRels = rels if (tableSourceReuseEnabled) { - newRels = new ScanReuser(flinkContext, flinkTypeFactory).reuseDuplicatedScan(rels) + newRels = new ScanReuser(flinkContext, flinkTypeFactory, filterReuseEnabled) + .reuseDuplicatedScan(rels) } if (tableSinkReuseEnabled) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java index 9a4a777f448de..9307aafc4f569 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java @@ -191,6 +191,15 @@ void testProjectWithMetaAndCompute() { util.verifyExecPlan(sqlQuery); } + @TestTemplate + void testUnionWithDifferentProjections() { + // Leg 1 needs project=[a, b], leg 2 needs project=[a, b, c] due to filter on c. + // ScanReuser should unify to a single source with project=[a, b, c]. + String sqlQuery = + "SELECT a, b FROM MyTable UNION ALL SELECT a, b FROM MyTable WHERE c = 'test'"; + util.verifyExecPlan(sqlQuery); + } + @TestTemplate void testProjectWithHints() { String sqlQuery = From 2d4e31366d7641219d9a264d4e82e4f2beaf43d1 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Tue, 21 Apr 2026 12:34:38 -0400 Subject: [PATCH 2/6] Updated unit tests + Style apply --- .../table/planner/plan/reuse/ScanReuser.java | 70 ++- .../planner/plan/optimize/ScanReuseTest.java | 238 ++++++++ .../planner/plan/optimize/ScanReuseTest.xml | 512 ++++++++++++++++++ 3 files changed, 790 insertions(+), 30 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java index 037e9c9b5c64c..345be0e5fb24f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java @@ -27,17 +27,17 @@ import org.apache.flink.table.planner.calcite.FlinkContext; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.connectors.DynamicSourceUtils; -import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; import org.apache.flink.table.planner.plan.abilities.source.FilterPushDownSpec; -import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; import org.apache.flink.table.planner.plan.abilities.source.ProjectPushDownSpec; import org.apache.flink.table.planner.plan.abilities.source.ReadingMetadataSpec; +import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext; import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec; import org.apache.flink.table.planner.plan.abilities.source.WatermarkPushDownSpec; import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec; import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalTableSourceScan; import org.apache.flink.table.planner.plan.rules.logical.PushProjectIntoTableSourceScanRule; import org.apache.flink.table.planner.plan.schema.TableSourceTable; +import org.apache.flink.table.planner.plan.utils.FlinkRexUtil; import org.apache.flink.table.types.logical.RowType; import org.apache.calcite.rel.RelNode; @@ -180,8 +180,7 @@ public List reuseDuplicatedScan(List relNodes) { // Determine if we should attempt to merge different filters (OR them). // Requires: config enabled + source supports filter pushdown. - DynamicTableSource tableSource = - reusableNodes.get(0).tableSourceTable().tableSource(); + DynamicTableSource tableSource = reusableNodes.get(0).tableSourceTable().tableSource(); boolean mergeFilters = filterReuseEnabled && tableSource instanceof SupportsFilterPushDown; @@ -201,7 +200,8 @@ public List reuseDuplicatedScan(List relNodes) { allMetaKeySet.addAll(metadataKeys(source)); } - // 1.1 When merging filters, add filter-referenced columns to projection so Calc can filter. + // 1.1 When merging filters, add filter-referenced columns to projection so Calc can + // filter. if (mergeFilters) { collectFilterReferencedColumns(reusableNodes, allProjectFieldSet); } @@ -211,8 +211,7 @@ public List reuseDuplicatedScan(List relNodes) { enforceMetadataKeyOrder(allMetaKeySet, pickTable.tableSource()); // 2. Create new source. - List specs = - abilitySpecsWithoutEscaped(pickTable, mergeFilters); + List specs = abilitySpecsWithoutEscaped(pickTable, mergeFilters); // 2.1 Create produced type. // The source produced type is the input type into the runtime. The format looks as: @@ -250,8 +249,7 @@ public List reuseDuplicatedScan(List relNodes) { // 2.4 OR all per-scan filters into combined predicate for the source. if (mergeFilters) { - buildOrFilterSpec(reusableNodes, newSourceType, rexBuilder) - .ifPresent(specs::add); + buildOrFilterSpec(reusableNodes, newSourceType, rexBuilder).ifPresent(specs::add); } // 2.5 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. @@ -279,9 +277,7 @@ public List reuseDuplicatedScan(List relNodes) { FilterPushDownSpec filterSpec = getAbilitySpec(source.abilitySpecs(), FilterPushDownSpec.class); boolean hasFilter = - mergeFilters - && filterSpec != null - && !filterSpec.getPredicates().isEmpty(); + mergeFilters && filterSpec != null && !filterSpec.getPredicates().isEmpty(); // Don't need add calc if (Arrays.deepEquals(projectedFields, allProjectFields) @@ -306,9 +302,12 @@ public List reuseDuplicatedScan(List relNodes) { // Add original filter as Calc condition with remapped indices if (hasFilter) { - RexShuttle remap = createFieldNameRemap( - physicalFieldNames(source), newScan.getRowType().getFieldNames()); - RexNode condition = andPredicates(filterSpec.getPredicates(), remap, rexBuilder); + RexShuttle remap = + createFieldNameRemap( + physicalFieldNames(source), + newScan.getRowType().getFieldNames()); + RexNode condition = + andPredicates(filterSpec.getPredicates(), remap, rexBuilder); builder.addCondition(condition); } @@ -324,12 +323,14 @@ public List reuseDuplicatedScan(List relNodes) { /** Add all columns referenced by the filters to the projection set. */ private static void collectFilterReferencedColumns( - List scans, - TreeSet allProjectFieldSet) { + List scans, TreeSet allProjectFieldSet) { for (CommonPhysicalTableSourceScan scan : scans) { FilterPushDownSpec fs = - getAbilitySpec(scan.tableSourceTable().abilitySpecs(), FilterPushDownSpec.class); - if (fs == null) continue; + getAbilitySpec( + scan.tableSourceTable().abilitySpecs(), FilterPushDownSpec.class); + if (fs == null) { + continue; + } for (RexNode pred : fs.getPredicates()) { for (RexInputRef ref : FlinkRexUtil.findAllInputRefs(pred)) { allProjectFieldSet.add(new int[] {ref.getIndex()}); @@ -354,13 +355,16 @@ private Optional buildOrFilterSpec( // Get on Node per table source scan for (CommonPhysicalTableSourceScan scan : scans) { FilterPushDownSpec fs = - getAbilitySpec(scan.tableSourceTable().abilitySpecs(), FilterPushDownSpec.class); + getAbilitySpec( + scan.tableSourceTable().abilitySpecs(), FilterPushDownSpec.class); if (fs == null || fs.getPredicates().isEmpty()) { // OR with empty filter -> no filter return Optional.empty(); } - RexShuttle remap = createFieldNameRemap( - physicalFieldNames(scan.tableSourceTable()), newSourceType.getFieldNames()); + RexShuttle remap = + createFieldNameRemap( + physicalFieldNames(scan.tableSourceTable()), + newSourceType.getFieldNames()); perScanFilters.add(andPredicates(fs.getPredicates(), remap, rexBuilder)); } // todo figure out when this could happen @@ -374,7 +378,8 @@ private Optional buildOrFilterSpec( combined = rexBuilder.makeCall(SqlStdOperatorTable.OR, combined, perScanFilters.get(i)); } - if (!connectorAcceptsFilter(scans.get(0).tableSourceTable().tableSource(), combined, newSourceType)) { + if (!connectorAcceptsFilter( + scans.get(0).tableSourceTable().tableSource(), combined, newSourceType)) { // todo decide if exception thrown here instead of no filter being passed // could be dangerous passing no filter return Optional.empty(); @@ -384,8 +389,8 @@ private Optional buildOrFilterSpec( } /** - * Test whether the connector accepts a filter predicate by calling applyFilters on a - * throwaway copy. No calls to actual data-source is used. + * Test whether the connector accepts a filter predicate by calling applyFilters on a throwaway + * copy. No calls to actual data-source is used. */ private boolean connectorAcceptsFilter( DynamicTableSource source, RexNode filter, RowType sourceType) { @@ -399,8 +404,7 @@ private boolean connectorAcceptsFilter( } /** Create a RexShuttle that remaps RexInputRef indices by matching field names. */ - private static RexShuttle createFieldNameRemap( - List oldNames, List newNames) { + private static RexShuttle createFieldNameRemap(List oldNames, List newNames) { return new RexShuttle() { @Override public RexNode visitInputRef(RexInputRef ref) { @@ -420,8 +424,12 @@ public RexNode visitInputRef(RexInputRef ref) { /** Get physical column field names from a TableSourceTable. */ // needed because want to remap indicies from filter predicates to our unified scan private static List physicalFieldNames(TableSourceTable source) { - return ((RowType) source.contextResolvedTable().getResolvedSchema() - .toPhysicalRowDataType().getLogicalType()).getFieldNames(); + return ((RowType) + source.contextResolvedTable() + .getResolvedSchema() + .toPhysicalRowDataType() + .getLogicalType()) + .getFieldNames(); } /** AND a list of predicates together, applying a remap shuttle to each. */ @@ -430,7 +438,9 @@ private static RexNode andPredicates( List predicates, RexShuttle remap, RexBuilder rexBuilder) { RexNode result = predicates.get(0).accept(remap); for (int i = 1; i < predicates.size(); i++) { - result = rexBuilder.makeCall(SqlStdOperatorTable.AND, result, predicates.get(i).accept(remap)); + result = + rexBuilder.makeCall( + SqlStdOperatorTable.AND, result, predicates.get(i).accept(remap)); } return result; } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java index 9307aafc4f569..44295a6889958 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.java @@ -20,6 +20,7 @@ import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.table.planner.utils.TableTestBase; import org.apache.flink.table.planner.utils.TableTestUtil; import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; @@ -485,4 +486,241 @@ void testWatermarkPushDownWithTimestampChanged() { stmt.addInsertSql("INSERT INTO MySinkTs SELECT b, ts_ltz FROM MyTableWatermark"); util.verifyExecPlan(stmt); } + + // ========================================================================= + // Filter reuse tests + // + // These tests verify that non-filter abilities are NOT incorrectly merged + // when filter reuse is enabled. Each test enables the config and verifies + // the plan stays correct — no unintended merging of limits, partitions, etc. + // If a future change escapes any of these specs from the digest, these + // tests will catch the regression. + // ========================================================================= + + @TestTemplate + void testFilterReuseDifferentFiltersNotMergedWithoutConfig() { + // Config OFF: two scans with different pushed filters should NOT be merged. + // Verifies baseline behavior is preserved. + String sqlQuery = + "SELECT T1.a, T1.c, T2.c FROM" + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 2) T1," + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T2" + + " WHERE T1.a = T2.a"; + util.verifyExecPlan(sqlQuery); + } + + @TestTemplate + void testFilterReuseSameFilterDifferentProjections() { + // Config ON: same filter, different projections. Should merge via existing + // projection reuse — filter stays pushed as-is on the unified source. + util.tableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER, true); + String sqlQuery = + "SELECT T1.a, T1.b, T2.c FROM" + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T1," + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T2" + + " WHERE T1.a = T2.a"; + util.verifyExecPlan(sqlQuery); + } + + @TestTemplate + void testFilterReuseDifferentLimitsNotMerged() { + // Config ON: different limits on same table. Limits are NOT escaped from + // digest, so scans with different limits must remain separate. + // Guards against LimitPushDownSpec being accidentally escaped. + util.tableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER, true); + String sqlQuery = + "SELECT T1.a, T1.c, T2.c FROM" + + " (SELECT * FROM MyTable LIMIT 10) T1," + + " (SELECT * FROM MyTable LIMIT 20) T2" + + " WHERE T1.a = T2.a"; + util.verifyExecPlan(sqlQuery); + } + + @TestTemplate + void testFilterReuseDifferentPartitionsNotMerged() { + // Config ON: different partitions on same table. Partitions are NOT escaped + // from digest, so scans with different partitions must remain separate. + // Guards against PartitionPushDownSpec being accidentally escaped. + if (!isStreaming) { + util.tableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER, true); + String sqlQuery = + "SELECT T1.a, T1.c, T2.c FROM" + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('partition-list'='c:1;c:2') */" + + " WHERE c = '1') T1," + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('partition-list'='c:1;c:2') */" + + " WHERE c = '2') T2" + + " WHERE T1.a = T2.a"; + util.verifyExecPlan(sqlQuery); + } + } + + @TestTemplate + void testFilterReuseDifferentHintsNotMerged() { + // Config ON: different OPTIONS hints on same table. Hints are included in + // digest, so scans with different hints must remain separate. + // Guards against hints being stripped when filter reuse is enabled. + util.tableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER, true); + String sqlQuery = + "SELECT T1.a, T1.c, T2.c FROM" + + " MyTable /*+ OPTIONS('source.num-element-to-skip'='1') */ T1," + + " MyTable /*+ OPTIONS('source.num-element-to-skip'='10') */ T2" + + " WHERE T1.a = T2.a"; + util.verifyExecPlan(sqlQuery); + } + + @TestTemplate + void testFilterReuseWithWatermarkPushDown() { + // Config ON: same table with watermark pushdown and different filters. + // Verifies watermark spec is correctly adjusted when filter reuse merges + // sources that have watermark pushdown enabled. + if (isStreaming) { + util.tableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER, true); + String ddl = + "CREATE TABLE FilterWatermarkTable (\n" + + " a int,\n" + + " b bigint,\n" + + " c string,\n" + + " rtime timestamp(3),\n" + + " WATERMARK FOR rtime AS rtime - INTERVAL '5' SECOND\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'false',\n" + + " 'enable-watermark-push-down' = 'true',\n" + + " 'filterable-fields' = 'b',\n" + + " 'disable-lookup' = 'true'" + + ")"; + util.tableEnv().executeSql(ddl); + String sqlQuery = + "SELECT T1.a, T1.c, T2.c FROM" + + " (SELECT MIN(a) as a, MIN(c) as c FROM FilterWatermarkTable" + + " WHERE b = 1 GROUP BY" + + " TUMBLE(rtime, INTERVAL '10' SECOND)) T1," + + " (SELECT MIN(a) as a, MIN(c) as c FROM FilterWatermarkTable" + + " WHERE b = 2 GROUP BY" + + " TUMBLE(rtime, INTERVAL '10' SECOND)) T2" + + " WHERE T1.a = T2.a"; + util.verifyExecPlan(sqlQuery); + } + } + + @TestTemplate + void testFilterReuseWithMetadata() { + // Config ON: same table with metadata reading and different filters. + // Verifies metadata columns are correctly unified when filter reuse + // merges sources with different metadata projections. + util.tableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER, true); + String sqlQuery = + "SELECT T1.a, T1.metadata_1, T2.c, T2.metadata_2 FROM" + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T1," + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 2) T2" + + " WHERE T1.a = T2.a"; + util.verifyExecPlan(sqlQuery); + } + + @TestTemplate + void testFilterReuseConfigOffPreservesExistingBehavior() { + // Config OFF (default): verifies that the existing filter pushdown reuse + // behavior (same filter = reuse) is not affected by the presence of the + // config option. Same-filter scans should still reuse via SubplanReuser. + String sqlQuery = + "SELECT T1.a, T1.b, T2.c FROM" + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T1," + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T2" + + " WHERE T1.a = T2.a"; + util.verifyExecPlan(sqlQuery); + } + + @TestTemplate + void testFilterReuseThreeWayUnion() { + // Config ON: three-way UNION ALL with different filters. The values connector + // rejects OR, so scans should remain separate. Tests nested OR handling and + // the connector rejection fallback with 3+ legs. + util.tableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER, true); + String sqlQuery = + "SELECT a, b FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1" + + " UNION ALL" + + " SELECT a, b FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 2" + + " UNION ALL" + + " SELECT a, b FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 3"; + util.verifyExecPlan(sqlQuery); + } + + @TestTemplate + void testFilterReuseMultiSinkStatementSet() { + // Config ON: StatementSet with two INSERT statements reading from the same + // table with different filters. Tests cross-statement source reuse. + util.tableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER, true); + util.tableEnv() + .executeSql( + "CREATE TABLE Sink1 (\n" + + " a int,\n" + + " b bigint\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'table-sink-class' = 'DEFAULT'" + + ")"); + util.tableEnv() + .executeSql( + "CREATE TABLE Sink2 (\n" + + " a int,\n" + + " c string\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'table-sink-class' = 'DEFAULT'" + + ")"); + StatementSet stmt = util.tableEnv().createStatementSet(); + stmt.addInsertSql( + "INSERT INTO Sink1 SELECT a, b FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1"); + stmt.addInsertSql( + "INSERT INTO Sink2 SELECT a, c FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 2"); + util.verifyExecPlan(stmt); + } + + @TestTemplate + void testFilterReuseMixedFilterAndNoFilter() { + // Config ON: one leg has a pushed filter, the other has no filter. + // OR with "everything" = "everything", so buildOrFilterSpec should return empty. + // Scans should remain separate (or merge without filter pushdown). + util.tableEnv() + .getConfig() + .set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_FILTER, true); + String sqlQuery = + "SELECT T1.a, T1.b, T2.c FROM" + + " (SELECT * FROM" + + " MyTable /*+ OPTIONS('filterable-fields'='b') */ WHERE b = 1) T1," + + " MyTable T2" + + " WHERE T1.a = T2.a"; + util.verifyExecPlan(sqlQuery); + } } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml index 0e508ef7691de..9bd0b2f4a2ab2 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml @@ -16,6 +16,468 @@ See the License for the specific language governing permissions and limitations under the License. --> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -1376,6 +1838,56 @@ Sink(table=[default_catalog.default_database.snk1], fields=[origin_ts, partition Sink(table=[default_catalog.default_database.snk2], fields=[id]) +- Calc(select=[id]) +- Reused(reference_id=[1]) +]]> + + + + + + + + + + + + + + + + + + + + + + From caa9f195a7ce67b8e8136d510678a61c137a50e2 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Tue, 21 Apr 2026 16:05:14 -0400 Subject: [PATCH 3/6] Removed TODOs after validation --- .../plan/reuse/ReusableScanVisitor.java | 1 - .../table/planner/plan/reuse/ScanReuser.java | 22 +++++++++---------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java index afa47950bcad7..f35b8c0ebb19b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java @@ -38,7 +38,6 @@ public class ReusableScanVisitor extends RelVisitor { private final Map> digestToReusableScans = new HashMap<>(); - // Todo don't love this, want to see if I can do this so getDigest gets this at constant level public ReusableScanVisitor(boolean escapeFilter) { this.escapeFilter = escapeFilter; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java index 345be0e5fb24f..865b9598d4263 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java @@ -248,8 +248,15 @@ public List reuseDuplicatedScan(List relNodes) { } // 2.4 OR all per-scan filters into combined predicate for the source. + // If the connector rejects the combined filter, skip this group entirely and revert + // source back to non-table reuse behaviour if (mergeFilters) { - buildOrFilterSpec(reusableNodes, newSourceType, rexBuilder).ifPresent(specs::add); + Optional combinedFilter = + buildOrFilterSpec(reusableNodes, newSourceType, rexBuilder); + if (combinedFilter.isEmpty()) { + continue; + } + specs.add(combinedFilter.get()); } // 2.5 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. @@ -340,13 +347,10 @@ private static void collectFilterReferencedColumns( } /** - * OR all table scan filter into a combined FilterPushDownSpec for the unified source. Before - * returning, verifies the connector accepts the OR'd predicate by calling applyFilters on a - * throwaway copy. Returns empty if any scan has no filter or the connector rejects the OR. - * TODO, still on the fence if this should return empty or throw exception indicating source - * incompatible with this source reuse + * OR all table scan filter into a combined FilterPushDownSpec for the unified source. Calls + * applyFilters on a copy of source to verify source connector accepts filter. Returns empty if + * any scan has no filter or the connector rejects the OR. */ - // todo remove extra comments once finalized approach private Optional buildOrFilterSpec( List scans, RowType newSourceType, @@ -367,7 +371,6 @@ private Optional buildOrFilterSpec( newSourceType.getFieldNames()); perScanFilters.add(andPredicates(fs.getPredicates(), remap, rexBuilder)); } - // todo figure out when this could happen if (perScanFilters.isEmpty()) { return Optional.empty(); } @@ -380,8 +383,6 @@ private Optional buildOrFilterSpec( if (!connectorAcceptsFilter( scans.get(0).tableSourceTable().tableSource(), combined, newSourceType)) { - // todo decide if exception thrown here instead of no filter being passed - // could be dangerous passing no filter return Optional.empty(); } @@ -397,7 +398,6 @@ private boolean connectorAcceptsFilter( DynamicTableSource copy = source.copy(); SourceAbilityContext context = new SourceAbilityContext(flinkContext, flinkTypeFactory, sourceType); - // todo triple check the apply here is proper way to do this check is valid for ORs SupportsFilterPushDown.Result result = FilterPushDownSpec.apply(List.of(filter), copy, context); return !result.getAcceptedFilters().isEmpty(); From 2aaf4acdbe65ee271b1a27e723dea35170e1a32f Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Wed, 22 Apr 2026 10:05:02 -0400 Subject: [PATCH 4/6] Updated to remove duplicate filteres + will abort source reuse on empty + test source util support OR --- .../table/planner/plan/reuse/ScanReuser.java | 18 +- .../table/planner/utils/FilterUtils.java | 18 +- .../planner/plan/optimize/ScanReuseTest.xml | 199 +++++++++++++++++- 3 files changed, 223 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java index 865b9598d4263..54e1980d0c206 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java @@ -375,10 +375,20 @@ private Optional buildOrFilterSpec( return Optional.empty(); } - // more efficient way to write this - RexNode combined = perScanFilters.get(0); - for (int i = 1; i < perScanFilters.size(); i++) { - combined = rexBuilder.makeCall(SqlStdOperatorTable.OR, combined, perScanFilters.get(i)); + // Deduplicate filters — e.g. 3 scans where 2 share the same filter + Set seen = new HashSet<>(); + List distinctFilters = new ArrayList<>(); + for (RexNode f : perScanFilters) { + if (seen.add(f.toString())) { + distinctFilters.add(f); + } + } + + RexNode combined = distinctFilters.get(0); + // OR distinct filters (if > 1) together + for (int i = 1; i < distinctFilters.size(); i++) { + combined = + rexBuilder.makeCall(SqlStdOperatorTable.OR, combined, distinctFilters.get(i)); } if (!connectorAcceptsFilter( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java index 32d3afa233a9d..161c52481200c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/FilterUtils.java @@ -43,11 +43,19 @@ public class FilterUtils { public static boolean shouldPushDown(ResolvedExpression expr, Set filterableFields) { - if (expr instanceof CallExpression && expr.getChildren().size() == 2) { - return shouldPushDownUnaryExpression( - expr.getResolvedChildren().get(0), filterableFields) - && shouldPushDownUnaryExpression( - expr.getResolvedChildren().get(1), filterableFields); + if (expr instanceof CallExpression) { + FunctionDefinition def = ((CallExpression) expr).getFunctionDefinition(); + if (def.equals(BuiltInFunctionDefinitions.OR) + || def.equals(BuiltInFunctionDefinitions.AND)) { + return expr.getResolvedChildren().stream() + .allMatch(child -> shouldPushDown(child, filterableFields)); + } + if (expr.getChildren().size() == 2) { + return shouldPushDownUnaryExpression( + expr.getResolvedChildren().get(0), filterableFields) + && shouldPushDownUnaryExpression( + expr.getResolvedChildren().get(1), filterableFields); + } } return false; } diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml index 9bd0b2f4a2ab2..1fea8630769b5 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ScanReuseTest.xml @@ -303,6 +303,126 @@ Calc(select=[a, c, c0]) : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, partitions=[{c=1}], project=[a], metadata=[]]], fields=[a], hints=[[[OPTIONS options:{partition-list=c:1;c:2}]]]) +- Exchange(distribution=[hash[a]]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, partitions=[{c=2}], project=[a], metadata=[]]], fields=[a], hints=[[[OPTIONS options:{partition-list=c:1;c:2}]]]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -331,7 +451,7 @@ Calc(select=[a, b, c]) +- MultipleInput(readOrder=[1,0], members=[\nHashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, b, a0, c], build=[left])\n:- Calc(select=[a, CAST(1 AS BIGINT) AS b])\n: +- [#2] Exchange(distribution=[hash[a]])\n+- [#1] Exchange(distribution=[hash[a]])\n]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, c], where=[(b = 1)]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c], metadata=[]]], fields=[a, b, c], hints=[[[OPTIONS options:{filterable-fields=b}]]])(reuse_id=[1]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c], metadata=[], filter=[=(b, 1:BIGINT)]]], fields=[a, b, c], hints=[[[OPTIONS options:{filterable-fields=b}]]])(reuse_id=[1]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a], where=[(b = 1)]) +- Reused(reference_id=[1]) @@ -369,6 +489,79 @@ Calc(select=[a, b, c]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, c]) +- Reused(reference_id=[1]) +]]> + + + + + + + + + + + + + + + + + + + + + + @@ -397,7 +590,7 @@ Calc(select=[a, metadata_1, c, metadata_2]) +- HashJoin(joinType=[InnerJoin], where=[(a = a0)], select=[a, metadata_1, a0, c, metadata_2], build=[left]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, metadata_1], where=[(b = 1)]) - : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, metadata_1, metadata_2], metadata=[]]], fields=[a, b, c, metadata_1, metadata_2], hints=[[[OPTIONS options:{filterable-fields=b}]]])(reuse_id=[1]) + : +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b, c, metadata_1, metadata_2], metadata=[], filter=[OR(=(b, 1:BIGINT), =(b, 2:BIGINT))]]], fields=[a, b, c, metadata_1, metadata_2], hints=[[[OPTIONS options:{filterable-fields=b}]]])(reuse_id=[1]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, c, metadata_2], where=[(b = 2)]) +- Reused(reference_id=[1]) @@ -469,7 +662,7 @@ Calc(select=[a, c, c0]) : +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rtime, 10000)], select=[MIN(a) AS a, MIN(c) AS c]) : +- Exchange(distribution=[single]) : +- Calc(select=[rtime, a, c], where=[(b = 1)]) - : +- TableSourceScan(table=[[default_catalog, default_database, FilterWatermarkTable, project=[a, b, c, rtime], metadata=[], watermark=[-(rtime, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[a, b, c, rtime])(reuse_id=[1]) + : +- TableSourceScan(table=[[default_catalog, default_database, FilterWatermarkTable, project=[a, b, c, rtime], metadata=[], watermark=[-(rtime, 5000:INTERVAL SECOND)], watermarkEmitStrategy=[on-periodic], filter=[OR(=(b, 1:BIGINT), =(b, 2:BIGINT))]]], fields=[a, b, c, rtime])(reuse_id=[1]) +- Exchange(distribution=[hash[a]]) +- GroupWindowAggregate(window=[TumblingGroupWindow('w$, rtime, 10000)], select=[MIN(a) AS a, MIN(c) AS c]) +- Exchange(distribution=[single]) From a0959127284657ea307c3181312657b515fbacb9 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Wed, 22 Apr 2026 13:21:40 -0400 Subject: [PATCH 5/6] Naming + comments update --- .../flink/table/planner/plan/reuse/ReusableScanVisitor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java index f35b8c0ebb19b..a45f239a65aeb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ReusableScanVisitor.java @@ -33,20 +33,20 @@ /** Find reusable sources. */ public class ReusableScanVisitor extends RelVisitor { - private final boolean escapeFilter; + private final boolean filterReuseEnabled; private final Map> digestToReusableScans = new HashMap<>(); public ReusableScanVisitor(boolean escapeFilter) { - this.escapeFilter = escapeFilter; + this.filterReuseEnabled = escapeFilter; } @Override public void visit(RelNode node, int ordinal, RelNode parent) { if (node instanceof CommonPhysicalTableSourceScan) { CommonPhysicalTableSourceScan scan = (CommonPhysicalTableSourceScan) node; - String digest = getDigest(scan, true, escapeFilter); + String digest = getDigest(scan, true, filterReuseEnabled); digestToReusableScans.computeIfAbsent(digest, k -> new ArrayList<>()).add(scan); // If the scan has input such as dpp dynamic scan node, so also need to consider the // input From e4e4ff2f06588f2a1058b0c3be825d794f710d96 Mon Sep 17 00:00:00 2001 From: Daniel Rossos Date: Wed, 22 Apr 2026 18:03:29 -0400 Subject: [PATCH 6/6] Updated logic for empty filter + warning about using config option --- .../api/config/OptimizerConfigOptions.java | 14 +++- .../table/planner/plan/reuse/ScanReuser.java | 73 ++++++++----------- 2 files changed, 43 insertions(+), 44 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index 6d1617c6b51cb..29ec5091f2f08 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -112,10 +112,18 @@ public class OptimizerConfigOptions { .defaultValue(false) .withDescription( "When it is true, the optimizer will try to reuse table sources that differ " - + "only in their pushed-down filters by moving filters into Calc nodes " - + "and sharing a single source reader. This works only when " + + "only in their pushed-down filters by OR'ing the filters into a " + + "combined predicate and sharing a single source reader. This works " + + "only when " + TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED.key() - + " is true."); + + " is true and the connector supports filter pushdown. Note: it is " + + "not recommended to turn on unless you are aware of possible side " + + "effects. When the connector rejects the combined OR'd predicate, " + + "projection-reuse opportunities within the group may be lost compared " + + "to the default behaviour; no correctness issues arise (baseline reads " + + "continue unchanged), but the optimization's benefit is reduced. " + + "Recommended for connectors that natively accept OR predicates (e.g. " + + "BigQuery, JDBC)."); @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_OPTIMIZER_REUSE_SINK_ENABLED = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java index 54e1980d0c206..c8c7d9621a1a0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuser.java @@ -46,6 +46,7 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexProgramBuilder; import org.apache.calcite.rex.RexShuttle; +import org.apache.calcite.rex.RexUtil; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import java.util.ArrayList; @@ -251,12 +252,19 @@ public List reuseDuplicatedScan(List relNodes) { // If the connector rejects the combined filter, skip this group entirely and revert // source back to non-table reuse behaviour if (mergeFilters) { - Optional combinedFilter = - buildOrFilterSpec(reusableNodes, newSourceType, rexBuilder); - if (combinedFilter.isEmpty()) { - continue; + RexNode combinedFilter = + buildOrFilterPredicate(reusableNodes, newSourceType, rexBuilder); + if (!combinedFilter.isAlwaysTrue()) { // Require merged filter, not full table source + if (!connectorAcceptsFilter( + tableSource, combinedFilter, newSourceType)) { + // Connector rejected the predicate, abort merge and leave each source scan + // independent + continue; + } else { + // Add merged OR'd filter to spec + specs.add(new FilterPushDownSpec(List.of(combinedFilter), false)); + } } - specs.add(combinedFilter.get()); } // 2.5 Create a new ScanTableSource. ScanTableSource can not be pushed down twice. @@ -340,30 +348,31 @@ private static void collectFilterReferencedColumns( } for (RexNode pred : fs.getPredicates()) { for (RexInputRef ref : FlinkRexUtil.findAllInputRefs(pred)) { - allProjectFieldSet.add(new int[] {ref.getIndex()}); + allProjectFieldSet.add(new int[]{ref.getIndex()}); } } } } /** - * OR all table scan filter into a combined FilterPushDownSpec for the unified source. Calls - * applyFilters on a copy of source to verify source connector accepts filter. Returns empty if - * any scan has no filter or the connector rejects the OR. + * OR all per-scan filter predicates into a single combined predicate for the unified source. + * Any scan without a filter contributes a TRUE literal; after simplification the OR collapses + * to TRUE, meaning no effective filter. expandSearch rewrites Sarg back into standard OR/IN + * so connectors can parse. The caller is responsible for validating the predicate against the + * connector and deciding whether to push it. */ - private Optional buildOrFilterSpec( + private RexNode buildOrFilterPredicate( List scans, RowType newSourceType, RexBuilder rexBuilder) { List perScanFilters = new ArrayList<>(); - // Get on Node per table source scan for (CommonPhysicalTableSourceScan scan : scans) { FilterPushDownSpec fs = getAbilitySpec( scan.tableSourceTable().abilitySpecs(), FilterPushDownSpec.class); if (fs == null || fs.getPredicates().isEmpty()) { - // OR with empty filter -> no filter - return Optional.empty(); + perScanFilters.add(rexBuilder.makeLiteral(true)); + continue; } RexShuttle remap = createFieldNameRemap( @@ -371,32 +380,14 @@ private Optional buildOrFilterSpec( newSourceType.getFieldNames()); perScanFilters.add(andPredicates(fs.getPredicates(), remap, rexBuilder)); } - if (perScanFilters.isEmpty()) { - return Optional.empty(); - } - - // Deduplicate filters — e.g. 3 scans where 2 share the same filter - Set seen = new HashSet<>(); - List distinctFilters = new ArrayList<>(); - for (RexNode f : perScanFilters) { - if (seen.add(f.toString())) { - distinctFilters.add(f); - } - } - RexNode combined = distinctFilters.get(0); - // OR distinct filters (if > 1) together - for (int i = 1; i < distinctFilters.size(); i++) { - combined = - rexBuilder.makeCall(SqlStdOperatorTable.OR, combined, distinctFilters.get(i)); + RexNode combined = perScanFilters.get(0); + for (int i = 1; i < perScanFilters.size(); i++) { + combined = rexBuilder.makeCall(SqlStdOperatorTable.OR, combined, perScanFilters.get(i)); } - - if (!connectorAcceptsFilter( - scans.get(0).tableSourceTable().tableSource(), combined, newSourceType)) { - return Optional.empty(); - } - - return Optional.of(new FilterPushDownSpec(List.of(combined), false)); + combined = FlinkRexUtil.simplify(rexBuilder, combined, RexUtil.EXECUTOR); + combined = FlinkRexUtil.expandSearch(rexBuilder, combined); + return combined; } /** @@ -435,10 +426,10 @@ public RexNode visitInputRef(RexInputRef ref) { // needed because want to remap indicies from filter predicates to our unified scan private static List physicalFieldNames(TableSourceTable source) { return ((RowType) - source.contextResolvedTable() - .getResolvedSchema() - .toPhysicalRowDataType() - .getLogicalType()) + source.contextResolvedTable() + .getResolvedSchema() + .toPhysicalRowDataType() + .getLogicalType()) .getFieldNames(); }