Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -137,18 +138,44 @@ private static RelDataType inferRowType(SqlOperatorBinding opBinding) {
final RelDataTypeField timeAttributeField = inputRowType.getField(timeField, false, false);
assert timeAttributeField != null;
return inferRowType(
callBinding.getTypeFactory(), inputRowType, timeAttributeField.getType());
callBinding.getTypeFactory(),
inputRowType,
timeAttributeField.getType(),
timeAttributeField.getIndex());
}

/**
* Infers the row type of a window TVF call.
*
* <p>The output is the row type of the input table plus the {@code window_start}, {@code
* window_end} and {@code window_time} columns. Following FLIP-145, the original rowtime
* attribute column (the one referenced by the time descriptor at {@code timeColumnIndex}) is
* materialized to a regular timestamp, so that {@code window_time} becomes the only rowtime
* attribute after applying the window TVF (see FLINK-39899). Processing-time descriptor columns
* are kept as-is.
*/
public static RelDataType inferRowType(
RelDataTypeFactory typeFactory,
RelDataType inputRowType,
RelDataType timeAttributeType) {
return typeFactory
.builder()
.kind(inputRowType.getStructKind())
.addAll(inputRowType.getFieldList())
.add("window_start", SqlTypeName.TIMESTAMP, 3)
RelDataType timeAttributeType,
int timeColumnIndex) {
final RelDataTypeFactory.Builder builder =
typeFactory.builder().kind(inputRowType.getStructKind());
final List<RelDataTypeField> inputFields = inputRowType.getFieldList();
for (int i = 0; i < inputFields.size(); i++) {
final RelDataTypeField field = inputFields.get(i);
if (i == timeColumnIndex && FlinkTypeFactory.isRowtimeIndicatorType(field.getType())) {
// materialize the original rowtime attribute to a regular timestamp
builder.add(
field.getName(),
typeFactory.createTypeWithNullability(
((TimeIndicatorRelDataType) field.getType()).originalType(),
field.getType().isNullable()));
} else {
builder.add(field);
}
}
return builder.add("window_start", SqlTypeName.TIMESTAMP, 3)
.add("window_end", SqlTypeName.TIMESTAMP, 3)
.add("window_time", typeFactory.createTypeWithNullability(timeAttributeType, false))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,14 @@ public void onMatch(RelOptRuleCall call) {
scan.getRowType().getFieldCount(), scanInputFieldCount, toPushFields);

// 3. create new window table function scan
// the original time attribute column is shifted to a new position on the pushed-down input
int newTimeColumnIndex = mapping.get(windowingStrategy.getTimeAttributeIndex());
LogicalTableFunctionScan newScan =
createNewTableFunctionScan(
relBuilder,
scan,
windowingStrategy.getTimeAttributeType(),
newTimeColumnIndex,
newScanInput,
mapping,
toPushFields);
Expand Down Expand Up @@ -155,6 +158,7 @@ private LogicalTableFunctionScan createNewTableFunctionScan(
RelBuilder relBuilder,
LogicalTableFunctionScan oldScan,
LogicalType timeAttributeType,
int timeColumnIndex,
RelNode newInput,
Map<Integer, Integer> mapping,
ImmutableBitSet toPushFields) {
Expand All @@ -165,7 +169,8 @@ private LogicalTableFunctionScan createNewTableFunctionScan(
SqlWindowTableFunction.inferRowType(
typeFactory,
newInput.getRowType(),
typeFactory.createFieldTypeFromLogicalType(timeAttributeType));
typeFactory.createFieldTypeFromLogicalType(timeAttributeType),
timeColumnIndex);
RexNode newCall =
rewriteWindowCall(
(RexCall) oldScan.getCall(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,16 @@ public void onMatch(RelOptRuleCall call) {
// -------------------------------------------------------------------------
// 3. Apply WindowTVF on the new Expand node
// -------------------------------------------------------------------------
// the time attribute ref is appended
int timeAttributeOnExpand =
timeFieldAdded ? newExpand.getRowType().getFieldCount() - 1 : newTimeField;
RelDataType newOutputType =
SqlWindowTableFunction.inferRowType(
typeFactory,
newExpand.getRowType(),
typeFactory.createFieldTypeFromLogicalType(
windowTVF.windowing().getTimeAttributeType()));
// the time attribute ref is appended
int timeAttributeOnExpand =
timeFieldAdded ? newExpand.getRowType().getFieldCount() - 1 : newTimeField;
windowTVF.windowing().getTimeAttributeType()),
timeAttributeOnExpand);
TimeAttributeWindowingStrategy newWindowing =
new TimeAttributeWindowingStrategy(
windowTVF.windowing().getWindow(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public class RelWindowProperties {
private final ImmutableBitSet windowStartColumns;
private final ImmutableBitSet windowEndColumns;
private final ImmutableBitSet windowTimeColumns;

/**
* The columns that hold the window's original time attribute (the column referenced by the
* window TVF time descriptor). After applying the window TVF this column is materialized to a
* regular timestamp (see FLINK-39899), so it can no longer be recognized as a time attribute by
* its type; we track its position here so that window operators (e.g. window deduplicate) can
* still identify it.
*/
private final ImmutableBitSet timeAttributeColumns;

private final WindowSpec windowSpec;
private final LogicalType timeAttributeType;

Expand All @@ -49,6 +59,7 @@ public class RelWindowProperties {
ImmutableBitSet windowStartColumns,
ImmutableBitSet windowEndColumns,
ImmutableBitSet windowTimeColumns,
ImmutableBitSet timeAttributeColumns,
WindowSpec windowSpec,
LogicalType timeAttributeType) {
if (windowStartColumns.isEmpty() || windowEndColumns.isEmpty()) {
Expand All @@ -59,6 +70,7 @@ public class RelWindowProperties {
windowStartColumns,
windowEndColumns,
windowTimeColumns,
timeAttributeColumns,
windowSpec,
timeAttributeType);
}
Expand All @@ -68,6 +80,7 @@ private RelWindowProperties(
ImmutableBitSet windowStartColumns,
ImmutableBitSet windowEndColumns,
ImmutableBitSet windowTimeColumns,
ImmutableBitSet timeAttributeColumns,
WindowSpec windowSpec,
LogicalType timeAttributeType) {
checkArgument(
Expand All @@ -77,18 +90,21 @@ private RelWindowProperties(
this.windowStartColumns = checkNotNull(windowStartColumns);
this.windowEndColumns = checkNotNull(windowEndColumns);
this.windowTimeColumns = checkNotNull(windowTimeColumns);
this.timeAttributeColumns = checkNotNull(timeAttributeColumns);
this.windowSpec = checkNotNull(windowSpec);
this.timeAttributeType = checkNotNull(timeAttributeType);
}

public @Nullable RelWindowProperties copy(
ImmutableBitSet windowStartColumns,
ImmutableBitSet windowEndColumns,
ImmutableBitSet windowTimeColumns) {
ImmutableBitSet windowTimeColumns,
ImmutableBitSet timeAttributeColumns) {
return create(
windowStartColumns,
windowEndColumns,
windowTimeColumns,
timeAttributeColumns,
windowSpec,
this.timeAttributeType);
}
Expand All @@ -97,11 +113,13 @@ private RelWindowProperties(
ImmutableBitSet windowStartColumns,
ImmutableBitSet windowEndColumns,
ImmutableBitSet windowTimeColumns,
ImmutableBitSet timeAttributeColumns,
WindowSpec windowSpec) {
return create(
windowStartColumns,
windowEndColumns,
windowTimeColumns,
timeAttributeColumns,
windowSpec,
this.timeAttributeType);
}
Expand All @@ -118,6 +136,10 @@ public ImmutableBitSet getWindowTimeColumns() {
return windowTimeColumns;
}

public ImmutableBitSet getTimeAttributeColumns() {
return timeAttributeColumns;
}

public ImmutableBitSet getWindowColumns() {
return windowStartColumns.union(windowEndColumns).union(windowTimeColumns);
}
Expand Down Expand Up @@ -151,6 +173,8 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
// timeAttributeColumns is intentionally excluded (like timeAttributeType): it is derived
// deterministically from the same window, so it cannot differ when the other fields match.
return Objects.hash(windowStartColumns, windowEndColumns, windowTimeColumns, windowSpec);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W
transformColumnIndex(childProps.getWindowStartColumns, mapInToOutPos),
transformColumnIndex(childProps.getWindowEndColumns, mapInToOutPos),
transformColumnIndex(childProps.getWindowTimeColumns, mapInToOutPos),
transformColumnIndex(childProps.getTimeAttributeColumns, mapInToOutPos),
updateWindowSpec(windowSpec, mapInToOutPos)
)
}
Expand Down Expand Up @@ -209,7 +210,9 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W
val starts = inferWindowPropertyAfterExpand(inputWindowProperties.getWindowStartColumns)
val ends = inferWindowPropertyAfterExpand(inputWindowProperties.getWindowEndColumns)
val times = inferWindowPropertyAfterExpand(inputWindowProperties.getWindowTimeColumns)
inputWindowProperties.copy(starts, ends, times)
val timeAttributes = inferWindowPropertyAfterExpand(
inputWindowProperties.getTimeAttributeColumns)
inputWindowProperties.copy(starts, ends, times, timeAttributes)
}

def getWindowProperties(rel: Exchange, mq: RelMetadataQuery): RelWindowProperties = {
Expand All @@ -229,7 +232,9 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W
val starts = properties.map(_.getWindowStartColumns).reduce((l, r) => l.intersect(r))
val ends = properties.map(_.getWindowEndColumns).reduce((l, r) => l.intersect(r))
val times = properties.map(_.getWindowTimeColumns).reduce((l, r) => l.intersect(r))
properties.head.copy(starts, ends, times)
val timeAttributes =
properties.map(_.getTimeAttributeColumns).reduce((l, r) => l.intersect(r))
properties.head.copy(starts, ends, times, timeAttributes)
} else {
// window properties is lost if windows are not equal
null
Expand All @@ -246,6 +251,7 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W
ImmutableBitSet.of(fieldCount - 3),
ImmutableBitSet.of(fieldCount - 2),
ImmutableBitSet.of(fieldCount - 1),
ImmutableBitSet.of(windowingStrategy.getTimeAttributeIndex),
windowingStrategy.getWindow,
windowingStrategy.getTimeAttributeType
)
Expand Down Expand Up @@ -274,11 +280,16 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W
.intersect(grouping)
.map(grouping.indexOf(_))
.toList
val timeAttributeColumns = windowProperties.getTimeAttributeColumns
.intersect(grouping)
.map(grouping.indexOf(_))
.toList

RelWindowProperties.create(
ImmutableBitSet.of(startColumns: _*),
ImmutableBitSet.of(endColumns: _*),
ImmutableBitSet.of(timeColumns: _*),
ImmutableBitSet.of(timeAttributeColumns: _*),
windowProperties.getWindowSpec,
windowProperties.getTimeAttributeType
)
Expand All @@ -292,6 +303,7 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W
ImmutableBitSet.of(fieldCount - 3),
ImmutableBitSet.of(fieldCount - 2),
ImmutableBitSet.of(fieldCount - 1),
ImmutableBitSet.of(rel.windowing.getTimeAttributeIndex),
rel.windowing.getWindow,
rel.windowing.getTimeAttributeType
)
Expand Down Expand Up @@ -344,6 +356,9 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W
ImmutableBitSet.of(starts: _*),
ImmutableBitSet.of(ends: _*),
ImmutableBitSet.of(times: _*),
// the original time attribute column is consumed by the window aggregate; only window_time
// remains as the time attribute of the aggregate output
ImmutableBitSet.of(),
windowSpec,
timeAttributeType
)
Expand Down Expand Up @@ -454,7 +469,10 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W
val timeColumns = inferWindowPropertyAfterWindowJoin(
leftWindowProperties.getWindowTimeColumns,
rightWindowProperties.getWindowTimeColumns)
leftWindowProperties.copy(startColumns, endColumns, timeColumns)
val timeAttributeColumns = inferWindowPropertyAfterWindowJoin(
leftWindowProperties.getTimeAttributeColumns,
rightWindowProperties.getTimeAttributeColumns)
leftWindowProperties.copy(startColumns, endColumns, timeColumns, timeAttributeColumns)
}

def getWindowProperties(hepRelVertex: HepRelVertex, mq: RelMetadataQuery): RelWindowProperties = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StreamPhysicalWindowDeduplicateRule(config: Config) extends ConverterRule(
val windowProperties = fmq.getRelWindowProperties(rank.getInput)
val partitionKey = rank.partitionKey
WindowUtil.groupingContainsWindowStartEnd(partitionKey, windowProperties) &&
RankUtil.canConvertToDeduplicate(rank)
RankUtil.canConvertToDeduplicate(rank, windowProperties)
}

override def convert(rel: RelNode): RelNode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class StreamPhysicalWindowRankRule(config: Config) extends ConverterRule(config)
val windowProperties = fmq.getRelWindowProperties(rank.getInput)
val partitionKey = rank.partitionKey
WindowUtil.groupingContainsWindowStartEnd(partitionKey, windowProperties) &&
!RankUtil.canConvertToDeduplicate(rank)
!RankUtil.canConvertToDeduplicate(rank, windowProperties)
}

override def convert(rel: RelNode): RelNode = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.ExpressionReducer
import org.apache.flink.table.planner.plan.`trait`.RelWindowProperties
import org.apache.flink.table.planner.plan.nodes.calcite.Rank
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalRank
import org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalRank, StreamPhysicalWindowDeduplicate}
Expand Down Expand Up @@ -339,7 +340,9 @@ object RankUtil {
* @return
* True if the input rank could be converted to [[StreamPhysicalWindowDeduplicate]]
*/
def canConvertToDeduplicate(rank: FlinkLogicalRank): Boolean = {
def canConvertToDeduplicate(
rank: FlinkLogicalRank,
windowProperties: RelWindowProperties): Boolean = {
val sortCollation = rank.orderKey
val rankRange = rank.rankRange

Expand All @@ -351,10 +354,28 @@ object RankUtil {
case _ => false
}

val inputRowType = rank.getInput.getRowType
val isSortOnTimeAttribute = sortOnTimeAttributeOnly(sortCollation, inputRowType)
val isSortOnWindowTimeAttribute =
sortOnWindowTimeAttributeColumn(sortCollation, windowProperties)

!rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute && isRowNumberType
!rank.outputRankNumber && isLimit1 && isSortOnWindowTimeAttribute && isRowNumberType
}

/**
* Window deduplication must sort on the window's original time attribute column, i.e. the column
* referenced by the window TVF time descriptor (tracked in
* [[RelWindowProperties.getTimeAttributeColumns]]). Sorting on window_time is forbidden because
* window_time is constant within a window and cannot distinguish the first/last row. After
* FLINK-39899 the original time attribute is materialized to a regular timestamp once it leaves
* the window TVF, so we identify it by column position instead of by time-indicator type.
*/
private def sortOnWindowTimeAttributeColumn(
sortCollation: RelCollation,
windowProperties: RelWindowProperties): Boolean = {
if (windowProperties == null || sortCollation.getFieldCollations.size() != 1) {
return false
}
val firstSortField = sortCollation.getFieldCollations.get(0)
windowProperties.getTimeAttributeColumns.get(firstSortField.getFieldIndex)
}

private def sortOnTimeAttributeOnly(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ GROUP BY window_start, window_end]]>
LogicalProject(EXPR$0=[$2], window_start=[$0])
+- LogicalAggregate(group=[{0, 1}], EXPR$0=[SUM($2)])
+- LogicalProject(window_start=[$2], window_end=[$3], i=[$0])
+- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0), DESCRIPTOR(_UTF-16LE'ts'), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER i, TIMESTAMP_LTZ(3) *ROWTIME* ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *ROWTIME* window_time)])
+- LogicalTableFunctionScan(invocation=[TUMBLE(TABLE(#0), DESCRIPTOR(_UTF-16LE'ts'), 600000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER i, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) ts, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *ROWTIME* window_time)])
+- LogicalProject(i=[$0], ts=[$1])
+- LogicalProject(i=[$0], ts=[$1])
+- LogicalWatermarkAssigner(rowtime=[ts], watermark=[$1])
Expand Down
Loading