Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.utils.DeltaJoinUtil;
import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
import org.apache.flink.table.planner.plan.utils.FunctionCallUtil.AsyncOptions;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil;
Expand All @@ -60,9 +61,12 @@
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.deltajoin.AsyncDeltaJoinRunner;
import org.apache.flink.table.runtime.operators.join.deltajoin.BinaryLookupHandler;
import org.apache.flink.table.runtime.operators.join.deltajoin.CascadedLookupHandler;
import org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinHandlerBase;
import org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinHandlerChain;
import org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinRuntimeTree;
import org.apache.flink.table.runtime.operators.join.deltajoin.LookupHandlerBase;
import org.apache.flink.table.runtime.operators.join.deltajoin.TailOutputDataHandler;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -499,6 +503,7 @@ protected Transformation<RowData> translateToPlanInternal(

private static LookupHandlerBase generateLookupHandler(
boolean isBinaryLookup,
@Nullable Integer id, // used for debug. `null` if it is a binary lookup
DeltaJoinLookupChain.Node node,
Map<Integer, GeneratedFunction<AsyncFunction<RowData, Object>>>
generatedFetcherCollector,
Expand All @@ -507,6 +512,9 @@ private static LookupHandlerBase generateLookupHandler(
FlinkTypeFactory typeFactory,
ClassLoader classLoader,
ExecNodeConfig config) {
Preconditions.checkArgument(
isBinaryLookup == (id == null), "Id should be null if it is binary lookup");

final int[] sourceInputOrdinals = node.inputTableBinaryInputOrdinals;
final int lookupTableOrdinal = node.lookupTableBinaryInputOrdinal;
final RowType sourceStreamType =
Expand Down Expand Up @@ -584,8 +592,64 @@ private static LookupHandlerBase generateLookupHandler(
node.lookupTableBinaryInputOrdinal);
}

// TODO FLINK-39233 Support cascaded delta join in runtime
throw new IllegalStateException("Support later");
final RowType lookupResultPassThroughCalcRowType;
if (node.isLeftLookupRight()) {
lookupResultPassThroughCalcRowType =
combineOutputRowType(
sourceStreamType,
lookupSidePassThroughCalcRowType,
node.joinType,
typeFactory);
} else {
lookupResultPassThroughCalcRowType =
combineOutputRowType(
lookupSidePassThroughCalcRowType,
sourceStreamType,
swapJoinType(node.joinType),
typeFactory);
}

GeneratedFilterCondition generatedRemainingCondition =
node.deltaJoinSpec
.getRemainingCondition()
.map(
remainCond ->
FilterCodeGenerator.generateFilterCondition(
config,
planner.getFlinkContext().getClassLoader(),
remainCond,
lookupResultPassThroughCalcRowType,
GENERATED_JOIN_CONDITION_CLASS_NAME))
.orElse(null);

final RowDataKeySelector streamSideLookupKeySelector =
KeySelectorUtil.getRowDataSelector(
classLoader,
lookupKeysOnInputSide.stream()
.mapToInt(
key -> {
Preconditions.checkState(
key instanceof FunctionCallUtil.FieldRef,
"Currently, delta join only supports to use field "
+ "reference as lookup key, but found %s",
key.getClass().getName());
return ((FunctionCallUtil.FieldRef) key).index;
})
.toArray(),
InternalTypeInfo.of(sourceStreamType));

return new CascadedLookupHandler(
id,
TypeConversions.fromLogicalToDataType(sourceStreamType),
lookupSideGeneratedFetcherWithType.dataType(),
TypeConversions.fromLogicalToDataType(lookupSidePassThroughCalcRowType),
InternalSerializers.create(lookupSidePassThroughCalcRowType),
lookupSideGeneratedCalc,
generatedRemainingCondition,
streamSideLookupKeySelector,
node.inputTableBinaryInputOrdinals,
node.lookupTableBinaryInputOrdinal,
node.isLeftLookupRight());
}

private static RowDataKeySelector getUpsertKeySelector(
Expand All @@ -600,23 +664,6 @@ private static RowDataKeySelector getUpsertKeySelector(
classLoader, finalUpsertKeys, InternalTypeInfo.of(rowType));
}

private boolean enableCache(ReadableConfig config) {
return config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED);
}

/** Get the left cache size and right size. */
private Tuple2<Long, Long> getCacheSize(ReadableConfig config) {
long leftCacheSize =
config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE);
long rightCacheSize =
config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE);
if ((leftCacheSize <= 0 || rightCacheSize <= 0) && enableCache(config)) {
throw new IllegalArgumentException(
"Cache size in delta join must be positive when enabling cache.");
}
return Tuple2.of(leftCacheSize, rightCacheSize);
}

