4949import org .apache .flink .table .planner .plan .nodes .exec .utils .ExecNodeUtil ;
5050import org .apache .flink .table .planner .plan .schema .TableSourceTable ;
5151import org .apache .flink .table .planner .plan .utils .DeltaJoinUtil ;
52+ import org .apache .flink .table .planner .plan .utils .FunctionCallUtil ;
5253import org .apache .flink .table .planner .plan .utils .FunctionCallUtil .AsyncOptions ;
5354import org .apache .flink .table .planner .plan .utils .KeySelectorUtil ;
5455import org .apache .flink .table .planner .plan .utils .LookupJoinUtil ;
6061import org .apache .flink .table .runtime .operators .join .FlinkJoinType ;
6162import org .apache .flink .table .runtime .operators .join .deltajoin .AsyncDeltaJoinRunner ;
6263import org .apache .flink .table .runtime .operators .join .deltajoin .BinaryLookupHandler ;
64+ import org .apache .flink .table .runtime .operators .join .deltajoin .CascadedLookupHandler ;
65+ import org .apache .flink .table .runtime .operators .join .deltajoin .DeltaJoinHandlerBase ;
6366import org .apache .flink .table .runtime .operators .join .deltajoin .DeltaJoinHandlerChain ;
6467import org .apache .flink .table .runtime .operators .join .deltajoin .DeltaJoinRuntimeTree ;
6568import org .apache .flink .table .runtime .operators .join .deltajoin .LookupHandlerBase ;
69+ import org .apache .flink .table .runtime .operators .join .deltajoin .TailOutputDataHandler ;
6670import org .apache .flink .table .runtime .typeutils .InternalSerializers ;
6771import org .apache .flink .table .runtime .typeutils .InternalTypeInfo ;
6872import org .apache .flink .table .types .logical .RowType ;
@@ -499,6 +503,7 @@ protected Transformation<RowData> translateToPlanInternal(
499503
500504 private static LookupHandlerBase generateLookupHandler (
501505 boolean isBinaryLookup ,
506+ @ Nullable Integer id , // used for debug. `null` if it is a binary lookup
502507 DeltaJoinLookupChain .Node node ,
503508 Map <Integer , GeneratedFunction <AsyncFunction <RowData , Object >>>
504509 generatedFetcherCollector ,
@@ -507,6 +512,9 @@ private static LookupHandlerBase generateLookupHandler(
507512 FlinkTypeFactory typeFactory ,
508513 ClassLoader classLoader ,
509514 ExecNodeConfig config ) {
515+ Preconditions .checkArgument (
516+ isBinaryLookup == (id == null ), "Id should be null if it is binary lookup" );
517+
510518 final int [] sourceInputOrdinals = node .inputTableBinaryInputOrdinals ;
511519 final int lookupTableOrdinal = node .lookupTableBinaryInputOrdinal ;
512520 final RowType sourceStreamType =
@@ -584,8 +592,61 @@ private static LookupHandlerBase generateLookupHandler(
584592 node .lookupTableBinaryInputOrdinal );
585593 }
586594
587- // TODO FLINK-39233 Support cascaded delta join in runtime
588- throw new IllegalStateException ("Support later" );
595+ final RowType lookupResultPassThroughCalcRowType ;
596+ if (node .isLeftLookupRight ()) {
597+ lookupResultPassThroughCalcRowType =
598+ combineOutputRowType (
599+ sourceStreamType ,
600+ lookupSidePassThroughCalcRowType ,
601+ node .joinType ,
602+ typeFactory );
603+ } else {
604+ lookupResultPassThroughCalcRowType =
605+ combineOutputRowType (
606+ lookupSidePassThroughCalcRowType ,
607+ sourceStreamType ,
608+ swapJoinType (node .joinType ),
609+ typeFactory );
610+ }
611+
612+ GeneratedFilterCondition generatedRemainingCondition =
613+ node .deltaJoinSpec
614+ .getRemainingCondition ()
615+ .map (
616+ remainCond ->
617+ FilterCodeGenerator .generateFilterCondition (
618+ config ,
619+ planner .getFlinkContext ().getClassLoader (),
620+ remainCond ,
621+ lookupResultPassThroughCalcRowType ,
622+ GENERATED_JOIN_CONDITION_CLASS_NAME ))
623+ .orElse (null );
624+
625+ final RowDataKeySelector streamSideLookupKeySelector =
626+ KeySelectorUtil .getRowDataSelector (
627+ classLoader ,
628+ lookupKeysOnInputSide .stream ()
629+ .mapToInt (
630+ key -> {
631+ Preconditions .checkState (
632+ key instanceof FunctionCallUtil .FieldRef );
633+ return ((FunctionCallUtil .FieldRef ) key ).index ;
634+ })
635+ .toArray (),
636+ InternalTypeInfo .of (sourceStreamType ));
637+
638+ return new CascadedLookupHandler (
639+ id ,
640+ TypeConversions .fromLogicalToDataType (sourceStreamType ),
641+ lookupSideGeneratedFetcherWithType .dataType (),
642+ TypeConversions .fromLogicalToDataType (lookupSidePassThroughCalcRowType ),
643+ InternalSerializers .create (lookupSidePassThroughCalcRowType ),
644+ lookupSideGeneratedCalc ,
645+ generatedRemainingCondition ,
646+ streamSideLookupKeySelector ,
647+ node .inputTableBinaryInputOrdinals ,
648+ node .lookupTableBinaryInputOrdinal ,
649+ node .isLeftLookupRight ());
589650 }
590651
591652 private static RowDataKeySelector getUpsertKeySelector (
@@ -600,23 +661,6 @@ private static RowDataKeySelector getUpsertKeySelector(
600661 classLoader , finalUpsertKeys , InternalTypeInfo .of (rowType ));
601662 }
602663
603- private boolean enableCache (ReadableConfig config ) {
604- return config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED );
605- }
606-
607- /** Get the left cache size and right size. */
608- private Tuple2 <Long , Long > getCacheSize (ReadableConfig config ) {
609- long leftCacheSize =
610- config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE );
611- long rightCacheSize =
612- config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE );
613- if ((leftCacheSize <= 0 || rightCacheSize <= 0 ) && enableCache (config )) {
614- throw new IllegalArgumentException (
615- "Cache size in delta join must be positive when enabling cache." );
616- }
617- return Tuple2 .of (leftCacheSize , rightCacheSize );
618- }
619-
620664 private abstract static class DeltaJoinOperatorFactoryBuilder {
621665 protected final PlannerBase planner ;
622666 protected final ExecNodeConfig config ;
@@ -651,6 +695,23 @@ public DeltaJoinOperatorFactoryBuilder(
651695 }
652696
653697 protected abstract StreamOperatorFactory <RowData > build ();
698+
699+ /** Get the left cache size and right size. */
700+ protected Tuple2 <Long , Long > getCacheSize (ReadableConfig config ) {
701+ long leftCacheSize =
702+ config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE );
703+ long rightCacheSize =
704+ config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE );
705+ if ((leftCacheSize <= 0 || rightCacheSize <= 0 ) && enableCache (config )) {
706+ throw new IllegalArgumentException (
707+ "Cache size in delta join must be positive when enabling cache." );
708+ }
709+ return Tuple2 .of (leftCacheSize , rightCacheSize );
710+ }
711+
712+ protected boolean enableCache (ReadableConfig config ) {
713+ return config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED );
714+ }
654715 }
655716
656717 private class DeltaJoinOperatorFactoryBuilderV1 extends DeltaJoinOperatorFactoryBuilder {
@@ -798,6 +859,7 @@ private DeltaJoinHandlerChain buildBinaryLookupHandlerChain(
798859 Collections .singletonList (
799860 generateLookupHandler (
800861 true , // isBinaryLookup
862+ null ,
801863 node ,
802864 generatedFetcherCollector ,
803865 deltaJoinTree ,
@@ -926,9 +988,10 @@ public StreamOperatorFactory<RowData> build() {
926988 Map <Integer , GeneratedFunction <AsyncFunction <RowData , Object >>>
927989 generatedFetcherCollector = new HashMap <>();
928990 DeltaJoinHandlerChain left2RightHandlerChain =
929- generateDeltaJoinHandlerChain (true , generatedFetcherCollector );
991+ generateDeltaJoinHandlerChain (true , leftStreamType , generatedFetcherCollector );
930992 DeltaJoinHandlerChain right2LeftHandlerChain =
931- generateDeltaJoinHandlerChain (false , generatedFetcherCollector );
993+ generateDeltaJoinHandlerChain (
994+ false , rightStreamType , generatedFetcherCollector );
932995 Preconditions .checkState (
933996 generatedFetcherCollector .size ()
934997 == leftAllBinaryInputOrdinals .size ()
@@ -1008,6 +1071,7 @@ public StreamOperatorFactory<RowData> build() {
10081071
10091072 private DeltaJoinHandlerChain generateDeltaJoinHandlerChain (
10101073 boolean lookupRight ,
1074+ RowType streamRowType ,
10111075 Map <Integer , GeneratedFunction <AsyncFunction <RowData , Object >>>
10121076 generatedFetcherCollector ) {
10131077 int [] streamOwnedSourceOrdinals =
@@ -1029,6 +1093,7 @@ private DeltaJoinHandlerChain generateDeltaJoinHandlerChain(
10291093 Collections .singletonList (
10301094 generateLookupHandler (
10311095 true , // isBinaryLookup
1096+ null , // debug id
10321097 nodes .get (0 ),
10331098 generatedFetcherCollector ,
10341099 deltaJoinTree ,
@@ -1039,7 +1104,40 @@ private DeltaJoinHandlerChain generateDeltaJoinHandlerChain(
10391104 streamOwnedSourceOrdinals );
10401105 }
10411106
1042- throw new UnsupportedOperationException ("Support cascaded delta join operator later" );
1107+ final List <DeltaJoinHandlerBase > lookupJoinHandlers = new ArrayList <>();
1108+
1109+ // build delta join handler chain
1110+ for (int i = 0 ; i < nodes .size (); i ++) {
1111+ DeltaJoinLookupChain .Node node = nodes .get (i );
1112+ LookupHandlerBase lookupHandler =
1113+ generateLookupHandler (
1114+ false , // isBinaryLookup
1115+ i + 1 , // debug id
1116+ node ,
1117+ generatedFetcherCollector ,
1118+ deltaJoinTree ,
1119+ planner ,
1120+ typeFactory ,
1121+ classLoader ,
1122+ config );
1123+ lookupJoinHandlers .add (lookupHandler );
1124+ }
1125+ List <Integer > lookupSideAllBinaryInputOrdinals =
1126+ lookupRight ? rightAllBinaryInputOrdinals : leftAllBinaryInputOrdinals ;
1127+ int lookupSideTableOffset = lookupRight ? leftAllBinaryInputOrdinals .size () : 0 ;
1128+ lookupJoinHandlers .add (
1129+ new TailOutputDataHandler (
1130+ lookupSideAllBinaryInputOrdinals .stream ()
1131+ .mapToInt (i -> i + lookupSideTableOffset )
1132+ .toArray ()));
1133+
1134+ Preconditions .checkArgument (
1135+ streamRowType .getFieldCount ()
1136+ == deltaJoinTree
1137+ .getOutputRowTypeOnNode (streamOwnedSourceOrdinals , typeFactory )
1138+ .getFieldCount ());
1139+
1140+ return DeltaJoinHandlerChain .build (lookupJoinHandlers , streamOwnedSourceOrdinals );
10431141 }
10441142
10451143 private Set <Set <Integer >> getAllDrivenInputsWhenLookup (boolean lookupRight ) {
0 commit comments