From 60000f37580e2d27d79fa3b62b29da2bca3076db Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Tue, 9 Jun 2026 18:19:50 +0800 Subject: [PATCH 1/2] [CALCITE-7595] Support FILTER clause with window functions --- .../adapter/enumerable/EnumerableWindow.java | 10 +- .../adapter/enumerable/RexImpTable.java | 16 ++ .../apache/calcite/sql/SqlOverOperator.java | 10 +- .../apache/calcite/test/SqlValidatorTest.java | 26 +++- core/src/test/resources/sql/winagg.iq | 139 ++++++++++++++++++ 5 files changed, 194 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableWindow.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableWindow.java index 6197420b05ea..78ecce8821d7 100644 --- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableWindow.java +++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableWindow.java @@ -451,6 +451,7 @@ private static void sampleOfTheGeneratedWindowedAggregate() { hasRows, frameRowCount, partitionRowCount, jDecl, inputPhysTypeFinal); + final RelDataType inputRowType = inputPhysType.getRowType(); final Function> rexArguments = agg -> { List argList = agg.call.getArgList(); List inputTypes = @@ -464,7 +465,7 @@ private static void sampleOfTheGeneratedWindowedAggregate() { return args; }; - implementAdd(aggs, builder7, resultContextBuilder, rexArguments, jDecl); + implementAdd(aggs, builder7, resultContextBuilder, rexArguments, jDecl, inputRowType); BlockStatement forBlock = builder7.toBlock(); // Don't run the aggregate function if current row is excluded @@ -866,7 +867,8 @@ private static void implementAdd(List aggs, final BlockBuilder builder7, final Function frame, final Function> rexArguments, - final DeclarationStatement jDecl) { + final DeclarationStatement jDecl, + final RelDataType inputRowType) { for (final AggImpState agg : aggs) { final WinAggAddContext addContext = new WinAggAddContextImpl(builder7, requireNonNull(agg.state, "agg.state"), frame) { @@ -879,7 +881,9 @@ private static void implementAdd(List aggs, } @Override public @Nullable RexNode rexFilterArgument() { - return null; // REVIEW + return agg.call.filterArg < 0 + ? null + : RexInputRef.of(agg.call.filterArg, inputRowType); } }; agg.implementor.implementAdd(requireNonNull(agg.context, "agg.context"), addContext); diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java index 549bddbf724d..a71fcf36d0a1 100644 --- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java +++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexImpTable.java @@ -417,6 +417,7 @@ import static org.apache.calcite.sql.fun.SqlStdOperatorTable.EVERY; import static org.apache.calcite.sql.fun.SqlStdOperatorTable.EXP; import static org.apache.calcite.sql.fun.SqlStdOperatorTable.EXTRACT; +import static org.apache.calcite.sql.fun.SqlStdOperatorTable.FILTER; import static org.apache.calcite.sql.fun.SqlStdOperatorTable.FIRST_VALUE; import static org.apache.calcite.sql.fun.SqlStdOperatorTable.FLOOR; import static org.apache.calcite.sql.fun.SqlStdOperatorTable.FUSION; @@ -1250,6 +1251,7 @@ void populate2() { NotJsonImplementor.of( new MethodImplementor(BuiltInMethod.IS_JSON_SCALAR.method, NullPolicy.NONE, false))); + define(FILTER, new FilterImplementor()); } /** Third step of population. */ @@ -5111,4 +5113,18 @@ private static class ReplaceImplementor extends AbstractRexCallImplementor { operand0, operand1, operand2, Expressions.constant(isCaseSensitive)); } } + + /** Implementor for the FILTER operator. */ + private static class FilterImplementor extends AbstractRexCallImplementor { + FilterImplementor() { + super("filter", NullPolicy.NONE, false); + } + + @Override Expression implementSafe(RexToLixTranslator translator, RexCall call, + List argValueList) { + final Expression value = argValueList.get(0); + final Expression condition = argValueList.get(1); + return Expressions.condition(condition, value, NULL_EXPR); + } + } } diff --git a/core/src/main/java/org/apache/calcite/sql/SqlOverOperator.java b/core/src/main/java/org/apache/calcite/sql/SqlOverOperator.java index cc42a9ad0c41..9f49f6402778 100644 --- a/core/src/main/java/org/apache/calcite/sql/SqlOverOperator.java +++ b/core/src/main/java/org/apache/calcite/sql/SqlOverOperator.java @@ -67,6 +67,7 @@ public SqlOverOperator() { switch (aggCall.getKind()) { case RESPECT_NULLS: case IGNORE_NULLS: + case FILTER: validator.validateCall(aggCall, scope); aggCall = aggCall.operand(0); break; @@ -102,7 +103,14 @@ public SqlOverOperator() { SqlNode window = call.operand(1); SqlWindow w = validator.resolveWindow(window, scope); - final SqlCall aggCall = (SqlCall) agg; + SqlCall aggCall = (SqlCall) agg; + // Unwrap FILTER, RESPECT_NULLS, or IGNORE_NULLS to get the actual aggregate call + while (aggCall != null + && (aggCall.getKind() == SqlKind.FILTER + || aggCall.getKind() == SqlKind.RESPECT_NULLS + || aggCall.getKind() == SqlKind.IGNORE_NULLS)) { + aggCall = aggCall.operand(0); + } SqlCallBinding opBinding = new SqlCallBinding(validator, scope, aggCall) { @Override public boolean hasEmptyGroup() { diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java index 5198762c106b..a753d74e299b 100644 --- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java +++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java @@ -3513,11 +3513,31 @@ void testWinPartClause() { * Validator rejects FILTER in OVER windows. */ @Test void testOverFilter() { winSql("SELECT deptno,\n" - + " ^COUNT(DISTINCT deptno) FILTER (WHERE deptno > 10)^\n" + + " COUNT(DISTINCT deptno) FILTER (WHERE deptno > 10)\n" + "OVER win AS agg\n" + "FROM emp\n" - + "WINDOW win AS (PARTITION BY empno)") - .fails("OVER must be applied to aggregate function"); + + "WINDOW win AS (PARTITION BY empno)") + .ok(); + } + + /** Test case for [CALCITE-7595] + * Support FILTER clause with window functions. */ + @Test void testFilterWithOver() { + winSql("SELECT SUM(sal) FILTER (WHERE sal > 100) OVER (PARTITION BY deptno) FROM emp") + .ok(); + } + + @Test void testFilterWithOverAndDistinct() { + winSql("SELECT SUM(DISTINCT sal) FILTER (WHERE sal > 100) OVER (ORDER BY deptno) FROM emp") + .ok(); + } + + @Test void testMultipleFiltersWithOver() { + winSql("SELECT " + + "COUNT(*) FILTER (WHERE empno > 100) OVER (PARTITION BY deptno), " + + "SUM(sal) FILTER (WHERE sal > 0) OVER (PARTITION BY deptno) " + + "FROM emp") + .ok(); } @Test void testOverInOrderBy() { diff --git a/core/src/test/resources/sql/winagg.iq b/core/src/test/resources/sql/winagg.iq index 6a32b3b3f7b0..ecdafb62411f 100644 --- a/core/src/test/resources/sql/winagg.iq +++ b/core/src/test/resources/sql/winagg.iq @@ -1173,4 +1173,143 @@ order by 1; (14 rows) !ok + +# [CALCITE-6442] Support FILTER clause with window functions + +# Test 1: FILTER with OVER on COUNT +select empno, deptno, + count(*) filter (where sal > 1500) over (partition by deptno) as filtered_count +from emp +order by empno; ++-------+--------+----------------+ +| EMPNO | DEPTNO | FILTERED_COUNT | ++-------+--------+----------------+ +| 7369 | 20 | 0 | +| 7566 | 20 | 5 | +| 7788 | 20 | 5 | +| 7876 | 20 | 0 | +| 7902 | 20 | 5 | +| 7782 | 10 | 3 | +| 7839 | 10 | 3 | +| 7934 | 10 | 0 | +| 7499 | 30 | 6 | +| 7521 | 30 | 0 | +| 7654 | 30 | 0 | +| 7698 | 30 | 6 | +| 7844 | 30 | 0 | +| 7900 | 30 | 0 | ++-------+--------+----------------+ +(14 rows) + +!ok + +# Test 2: FILTER with OVER on SUM +select empno, deptno, + sum(sal) filter (where comm is not null) over (partition by deptno) as filtered_sum +from emp +order by empno; ++-------+--------+--------------+ +| EMPNO | DEPTNO | FILTERED_SUM | ++-------+--------+--------------+ +| 7369 | 20 | | +| 7566 | 20 | | +| 7788 | 20 | | +| 7876 | 20 | | +| 7902 | 20 | | +| 7782 | 10 | | +| 7839 | 10 | | +| 7934 | 10 | | +| 7499 | 30 | 9400.00 | +| 7521 | 30 | 9400.00 | +| 7654 | 30 | 9400.00 | +| 7698 | 30 | | +| 7844 | 30 | 9400.00 | +| 7900 | 30 | | ++-------+--------+--------------+ +(14 rows) + +!ok + +# Test 3: FILTER with OVER and DISTINCT +select empno, deptno, + count(distinct sal) filter (where sal > 1000) over (partition by deptno) as filtered_count_distinct +from emp +order by empno; ++-------+--------+-------------------------+ +| EMPNO | DEPTNO | FILTERED_COUNT_DISTINCT | ++-------+--------+-------------------------+ +| 7369 | 20 | 0 | +| 7566 | 20 | 5 | +| 7788 | 20 | 5 | +| 7876 | 20 | 5 | +| 7902 | 20 | 5 | +| 7782 | 10 | 3 | +| 7839 | 10 | 3 | +| 7934 | 10 | 3 | +| 7499 | 30 | 6 | +| 7521 | 30 | 6 | +| 7654 | 30 | 6 | +| 7698 | 30 | 6 | +| 7844 | 30 | 6 | +| 7900 | 30 | 0 | ++-------+--------+-------------------------+ +(14 rows) + +!ok + +# Test 4: Multiple FILTER with OVER on different aggregates +select empno, deptno, + count(*) filter (where sal > 1500) over (partition by deptno) as high_sal_count, + sum(sal) filter (where sal <= 1500) over (partition by deptno) as low_sal_sum +from emp +order by empno; ++-------+--------+----------------+-------------+ +| EMPNO | DEPTNO | HIGH_SAL_COUNT | LOW_SAL_SUM | ++-------+--------+----------------+-------------+ +| 7369 | 20 | 0 | 10875.00 | +| 7566 | 20 | 5 | | +| 7788 | 20 | 5 | | +| 7876 | 20 | 0 | 10875.00 | +| 7902 | 20 | 5 | | +| 7782 | 10 | 3 | | +| 7839 | 10 | 3 | | +| 7934 | 10 | 0 | 8750.00 | +| 7499 | 30 | 6 | | +| 7521 | 30 | 0 | 9400.00 | +| 7654 | 30 | 0 | 9400.00 | +| 7698 | 30 | 6 | | +| 7844 | 30 | 0 | 9400.00 | +| 7900 | 30 | 0 | 9400.00 | ++-------+--------+----------------+-------------+ +(14 rows) + +!ok + +# Test 5: FILTER with OVER and ORDER BY (running window) +select empno, deptno, sal, + sum(sal) filter (where sal > 1000) over (partition by deptno order by empno rows between unbounded preceding and current row) as running_sum +from emp +order by empno; ++-------+--------+---------+-------------+ +| EMPNO | DEPTNO | SAL | RUNNING_SUM | ++-------+--------+---------+-------------+ +| 7369 | 20 | 800.00 | | +| 7566 | 20 | 2975.00 | 3775.00 | +| 7788 | 20 | 3000.00 | 6775.00 | +| 7876 | 20 | 1100.00 | 7875.00 | +| 7902 | 20 | 3000.00 | 10875.00 | +| 7782 | 10 | 2450.00 | 2450.00 | +| 7839 | 10 | 5000.00 | 7450.00 | +| 7934 | 10 | 1300.00 | 8750.00 | +| 7499 | 30 | 1600.00 | 1600.00 | +| 7521 | 30 | 1250.00 | 2850.00 | +| 7654 | 30 | 1250.00 | 4100.00 | +| 7698 | 30 | 2850.00 | 6950.00 | +| 7844 | 30 | 1500.00 | 8450.00 | +| 7900 | 30 | 950.00 | | ++-------+--------+---------+-------------+ +(14 rows) + +!ok + # End winagg.iq From 72f48c2662aac5ac2d61315320621687048e719f Mon Sep 17 00:00:00 2001 From: xuzifu666 <1206332514@qq.com> Date: Mon, 15 Jun 2026 11:12:13 +0800 Subject: [PATCH 2/2] Addressed --- .../calcite/runtime/CalciteResource.java | 3 ++ .../apache/calcite/sql/SqlOverOperator.java | 5 +++ .../apache/calcite/test/SqlValidatorTest.java | 8 ++--- core/src/test/resources/sql/winagg.iq | 31 ++----------------- 4 files changed, 14 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java index 40c680e2ee6c..9f1b18fddafd 100644 --- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java +++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java @@ -428,6 +428,9 @@ ExInst naturalOrUsingColumnNotCompatible(String a0, @BaseMessage("FILTER must be applied to aggregate function") ExInst filterNonAggregate(); + @BaseMessage("DISTINCT is not allowed in window functions") + ExInst distinctNotAllowedInWindowFunction(); + @BaseMessage("Cannot override window attribute") ExInst cannotOverrideWindowAttribute(); diff --git a/core/src/main/java/org/apache/calcite/sql/SqlOverOperator.java b/core/src/main/java/org/apache/calcite/sql/SqlOverOperator.java index 9f49f6402778..954471a02bcd 100644 --- a/core/src/main/java/org/apache/calcite/sql/SqlOverOperator.java +++ b/core/src/main/java/org/apache/calcite/sql/SqlOverOperator.java @@ -77,6 +77,11 @@ public SqlOverOperator() { if (!aggCall.getOperator().isAggregator()) { throw validator.newValidationError(aggCall, RESOURCE.overNonAggregate()); } + // Check that DISTINCT is not used in window functions + if (aggCall.getFunctionQuantifier() != null) { + throw validator.newValidationError(aggCall, + RESOURCE.distinctNotAllowedInWindowFunction()); + } final SqlNode window = call.operand(1); validator.validateWindow(window, scope, aggCall); } diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java index a753d74e299b..1d6f8bbf7f17 100644 --- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java +++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java @@ -3513,11 +3513,11 @@ void testWinPartClause() { * Validator rejects FILTER in OVER windows. */ @Test void testOverFilter() { winSql("SELECT deptno,\n" - + " COUNT(DISTINCT deptno) FILTER (WHERE deptno > 10)\n" + + " ^COUNT(DISTINCT deptno)^ FILTER (WHERE deptno > 10)\n" + "OVER win AS agg\n" + "FROM emp\n" + "WINDOW win AS (PARTITION BY empno)") - .ok(); + .fails("DISTINCT is not allowed in window functions"); } /** Test case for [CALCITE-7595] @@ -3528,8 +3528,8 @@ void testWinPartClause() { } @Test void testFilterWithOverAndDistinct() { - winSql("SELECT SUM(DISTINCT sal) FILTER (WHERE sal > 100) OVER (ORDER BY deptno) FROM emp") - .ok(); + winSql("SELECT ^SUM(DISTINCT sal)^ FILTER (WHERE sal > 100) OVER (ORDER BY deptno) FROM emp") + .fails("DISTINCT is not allowed in window functions"); } @Test void testMultipleFiltersWithOver() { diff --git a/core/src/test/resources/sql/winagg.iq b/core/src/test/resources/sql/winagg.iq index ecdafb62411f..a277ff7b993c 100644 --- a/core/src/test/resources/sql/winagg.iq +++ b/core/src/test/resources/sql/winagg.iq @@ -1230,34 +1230,7 @@ order by empno; !ok -# Test 3: FILTER with OVER and DISTINCT -select empno, deptno, - count(distinct sal) filter (where sal > 1000) over (partition by deptno) as filtered_count_distinct -from emp -order by empno; -+-------+--------+-------------------------+ -| EMPNO | DEPTNO | FILTERED_COUNT_DISTINCT | -+-------+--------+-------------------------+ -| 7369 | 20 | 0 | -| 7566 | 20 | 5 | -| 7788 | 20 | 5 | -| 7876 | 20 | 5 | -| 7902 | 20 | 5 | -| 7782 | 10 | 3 | -| 7839 | 10 | 3 | -| 7934 | 10 | 3 | -| 7499 | 30 | 6 | -| 7521 | 30 | 6 | -| 7654 | 30 | 6 | -| 7698 | 30 | 6 | -| 7844 | 30 | 6 | -| 7900 | 30 | 0 | -+-------+--------+-------------------------+ -(14 rows) - -!ok - -# Test 4: Multiple FILTER with OVER on different aggregates +# Test 3: Multiple FILTER with OVER on different aggregates select empno, deptno, count(*) filter (where sal > 1500) over (partition by deptno) as high_sal_count, sum(sal) filter (where sal <= 1500) over (partition by deptno) as low_sal_sum @@ -1285,7 +1258,7 @@ order by empno; !ok -# Test 5: FILTER with OVER and ORDER BY (running window) +# Test 4: FILTER with OVER and ORDER BY (running window) select empno, deptno, sal, sum(sal) filter (where sal > 1000) over (partition by deptno order by empno rows between unbounded preceding and current row) as running_sum from emp