From 35372107c4f329eac381a0736b8521f07d2e25b9 Mon Sep 17 00:00:00 2001 From: Eric Wei Date: Mon, 18 May 2026 16:31:01 -0700 Subject: [PATCH] fix: recover datetime schema labels on analytics-engine path (#5420) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem: PPL datetime columns on the force-routed analytics-engine path return wire schema `"type": "string"` instead of the documented `"timestamp"`/`"date"`/`"time"`. The cause is `DatetimeOutputCastRule`, which wraps the root output with `CAST( AS VARCHAR)` so every backend can serialize datetimes uniformly — but this also strips the pre-cast type from the RelNode's row type that `AnalyticsExecutionEngine` hands to the response builder. Solution: in `AnalyticsExecutionEngine.buildSchema`, detect the rule's output Project structurally — a Project where every slot is either an identity `RexInputRef` or `CAST( AS VARCHAR)`, with at least one cast slot. Walk past a single `Sort` wrapper (the unified planner's system query-size limit). For each cast slot, recover the pre-cast operand type for the wire schema; values continue to render as VARCHAR so per-row output is unchanged. Detection is structural rather than hint-based to avoid registering a new hint name in Calcite's strict-mode HintStrategyTable; it mirrors the shape check `DatetimeOutputCastRewriter` (analytics-backend-datafusion) already uses to locate the same Project for value-format rewriting. Signed-off-by: Eric Wei --- .../analytics/AnalyticsExecutionEngine.java | 98 ++++++++++-- .../AnalyticsExecutionEngineTest.java | 145 ++++++++++++++++++ 2 files changed, 234 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java index ddfe5fd3556..6bed4722a9b 100644 --- a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -11,8 +11,15 @@ import java.util.Map; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.type.SqlTypeName; import org.opensearch.analytics.exec.QueryPlanExecutor; import org.opensearch.core.action.ActionListener; import org.opensearch.sql.ast.statement.ExplainMode; @@ -71,10 +78,7 @@ public void explain(PhysicalPlan plan, ResponseListener listene @Override public void execute( RelNode plan, CalcitePlanContext context, ResponseListener listener) { - // QueryPlanExecutor became asynchronous in analytics-framework 3.7 — execution is dispatched - // to a worker pool and results arrive on the listener. Record the execute metric in the - // listener callback, before delegating to the user-supplied listener, so the metric snapshot - // taken by SimpleJsonResponseFormatter sees the correct value. + // Record EXECUTE inside the callback so the formatter's metric snapshot sees the final value. ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE); long execStart = System.nanoTime(); @@ -87,7 +91,7 @@ public void onResponse(Iterable rows) { try { List fields = plan.getRowType().getFieldList(); List results = convertRows(rows, fields); - Schema schema = buildSchema(fields); + Schema schema = buildSchema(plan); execMetric.set(System.nanoTime() - execStart); listener.onResponse(new QueryResponse(schema, results, Cursor.None)); } catch (Exception e) { @@ -132,15 +136,91 @@ private List convertRows(Iterable rows, List fields) { + /** + * Recovers pre-cast datetime types stripped by {@code DatetimeOutputCastRule}. The wire format + * keeps {@code timestamp}/{@code date}/{@code time} labels even though values render as VARCHAR. + * + *

Detection is structural: the rule's output Project has only identity {@link RexInputRef} + * slots and {@code CAST( AS VARCHAR)} slots — no user-authored expressions. + * That shape uniquely identifies the rule's emit and avoids unwrapping user CASTs (which can + * legitimately VARCHAR-wrap a datetime expression and should round-trip as VARCHAR). + */ + private Schema buildSchema(RelNode plan) { + List fields = plan.getRowType().getFieldList(); + Project castProject = findOutputCastProject(plan); + List projects = castProject == null ? null : castProject.getProjects(); List columns = new ArrayList<>(); - for (RelDataTypeField field : fields) { - ExprType exprType = convertType(field.getType()); - columns.add(new Schema.Column(field.getName(), null, exprType)); + for (int i = 0; i < fields.size(); i++) { + RelDataTypeField field = fields.get(i); + RelDataType labelType = field.getType(); + if (projects != null && i < projects.size()) { + labelType = unwrapDatetimeCast(projects.get(i), labelType); + } + columns.add(new Schema.Column(field.getName(), null, convertType(labelType))); } return new Schema(columns); } + /** + * Returns the Project emitted by {@code DatetimeOutputCastRule} — root, or sitting under a single + * {@code Sort} (system query-size limit). Detection is by structural shape: every slot is either + * an identity {@link RexInputRef} or {@code CAST( AS VARCHAR)}, and at least + * one slot is the cast form. Anything else (user-authored expression, function call) means this + * is not the rule's emit. + */ + private static Project findOutputCastProject(RelNode plan) { + RelNode current = plan; + while (current instanceof Sort) { + current = current.getInput(0); + } + if (!(current instanceof Project project)) { + return null; + } + boolean sawDatetimeCast = false; + for (RexNode slot : project.getProjects()) { + if (slot instanceof RexInputRef) { + continue; + } + if (isDatetimeToVarcharCast(slot)) { + sawDatetimeCast = true; + continue; + } + return null; + } + return sawDatetimeCast ? project : null; + } + + private static boolean isDatetimeToVarcharCast(RexNode expr) { + if (!(expr instanceof RexCall call) || call.getKind() != SqlKind.CAST) { + return false; + } + if (call.getOperands().size() != 1) { + return false; + } + RexNode source = call.getOperands().get(0); + if (!(source instanceof RexInputRef)) { + return false; + } + if (call.getType().getSqlTypeName() != SqlTypeName.VARCHAR) { + return false; + } + return isDatetime(source.getType().getSqlTypeName()); + } + + private static boolean isDatetime(SqlTypeName type) { + return type == SqlTypeName.DATE + || type == SqlTypeName.TIME + || type == SqlTypeName.TIMESTAMP + || type == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE; + } + + private static RelDataType unwrapDatetimeCast(RexNode projection, RelDataType fallback) { + if (!isDatetimeToVarcharCast(projection)) { + return fallback; + } + return ((RexCall) projection).getOperands().get(0).getType(); + } + private ExprType convertType(RelDataType type) { try { return OpenSearchTypeFactory.convertRelDataTypeToExprType(type); diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java index 4de596fb375..94e77d56f85 100644 --- a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java +++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java @@ -15,14 +15,26 @@ import static org.mockito.Mockito.when; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.hep.HepPlanner; +import org.apache.calcite.plan.hep.HepProgramBuilder; +import org.apache.calcite.rel.RelCollations; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.logical.LogicalProject; +import org.apache.calcite.rel.logical.LogicalSort; +import org.apache.calcite.rel.logical.LogicalValues; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.type.SqlTypeName; import org.junit.jupiter.api.BeforeEach; @@ -276,8 +288,141 @@ void physicalPlanExplain_callsOnFailure() { + errorRef.get().getMessage()); } + // --- schema recovery: structural detection of CAST( AS VARCHAR) projects --- + + @Test + void buildSchema_recoversDatetimeLabelsFromOutputCastProject() { + RelNode plan = + buildOutputCastPlan( + new String[] {"ts", "d", "t"}, + new SqlTypeName[] {SqlTypeName.TIMESTAMP, SqlTypeName.DATE, SqlTypeName.TIME}); + Iterable rows = + Collections.singletonList(new Object[] {"2024-01-15 10:30:00", "2024-01-15", "10:30:00"}); + stubExecutorWith(plan, rows); + + QueryResponse response = executeAndCapture(plan); + String dump = dumpResponse(response); + + assertEquals( + ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals(ExprCoreType.DATE, response.getSchema().getColumns().get(1).getExprType(), dump); + assertEquals(ExprCoreType.TIME, response.getSchema().getColumns().get(2).getExprType(), dump); + } + + @Test + void buildSchema_walksThroughLogicalSortWrapper() { + RelNode castProject = + buildOutputCastPlan(new String[] {"ts"}, new SqlTypeName[] {SqlTypeName.TIMESTAMP}); + // Mimic the LogicalSystemLimit wrapper that wraps the rule-emitted Project at the root. + RexBuilder rexBuilder = castProject.getCluster().getRexBuilder(); + RexNode fetch = + rexBuilder.makeLiteral( + 10000, castProject.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER)); + RelNode wrapped = LogicalSort.create(castProject, RelCollations.EMPTY, null, fetch); + stubExecutorWith(wrapped, Collections.emptyList()); + + QueryResponse response = executeAndCapture(wrapped); + String dump = dumpResponse(response); + + assertEquals( + ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(0).getExprType(), dump); + } + + @Test + void buildSchema_projectWithUserExpressionDoesNotRecover() { + // A Project that mixes a CAST( AS VARCHAR) slot with a user-authored + // expression slot (here: ts + INTERVAL — an unrelated function call) is NOT the + // rule's emit shape. Recovery must NOT happen; the wire schema reflects the + // Project's row type as-is (the cast slot stays VARCHAR/STRING). + RelNode plan = + buildMixedProject( + new String[] {"ts_str", "calc"}, + new SqlTypeName[] {SqlTypeName.TIMESTAMP, SqlTypeName.INTEGER}); + stubExecutorWith(plan, Collections.emptyList()); + + QueryResponse response = executeAndCapture(plan); + String dump = dumpResponse(response); + + // ts_str slot is CAST( AS VARCHAR) but sits next to a user expression, + // so the structural shape doesn't match — schema stays STRING (VARCHAR). + assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump); + } + + @Test + void buildSchema_nonProjectRootKeepsFieldType() { + // When the rule didn't fire (no datetime fields), the root is whatever the + // planner produced — the recovery path must fall back to the field type. + RelNode plan = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER); + stubExecutorWith(plan, Collections.emptyList()); + + QueryResponse response = executeAndCapture(plan); + String dump = dumpResponse(response); + + assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals( + ExprCoreType.INTEGER, response.getSchema().getColumns().get(1).getExprType(), dump); + } + // --- helpers --- + /** + * Builds a {@code LogicalProject(CAST( AS VARCHAR))} over a {@link LogicalValues} input — + * mirrors what {@code DatetimeOutputCastRule} emits at the root. + */ + private RelNode buildOutputCastPlan(String[] names, SqlTypeName[] types) { + SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RexBuilder rexBuilder = new RexBuilder(typeFactory); + HepPlanner planner = new HepPlanner(new HepProgramBuilder().build()); + RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder); + + RelDataTypeFactory.Builder rowBuilder = typeFactory.builder(); + for (int i = 0; i < names.length; i++) { + rowBuilder.add(names[i], types[i]).nullable(true); + } + RelDataType rowType = rowBuilder.build(); + LogicalValues input = LogicalValues.createEmpty(cluster, rowType); + + RelDataType varchar = + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true); + List projects = new ArrayList<>(names.length); + List projectNames = new ArrayList<>(names.length); + for (int i = 0; i < names.length; i++) { + RexNode ref = rexBuilder.makeInputRef(input, i); + projects.add(rexBuilder.makeCast(varchar, ref)); + projectNames.add(names[i]); + } + return LogicalProject.create(input, List.of(), projects, projectNames); + } + + /** + * Builds a {@code LogicalProject} where the first slot is {@code CAST( AS VARCHAR)} and + * the second slot is a user-authored expression ({@code col + 1}) — i.e. NOT the shape that the + * rule emits. + */ + private RelNode buildMixedProject(String[] names, SqlTypeName[] types) { + SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RexBuilder rexBuilder = new RexBuilder(typeFactory); + HepPlanner planner = new HepPlanner(new HepProgramBuilder().build()); + RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder); + + RelDataTypeFactory.Builder rowBuilder = typeFactory.builder(); + for (int i = 0; i < names.length; i++) { + rowBuilder.add(names[i], types[i]).nullable(true); + } + LogicalValues input = LogicalValues.createEmpty(cluster, rowBuilder.build()); + + RelDataType varchar = + typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true); + RexNode castSlot = rexBuilder.makeCast(varchar, rexBuilder.makeInputRef(input, 0)); + RexNode plusSlot = + rexBuilder.makeCall( + SqlStdOperatorTable.PLUS, + rexBuilder.makeInputRef(input, 1), + rexBuilder.makeLiteral(1, typeFactory.createSqlType(SqlTypeName.INTEGER))); + return LogicalProject.create( + input, List.of(), List.of(castSlot, plusSlot), List.of(names[0], names[1])); + } + private QueryResponse executeAndCapture(RelNode relNode) { AtomicReference ref = new AtomicReference<>(); engine.execute(relNode, mockContext, captureListener(ref));