Skip to content

feat: support SQL aggregate FILTER (WHERE ...) clause in native execution#3835

Open
viirya wants to merge 11 commits intoapache:mainfrom
viirya:feat-aggregate-filter
Open

feat: support SQL aggregate FILTER (WHERE ...) clause in native execution#3835
viirya wants to merge 11 commits intoapache:mainfrom
viirya:feat-aggregate-filter

Conversation

@viirya
Copy link
Copy Markdown
Member

@viirya viirya commented Mar 29, 2026

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

Previously, Comet fell back to Spark for any aggregation containing a FILTER (WHERE ...) clause (e.g. SUM(x) FILTER (WHERE y > 0)).

This patch wires the full pipeline:

Proto (expr.proto):

  • Add optional Expr filter = 89 to AggExpr message

Scala serialization (QueryPlanSerde.scala):

  • In aggExprToProto, serialize aggExpr.filter into the proto when aggExpr.mode == Partial (filters are only meaningful in partial mode)
  • If the filter expression cannot be serialized, fall back gracefully

Native planner (planner.rs):

  • Build per-aggregate filter PhysicalExpr from agg_expr.filter
  • Pass to AggregateExec::try_new instead of vec![None; num_agg]

Comet planner (operators.scala):

  • Remove the blanket fallback guard for aggregate expressions with filter

SumInt and SumDecimal group accumulators:

  • implement opt_filter support
  • Unit tests added for each affected accumulator covering

Tests (aggregate_filter.sql):

  • Update queries from expect_fallback to plain query mode now that native execution is supported; tests verify results match Spark

How are these changes tested?

Unit tests

viirya and others added 4 commits March 28, 2026 17:11
…umulators

Previously, update_batch() in SumIntGroupsAccumulatorLegacy,
SumIntGroupsAccumulatorAnsi, SumIntGroupsAccumulatorTry, and
SumDecimalGroupsAccumulator had debug_assert!/assert! that would panic
in debug mode if opt_filter was non-None, preventing use of SQL
FILTER (WHERE ...) clauses with SUM aggregations.

Each update_batch() inner loop now checks the filter per-row:
- null filter entries are treated as exclude (consistent with SQL semantics)
- false filter entries skip the row
- true filter entries include the row as before

merge_batch() retains debug_assert!(opt_filter.is_none()) since filters
are not meaningful when merging partial aggregate states.

Unit tests added for each affected accumulator covering:
- filter with true/false values across groups
- null filter entries treated as exclude
- no filter (None) still works correctly

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The tests use expect_fallback mode to verify that:
1. Comet correctly falls back to Spark (with message "Aggregate
   expression with filter is not supported") rather than executing
   natively with wrong results
2. Results match Spark's output (correctness guaranteed via fallback)

Tests cover SUM(int), SUM(long), SUM(decimal), COUNT(*) with FILTER,
both with and without GROUP BY, and with NULL values in the data.

Once the Scala-side support is implemented (serializing aggExpr.filter
through proto to the native planner), these tests should be updated
from expect_fallback to plain query mode to verify native execution.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tion

Previously, Comet fell back to Spark for any aggregation containing
a FILTER (WHERE ...) clause (e.g. SUM(x) FILTER (WHERE y > 0)).
The native SumInt/SumDecimal accumulators already received opt_filter
support in the previous commit. This commit wires the full pipeline:

Proto (expr.proto):
- Add optional Expr filter = 89 to AggExpr message

Scala serialization (QueryPlanSerde.scala):
- In aggExprToProto, serialize aggExpr.filter into the proto when
  aggExpr.mode == Partial (filters are only meaningful in partial mode)
- If the filter expression cannot be serialized, fall back gracefully

Native planner (planner.rs):
- Build per-aggregate filter PhysicalExpr from agg_expr.filter
- Pass to AggregateExec::try_new instead of vec![None; num_agg]

Comet planner (operators.scala):
- Remove the blanket fallback guard for aggregate expressions with filter

Tests (aggregate_filter.sql):
- Update queries from expect_fallback to plain query mode now that
  native execution is supported; tests verify results match Spark

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@viirya viirya force-pushed the feat-aggregate-filter branch from 038691b to 523c6f3 Compare March 29, 2026 00:12
viirya and others added 3 commits March 28, 2026 17:16
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…fference

DataFusion and JVM produce slightly different floating-point results for
sum(1/ten) FILTER (WHERE ten > 0) due to different summation order.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
-- Test aggregate operator with codegen on and off.
+
+-- Floating-point precision difference between DataFusion and JVM for FILTER aggregates
+--SET spark.comet.enabled = false
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The float precision difference issue is:

[info] - postgreSQL/aggregates_part3.sql *** FAILED *** (381 milliseconds)
[info]   postgreSQL/aggregates_part3.sql
[info]   Expected "2828.9682539682[954]", but got "2828.9682539682[517]" Result did not match for query #2
[info]   select sum(1/ten) filter (where ten > 0) from tenk1 (SQLQueryTestSuite.scala:683)

Comment on lines -1366 to -1369
if (aggregateExpressions.exists(_.filter.isDefined)) {
withInfo(aggregate, "Aggregate expression with filter is not supported")
return None
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes the guard for all aggregates with FILTER, but the PR only modifies SUM to accept the filter. What happens for other aggregates like AVG?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed it. Modified it now.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to support this for the other aggregate expressions that Comet supports, or is this limited to SUM and AVG?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We update native/core/src/execution/planner.rs to construct aggregate filter expressions and pass to datafusion::physical_plan::aggregates::AggregateExec. AggregateExec will use these filter expressions in runtime to produce the filtering boolean array (i.e., the parameter Option<&arrow::array::BooleanArray>) that will be passed to update_batch call.

So DataFusion has this logic internally in AggregateExec.

We only need to make sure the aggregate expressions implemented by Comet (SumDecimal、SumInt、Avg、AvgDecimal), should apply opt_filter in their update_batch functions. opt_filter was ignored previously because Comet knows it won't pass aggregate filter expressions to AggregateExec.

viirya and others added 4 commits March 29, 2026 08:30
AvgGroupsAccumulator and AvgDecimalGroupsAccumulator implement
GroupsAccumulator directly and must apply opt_filter in update_batch.
Add filter handling matching the pattern in SumDecimal/SumInt.
Add AVG FILTER tests to aggregate_filter.sql.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Decimal AVG in Comet falls back to Spark for the final HashAggregate
due to rounding differences in the cast back to decimal type.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Tests AvgDecimalGroupsAccumulator filter support. Requires
spark.comet.expression.Cast.allowIncompatible=true to allow the
final cast back to decimal to run through Comet natively.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…t fallback

Decimal AVG requires a final cast back to decimal type that differs from
Spark's implementation, causing the final HashAggregate to fall back to
Spark. Use spark_answer_only mode to validate correctness without asserting
full Comet operator coverage.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants