Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 73 additions & 7 deletions core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexOver;
import org.apache.calcite.rex.RexShuttle;
import org.apache.calcite.rex.RexSubQuery;
import org.apache.calcite.rex.RexUtil;
Expand Down Expand Up @@ -1619,15 +1620,26 @@
new TreeMap<>(inputFrame.corDefOutputs);

final Collection<CorRef> corVarList = cm.mapRefRelToCorRef.get(rel);
// Track only correlation variables that are not already produced by the
// input frame. Existing outputs still need to be carried forward, but they
// should not force an extra value generator.
final List<CorRef> missingCorVarList = new ArrayList<>();
for (CorRef correlation : corVarList) {
if (!corDefOutputs.containsKey(correlation.def())) {
missingCorVarList.add(correlation);
}
}

// Try to populate correlation variables using local fields.
// Try to populate missing correlation variables using local fields.
// This means that we do not need a value generator.
if (rel instanceof Filter) {
NavigableMap<CorDef, Integer> map = new TreeMap<>();
List<RexNode> projects = new ArrayList<>();
for (CorRef correlation : corVarList) {
for (CorRef correlation : missingCorVarList) {
final CorDef def = correlation.def();
if (corDefOutputs.containsKey(def) || map.containsKey(def)) {
if (map.containsKey(def)) {
// The same correlation definition may be referenced more than once;

Check warning on line 1641 in core/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

This block of commented-out lines of code should be removed.

See more on https://sonarcloud.io/project/issues?id=apache_calcite&issues=AZ6bgVCjiIVtV7kfoqM1&open=AZ6bgVCjiIVtV7kfoqM1&pullRequest=4997
// one output slot is enough.
continue;
}
try {
Expand All @@ -1651,9 +1663,9 @@
}
}
}
// If all correlation variables are now satisfied, skip creating a value
// generator.
if (map.size() == corVarList.size()) {
// If all missing correlation variables are now satisfied, skip creating a
// value generator.
if (map.size() == missingCorVarList.size()) {
map.putAll(inputFrame.corDefOutputs);
final RelNode r;
if (!projects.isEmpty()) {
Expand All @@ -1667,7 +1679,10 @@
}
}

return createFrameWithValueGenerator(rel.getInput(0), inputFrame, corVarList, corDefOutputs);
// Fall back to a value generator for correlation variables that could not
// be derived from local fields.
return createFrameWithValueGenerator(rel.getInput(0), inputFrame,
missingCorVarList, corDefOutputs);
}

/**
Expand Down Expand Up @@ -2487,6 +2502,57 @@
return fieldAccess;
}

/**
* Window operators are decorrelated similarly to aggregates. A correlated
* window expression is evaluated once per outer-row binding before
* decorrelation; after decorrelation, outer references are represented as
* ordinary input fields. Therefore, add those fields to the window partition
* keys so that the window function is still evaluated independently for
* each outer-row binding.
*
* <p>Implementation based on: Improving Unnesting of Complex Queries
*
* <p>3.3 Unnesting Rules
* (https://dl.gi.de/server/api/core/bitstreams/c1918e8c-6a87-4da2-930a-bfed289f2388/content)
*/
@Override public RexNode visitOver(RexOver over) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to conflict with #4998, so let's merge the other one first if it turns out to be right, so we can see the final shape of this function

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the two fixes are related but apply to different shuttles. This PR updates visitOver in DecorrelateRexShuttle, while #4998 updates visitOver in RemoveCorrelationRexShuttle.

I also verified that if RemoveCorrelationForScalarAggregateRule is disabled in removeCorrelationViaRule(), the #4998 fix does not seem to be needed. So #4998 appears to address the rule-based decorrelation path, whereas this PR addresses the DecorrelateRexShuttle path.

final RexOver newOver = (RexOver) super.visitOver(over);
final List<RexNode> partitionKeys = new ArrayList<>(newOver.getWindow().partitionKeys);
boolean update = newOver != over;
int newInputOutputOffset = 0;
for (RelNode input : currentRel.getInputs()) {
final Frame frame = map.get(input);
if (frame == null) {
// Inputs without a decorrelation frame keep their original field layout.
newInputOutputOffset += input.getRowType().getFieldCount();
continue;
}
for (Integer newInputPos : frame.corDefOutputs.values()) {
// Correlation variables become regular input fields after decorrelation.
// Add them to the window partition keys to preserve the original
// per-correlate evaluation scope.
final RexInputRef ref =
new RexInputRef(newInputOutputOffset + newInputPos,
frame.r.getRowType().getFieldList()
.get(newInputPos).getType());
if (!partitionKeys.contains(ref)) {
partitionKeys.add(ref);
update = true;
}
}
newInputOutputOffset += frame.r.getRowType().getFieldCount();
}
if (!update) {
return over;
}
return currentRel.getCluster().getRexBuilder().makeOver(
newOver.getParserPosition(), newOver.getType(), newOver.getAggOperator(),
newOver.getOperands(), partitionKeys, newOver.getWindow().orderKeys,
newOver.getWindow().getLowerBound(), newOver.getWindow().getUpperBound(),
newOver.getWindow().getExclude(), newOver.getWindow().isRows(), true, false,
newOver.isDistinct(), newOver.ignoreNulls());
}

@Override public RexNode visitInputRef(RexInputRef inputRef) {
final RexInputRef ref = getNewForOldInputRef(currentRel, map, inputRef);
if (ref.getIndex() == inputRef.getIndex()
Expand Down
15 changes: 3 additions & 12 deletions core/src/test/java/org/apache/calcite/test/JdbcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2583,18 +2583,9 @@ void checkMultisetQueryWithSingleColumn() {
CalciteAssert.that()
.with(CalciteAssert.Config.REGULAR)
.query(sql)
.returnsUnordered("name=Bill; deptno=10; M=190",
"name=Bill; deptno=30; M=190",

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"name=Bill; deptno=40; M=190",
"name=Eric; deptno=10; M=240",
"name=Eric; deptno=30; M=240",
"name=Eric; deptno=40; M=240",
"name=Sebastian; deptno=10; M=190",
"name=Sebastian; deptno=30; M=190",
"name=Sebastian; deptno=40; M=190",
"name=Theodore; deptno=10; M=190",
"name=Theodore; deptno=30; M=190",
"name=Theodore; deptno=40; M=190");
.returnsUnordered("name=Bill; deptno=10; M=110",
"name=Sebastian; deptno=10; M=160",
"name=Theodore; deptno=10; M=120");
}

/** Per SQL std, UNNEST is implicitly LATERAL. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7050,7 +7050,7 @@ LogicalProject(EMPNO=[$0], ENAME=[$1], JOB=[$2], MGR=[$3], HIREDATE=[$4], SAL=[$
LogicalAggregate(group=[{0}], agg#0=[MIN($1)])
LogicalProject(DEPTNO0=[$2], $f0=[true])
LogicalFilter(condition=[$1])
LogicalProject(NAME=[$1], QualifyExpression=[=(RANK() OVER (PARTITION BY $1 ORDER BY $0 DESC), $2)], DEPTNO0=[$2])
LogicalProject(NAME=[$1], QualifyExpression=[=(RANK() OVER (PARTITION BY $1, $2 ORDER BY $0 DESC), $2)], DEPTNO0=[$2])
LogicalJoin(condition=[true], joinType=[inner])
LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
LogicalAggregate(group=[{0}])
Expand Down
Loading
Loading