Skip to content

[FLINK-39324][table-planner] Allow MultiJoin to respect IS_NOT_DISTICT_FROM when extracting join keys#27827

Open
SteveStevenpoor wants to merge 1 commit intoapache:masterfrom
SteveStevenpoor:FLINK-39324
Open

[FLINK-39324][table-planner] Allow MultiJoin to respect IS_NOT_DISTICT_FROM when extracting join keys#27827
SteveStevenpoor wants to merge 1 commit intoapache:masterfrom
SteveStevenpoor:FLINK-39324

Conversation

@SteveStevenpoor
Copy link
Contributor

What is the purpose of the change

Example

Consider the following query:

INSERT INTO ResultSink
SELECT u.user_id, u.name, o.order_id, p.payment_id, p.price
FROM Users u
  JOIN Orders o 
    ON u.user_id IS NOT DISTINCT FROM o.user_id
  JOIN Payments p 
    ON o.user_id IS NOT DISTINCT FROM p.user_id

Before fix:

MultiJoin fails to extract a common join key, resulting in two MultiJoin nodes and singleton distribution:

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id, name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
   +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT FROM(user_id0, user_id1)], select=[user_id,name,order_id,user_id0,payment_id,price,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
      :- Exchange(distribution=[single])
      :  +- MultiJoin(commonJoinKey=[noCommonJoinKey], joinTypes=[INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT FROM(user_id, user_id0)], select=[user_id,name,order_id,user_id0], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0)])
      :     :- Exchange(distribution=[single])
      :     :  +- Calc(select=[user_id, name])
      :     :     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
      :     :        +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id, name, cash])
      :     +- Exchange(distribution=[single])
      :        +- Calc(select=[order_id, user_id])
      :           +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
      :              +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, user_id, product])
      +- Exchange(distribution=[single])
         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
            +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id])

After fix:

Joins are flattened into a single MultiJoin node:

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.ResultSink], fields=[user_id, name, order_id, payment_id, price])
+- Calc(select=[user_id, name, order_id, payment_id, price])
   +- MultiJoin(commonJoinKey=[user_id], joinTypes=[INNER, INNER], inputUniqueKeys=[noUniqueKey, noUniqueKey, noUniqueKey], joinConditions=[IS NOT DISTINCT FROM(user_id, user_id0), IS NOT DISTINCT FROM(user_id0, user_id1)], select=[user_id,name,order_id,user_id0,payment_id,price,user_id1], rowType=[RecordType(VARCHAR(2147483647) user_id, VARCHAR(2147483647) name, VARCHAR(2147483647) order_id, VARCHAR(2147483647) user_id0, VARCHAR(2147483647) payment_id, INTEGER price, VARCHAR(2147483647) user_id1)])
      :- Exchange(distribution=[hash[user_id]])
      :  +- Calc(select=[user_id, name])
      :     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
      :        +- TableSourceScan(table=[[default_catalog, default_database, Users]], fields=[user_id, name, cash])
      :- Exchange(distribution=[hash[user_id]])
      :  +- Calc(select=[order_id, user_id])
      :     +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
      :        +- TableSourceScan(table=[[default_catalog, default_database, Orders]], fields=[order_id, user_id, product])
      +- Exchange(distribution=[hash[user_id]])
         +- MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])
            +- TableSourceScan(table=[[default_catalog, default_database, Payments]], fields=[payment_id, price, user_id])

Brief change log

  • Little addition to join attribute map creation
  • Added some tests

Verifying this change

This change added tests and can be verified as follows:

  • Added test with IS_NOT_DISTINCT_FROM to MultiJoinTest
  • Added test with IS_NOT_DISTINCT_FROM to MultiJoinSemanticTests

Also some tests with IS_NOT_DISTINCT_FROM will be available after FLINK-38551 is merged.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 25, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants