Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,15 @@
import java.util.Map;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.analytics.exec.QueryPlanExecutor;
import org.opensearch.core.action.ActionListener;
import org.opensearch.sql.ast.statement.ExplainMode;
Expand Down Expand Up @@ -71,10 +78,7 @@ public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listene
@Override
public void execute(
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
// QueryPlanExecutor became asynchronous in analytics-framework 3.7 — execution is dispatched
// to a worker pool and results arrive on the listener. Record the execute metric in the
// listener callback, before delegating to the user-supplied listener, so the metric snapshot
// taken by SimpleJsonResponseFormatter sees the correct value.
// Record EXECUTE inside the callback so the formatter's metric snapshot sees the final value.
ProfileMetric execMetric = QueryProfiling.current().getOrCreateMetric(MetricName.EXECUTE);
long execStart = System.nanoTime();

Expand All @@ -87,7 +91,7 @@ public void onResponse(Iterable<Object[]> rows) {
try {
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
List<ExprValue> results = convertRows(rows, fields);
Schema schema = buildSchema(fields);
Schema schema = buildSchema(plan);
execMetric.set(System.nanoTime() - execStart);
listener.onResponse(new QueryResponse(schema, results, Cursor.None));
} catch (Exception e) {
Expand Down Expand Up @@ -132,15 +136,91 @@ private List<ExprValue> convertRows(Iterable<Object[]> rows, List<RelDataTypeFie
return results;
}

private Schema buildSchema(List<RelDataTypeField> fields) {
/**
* Recovers pre-cast datetime types stripped by {@code DatetimeOutputCastRule}. The wire format
* keeps {@code timestamp}/{@code date}/{@code time} labels even though values render as VARCHAR.
*
* <p>Detection is structural: the rule's output Project has only identity {@link RexInputRef}
* slots and {@code CAST(<datetime InputRef> AS VARCHAR)} slots — no user-authored expressions.
* That shape uniquely identifies the rule's emit and avoids unwrapping user CASTs (which can
* legitimately VARCHAR-wrap a datetime expression and should round-trip as VARCHAR).
*/
private Schema buildSchema(RelNode plan) {
List<RelDataTypeField> fields = plan.getRowType().getFieldList();
Project castProject = findOutputCastProject(plan);
List<RexNode> projects = castProject == null ? null : castProject.getProjects();
List<Schema.Column> columns = new ArrayList<>();
for (RelDataTypeField field : fields) {
ExprType exprType = convertType(field.getType());
columns.add(new Schema.Column(field.getName(), null, exprType));
for (int i = 0; i < fields.size(); i++) {
RelDataTypeField field = fields.get(i);
RelDataType labelType = field.getType();
if (projects != null && i < projects.size()) {
labelType = unwrapDatetimeCast(projects.get(i), labelType);
}
columns.add(new Schema.Column(field.getName(), null, convertType(labelType)));
}
return new Schema(columns);
}

/**
* Returns the Project emitted by {@code DatetimeOutputCastRule} — root, or sitting under a single
* {@code Sort} (system query-size limit). Detection is by structural shape: every slot is either
* an identity {@link RexInputRef} or {@code CAST(<datetime InputRef> AS VARCHAR)}, and at least
* one slot is the cast form. Anything else (user-authored expression, function call) means this
* is not the rule's emit.
*/
private static Project findOutputCastProject(RelNode plan) {
RelNode current = plan;
while (current instanceof Sort) {
current = current.getInput(0);
}
if (!(current instanceof Project project)) {
return null;
}
boolean sawDatetimeCast = false;
for (RexNode slot : project.getProjects()) {
if (slot instanceof RexInputRef) {
continue;
}
if (isDatetimeToVarcharCast(slot)) {
sawDatetimeCast = true;
continue;
}
return null;
}
return sawDatetimeCast ? project : null;
}

private static boolean isDatetimeToVarcharCast(RexNode expr) {
if (!(expr instanceof RexCall call) || call.getKind() != SqlKind.CAST) {
return false;
}
if (call.getOperands().size() != 1) {
return false;
}
RexNode source = call.getOperands().get(0);
if (!(source instanceof RexInputRef)) {
return false;
}
if (call.getType().getSqlTypeName() != SqlTypeName.VARCHAR) {
return false;
}
return isDatetime(source.getType().getSqlTypeName());
}

private static boolean isDatetime(SqlTypeName type) {
return type == SqlTypeName.DATE
|| type == SqlTypeName.TIME
|| type == SqlTypeName.TIMESTAMP
|| type == SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE;
}

private static RelDataType unwrapDatetimeCast(RexNode projection, RelDataType fallback) {
if (!isDatetimeToVarcharCast(projection)) {
return fallback;
}
return ((RexCall) projection).getOperands().get(0).getType();
}

private ExprType convertType(RelDataType type) {
try {
return OpenSearchTypeFactory.convertRelDataTypeToExprType(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,26 @@
import static org.mockito.Mockito.when;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.hep.HepPlanner;
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalValues;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
import org.apache.calcite.sql.type.SqlTypeName;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -276,8 +288,141 @@ void physicalPlanExplain_callsOnFailure() {
+ errorRef.get().getMessage());
}

// --- schema recovery: structural detection of CAST(<datetime> AS VARCHAR) projects ---

@Test
void buildSchema_recoversDatetimeLabelsFromOutputCastProject() {
RelNode plan =
buildOutputCastPlan(
new String[] {"ts", "d", "t"},
new SqlTypeName[] {SqlTypeName.TIMESTAMP, SqlTypeName.DATE, SqlTypeName.TIME});
Iterable<Object[]> rows =
Collections.singletonList(new Object[] {"2024-01-15 10:30:00", "2024-01-15", "10:30:00"});
stubExecutorWith(plan, rows);

QueryResponse response = executeAndCapture(plan);
String dump = dumpResponse(response);

assertEquals(
ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(0).getExprType(), dump);
assertEquals(ExprCoreType.DATE, response.getSchema().getColumns().get(1).getExprType(), dump);
assertEquals(ExprCoreType.TIME, response.getSchema().getColumns().get(2).getExprType(), dump);
}

@Test
void buildSchema_walksThroughLogicalSortWrapper() {
RelNode castProject =
buildOutputCastPlan(new String[] {"ts"}, new SqlTypeName[] {SqlTypeName.TIMESTAMP});
// Mimic the LogicalSystemLimit wrapper that wraps the rule-emitted Project at the root.
RexBuilder rexBuilder = castProject.getCluster().getRexBuilder();
RexNode fetch =
rexBuilder.makeLiteral(
10000, castProject.getCluster().getTypeFactory().createSqlType(SqlTypeName.INTEGER));
RelNode wrapped = LogicalSort.create(castProject, RelCollations.EMPTY, null, fetch);
stubExecutorWith(wrapped, Collections.emptyList());

QueryResponse response = executeAndCapture(wrapped);
String dump = dumpResponse(response);

assertEquals(
ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(0).getExprType(), dump);
}

@Test
void buildSchema_projectWithUserExpressionDoesNotRecover() {
// A Project that mixes a CAST(<datetime> AS VARCHAR) slot with a user-authored
// expression slot (here: ts + INTERVAL — an unrelated function call) is NOT the
// rule's emit shape. Recovery must NOT happen; the wire schema reflects the
// Project's row type as-is (the cast slot stays VARCHAR/STRING).
RelNode plan =
buildMixedProject(
new String[] {"ts_str", "calc"},
new SqlTypeName[] {SqlTypeName.TIMESTAMP, SqlTypeName.INTEGER});
stubExecutorWith(plan, Collections.emptyList());

QueryResponse response = executeAndCapture(plan);
String dump = dumpResponse(response);

// ts_str slot is CAST(<TIMESTAMP> AS VARCHAR) but sits next to a user expression,
// so the structural shape doesn't match — schema stays STRING (VARCHAR).
assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump);
}

@Test
void buildSchema_nonProjectRootKeepsFieldType() {
// When the rule didn't fire (no datetime fields), the root is whatever the
// planner produced — the recovery path must fall back to the field type.
RelNode plan = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER);
stubExecutorWith(plan, Collections.emptyList());

QueryResponse response = executeAndCapture(plan);
String dump = dumpResponse(response);

assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump);
assertEquals(
ExprCoreType.INTEGER, response.getSchema().getColumns().get(1).getExprType(), dump);
}

// --- helpers ---

/**
* Builds a {@code LogicalProject(CAST(<typed> AS VARCHAR))} over a {@link LogicalValues} input —
* mirrors what {@code DatetimeOutputCastRule} emits at the root.
*/
private RelNode buildOutputCastPlan(String[] names, SqlTypeName[] types) {
SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
RexBuilder rexBuilder = new RexBuilder(typeFactory);
HepPlanner planner = new HepPlanner(new HepProgramBuilder().build());
RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);

RelDataTypeFactory.Builder rowBuilder = typeFactory.builder();
for (int i = 0; i < names.length; i++) {
rowBuilder.add(names[i], types[i]).nullable(true);
}
RelDataType rowType = rowBuilder.build();
LogicalValues input = LogicalValues.createEmpty(cluster, rowType);

RelDataType varchar =
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true);
List<RexNode> projects = new ArrayList<>(names.length);
List<String> projectNames = new ArrayList<>(names.length);
for (int i = 0; i < names.length; i++) {
RexNode ref = rexBuilder.makeInputRef(input, i);
projects.add(rexBuilder.makeCast(varchar, ref));
projectNames.add(names[i]);
}
return LogicalProject.create(input, List.of(), projects, projectNames);
}

/**
* Builds a {@code LogicalProject} where the first slot is {@code CAST(<datetime> AS VARCHAR)} and
* the second slot is a user-authored expression ({@code col + 1}) — i.e. NOT the shape that the
* rule emits.
*/
private RelNode buildMixedProject(String[] names, SqlTypeName[] types) {
SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
RexBuilder rexBuilder = new RexBuilder(typeFactory);
HepPlanner planner = new HepPlanner(new HepProgramBuilder().build());
RelOptCluster cluster = RelOptCluster.create(planner, rexBuilder);

RelDataTypeFactory.Builder rowBuilder = typeFactory.builder();
for (int i = 0; i < names.length; i++) {
rowBuilder.add(names[i], types[i]).nullable(true);
}
LogicalValues input = LogicalValues.createEmpty(cluster, rowBuilder.build());

RelDataType varchar =
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.VARCHAR), true);
RexNode castSlot = rexBuilder.makeCast(varchar, rexBuilder.makeInputRef(input, 0));
RexNode plusSlot =
rexBuilder.makeCall(
SqlStdOperatorTable.PLUS,
rexBuilder.makeInputRef(input, 1),
rexBuilder.makeLiteral(1, typeFactory.createSqlType(SqlTypeName.INTEGER)));
return LogicalProject.create(
input, List.of(), List.of(castSlot, plusSlot), List.of(names[0], names[1]));
}

private QueryResponse executeAndCapture(RelNode relNode) {
AtomicReference<QueryResponse> ref = new AtomicReference<>();
engine.execute(relNode, mockContext, captureListener(ref));
Expand Down
Loading