Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions auron-flink-extension/auron-flink-planner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@
<configuration>
<includes>
<include>**/*ITCase.java</include>
<include>**/*Test.java</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.table.planner.converter;

import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import org.apache.auron.flink.utils.SchemaConverters;
import org.apache.auron.protobuf.PhysicalBinaryExprNode;
import org.apache.auron.protobuf.PhysicalExprNode;
import org.apache.auron.protobuf.PhysicalNegativeNode;
import org.apache.auron.protobuf.PhysicalTryCastNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.logical.LogicalType;

/**
* Converts a Calcite {@link RexCall} (operator expression) to an Auron native
* {@link PhysicalExprNode}.
*
* <p>Handles arithmetic operators ({@code +}, {@code -}, {@code *}, {@code /},
* {@code %}), unary minus/plus, and {@code CAST}. Binary arithmetic operands
* are promoted to a common type before conversion, and the result is cast to
* the output type if it differs from the common type.
*/
public class RexCallConverter implements FlinkRexNodeConverter {

private static final RelDataTypeFactory TYPE_FACTORY = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);

/** Binary arithmetic kinds that require numeric result type. */
private static final Set<SqlKind> BINARY_ARITHMETIC_KINDS =
EnumSet.of(SqlKind.PLUS, SqlKind.MINUS, SqlKind.TIMES, SqlKind.DIVIDE, SqlKind.MOD);

/** All supported SqlKinds including unary and cast. */
private static final Set<SqlKind> SUPPORTED_KINDS = EnumSet.of(
SqlKind.PLUS,
SqlKind.MINUS,
SqlKind.TIMES,
SqlKind.DIVIDE,
SqlKind.MOD,
SqlKind.MINUS_PREFIX,
SqlKind.PLUS_PREFIX,
SqlKind.CAST);

private final FlinkNodeConverterFactory factory;

/**
* Creates a new converter that delegates operand conversion to the given
* factory.
*
* @param factory the factory used for recursive operand conversion
*/
public RexCallConverter(FlinkNodeConverterFactory factory) {
this.factory = factory;
}

/** {@inheritDoc} */
@Override
public Class<? extends RexNode> getNodeClass() {
return RexCall.class;
}

/**
* Returns {@code true} if the call's {@link SqlKind} is supported.
*
* <p>For binary arithmetic kinds, the call's result type must also be
* numeric to reject non-arithmetic uses (e.g., TIMESTAMP + INTERVAL).
*/
@Override
public boolean isSupported(RexNode node, ConverterContext context) {
RexCall call = (RexCall) node;
SqlKind kind = call.getKind();
if (!SUPPORTED_KINDS.contains(kind)) {
return false;
}
if (BINARY_ARITHMETIC_KINDS.contains(kind)) {
return SqlTypeUtil.isNumeric(call.getType());
}
return true;
}

/**
* Converts the given {@link RexCall} to a native {@link PhysicalExprNode}.
*
* <p>Dispatches by {@link SqlKind}:
* <ul>
* <li>Binary arithmetic → {@link PhysicalBinaryExprNode} with type
* promotion
* <li>{@code MINUS_PREFIX} → {@link PhysicalNegativeNode}
* <li>{@code PLUS_PREFIX} → identity (passthrough to operand)
* <li>{@code CAST} → {@link PhysicalTryCastNode}
* </ul>
*
* @throws IllegalArgumentException if the SqlKind is not supported
*/
@Override
public PhysicalExprNode convert(RexNode node, ConverterContext context) {
RexCall call = (RexCall) node;
SqlKind kind = call.getKind();
switch (kind) {
case PLUS:
return buildBinaryExpr(call, "Plus", context);
case MINUS:
return buildBinaryExpr(call, "Minus", context);
case TIMES:
return buildBinaryExpr(call, "Multiply", context);
case DIVIDE:
return buildBinaryExpr(call, "Divide", context);
case MOD:
return buildBinaryExpr(call, "Modulo", context);
case MINUS_PREFIX:
return buildNegative(call, context);
case PLUS_PREFIX:
return convertOperand(call.getOperands().get(0), context);
case CAST:
return buildTryCast(call, context);
default:
throw new IllegalArgumentException("Unsupported SqlKind: " + kind);
}
}

