diff --git a/auron-flink-extension/auron-flink-planner/pom.xml b/auron-flink-extension/auron-flink-planner/pom.xml
index bce3ae821..4f2a9b18b 100644
--- a/auron-flink-extension/auron-flink-planner/pom.xml
+++ b/auron-flink-extension/auron-flink-planner/pom.xml
@@ -303,6 +303,7 @@
Handles arithmetic operators ({@code +}, {@code -}, {@code *}, {@code /},
+ * {@code %}), unary minus/plus, and {@code CAST}. Binary arithmetic operands
+ * are promoted to a common type before conversion, and the result is cast to
+ * the output type if it differs from the common type.
+ */
+public class RexCallConverter implements FlinkRexNodeConverter {
+
+ private static final RelDataTypeFactory TYPE_FACTORY = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
+
+ /** Binary arithmetic kinds that require numeric result type. */
+ private static final Set For binary arithmetic kinds, the call's result type must also be
+ * numeric to reject non-arithmetic uses (e.g., TIMESTAMP + INTERVAL).
+ */
+ @Override
+ public boolean isSupported(RexNode node, ConverterContext context) {
+ RexCall call = (RexCall) node;
+ SqlKind kind = call.getKind();
+ if (!SUPPORTED_KINDS.contains(kind)) {
+ return false;
+ }
+ if (BINARY_ARITHMETIC_KINDS.contains(kind)) {
+ return SqlTypeUtil.isNumeric(call.getType());
+ }
+ return true;
+ }
+
+ /**
+ * Converts the given {@link RexCall} to a native {@link PhysicalExprNode}.
+ *
+ * Dispatches by {@link SqlKind}:
+ * Operands are promoted to a common type. If the call's output type
+ * differs from the common type, the result is wrapped in a TryCast.
+ */
+ private PhysicalExprNode buildBinaryExpr(RexCall call, String op, ConverterContext context) {
+ RexNode left = call.getOperands().get(0);
+ RexNode right = call.getOperands().get(1);
+ RelDataType outputType = call.getType();
+
+ RelDataType compatibleType = getCommonTypeForComparison(left.getType(), right.getType(), TYPE_FACTORY);
+ if (compatibleType == null) {
+ throw new IllegalStateException("Incompatible types: "
+ + left.getType().getSqlTypeName()
+ + " and "
+ + right.getType().getSqlTypeName());
+ }
+
+ PhysicalExprNode leftExpr = castIfNecessary(left, compatibleType, context);
+ PhysicalExprNode rightExpr = castIfNecessary(right, compatibleType, context);
+
+ PhysicalExprNode binaryExpr = PhysicalExprNode.newBuilder()
+ .setBinaryExpr(PhysicalBinaryExprNode.newBuilder()
+ .setL(leftExpr)
+ .setR(rightExpr)
+ .setOp(op))
+ .build();
+
+ if (!outputType.getSqlTypeName().equals(compatibleType.getSqlTypeName())) {
+ return wrapInTryCast(binaryExpr, outputType);
+ }
+ return binaryExpr;
+ }
+
+ /**
+ * Computes the common type for two operand types during arithmetic
+ * promotion.
+ *
+ * Rules:
+ * Column references are supported when the index is within the input schema bounds. Every valid
+ * {@code RexInputRef} maps directly to a named, indexed column in the input schema provided by the
+ * {@link ConverterContext}.
+ */
+public class RexInputRefConverter implements FlinkRexNodeConverter {
+
+ /** {@inheritDoc} */
+ @Override
+ public Class extends RexNode> getNodeClass() {
+ return RexInputRef.class;
+ }
+
+ /**
+ * Returns {@code true} if the column index is within the input schema bounds.
+ *
+ * @param node the RexNode to check (must be a {@link RexInputRef})
+ * @param context shared conversion state
+ * @return {@code true} if the index is valid
+ */
+ @Override
+ public boolean isSupported(RexNode node, ConverterContext context) {
+ RexInputRef inputRef = (RexInputRef) node;
+ return inputRef.getIndex() < context.getInputType().getFieldCount();
+ }
+
+ /**
+ * Converts the given {@link RexInputRef} to a {@link PhysicalExprNode} with a {@link
+ * PhysicalColumn}.
+ *
+ * Resolves the column name from the input schema via {@link
+ * ConverterContext#getInputType()}.
+ *
+ * @param node the RexNode to convert (must be a {@link RexInputRef})
+ * @param context shared conversion state containing the input schema
+ * @return a {@link PhysicalExprNode} wrapping a {@link PhysicalColumn} with name and index
+ * @throws IllegalArgumentException if the node is not a {@link RexInputRef}
+ */
+ @Override
+ public PhysicalExprNode convert(RexNode node, ConverterContext context) {
+ RexInputRef inputRef = (RexInputRef) node;
+ int index = inputRef.getIndex();
+ String name = context.getInputType().getFieldNames().get(index);
+
+ return PhysicalExprNode.newBuilder()
+ .setColumn(PhysicalColumn.newBuilder().setName(name).setIndex(index))
+ .build();
+ }
+}
diff --git a/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/RexLiteralConverter.java b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/RexLiteralConverter.java
new file mode 100644
index 000000000..bb80b3e16
--- /dev/null
+++ b/auron-flink-extension/auron-flink-planner/src/main/java/org/apache/auron/flink/table/planner/converter/RexLiteralConverter.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import com.google.protobuf.ByteString;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.auron.protobuf.ScalarValue;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+
+/**
+ * Converts a Calcite {@link RexLiteral} to an Auron native {@link PhysicalExprNode}
+ * containing a {@link ScalarValue} with Arrow IPC bytes.
+ *
+ * The literal value is serialized as a single-element Arrow vector in IPC stream format,
+ * following the same pattern as the Spark implementation in {@code NativeConverters}.
+ *
+ * Supported types: {@code TINYINT}, {@code SMALLINT}, {@code INTEGER}, {@code BIGINT},
+ * {@code FLOAT}, {@code DOUBLE}, {@code DECIMAL}, {@code BOOLEAN}, {@code CHAR},
+ * {@code VARCHAR}, and {@code NULL} (of a supported type).
+ */
+public class RexLiteralConverter implements FlinkRexNodeConverter {
+
+ private static final Set For null literals, the underlying type is still checked — a null of an unsupported
+ * type (e.g., TIMESTAMP) returns {@code false}.
+ */
+ @Override
+ public boolean isSupported(RexNode node, ConverterContext context) {
+ RexLiteral literal = (RexLiteral) node;
+ SqlTypeName typeName = literal.getType().getSqlTypeName();
+ return isSupportedType(typeName);
+ }
+
+ /**
+ * Converts the given {@link RexLiteral} to a {@link PhysicalExprNode} with Arrow IPC bytes.
+ *
+ * @throws IllegalArgumentException if the literal type is not supported
+ */
+ @Override
+ public PhysicalExprNode convert(RexNode node, ConverterContext context) {
+ RexLiteral literal = (RexLiteral) node;
+ byte[] ipcBytes = serializeToIpc(literal);
+ return PhysicalExprNode.newBuilder()
+ .setLiteral(ScalarValue.newBuilder().setIpcBytes(ByteString.copyFrom(ipcBytes)))
+ .build();
+ }
+
+ private static boolean isSupportedType(SqlTypeName typeName) {
+ return SUPPORTED_TYPES.contains(typeName);
+ }
+
+ /**
+ * Serializes the literal value as a single-element Arrow vector in IPC stream format.
+ */
+ private static byte[] serializeToIpc(RexLiteral literal) {
+ Field field = arrowFieldForType(literal);
+ Schema schema = new Schema(Collections.singletonList(field));
+
+ try (BufferAllocator allocator = new RootAllocator();
+ VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
+
+ root.allocateNew();
+ FieldVector vector = root.getVector(0);
+
+ if (literal.isNull()) {
+ vector.setNull(0);
+ } else {
+ setVectorValue(literal, vector);
+ }
+
+ vector.setValueCount(1);
+ root.setRowCount(1);
+
+ return writeIpcBytes(root);
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to serialize literal to Arrow IPC", e);
+ }
+ }
+
+ /**
+ * Returns the Arrow {@link Field} corresponding to the literal's Calcite type.
+ */
+ private static Field arrowFieldForType(RexLiteral literal) {
+ SqlTypeName typeName = literal.getType().getSqlTypeName();
+ switch (typeName) {
+ case TINYINT:
+ return Field.nullable("v", new ArrowType.Int(8, true));
+ case SMALLINT:
+ return Field.nullable("v", new ArrowType.Int(16, true));
+ case INTEGER:
+ return Field.nullable("v", new ArrowType.Int(32, true));
+ case BIGINT:
+ return Field.nullable("v", new ArrowType.Int(64, true));
+ case FLOAT:
+ return Field.nullable("v", new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE));
+ case DOUBLE:
+ return Field.nullable("v", new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE));
+ case DECIMAL:
+ int precision = literal.getType().getPrecision();
+ int scale = literal.getType().getScale();
+ return Field.nullable("v", new ArrowType.Decimal(precision, scale, 128));
+ case BOOLEAN:
+ return Field.nullable("v", ArrowType.Bool.INSTANCE);
+ case CHAR:
+ case VARCHAR:
+ return Field.nullable("v", ArrowType.Utf8.INSTANCE);
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + typeName);
+ }
+ }
+
+ /**
+ * Sets the value at index 0 of the given vector based on the literal's type.
+ */
+ private static void setVectorValue(RexLiteral literal, FieldVector vector) {
+ SqlTypeName typeName = literal.getType().getSqlTypeName();
+ switch (typeName) {
+ case TINYINT:
+ ((TinyIntVector) vector).set(0, literal.getValueAs(Byte.class));
+ break;
+ case SMALLINT:
+ ((SmallIntVector) vector).set(0, literal.getValueAs(Short.class));
+ break;
+ case INTEGER:
+ ((IntVector) vector).set(0, literal.getValueAs(Integer.class));
+ break;
+ case BIGINT:
+ ((BigIntVector) vector).set(0, literal.getValueAs(Long.class));
+ break;
+ case FLOAT:
+ ((Float4Vector) vector).set(0, literal.getValueAs(Float.class));
+ break;
+ case DOUBLE:
+ ((Float8Vector) vector).set(0, literal.getValueAs(Double.class));
+ break;
+ case DECIMAL:
+ ((DecimalVector) vector).set(0, literal.getValueAs(BigDecimal.class));
+ break;
+ case BOOLEAN:
+ ((BitVector) vector).set(0, literal.getValueAs(Boolean.class) ? 1 : 0);
+ break;
+ case CHAR:
+ case VARCHAR:
+ ((VarCharVector) vector).set(0, literal.getValueAs(String.class).getBytes(StandardCharsets.UTF_8));
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported type: " + typeName);
+ }
+ }
+
+ /**
+ * Writes the given {@link VectorSchemaRoot} to Arrow IPC stream format.
+ */
+ private static byte[] writeIpcBytes(VectorSchemaRoot root) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null, out)) {
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+ }
+ return out.toByteArray();
+ }
+}
diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexCallConverterTest.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexCallConverterTest.java
new file mode 100644
index 000000000..9a4c8a8e8
--- /dev/null
+++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexCallConverterTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.math.BigDecimal;
+import java.util.Arrays;
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link RexCallConverter}. */
+class RexCallConverterTest {
+
+ private static final RelDataTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl();
+ private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+ private FlinkNodeConverterFactory factory;
+ private RexCallConverter converter;
+ private ConverterContext context;
+
+ @BeforeEach
+ void setUp() {
+ factory = new FlinkNodeConverterFactory();
+ converter = new RexCallConverter(factory);
+ factory.registerRexConverter(new RexInputRefConverter());
+ factory.registerRexConverter(new RexLiteralConverter());
+ factory.registerRexConverter(converter);
+
+ RowType inputType = RowType.of(new LogicalType[] {new IntType(), new BigIntType()}, new String[] {"f0", "f1"});
+ context = new ConverterContext(new Configuration(), null, getClass().getClassLoader(), inputType);
+ }
+
+ @Test
+ void testGetNodeClass() {
+ assertEquals(RexCall.class, converter.getNodeClass());
+ }
+
+ @Test
+ void testConvertPlus() {
+ RexNode plus = makeCall(intType(), SqlStdOperatorTable.PLUS, makeIntRef(0), makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(plus, context);
+
+ assertTrue(result.hasBinaryExpr());
+ assertEquals("Plus", result.getBinaryExpr().getOp());
+ }
+
+ @Test
+ void testConvertMinus() {
+ RexNode minus = makeCall(intType(), SqlStdOperatorTable.MINUS, makeIntRef(0), makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(minus, context);
+
+ assertTrue(result.hasBinaryExpr());
+ assertEquals("Minus", result.getBinaryExpr().getOp());
+ }
+
+ @Test
+ void testConvertTimes() {
+ RexNode times = makeCall(intType(), SqlStdOperatorTable.MULTIPLY, makeIntRef(0), makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(times, context);
+
+ assertTrue(result.hasBinaryExpr());
+ assertEquals("Multiply", result.getBinaryExpr().getOp());
+ }
+
+ @Test
+ void testConvertDivide() {
+ RexNode divide = makeCall(intType(), SqlStdOperatorTable.DIVIDE, makeIntRef(0), makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(divide, context);
+
+ assertTrue(result.hasBinaryExpr());
+ assertEquals("Divide", result.getBinaryExpr().getOp());
+ }
+
+ @Test
+ void testConvertMod() {
+ RexNode mod = makeCall(intType(), SqlStdOperatorTable.MOD, makeIntRef(0), makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(mod, context);
+
+ assertTrue(result.hasBinaryExpr());
+ assertEquals("Modulo", result.getBinaryExpr().getOp());
+ }
+
+ @Test
+ void testConvertUnaryMinus() {
+ RexNode neg = REX_BUILDER.makeCall(SqlStdOperatorTable.UNARY_MINUS, makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(neg, context);
+
+ assertTrue(result.hasNegative());
+ assertTrue(result.getNegative().getExpr().hasColumn());
+ }
+
+ @Test
+ void testConvertUnaryPlus() {
+ RexNode pos = REX_BUILDER.makeCall(SqlStdOperatorTable.UNARY_PLUS, makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(pos, context);
+
+ // Unary plus is identity — passthrough to operand
+ assertTrue(result.hasColumn());
+ assertEquals("f0", result.getColumn().getName());
+ }
+
+ @Test
+ void testConvertCast() {
+ RexNode cast = makeCall(bigintType(), SqlStdOperatorTable.CAST, makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(cast, context);
+
+ assertTrue(result.hasTryCast());
+ assertTrue(result.getTryCast().getExpr().hasColumn());
+ assertTrue(result.getTryCast().hasArrowType());
+ }
+
+ @Test
+ void testConvertMixedTypePromotion() {
+ // INT (f0) + BIGINT (f1) — left operand should be promoted
+ RexNode intRef = makeIntRef(0);
+ RexNode bigintRef = REX_BUILDER.makeInputRef(bigintType(), 1);
+ RexNode mixedPlus = makeCall(bigintType(), SqlStdOperatorTable.PLUS, intRef, bigintRef);
+
+ PhysicalExprNode result = converter.convert(mixedPlus, context);
+
+ assertTrue(result.hasBinaryExpr());
+ assertEquals("Plus", result.getBinaryExpr().getOp());
+ // Left operand (INT) should be wrapped in TryCast to BIGINT
+ PhysicalExprNode left = result.getBinaryExpr().getL();
+ assertTrue(left.hasTryCast(), "Left operand should be cast from INT to BIGINT");
+ // Right operand (BIGINT) should be plain column
+ PhysicalExprNode right = result.getBinaryExpr().getR();
+ assertTrue(right.hasColumn(), "Right operand should be plain column (already BIGINT)");
+ }
+
+ @Test
+ void testConvertOutputTypeCast() {
+ // INT + INT where output type is BIGINT → result wrapped in TryCast
+ RexNode plus = makeCall(bigintType(), SqlStdOperatorTable.PLUS, makeIntRef(0), makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(plus, context);
+
+ // Both operands are INT, compatible type is INT,
+ // but output type is BIGINT → outer TryCast
+ assertTrue(result.hasTryCast(), "Result should be wrapped in TryCast when output " + "!= compatible type");
+ assertTrue(result.getTryCast().getExpr().hasBinaryExpr());
+ }
+
+ @Test
+ void testConvertNestedExpr() {
+ // (f0 + 1) * f0
+ RexNode f0 = makeIntRef(0);
+ RexNode one = REX_BUILDER.makeExactLiteral(BigDecimal.ONE, intType());
+ RexNode innerPlus = makeCall(intType(), SqlStdOperatorTable.PLUS, f0, one);
+ RexNode outer = makeCall(intType(), SqlStdOperatorTable.MULTIPLY, innerPlus, makeIntRef(0));
+
+ PhysicalExprNode result = converter.convert(outer, context);
+
+ assertTrue(result.hasBinaryExpr());
+ assertEquals("Multiply", result.getBinaryExpr().getOp());
+ // Left child is the inner (f0 + 1)
+ PhysicalExprNode leftChild = result.getBinaryExpr().getL();
+ assertTrue(leftChild.hasBinaryExpr());
+ assertEquals("Plus", leftChild.getBinaryExpr().getOp());
+ }
+
+ @Test
+ void testIsSupportedNumericArithmetic() {
+ RexNode plus = makeCall(intType(), SqlStdOperatorTable.PLUS, makeIntRef(0), makeIntRef(0));
+
+ assertTrue(converter.isSupported(plus, context));
+ }
+
+ @Test
+ void testIsNotSupportedNonNumericKind() {
+ // EQUALS is not in the supported set
+ RexNode eq = REX_BUILDER.makeCall(SqlStdOperatorTable.EQUALS, makeIntRef(0), makeIntRef(0));
+
+ assertFalse(converter.isSupported(eq, context));
+ }
+
+ // ---- getCommonTypeForComparison direct tests ----
+
+ @Test
+ void testCommonTypeDecimalWins() {
+ RelDataType intType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ RelDataType decType = TYPE_FACTORY.createSqlType(SqlTypeName.DECIMAL, 10, 2);
+ RelDataType result = RexCallConverter.getCommonTypeForComparison(intType, decType, TYPE_FACTORY);
+
+ assertEquals(SqlTypeName.DECIMAL, result.getSqlTypeName());
+ }
+
+ @Test
+ void testCommonTypeExactIntegerPromotesToBigint() {
+ // TINYINT + INTEGER → BIGINT (both exact, promoted to widest exact type)
+ RelDataType tinyintType = TYPE_FACTORY.createSqlType(SqlTypeName.TINYINT);
+ RelDataType integerType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ RelDataType result = RexCallConverter.getCommonTypeForComparison(tinyintType, integerType, TYPE_FACTORY);
+
+ assertEquals(SqlTypeName.BIGINT, result.getSqlTypeName());
+ }
+
+ @Test
+ void testCommonTypeApproxFallbackToDouble() {
+ // INT + FLOAT → DOUBLE (FLOAT is approx, so exact integer rule skipped)
+ RelDataType intType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ RelDataType floatType = TYPE_FACTORY.createSqlType(SqlTypeName.FLOAT);
+ RelDataType result = RexCallConverter.getCommonTypeForComparison(intType, floatType, TYPE_FACTORY);
+
+ assertEquals(SqlTypeName.DOUBLE, result.getSqlTypeName());
+ }
+
+ @Test
+ void testCommonTypeIncompatible() {
+ // BOOLEAN + INTEGER → null (incompatible)
+ RelDataType boolType = TYPE_FACTORY.createSqlType(SqlTypeName.BOOLEAN);
+ RelDataType intType = TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ RelDataType result = RexCallConverter.getCommonTypeForComparison(boolType, intType, TYPE_FACTORY);
+
+ assertNull(result, "Incompatible types should return null");
+ }
+
+ // ---- Helpers ----
+
+ private static RexNode makeIntRef(int index) {
+ return REX_BUILDER.makeInputRef(intType(), index);
+ }
+
+ private static RelDataType intType() {
+ return TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER);
+ }
+
+ private static RelDataType bigintType() {
+ return TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT);
+ }
+
+ /**
+ * Creates a {@link org.apache.calcite.rex.RexCall} with an explicit
+ * return type using the List-based {@code makeCall} overload.
+ */
+ private static RexNode makeCall(
+ RelDataType returnType, org.apache.calcite.sql.SqlOperator op, RexNode... operands) {
+ return REX_BUILDER.makeCall(returnType, op, Arrays.asList(operands));
+ }
+}
diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexInputRefConverterTest.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexInputRefConverterTest.java
new file mode 100644
index 000000000..db22cf5c4
--- /dev/null
+++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexInputRefConverterTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link RexInputRefConverter}. */
+class RexInputRefConverterTest {
+
+ private static final RelDataTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl();
+ private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+ private RexInputRefConverter converter;
+ private ConverterContext context;
+
+ @BeforeEach
+ void setUp() {
+ converter = new RexInputRefConverter();
+ RowType inputType = RowType.of(new LogicalType[] {new IntType(), new BigIntType()}, new String[] {"f0", "f1"});
+ context = new ConverterContext(new Configuration(), null, getClass().getClassLoader(), inputType);
+ }
+
+ @Test
+ void testGetNodeClass() {
+ assertEquals(RexInputRef.class, converter.getNodeClass());
+ }
+
+ @Test
+ void testIsSupportedValidIndex() {
+ RexNode inputRef = REX_BUILDER.makeInputRef(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER), 0);
+ assertTrue(converter.isSupported(inputRef, context));
+ }
+
+ @Test
+ void testIsNotSupportedOutOfRangeIndex() {
+ // Schema has 2 fields (f0, f1) — index 5 is out of range
+ RexNode inputRef = REX_BUILDER.makeInputRef(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER), 5);
+ assertFalse(converter.isSupported(inputRef, context));
+ }
+
+ @Test
+ void testConvertFirstColumn() {
+ RexNode inputRef = REX_BUILDER.makeInputRef(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER), 0);
+
+ PhysicalExprNode result = converter.convert(inputRef, context);
+
+ assertTrue(result.hasColumn());
+ assertEquals("f0", result.getColumn().getName());
+ assertEquals(0, result.getColumn().getIndex());
+ }
+
+ @Test
+ void testConvertSecondColumn() {
+ RexNode inputRef = REX_BUILDER.makeInputRef(TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT), 1);
+
+ PhysicalExprNode result = converter.convert(inputRef, context);
+
+ assertTrue(result.hasColumn());
+ assertEquals("f1", result.getColumn().getName());
+ assertEquals(1, result.getColumn().getIndex());
+ }
+}
diff --git a/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexLiteralConverterTest.java b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexLiteralConverterTest.java
new file mode 100644
index 000000000..b408272ba
--- /dev/null
+++ b/auron-flink-extension/auron-flink-planner/src/test/java/org/apache/auron/flink/table/planner/converter/RexLiteralConverterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.auron.flink.table.planner.converter;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.math.BigDecimal;
+import org.apache.auron.protobuf.PhysicalExprNode;
+import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link RexLiteralConverter}. */
+class RexLiteralConverterTest {
+
+ private static final RelDataTypeFactory TYPE_FACTORY = new JavaTypeFactoryImpl();
+ private static final RexBuilder REX_BUILDER = new RexBuilder(TYPE_FACTORY);
+
+ private RexLiteralConverter converter;
+ private ConverterContext context;
+
+ @BeforeEach
+ void setUp() {
+ converter = new RexLiteralConverter();
+ context =
+ new ConverterContext(new Configuration(), null, getClass().getClassLoader(), RowType.of(new IntType()));
+ }
+
+ @Test
+ void testGetNodeClass() {
+ assertEquals(RexLiteral.class, converter.getNodeClass());
+ }
+
+ @Test
+ void testConvertTinyIntLiteral() {
+ RexLiteral lit = (RexLiteral)
+ REX_BUILDER.makeExactLiteral(BigDecimal.valueOf(7), TYPE_FACTORY.createSqlType(SqlTypeName.TINYINT));
+
+ assertTrue(converter.isSupported(lit, context));
+ PhysicalExprNode result = converter.convert(lit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testConvertSmallIntLiteral() {
+ RexLiteral lit = (RexLiteral)
+ REX_BUILDER.makeExactLiteral(BigDecimal.valueOf(256), TYPE_FACTORY.createSqlType(SqlTypeName.SMALLINT));
+
+ assertTrue(converter.isSupported(lit, context));
+ PhysicalExprNode result = converter.convert(lit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testConvertIntLiteral() {
+ RexLiteral intLit = (RexLiteral)
+ REX_BUILDER.makeExactLiteral(BigDecimal.valueOf(42), TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+
+ assertTrue(converter.isSupported(intLit, context));
+ PhysicalExprNode result = converter.convert(intLit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testConvertLongLiteral() {
+ RexLiteral longLit = (RexLiteral) REX_BUILDER.makeExactLiteral(
+ BigDecimal.valueOf(123456789L), TYPE_FACTORY.createSqlType(SqlTypeName.BIGINT));
+
+ assertTrue(converter.isSupported(longLit, context));
+ PhysicalExprNode result = converter.convert(longLit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testConvertFloatLiteral() {
+ RexLiteral lit = (RexLiteral)
+ REX_BUILDER.makeApproxLiteral(BigDecimal.valueOf(2.5f), TYPE_FACTORY.createSqlType(SqlTypeName.FLOAT));
+
+ assertTrue(converter.isSupported(lit, context));
+ PhysicalExprNode result = converter.convert(lit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testConvertDoubleLiteral() {
+ RexLiteral doubleLit = (RexLiteral)
+ REX_BUILDER.makeApproxLiteral(BigDecimal.valueOf(3.14), TYPE_FACTORY.createSqlType(SqlTypeName.DOUBLE));
+
+ assertTrue(converter.isSupported(doubleLit, context));
+ PhysicalExprNode result = converter.convert(doubleLit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testConvertBooleanTrue() {
+ RexLiteral boolLit = (RexLiteral) REX_BUILDER.makeLiteral(true);
+
+ assertTrue(converter.isSupported(boolLit, context));
+ PhysicalExprNode result = converter.convert(boolLit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testConvertStringLiteral() {
+ RexLiteral strLit = (RexLiteral) REX_BUILDER.makeLiteral("hello");
+
+ assertTrue(converter.isSupported(strLit, context));
+ PhysicalExprNode result = converter.convert(strLit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testConvertDecimalLiteral() {
+ RexLiteral decLit = (RexLiteral) REX_BUILDER.makeExactLiteral(
+ new BigDecimal("123.45"), TYPE_FACTORY.createSqlType(SqlTypeName.DECIMAL, 10, 2));
+
+ assertTrue(converter.isSupported(decLit, context));
+ PhysicalExprNode result = converter.convert(decLit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testConvertNullLiteral() {
+ RexLiteral nullLit = (RexLiteral) REX_BUILDER.makeNullLiteral(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER));
+
+ assertTrue(converter.isSupported(nullLit, context));
+ PhysicalExprNode result = converter.convert(nullLit, context);
+ assertTrue(result.hasLiteral());
+ assertTrue(result.getLiteral().getIpcBytes().size() > 0);
+ }
+
+ @Test
+ void testUnsupportedTypeNotSupported() {
+ RexNode tsLit = REX_BUILDER.makeNullLiteral(TYPE_FACTORY.createSqlType(SqlTypeName.TIMESTAMP));
+
+ assertFalse(converter.isSupported(tsLit, context));
+ }
+}
diff --git a/docs/PR-AURON-1859/AURON-1859-DESIGN.md b/docs/PR-AURON-1859/AURON-1859-DESIGN.md
new file mode 100644
index 000000000..d3c3a1278
--- /dev/null
+++ b/docs/PR-AURON-1859/AURON-1859-DESIGN.md
@@ -0,0 +1,503 @@
+# Design — AURON-1859: Convert Math Operators to Auron Native Operators
+
+**Rev 1** — 2026-04-01
+**Issue**: https://github.com/apache/auron/issues/1859
+**Prerequisite**: #1856 (Flink Node Converter Tools) — PR #2146 merged
+
+---
+
+## 1. Problem Statement
+
+The Flink integration track needs concrete expression converters that translate Flink/Calcite `RexNode` expressions into Auron native `PhysicalExprNode` protobuf representations. Issue #1859 targets math operators (`+`, `-`, `*`, `/`, `%`) as the first converters, unlocking #1857 (FlinkAuronCalcOperator) and #1853 (StreamExecCalc rewrite).
+
+The converter framework from #1856 provides the dispatch infrastructure (`FlinkNodeConverterFactory`, `FlinkRexNodeConverter`, `ConverterContext`) but has zero concrete converter implementations. This PR delivers the first three.
+
+---
+
+## 2. Approach Candidates
+
+### Candidate A: Single RexCallConverter with SqlKind Switch
+
+One `RexCallConverter` registered for `RexCall.class`, dispatching by `SqlKind` internally.
+
+**Pros**: Matches the factory's class-based dispatch design. Simple, one registration call.
+**Cons**: Class will grow as more operators are added in future PRs (#1860, #1861, #1864).
+
+### Candidate B: Per-Operator Converter Classes (Gluten-style)
+
+Separate converter class for each operator (e.g., `PlusConverter`, `MinusConverter`).
+
+**Pros**: Small focused classes.
+**Cons**: Cannot work with our factory — it keys by `RexNode` subclass (`RexCall.class`), so only ONE converter can be registered for `RexCall`. Would require redesigning the factory.
+
+**Evidence**: The factory throws `IllegalArgumentException` on duplicate registration:
+```java
+// FlinkNodeConverterFactory.java:77-79
+if (rexConverterMap.containsKey(nodeClass)) {
+ throw new IllegalArgumentException("Duplicate RexNode converter for " + nodeClass.getName());
+}
+```
+
+### Candidate C: Sub-dispatch Registry Inside RexCallConverter (Gluten hybrid)
+
+One `RexCallConverter` for the factory, but internally uses a `Map
+ *
+ *
+ * @throws IllegalArgumentException if the SqlKind is not supported
+ */
+ @Override
+ public PhysicalExprNode convert(RexNode node, ConverterContext context) {
+ RexCall call = (RexCall) node;
+ SqlKind kind = call.getKind();
+ switch (kind) {
+ case PLUS:
+ return buildBinaryExpr(call, "Plus", context);
+ case MINUS:
+ return buildBinaryExpr(call, "Minus", context);
+ case TIMES:
+ return buildBinaryExpr(call, "Multiply", context);
+ case DIVIDE:
+ return buildBinaryExpr(call, "Divide", context);
+ case MOD:
+ return buildBinaryExpr(call, "Modulo", context);
+ case MINUS_PREFIX:
+ return buildNegative(call, context);
+ case PLUS_PREFIX:
+ return convertOperand(call.getOperands().get(0), context);
+ case CAST:
+ return buildTryCast(call, context);
+ default:
+ throw new IllegalArgumentException("Unsupported SqlKind: " + kind);
+ }
+ }
+
+ /**
+ * Builds a binary expression with type promotion between operands.
+ *
+ *
+ *
+ *
+ * @param type1 left operand type
+ * @param type2 right operand type
+ * @param typeFactory factory for creating result types
+ * @return the common type, or {@code null} if incompatible
+ */
+ static RelDataType getCommonTypeForComparison(
+ RelDataType type1, RelDataType type2, RelDataTypeFactory typeFactory) {
+ if (type1.getSqlTypeName().equals(type2.getSqlTypeName())) {
+ return type1;
+ }
+ if (SqlTypeUtil.isNumeric(type1) && SqlTypeUtil.isNumeric(type2)) {
+ SqlTypeName t1 = type1.getSqlTypeName();
+ SqlTypeName t2 = type2.getSqlTypeName();
+ if (t1 == SqlTypeName.DECIMAL || t2 == SqlTypeName.DECIMAL) {
+ return typeFactory.createSqlType(SqlTypeName.DECIMAL);
+ }
+ if (notApproxType(t1) && notApproxType(t2)) {
+ return typeFactory.createSqlType(SqlTypeName.BIGINT);
+ }
+ return typeFactory.createSqlType(SqlTypeName.DOUBLE);
+ }
+ if (SqlTypeUtil.inCharFamily(type1) && SqlTypeUtil.inCharFamily(type2)) {
+ return typeFactory.createSqlType(SqlTypeName.VARCHAR);
+ }
+ return null;
+ }
+
+ private static boolean notApproxType(SqlTypeName typeName) {
+ return typeName != SqlTypeName.FLOAT && typeName != SqlTypeName.DOUBLE;
+ }
+
+ /**
+ * Wraps the converted operand in a TryCast if its type differs from the
+ * target type.
+ */
+ private PhysicalExprNode castIfNecessary(RexNode expr, RelDataType targetType, ConverterContext context) {
+ PhysicalExprNode converted = convertOperand(expr, context);
+ if (expr.getType().getSqlTypeName().equals(targetType.getSqlTypeName())) {
+ return converted;
+ }
+ return wrapInTryCast(converted, targetType);
+ }
+
+ /**
+ * Delegates operand conversion to the factory.
+ *
+ * @throws IllegalStateException if no converter is registered for
+ * the operand
+ */
+ private PhysicalExprNode convertOperand(RexNode operand, ConverterContext context) {
+ Optional