From 3258bfde922655b5278347dcd1b916f93f931ca4 Mon Sep 17 00:00:00 2001 From: Stepan Stepanishchev Date: Wed, 25 Mar 2026 17:28:38 +0700 Subject: [PATCH] [FLINK-39324][table-planner] Allow MultiJoin to respect IS_NOT_DISTINCT_FROM when extracting join keys --- .../planner/plan/utils/MultiJoinUtil.java | 2 +- .../exec/stream/MultiJoinSemanticTests.java | 3 +- .../exec/stream/MultiJoinTestPrograms.java | 50 +++++++++++++++++++ .../plan/stream/sql/MultiJoinTest.java | 15 ++++++ .../planner/plan/stream/sql/MultiJoinTest.xml | 37 ++++++++++++++ 5 files changed, 105 insertions(+), 2 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java index 59a69599381d9..65aff4ba5d0da 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/MultiJoinUtil.java @@ -70,7 +70,7 @@ private static void extractEqualityConditions( final RexCall call = (RexCall) condition; final SqlKind kind = call.getOperator().getKind(); - if (kind != SqlKind.EQUALS) { + if (kind != SqlKind.EQUALS && kind != SqlKind.IS_NOT_DISTINCT_FROM) { // Only conjunctions (AND) can contain equality conditions that are valid for multijoin. // All other condition types are deferred to the postJoinFilter. if (kind == SqlKind.AND) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java index 706e512a3de3e..f5088a3eb080f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java @@ -55,6 +55,7 @@ public List programs() { MultiJoinTestPrograms.MULTI_JOIN_TWO_WAY_INNER_JOIN_WITH_WHERE_IN, MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_MULTI_KEY_TYPES, MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_MIXED_JOIN_MULTI_KEY_TYPES_SHUFFLED, - MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_HINT); + MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_HINT, + MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_IS_NOT_DISTINCT); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java index 9e3e7fa763139..b8be7eefaa29b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java @@ -133,6 +133,56 @@ public class MultiJoinTestPrograms { + "INNER JOIN Payments p ON u.user_id = p.user_id") .build(); + public static final TableTestProgram MULTI_JOIN_THREE_WAY_INNER_JOIN_WITH_IS_NOT_DISTINCT = + TableTestProgram.of( + "three-way-inner-join-with-is-not-distinct", + "three way inner join with is not distinct") + .setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true) + .setupTableSource( + SourceTestStep.newBuilder("Users") + .addSchema("user_id STRING", "name STRING") + .producedValues( + Row.ofKind(RowKind.INSERT, "1", "Gus"), + Row.ofKind(RowKind.INSERT, "2", "Bob"), + Row.ofKind(RowKind.INSERT, null, "Steve")) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("Orders") + .addSchema("user_id STRING", "order_id STRING") + .producedValues( + Row.ofKind(RowKind.INSERT, "1", "order1"), + Row.ofKind(RowKind.INSERT, "2", "order2"), + Row.ofKind(RowKind.INSERT, null, "order3")) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("Payments") + .addSchema("user_id STRING", "payment_id STRING") + .producedValues( + Row.ofKind(RowKind.INSERT, "1", "payment1"), + Row.ofKind(RowKind.INSERT, "2", "payment2"), + Row.ofKind(RowKind.INSERT, null, "payment3")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "user_id STRING", + "name STRING", + "order_id STRING", + "payment_id STRING") + .consumedValues( + "+I[1, Gus, order1, payment1]", + "+I[2, Bob, order2, payment2]", + "+I[null, Steve, order3, payment3]") + .testMaterializedData() + .build()) + .runSql( + "INSERT INTO sink " + + "SELECT u.user_id, u.name, o.order_id, p.payment_id " + + "FROM Users u " + + "INNER JOIN Orders o ON u.user_id IS NOT DISTINCT FROM o.user_id " + + "INNER JOIN Payments p ON o.user_id IS NOT DISTINCT FROM p.user_id") + .build(); + public static final TableTestProgram MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_WHERE = TableTestProgram.of( "three-way-inner-join-with-where", diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java index fc387fddeb01f..d2076b7b42f2b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.java @@ -193,6 +193,21 @@ void testThreeWayInnerJoinRelPlan() { + " ON u.user_id = p.user_id"); } + @Test + void testThreeWayInnerJoinWithIsNotDistinctRelPlan() { + util.verifyRelPlan( + "\nSELECT\n" + + " u.user_id,\n" + + " u.name,\n" + + " o.order_id,\n" + + " p.payment_id\n" + + "FROM Users u\n" + + "INNER JOIN Orders o\n" + + " ON u.user_id IS NOT DISTINCT FROM o.user_id\n" + + "INNER JOIN Payments p\n" + + " ON u.user_id = p.user_id"); + } + @Test @Tag("no-common-join-key") void testThreeWayInnerJoinRelPlanNoCommonJoinKey() { diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml index 3a5e888ca25c4..2bede58dd6efc 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/MultiJoinTest.xml @@ -2097,6 +2097,43 @@ Calc(select=[user_id, name, order_id, payment_id]) : +- TableSourceScan(table=[[default_catalog, default_database, Orders, project=[order_id, user_id], metadata=[]]], fields=[order_id, user_id]) +- Exchange(distribution=[hash[price]]) +- TableSourceScan(table=[[default_catalog, default_database, Payments, project=[payment_id, price], metadata=[]]], fields=[payment_id, price]) +]]> + + + + + + + + + + +