/**
* Builds a binary expression with type promotion between operands.
*
* <p>Operands are promoted to a common type. If the call's output type
* differs from the common type, the result is wrapped in a TryCast.
*/
private PhysicalExprNode buildBinaryExpr(RexCall call, String op, ConverterContext context) {
RexNode left = call.getOperands().get(0);
RexNode right = call.getOperands().get(1);
RelDataType outputType = call.getType();

RelDataType compatibleType = getCommonTypeForComparison(left.getType(), right.getType(), TYPE_FACTORY);
if (compatibleType == null) {
throw new IllegalStateException("Incompatible types: "
+ left.getType().getSqlTypeName()
+ " and "
+ right.getType().getSqlTypeName());
}

PhysicalExprNode leftExpr = castIfNecessary(left, compatibleType, context);
PhysicalExprNode rightExpr = castIfNecessary(right, compatibleType, context);

PhysicalExprNode binaryExpr = PhysicalExprNode.newBuilder()
.setBinaryExpr(PhysicalBinaryExprNode.newBuilder()
.setL(leftExpr)
.setR(rightExpr)
.setOp(op))
.build();

if (!outputType.getSqlTypeName().equals(compatibleType.getSqlTypeName())) {
return wrapInTryCast(binaryExpr, outputType);
}
return binaryExpr;
}

/**
* Computes the common type for two operand types during arithmetic
* promotion.
*
* <p>Rules:
* <ul>
* <li>Same type → return as-is
* <li>Both numeric: DECIMAL wins; BIGINT wins over smaller integers
* (when neither is approximate); otherwise DOUBLE
* <li>Both character → VARCHAR
* <li>Otherwise → {@code null} (incompatible)
* </ul>
*
* @param type1 left operand type
* @param type2 right operand type
* @param typeFactory factory for creating result types
* @return the common type, or {@code null} if incompatible
*/
static RelDataType getCommonTypeForComparison(
RelDataType type1, RelDataType type2, RelDataTypeFactory typeFactory) {
if (type1.getSqlTypeName().equals(type2.getSqlTypeName())) {
return type1;
}
if (SqlTypeUtil.isNumeric(type1) && SqlTypeUtil.isNumeric(type2)) {
SqlTypeName t1 = type1.getSqlTypeName();
SqlTypeName t2 = type2.getSqlTypeName();
if (t1 == SqlTypeName.DECIMAL || t2 == SqlTypeName.DECIMAL) {
return typeFactory.createSqlType(SqlTypeName.DECIMAL);
}
Comment on lines +203 to +207
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCommonTypeForComparison() returns typeFactory.createSqlType(DECIMAL) when either operand is DECIMAL, which drops the original precision/scale. That can lead to casts (and Arrow types) that don't match the operands' declared decimal types, potentially changing rounding/overflow behavior. Consider constructing a DECIMAL common type that preserves or derives precision/scale (e.g., max scale and sufficient precision) instead of using the default DECIMAL type.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default-precision DECIMAL returned by createSqlType(SqlTypeName.DECIMAL) is used as the promotion target, but crucially:

  1. When both operands are DECIMAL (different precision), getCommonTypeForComparison returns type1 directly (line 199 — SqlTypeName match short-circuits), so neither operand gets a default-precision cast.
  2. For cross-family promotion (e.g., INTEGER + DECIMAL), only the INTEGER operand gets cast to DECIMAL, which is lossless regardless of precision.
  3. The final output is always cast to call.getType(), which carries Calcite's correctly computed precision/scale.

The intermediate compatibleType controls which operands get promoted, not the final result precision. This pattern follows the reviewer's PoC code.

if (notApproxType(t1) && notApproxType(t2)) {
return typeFactory.createSqlType(SqlTypeName.BIGINT);
}
return typeFactory.createSqlType(SqlTypeName.DOUBLE);
}
Comment on lines +202 to +212
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For exact integer mixes like TINYINT/SMALLINT/INTEGER (where neither side is BIGINT/DECIMAL and neither is FLOAT/DOUBLE), getCommonTypeForComparison() currently falls back to DOUBLE. This changes arithmetic semantics (compute in floating point, then potentially TryCast back) and is unlikely to match Flink/Calcite's least-restrictive integer promotion rules. Consider promoting exact integer combinations to the widest exact integer type (e.g., INTEGER or BIGINT) rather than DOUBLE.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

@weiqingy weiqingy Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved.

The observation is technically correct - mixed exact integer types (e.g., TINYINT + SMALLINT) would fall through to DOUBLE. In practice, this path is unreachable because Calcite/Flink's type coercion inserts explicit CAST nodes to align operands before the arithmetic RexCall reaches our converter.

That said, since the fix is trivial and makes the safety net semantically correct, I've tightened the condition in b1749a8: any two exact integer types now promote to BIGINT instead of falling through to DOUBLE. The change only affects the previously-unreachable mixed-exact-integer path — all other promotion behavior is unchanged.

if (SqlTypeUtil.inCharFamily(type1) && SqlTypeUtil.inCharFamily(type2)) {
return typeFactory.createSqlType(SqlTypeName.VARCHAR);
}
return null;
}