private abstract static class DeltaJoinOperatorFactoryBuilder {
protected final PlannerBase planner;
protected final ExecNodeConfig config;
Expand Down Expand Up @@ -651,6 +698,23 @@ public DeltaJoinOperatorFactoryBuilder(
}

protected abstract StreamOperatorFactory<RowData> build();

/** Get the left cache size and right size. */
protected Tuple2<Long, Long> getCacheSize(ReadableConfig config) {
long leftCacheSize =
config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE);
long rightCacheSize =
config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE);
if ((leftCacheSize <= 0 || rightCacheSize <= 0) && enableCache(config)) {
throw new IllegalArgumentException(
"Cache size in delta join must be positive when enabling cache.");
}
return Tuple2.of(leftCacheSize, rightCacheSize);
}

protected boolean enableCache(ReadableConfig config) {
return config.get(ExecutionConfigOptions.TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED);
}
}

private class DeltaJoinOperatorFactoryBuilderV1 extends DeltaJoinOperatorFactoryBuilder {
Expand Down Expand Up @@ -798,6 +862,7 @@ private DeltaJoinHandlerChain buildBinaryLookupHandlerChain(
Collections.singletonList(
generateLookupHandler(
true, // isBinaryLookup
null,
node,
generatedFetcherCollector,
deltaJoinTree,
Expand Down Expand Up @@ -926,9 +991,10 @@ public StreamOperatorFactory<RowData> build() {
Map<Integer, GeneratedFunction<AsyncFunction<RowData, Object>>>
generatedFetcherCollector = new HashMap<>();
DeltaJoinHandlerChain left2RightHandlerChain =
generateDeltaJoinHandlerChain(true, generatedFetcherCollector);
generateDeltaJoinHandlerChain(true, leftStreamType, generatedFetcherCollector);
DeltaJoinHandlerChain right2LeftHandlerChain =
generateDeltaJoinHandlerChain(false, generatedFetcherCollector);
generateDeltaJoinHandlerChain(
false, rightStreamType, generatedFetcherCollector);
Preconditions.checkState(
generatedFetcherCollector.size()
== leftAllBinaryInputOrdinals.size()
Expand Down Expand Up @@ -1008,6 +1074,7 @@ public StreamOperatorFactory<RowData> build() {

private DeltaJoinHandlerChain generateDeltaJoinHandlerChain(
boolean lookupRight,
RowType streamRowType,
Map<Integer, GeneratedFunction<AsyncFunction<RowData, Object>>>
generatedFetcherCollector) {
int[] streamOwnedSourceOrdinals =
Expand All @@ -1029,6 +1096,7 @@ private DeltaJoinHandlerChain generateDeltaJoinHandlerChain(
Collections.singletonList(
generateLookupHandler(
true, // isBinaryLookup
null, // debug id
nodes.get(0),
generatedFetcherCollector,
deltaJoinTree,
Expand All @@ -1039,7 +1107,40 @@ private DeltaJoinHandlerChain generateDeltaJoinHandlerChain(
streamOwnedSourceOrdinals);
}

throw new UnsupportedOperationException("Support cascaded delta join operator later");
final List<DeltaJoinHandlerBase> lookupJoinHandlers = new ArrayList<>();

// build delta join handler chain
for (int i = 0; i < nodes.size(); i++) {
DeltaJoinLookupChain.Node node = nodes.get(i);
LookupHandlerBase lookupHandler =
generateLookupHandler(
false, // isBinaryLookup
i + 1, // debug id
node,
generatedFetcherCollector,
deltaJoinTree,
planner,
typeFactory,
classLoader,
config);
lookupJoinHandlers.add(lookupHandler);
}
List<Integer> lookupSideAllBinaryInputOrdinals =
lookupRight ? rightAllBinaryInputOrdinals : leftAllBinaryInputOrdinals;
int lookupSideTableOffset = lookupRight ? leftAllBinaryInputOrdinals.size() : 0;
lookupJoinHandlers.add(
new TailOutputDataHandler(
lookupSideAllBinaryInputOrdinals.stream()
.mapToInt(i -> i + lookupSideTableOffset)
.toArray()));

Preconditions.checkArgument(
streamRowType.getFieldCount()
== deltaJoinTree
.getOutputRowTypeOnNode(streamOwnedSourceOrdinals, typeFactory)
.getFieldCount());

return DeltaJoinHandlerChain.build(lookupJoinHandlers, streamOwnedSourceOrdinals);
}

private Set<Set<Integer>> getAllDrivenInputsWhenLookup(boolean lookupRight) {
Expand Down
Loading