[AURON #1859] Convert math operators to Auron native operators#2167
[AURON #1859] Convert math operators to Auron native operators#2167weiqingy wants to merge 5 commits intoapache:masterfrom
Conversation
…version Implement RexInputRefConverter, the first concrete FlinkRexNodeConverter that converts Calcite RexInputRef (column reference) into Auron native PhysicalExprNode containing PhysicalColumn with name and index. Also fix surefire configuration to discover *Test.java files alongside existing *ITCase.java in the Flink planner module.
…sion Implement RexLiteralConverter that converts Calcite RexLiteral values into Auron native PhysicalExprNode with ScalarValue containing Arrow IPC bytes. Supports numeric types (TINYINT through DOUBLE, DECIMAL), BOOLEAN, CHAR/VARCHAR, and typed NULL. Serializes each literal as a single-element Arrow vector using ArrowStreamWriter, following the Spark NativeConverters pattern. Null literals of unsupported types correctly return false from isSupported().
…ast ops Implement RexCallConverter that converts Calcite RexCall expressions into Auron native PhysicalExprNode with explicit type promotion. Supports binary arithmetic (+,-,*,/,%), unary minus (PhysicalNegativeNode), unary plus (identity passthrough), and CAST (PhysicalTryCastNode). Binary operands are promoted to a common type via getCommonTypeForComparison() following the reviewer PoC pattern: DECIMAL wins, BIGINT for exact types, DOUBLE fallback. Operands and results wrapped in PhysicalTryCastNode when type promotion is needed.
|
Hi @Tartarus0zm, the implementation PR for #1859 is ready for review. To help with the review, I also included:
These markdown files are only for review support and will be removed after the review is complete. Could you please take a look when you have a chance? Thanks! |
There was a problem hiding this comment.
Pull request overview
This PR adds the first concrete Flink→Auron expression converters, translating Calcite RexNode expressions (input refs, literals, and calls) into Auron native PhysicalExprNode protobufs to unblock Flink calc operator integration work (#1857) as part of the broader Flink support track.
Changes:
- Added
RexInputRefConverter,RexLiteralConverter, andRexCallConverterfor column refs, scalar literals (Arrow IPC), and arithmetic/unary/CAST with type promotion. - Added unit tests for the three converters and adjusted surefire includes so
*Test.javaruns in the planner module. - Added design/reviewhelper documentation for AURON-1859.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/RexInputRefConverter.java | Converts RexInputRef to PhysicalColumn nodes. |
| auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/RexLiteralConverter.java | Serializes supported RexLiteral values into Arrow IPC bytes stored in ScalarValue. |
| auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/RexCallConverter.java | Converts arithmetic/unary/CAST RexCall into native physical expression nodes with explicit casts. |
| auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexInputRefConverterTest.java | Unit tests for column reference conversion. |
| auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexLiteralConverterTest.java | Unit tests for supported literal types and unsupported rejection. |
| auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexCallConverterTest.java | Unit tests for arithmetic/unary/CAST, promotion, and nested expressions. |
| auron-flink-extension/auron-flink-planner/pom.xml | Ensures **/*Test.java is included in surefire runs. |
| docs/PR-AURON-1859/AURON-1859-DESIGN.md | Design document describing the intended converter behavior and scope. |
| docs/reviewhelper/AURON-1859/01-rex-input-ref-converter.md | Review helper for commit 1 summary. |
| docs/reviewhelper/AURON-1859/02-rex-literal-converter.md | Review helper for commit 2 summary. |
| docs/reviewhelper/AURON-1859/03-rex-call-converter.md | Review helper for commit 3 summary. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @Override | ||
| public boolean isSupported(RexNode node, ConverterContext context) { | ||
| return true; | ||
| } |
There was a problem hiding this comment.
isSupported() always returns true, but convert() blindly indexes into context.getInputType().getFieldNames() using inputRef.getIndex(). If a RexInputRef index is out of range, this will throw IndexOutOfBoundsException and bypass the factory's "unsupported" path. Consider validating the index against context.getInputType().getFieldCount() in isSupported() (and/or in convert() with a clear exception).
There was a problem hiding this comment.
Resolved in b1749a8: isSupported() now validates inputRef.getIndex() < context.getInputType().getFieldCount(). Added a test for the out-of-range case.
Note: the factory's catch-all in convertRexNode() (line 116-119) would have prevented a crash, but having the check in isSupported() makes the intent explicit and avoids relying on exception-based flow control.
| * @throws IllegalArgumentException if the node is not a {@link RexInputRef} | ||
| */ | ||
| @Override | ||
| public PhysicalExprNode convert(RexNode node, ConverterContext context) { |
There was a problem hiding this comment.
The Javadoc says convert() throws IllegalArgumentException when the node is not a RexInputRef, but the implementation performs an unchecked cast and would throw ClassCastException instead. Either align the Javadoc with the framework guarantee, or add an explicit type check and throw IllegalArgumentException to match the FlinkNodeConverter contract.
| public PhysicalExprNode convert(RexNode node, ConverterContext context) { | |
| public PhysicalExprNode convert(RexNode node, ConverterContext context) { | |
| if (!(node instanceof RexInputRef)) { | |
| throw new IllegalArgumentException( | |
| "RexInputRefConverter can only convert RexInputRef nodes, but got: " | |
| + (node == null ? "null" : node.getClass().getName())); | |
| } |
There was a problem hiding this comment.
The factory dispatches by node.getClass() at FlinkNodeConverterFactory.java:109 — only RexInputRef instances reach this converter. The cast is guaranteed safe by the framework's dispatch contract.
The @throws IllegalArgumentException in the Javadoc inherits from the FlinkNodeConverter interface (line 62), which documents the general contract for all converter implementations. In practice, this exception path is unreachable through normal factory dispatch.
Adding an instanceof check would validate an internal framework invariant on every call — per our project conventions, we validate at system boundaries rather than internal API boundaries. The factory itself is the boundary.
| if (expr.getType().getSqlTypeName().equals(targetType.getSqlTypeName())) { | ||
| return converted; | ||
| } | ||
| return wrapInTryCast(converted, targetType); | ||
| } | ||
|
|
There was a problem hiding this comment.
castIfNecessary() decides whether to wrap a TryCast by comparing only SqlTypeName. This will treat all DECIMALs as the same type even when precision/scale differ, and can also miss other type attributes (nullability, charset/collation). For arithmetic, this can yield a binary expression whose operands have incompatible Arrow types at runtime. Consider comparing full RelDataType equality (or at least DECIMAL precision/scale) and casting when they differ.
| if (expr.getType().getSqlTypeName().equals(targetType.getSqlTypeName())) { | |
| return converted; | |
| } | |
| return wrapInTryCast(converted, targetType); | |
| } | |
| if (hasSameType(expr.getType(), targetType)) { | |
| return converted; | |
| } | |
| return wrapInTryCast(converted, targetType); | |
| } | |
| private static boolean hasSameType(RelDataType sourceType, RelDataType targetType) { | |
| if (sourceType == targetType) { | |
| return true; | |
| } | |
| if (sourceType == null || targetType == null) { | |
| return false; | |
| } | |
| return sourceType.getFullTypeString().equals(targetType.getFullTypeString()); | |
| } |
There was a problem hiding this comment.
The SqlTypeName-only comparison is intentional — castIfNecessary() handles cross-family type promotion (e.g., INT→BIGINT, INT→DECIMAL), not intra-family precision alignment.
For the DECIMAL case: when both operands are DECIMAL with different precision/scale, castIfNecessary correctly skips the cast (both are SqlTypeName.DECIMAL), and DataFusion's binary arithmetic kernels handle precision alignment natively at the Arrow level.
Using full RelDataType equality (e.g., getFullTypeString()) would trigger unnecessary TryCast wrapping for cases like DECIMAL(10,2) vs DECIMAL(10,3), and would also fire on nullability differences (INTEGER NOT NULL vs INTEGER), which is undesirable for arithmetic operand promotion.
This pattern follows the reviewer's PoC code (see issue comment), which uses the same SqlTypeName-level comparison.
| SqlTypeName t1 = type1.getSqlTypeName(); | ||
| SqlTypeName t2 = type2.getSqlTypeName(); | ||
| if (t1 == SqlTypeName.DECIMAL || t2 == SqlTypeName.DECIMAL) { | ||
| return typeFactory.createSqlType(SqlTypeName.DECIMAL); | ||
| } |
There was a problem hiding this comment.
getCommonTypeForComparison() returns typeFactory.createSqlType(DECIMAL) when either operand is DECIMAL, which drops the original precision/scale. That can lead to casts (and Arrow types) that don't match the operands' declared decimal types, potentially changing rounding/overflow behavior. Consider constructing a DECIMAL common type that preserves or derives precision/scale (e.g., max scale and sufficient precision) instead of using the default DECIMAL type.
There was a problem hiding this comment.
The default-precision DECIMAL returned by createSqlType(SqlTypeName.DECIMAL) is used as the promotion target, but crucially:
- When both operands are DECIMAL (different precision),
getCommonTypeForComparisonreturnstype1directly (line 199 —SqlTypeNamematch short-circuits), so neither operand gets a default-precision cast. - For cross-family promotion (e.g., INTEGER + DECIMAL), only the INTEGER operand gets cast to DECIMAL, which is lossless regardless of precision.
- The final output is always cast to
call.getType(), which carries Calcite's correctly computed precision/scale.
The intermediate compatibleType controls which operands get promoted, not the final result precision. This pattern follows the reviewer's PoC code.
| if (SqlTypeUtil.isNumeric(type1) && SqlTypeUtil.isNumeric(type2)) { | ||
| SqlTypeName t1 = type1.getSqlTypeName(); | ||
| SqlTypeName t2 = type2.getSqlTypeName(); | ||
| if (t1 == SqlTypeName.DECIMAL || t2 == SqlTypeName.DECIMAL) { | ||
| return typeFactory.createSqlType(SqlTypeName.DECIMAL); | ||
| } | ||
| if ((t1 == SqlTypeName.BIGINT || t2 == SqlTypeName.BIGINT) && notApproxType(t1) && notApproxType(t2)) { | ||
| return typeFactory.createSqlType(SqlTypeName.BIGINT); | ||
| } | ||
| return typeFactory.createSqlType(SqlTypeName.DOUBLE); | ||
| } |
There was a problem hiding this comment.
For exact integer mixes like TINYINT/SMALLINT/INTEGER (where neither side is BIGINT/DECIMAL and neither is FLOAT/DOUBLE), getCommonTypeForComparison() currently falls back to DOUBLE. This changes arithmetic semantics (compute in floating point, then potentially TryCast back) and is unlikely to match Flink/Calcite's least-restrictive integer promotion rules. Consider promoting exact integer combinations to the widest exact integer type (e.g., INTEGER or BIGINT) rather than DOUBLE.
There was a problem hiding this comment.
Resolved.
The observation is technically correct - mixed exact integer types (e.g., TINYINT + SMALLINT) would fall through to DOUBLE. In practice, this path is unreachable because Calcite/Flink's type coercion inserts explicit CAST nodes to align operands before the arithmetic RexCall reaches our converter.
That said, since the fix is trivial and makes the safety net semantically correct, I've tightened the condition in b1749a8: any two exact integer types now promote to BIGINT instead of falling through to DOUBLE. The change only affects the previously-unreachable mixed-exact-integer path — all other promotion behavior is unchanged.
| **Decision**: Include basic numeric CAST in this PR. Use `PhysicalCastNode` (not `PhysicalTryCastNode`). | ||
|
|
||
| **Rationale for `PhysicalCastNode` over `PhysicalTryCastNode`**: | ||
| - Flink's CAST is strict by default (errors on invalid input) | ||
| - Flink has `TRY_CAST` as a separate function for safe casting | ||
| - Spark uses `TryCast` because Spark's CAST is lenient by default — different semantics | ||
| - For numeric-to-numeric widening (INT→BIGINT), both behave identically anyway | ||
|
|
There was a problem hiding this comment.
The design doc states CAST should be implemented with PhysicalCastNode (and that type coercion can rely on upstream Calcite CASTs), but the actual implementation uses PhysicalTryCastNode and performs additional type promotion in RexCallConverter. Please update the design section to reflect the current implementation choices so future work (#1857+) doesn’t rely on outdated semantics.
There was a problem hiding this comment.
Updated the design doc to reflect the current implementation in b1749a8.
…design doc - RexInputRefConverter.isSupported() now validates index < fieldCount instead of unconditionally returning true (aligns with design doc) - getCommonTypeForComparison() promotes mixed exact integers to BIGINT instead of falling through to DOUBLE (e.g., TINYINT + SMALLINT) - Updated DESIGN.md section 3.3 to reflect PhysicalTryCastNode usage per reviewer PoC (was incorrectly documenting PhysicalCastNode) - Added tests for out-of-range index and exact integer promotion
Which issue does this PR close?
Closes #1859
Rationale for this change
Next step in the Flink integration track (#1264). After #1856 established the converter framework (
FlinkRexNodeConverter,FlinkNodeConverterFactory,ConverterContext), this PR adds the first concrete converter implementations that translate Flink/CalciteRexNodeexpressions into Auron nativePhysicalExprNodeprotobuf representations. Required before #1857 (FlinkAuronCalcOperator) can wire them into the execution pipeline.What changes are included in this PR?
Three new
FlinkRexNodeConverterimplementations (3 commits, 6 new files, 1 modified):Commit 1 —
RexInputRefConverterRexInputRefcolumn references toPhysicalColumn(name + index)ConverterContextCommit 2 —
RexLiteralConverterRexLiteralscalar values toScalarValuewith Arrow IPC bytesArrowStreamWriter→ipc_bytesfieldCommit 3 —
RexCallConverterRexCalloperator expressions dispatched bySqlKind+,-,*,/,%→PhysicalBinaryExprNodewith explicit type promotion-→PhysicalNegativeNode,+→ identity passthroughPhysicalTryCastNodegetCommonTypeForComparison()+castIfNecessary()(per reviewer PoC)FlinkNodeConverterFactorypom.xml — Added
**/*Test.javato surefire includes for the planner moduleKey design decisions (per @Tartarus0zm PoC in #1859 (comment)):
PhysicalTryCastNode(notPhysicalCastNode) for all type castsFlink_NullIfZero)Are there any user-facing changes?
No.
How was this patch tested?
33 unit tests across 3 test classes:
RexInputRefConverterTest— 4 tests (node class, isSupported, first/second column)RexLiteralConverterTest— 12 tests (all supported types, null literal, unsupported type rejection)RexCallConverterTest— 17 tests (all 5 arithmetic ops, unary minus/plus, cast, mixed-type promotion, output-type cast, nested expressions, isSupported checks,getCommonTypeForComparisondirect tests)