private static boolean notApproxType(SqlTypeName typeName) {
return typeName != SqlTypeName.FLOAT && typeName != SqlTypeName.DOUBLE;
}

/**
* Wraps the converted operand in a TryCast if its type differs from the
* target type.
*/
private PhysicalExprNode castIfNecessary(RexNode expr, RelDataType targetType, ConverterContext context) {
PhysicalExprNode converted = convertOperand(expr, context);
if (expr.getType().getSqlTypeName().equals(targetType.getSqlTypeName())) {
return converted;
}
return wrapInTryCast(converted, targetType);
}

Comment on lines +229 to +234
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

castIfNecessary() decides whether to wrap a TryCast by comparing only SqlTypeName. This will treat all DECIMALs as the same type even when precision/scale differ, and can also miss other type attributes (nullability, charset/collation). For arithmetic, this can yield a binary expression whose operands have incompatible Arrow types at runtime. Consider comparing full RelDataType equality (or at least DECIMAL precision/scale) and casting when they differ.

Suggested change
if (expr.getType().getSqlTypeName().equals(targetType.getSqlTypeName())) {
return converted;
}
return wrapInTryCast(converted, targetType);
}
if (hasSameType(expr.getType(), targetType)) {
return converted;
}
return wrapInTryCast(converted, targetType);
}
private static boolean hasSameType(RelDataType sourceType, RelDataType targetType) {
if (sourceType == targetType) {
return true;
}
if (sourceType == null || targetType == null) {
return false;
}
return sourceType.getFullTypeString().equals(targetType.getFullTypeString());
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SqlTypeName-only comparison is intentional — castIfNecessary() handles cross-family type promotion (e.g., INT→BIGINT, INT→DECIMAL), not intra-family precision alignment.

For the DECIMAL case: when both operands are DECIMAL with different precision/scale, castIfNecessary correctly skips the cast (both are SqlTypeName.DECIMAL), and DataFusion's binary arithmetic kernels handle precision alignment natively at the Arrow level.

Using full RelDataType equality (e.g., getFullTypeString()) would trigger unnecessary TryCast wrapping for cases like DECIMAL(10,2) vs DECIMAL(10,3), and would also fire on nullability differences (INTEGER NOT NULL vs INTEGER), which is undesirable for arithmetic operand promotion.

This pattern follows the reviewer's PoC code (see issue comment), which uses the same SqlTypeName-level comparison.

/**
* Delegates operand conversion to the factory.
*
* @throws IllegalStateException if no converter is registered for
* the operand
*/
private PhysicalExprNode convertOperand(RexNode operand, ConverterContext context) {
Optional<PhysicalExprNode> result = factory.convertRexNode(operand, context);
if (!result.isPresent()) {
throw new IllegalStateException("Failed to convert operand: " + operand + " (no converter registered)");
}
return result.get();
}

private static PhysicalExprNode wrapInTryCast(PhysicalExprNode expr, RelDataType targetType) {
LogicalType logicalType = FlinkTypeFactory.toLogicalType(targetType);
org.apache.auron.protobuf.ArrowType arrowType = SchemaConverters.convertToAuronArrowType(logicalType);
return PhysicalExprNode.newBuilder()
.setTryCast(PhysicalTryCastNode.newBuilder().setExpr(expr).setArrowType(arrowType))
.build();
}

private PhysicalExprNode buildNegative(RexCall call, ConverterContext context) {
PhysicalExprNode operand = convertOperand(call.getOperands().get(0), context);
return PhysicalExprNode.newBuilder()
.setNegative(PhysicalNegativeNode.newBuilder().setExpr(operand))
.build();
}

private PhysicalExprNode buildTryCast(RexCall call, ConverterContext context) {
PhysicalExprNode operand = convertOperand(call.getOperands().get(0), context);
return wrapInTryCast(operand, call.getType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.auron.flink.table.planner.converter;

import org.apache.auron.protobuf.PhysicalColumn;
import org.apache.auron.protobuf.PhysicalExprNode;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;

/**
* Converts a Calcite {@link RexInputRef} (column reference) to an Auron native {@link
* PhysicalExprNode} containing a {@link PhysicalColumn}.
*
* <p>Column references are supported when the index is within the input schema bounds. Every valid
* {@code RexInputRef} maps directly to a named, indexed column in the input schema provided by the
* {@link ConverterContext}.
*/
public class RexInputRefConverter implements FlinkRexNodeConverter {

/** {@inheritDoc} */
@Override
public Class<? extends RexNode> getNodeClass() {
return RexInputRef.class;
}

/**
* Returns {@code true} if the column index is within the input schema bounds.
*
* @param node the RexNode to check (must be a {@link RexInputRef})
* @param context shared conversion state
* @return {@code true} if the index is valid
*/
@Override
public boolean isSupported(RexNode node, ConverterContext context) {
RexInputRef inputRef = (RexInputRef) node;
return inputRef.getIndex() < context.getInputType().getFieldCount();
}
Comment on lines +47 to +51
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isSupported() always returns true, but convert() blindly indexes into context.getInputType().getFieldNames() using inputRef.getIndex(). If a RexInputRef index is out of range, this will throw IndexOutOfBoundsException and bypass the factory's "unsupported" path. Consider validating the index against context.getInputType().getFieldCount() in isSupported() (and/or in convert() with a clear exception).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

@weiqingy weiqingy Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in b1749a8: isSupported() now validates inputRef.getIndex() < context.getInputType().getFieldCount(). Added a test for the out-of-range case.

Note: the factory's catch-all in convertRexNode() (line 116-119) would have prevented a crash, but having the check in isSupported() makes the intent explicit and avoids relying on exception-based flow control.


/**
* Converts the given {@link RexInputRef} to a {@link PhysicalExprNode} with a {@link
* PhysicalColumn}.
*
* <p>Resolves the column name from the input schema via {@link
* ConverterContext#getInputType()}.
*
* @param node the RexNode to convert (must be a {@link RexInputRef})
* @param context shared conversion state containing the input schema
* @return a {@link PhysicalExprNode} wrapping a {@link PhysicalColumn} with name and index
* @throws IllegalArgumentException if the node is not a {@link RexInputRef}
*/
@Override
public PhysicalExprNode convert(RexNode node, ConverterContext context) {
Copy link

Copilot AI Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc says convert() throws IllegalArgumentException when the node is not a RexInputRef, but the implementation performs an unchecked cast and would throw ClassCastException instead. Either align the Javadoc with the framework guarantee, or add an explicit type check and throw IllegalArgumentException to match the FlinkNodeConverter contract.

Suggested change
public PhysicalExprNode convert(RexNode node, ConverterContext context) {
public PhysicalExprNode convert(RexNode node, ConverterContext context) {
if (!(node instanceof RexInputRef)) {
throw new IllegalArgumentException(
"RexInputRefConverter can only convert RexInputRef nodes, but got: "
+ (node == null ? "null" : node.getClass().getName()));
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The factory dispatches by node.getClass() at FlinkNodeConverterFactory.java:109 — only RexInputRef instances reach this converter. The cast is guaranteed safe by the framework's dispatch contract.

The @throws IllegalArgumentException in the Javadoc inherits from the FlinkNodeConverter interface (line 62), which documents the general contract for all converter implementations. In practice, this exception path is unreachable through normal factory dispatch.

Adding an instanceof check would validate an internal framework invariant on every call — per our project conventions, we validate at system boundaries rather than internal API boundaries. The factory itself is the boundary.

RexInputRef inputRef = (RexInputRef) node;
int index = inputRef.getIndex();
String name = context.getInputType().getFieldNames().get(index);

return PhysicalExprNode.newBuilder()
.setColumn(PhysicalColumn.newBuilder().setName(name).setIndex(index))
.build();
}
}
Loading
Loading