From 5cdc943db3819849f2da51d60f3a30e49e75a7e9 Mon Sep 17 00:00:00 2001 From: Lantao Jin Date: Mon, 26 Jan 2026 17:48:51 +0800 Subject: [PATCH] Support pushdown array/collect aggregation Signed-off-by: Lantao Jin --- .../sql/calcite/CalciteRelNodeVisitor.java | 186 ++++++++++++ .../calcite/utils/OpenSearchTypeFactory.java | 2 +- .../calcite/explain_mvcombine.yaml | 9 + ...lain_patterns_simple_pattern_agg_push.yaml | 2 +- .../expectedOutput/calcite/explain_take.yaml | 2 +- .../explain_mvcombine.yaml | 12 + .../opensearch/client/OpenSearchClient.java | 9 + .../client/OpenSearchNodeClient.java | 16 + .../client/OpenSearchRestClient.java | 18 ++ .../opensearch/request/AggregateAnalyzer.java | 68 +++-- .../response/agg/TopHitsParser.java | 23 +- .../dsl/MetricAggregationBuilder.java | 1 + .../ppl/calcite/CalcitePPLMvCombineTest.java | 273 ++++++++++++++++++ 13 files changed, 588 insertions(+), 33 deletions(-) create mode 100644 integ-test/src/test/resources/expectedOutput/calcite/explain_mvcombine.yaml create mode 100644 integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvcombine.yaml create mode 100644 ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvCombineTest.java diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index f1bc5fd6a0d..9482928c615 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -61,6 +61,7 @@ import org.apache.calcite.rex.RexVisitorImpl; import org.apache.calcite.rex.RexWindowBounds; import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.fun.SqlLibraryOperators; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.ArraySqlType; import org.apache.calcite.sql.type.MapSqlType; @@ -3086,6 +3087,191 @@ public RelNode visitExpand(Expand expand, CalcitePlanContext context) { return context.relBuilder.peek(); } + /** + * mvcombine command visitor to collapse rows that are identical on all fields except the target + * field, and combine the target field values into a multivalue (array) field. + * + *

Implementation notes:Groups by all input fields except the target field. Aggregates target + * values using {@code COLLECT} (MULTISET). Casts the aggregation result from MULTISET to ARRAY + * for a stable multivalue output type. Preserves the original output column order. + * + * @param node mvcombine command to be visited + * @param context CalcitePlanContext containing the RelBuilder, RexBuilder, and resolution context + * @return RelNode representing collapsed records with the target combined into a multivalue array + * @throws SemanticCheckException if the mvcombine target is not a direct field reference + */ + @Override + public RelNode visitMvCombine(MvCombine node, CalcitePlanContext context) { + // 1) Lower the child plan first so the RelBuilder has the input schema on the stack. + visitChildren(node, context); + + final RelBuilder relBuilder = context.relBuilder; + final RexBuilder rexBuilder = context.rexBuilder; + + final RelNode input = relBuilder.peek(); + final List inputFieldNames = input.getRowType().getFieldNames(); + + // 2) Resolve the mvcombine target to an input column index (must be a direct field reference). + final Field targetField = node.getField(); + final int targetIndex = resolveTargetIndex(targetField, context); + final String targetName = inputFieldNames.get(targetIndex); + + // 3) Group by all fields except the target. + final List groupExprs = + buildGroupExpressionsExcludingTarget(targetIndex, inputFieldNames, relBuilder); + + // 4) Aggregate target values using COLLECT, filtering out NULLs. + performCollectAggregation(relBuilder, targetIndex, targetName, groupExprs); + + // 5) Restore original output column order, and cast COLLECT's MULTISET output to ARRAY. + restoreColumnOrderWithArrayCast( + relBuilder, rexBuilder, input, inputFieldNames, targetIndex, groupExprs); + + return relBuilder.peek(); + } + + /** + * Resolves the mvcombine target expression to an input field index. + * + *

mvcombine requires the target to be a direct field reference (RexInputRef). This keeps the + * command semantics predictable and avoids accidental grouping on computed expressions. + * + *

The target must also be a scalar-ish field. mvcombine outputs ARRAY<T>, so the input + * target cannot already be an ARRAY or MULTISET. + * + * @param targetField Target field expression from the AST + * @param context Planning context + * @return 0-based input field index for the target + * @throws SemanticCheckException if the target is not a direct field reference or has an array + * type + */ + private int resolveTargetIndex(Field targetField, CalcitePlanContext context) { + final RexNode targetRex = rexVisitor.analyze(targetField, context); + if (!(targetRex instanceof RexInputRef)) { + throw new SemanticCheckException( + "mvcombine target must be a direct field reference, but got: " + targetField); + } + + final int index = ((RexInputRef) targetRex).getIndex(); + + final RelDataType fieldType = + context.relBuilder.peek().getRowType().getFieldList().get(index).getType(); + + if (fieldType.getSqlTypeName() == org.apache.calcite.sql.type.SqlTypeName.ARRAY + || fieldType.getSqlTypeName() == org.apache.calcite.sql.type.SqlTypeName.MULTISET) { + throw new SemanticCheckException( + "mvcombine target cannot be an array/multivalue type, but got: " + fieldType); + } + + return index; + } + + /** + * Builds group-by expressions for mvcombine: all input fields except the target field. + * + * @param targetIndex Input index of the mvcombine target field + * @param inputFieldNames Input schema field names (for sizing/ordering) + * @param relBuilder RelBuilder positioned on the input + * @return Group-by expressions in input order excluding the target + */ + private List buildGroupExpressionsExcludingTarget( + int targetIndex, List inputFieldNames, RelBuilder relBuilder) { + final List groupExprs = new ArrayList<>(Math.max(0, inputFieldNames.size() - 1)); + for (int i = 0; i < inputFieldNames.size(); i++) { + if (i == targetIndex) { + continue; + } + groupExprs.add(relBuilder.field(i)); + } + return groupExprs; + } + + /** + * Applies mvcombine aggregation: + * + *

GROUP BY all non-target fields, and aggregate target values using {@code COLLECT}. {@code + * COLLECT} produces a MULTISET in Calcite, which we later cast to ARRAY for output. + * + *

NULL target values are excluded from the collected multivalue list by applying an aggregate + * filter. This matches typical "combine values" semantics and avoids polluting the result with + * NULL elements. + * + * @param relBuilder RelBuilder positioned on the input + * @param targetIndex Target field input index + * @param targetName Target field output name (preserved) + * @param groupExprs Group-by expressions (all fields except target) + */ + private void performCollectAggregation( + RelBuilder relBuilder, int targetIndex, String targetName, List groupExprs) { + + final RexNode targetRef = relBuilder.field(targetIndex); + final RexNode notNullTarget = relBuilder.isNotNull(targetRef); + + final RelBuilder.AggCall aggCall = + relBuilder + .aggregateCall(SqlLibraryOperators.ARRAY_AGG, targetRef) + .filter(notNullTarget) + .as(targetName); + + relBuilder.aggregate(relBuilder.groupKey(groupExprs), aggCall); + } + + /** + * Restores the original output column order after the aggregate step and converts the collected + * target from MULTISET to ARRAY<T>. + * + *

After aggregation, the schema is: + * + *

+   *   [groupField0, groupField1, ..., groupFieldN, targetAggMultiset]
+   * 
+ * + *

This method projects fields back to the original input order, replacing the original target + * slot with {@code CAST(targetAggMultiset AS ARRAY<T>)}. + * + * @param relBuilder RelBuilder positioned on the post-aggregate node + * @param rexBuilder RexBuilder for explicit casts + * @param input Original input RelNode (used to derive the target element type) + * @param inputFieldNames Original input field names (also output field names) + * @param targetIndex Target field index in the original input + * @param groupExprs Group-by expressions used during aggregation + */ + private void restoreColumnOrderWithArrayCast( + RelBuilder relBuilder, + RexBuilder rexBuilder, + RelNode input, + List inputFieldNames, + int targetIndex, + List groupExprs) { + + // Post-aggregate: group fields come first, and the collected target is appended at the end. + final int collectedTargetPos = groupExprs.size(); + + final RelDataType targetElemType = input.getRowType().getFieldList().get(targetIndex).getType(); + final RelDataType targetArrayType = + relBuilder.getTypeFactory().createArrayType(targetElemType, -1); + + final List projections = new ArrayList<>(inputFieldNames.size()); + final List projectionNames = new ArrayList<>(inputFieldNames.size()); + + int groupPos = 0; + for (int i = 0; i < inputFieldNames.size(); i++) { + projectionNames.add(inputFieldNames.get(i)); + + if (i == targetIndex) { + // COLLECT returns MULTISET; normalize output to ARRAY. + final RexNode multisetRef = relBuilder.field(collectedTargetPos); + projections.add(multisetRef); + } else { + projections.add(relBuilder.field(groupPos)); + groupPos++; + } + } + + // Force projection to avoid Calcite "identity" short-circuit when only names/types change. + relBuilder.project(projections, projectionNames, /* force= */ true); + } + @Override public RelNode visitValues(Values values, CalcitePlanContext context) { if (values.getValues() == null || values.getValues().isEmpty()) { diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java index 17d99fb4fbb..52f2bdde6f4 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java @@ -242,7 +242,7 @@ public static ExprType convertSqlTypeNameToExprType(SqlTypeName sqlTypeName) { INTERVAL_MINUTE_SECOND, INTERVAL_SECOND -> INTERVAL; - case ARRAY -> ARRAY; + case ARRAY, MULTISET -> ARRAY; case MAP -> STRUCT; case GEOMETRY -> GEO_POINT; case NULL, ANY, OTHER -> UNDEFINED; diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_mvcombine.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvcombine.yaml new file mode 100644 index 00000000000..caa09b57d6a --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_mvcombine.yaml @@ -0,0 +1,9 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(state=[$0], city=[$1], age=[$2]) + LogicalAggregate(group=[{0, 1}], age=[ARRAY_AGG($2) FILTER $3]) + LogicalProject(state=[$7], city=[$5], age=[$8], $f3=[IS NOT NULL($8)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},age=ARRAY_AGG($2) FILTER $3), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}},{"city":{"terms":{"field":"city.keyword","missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"age":{"filter":{"exists":{"field":"age","boost":1.0}},"aggregations":{"age":{"top_hits":{"from":0,"size":10000,"version":false,"seq_no_primary_term":false,"explain":false,"_source":{"includes":["age"],"excludes":[]},"script_fields":{}}}}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_simple_pattern_agg_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_simple_pattern_agg_push.yaml index b2185274499..716269ef149 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_simple_pattern_agg_push.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_patterns_simple_pattern_agg_push.yaml @@ -7,4 +7,4 @@ calcite: CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableCalc(expr#0..2=[{inputs}], expr#3=[PATTERN_PARSER($t0, $t2)], expr#4=['pattern'], expr#5=[ITEM($t3, $t4)], expr#6=[SAFE_CAST($t5)], expr#7=['tokens'], expr#8=[ITEM($t3, $t7)], expr#9=[SAFE_CAST($t8)], patterns_field=[$t6], pattern_count=[$t1], tokens=[$t9], sample_logs=[$t2]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},pattern_count=COUNT($1),sample_logs=TAKE($0, $2)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"patterns_field":{"terms":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQH2nsKICAib3AiOiB7CiAgICAibmFtZSI6ICJDQVNFIiwKICAgICJraW5kIjogIkNBU0UiLAogICAgInN5bnRheCI6ICJTUEVDSUFMIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiT1IiLAogICAgICAgICJraW5kIjogIk9SIiwKICAgICAgICAic3ludGF4IjogIkJJTkFSWSIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJvcCI6IHsKICAgICAgICAgICAgIm5hbWUiOiAiSVMgTlVMTCIsCiAgICAgICAgICAgICJraW5kIjogIklTX05VTEwiLAogICAgICAgICAgICAic3ludGF4IjogIlBPU1RGSVgiCiAgICAgICAgICB9LAogICAgICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDAsCiAgICAgICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgICAgICB9CiAgICAgICAgICAgIH0KICAgICAgICAgIF0KICAgICAgICB9LAogICAgICAgIHsKICAgICAgICAgICJvcCI6IHsKICAgICAgICAgICAgIm5hbWUiOiAiPSIsCiAgICAgICAgICAgICJraW5kIjogIkVRVUFMUyIsCiAgICAgICAgICAgICJzeW50YXgiOiAiQklOQVJZIgogICAgICAgICAgfSwKICAgICAgICAgICJvcGVyYW5kcyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICJkeW5hbWljUGFyYW0iOiAxLAogICAgICAgICAgICAgICJ0eXBlIjogewogICAgICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgICAgICAgICAgfQogICAgICAgICAgICB9LAogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDIsCiAgICAgICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgICAgICB9CiAgICAgICAgICAgIH0KICAgICAgICAgIF0KICAgICAgICB9CiAgICAgIF0KICAgIH0sCiAgICB7CiAgICAgICJkeW5hbWljUGFyYW0iOiAzLAogICAgICAidHlwZSI6IHsKICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICB9CiAgICB9LAogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiUkVHRVhQX1JFUExBQ0UiLAogICAgICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICAgICAic3ludGF4IjogIkZVTkNUSU9OIgogICAgICB9LAogICAgICAib3BlcmFuZHMiOiBbCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDQsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0sCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDUsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0sCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDYsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgXQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0,0,2,2,0,2,2],"DIGESTS":["email.keyword","email.keyword","","","email.keyword","[a-zA-Z0-9]+","<*>"]}},"missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"pattern_count":{"value_count":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQH2nsKICAib3AiOiB7CiAgICAibmFtZSI6ICJDQVNFIiwKICAgICJraW5kIjogIkNBU0UiLAogICAgInN5bnRheCI6ICJTUEVDSUFMIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiT1IiLAogICAgICAgICJraW5kIjogIk9SIiwKICAgICAgICAic3ludGF4IjogIkJJTkFSWSIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJvcCI6IHsKICAgICAgICAgICAgIm5hbWUiOiAiSVMgTlVMTCIsCiAgICAgICAgICAgICJraW5kIjogIklTX05VTEwiLAogICAgICAgICAgICAic3ludGF4IjogIlBPU1RGSVgiCiAgICAgICAgICB9LAogICAgICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDAsCiAgICAgICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgICAgICB9CiAgICAgICAgICAgIH0KICAgICAgICAgIF0KICAgICAgICB9LAogICAgICAgIHsKICAgICAgICAgICJvcCI6IHsKICAgICAgICAgICAgIm5hbWUiOiAiPSIsCiAgICAgICAgICAgICJraW5kIjogIkVRVUFMUyIsCiAgICAgICAgICAgICJzeW50YXgiOiAiQklOQVJZIgogICAgICAgICAgfSwKICAgICAgICAgICJvcGVyYW5kcyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICJkeW5hbWljUGFyYW0iOiAxLAogICAgICAgICAgICAgICJ0eXBlIjogewogICAgICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgICAgICAgICAgfQogICAgICAgICAgICB9LAogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDIsCiAgICAgICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgICAgICB9CiAgICAgICAgICAgIH0KICAgICAgICAgIF0KICAgICAgICB9CiAgICAgIF0KICAgIH0sCiAgICB7CiAgICAgICJkeW5hbWljUGFyYW0iOiAzLAogICAgICAidHlwZSI6IHsKICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICB9CiAgICB9LAogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiUkVHRVhQX1JFUExBQ0UiLAogICAgICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICAgICAic3ludGF4IjogIkZVTkNUSU9OIgogICAgICB9LAogICAgICAib3BlcmFuZHMiOiBbCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDQsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0sCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDUsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0sCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDYsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgXQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0,0,2,2,0,2,2],"DIGESTS":["email.keyword","email.keyword","","","email.keyword","[a-zA-Z0-9]+","<*>"]}}}},"sample_logs":{"top_hits":{"from":0,"size":10,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"email.keyword"}]}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1},pattern_count=COUNT($1),sample_logs=TAKE($0, $2)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"patterns_field":{"terms":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQH2nsKICAib3AiOiB7CiAgICAibmFtZSI6ICJDQVNFIiwKICAgICJraW5kIjogIkNBU0UiLAogICAgInN5bnRheCI6ICJTUEVDSUFMIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiT1IiLAogICAgICAgICJraW5kIjogIk9SIiwKICAgICAgICAic3ludGF4IjogIkJJTkFSWSIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJvcCI6IHsKICAgICAgICAgICAgIm5hbWUiOiAiSVMgTlVMTCIsCiAgICAgICAgICAgICJraW5kIjogIklTX05VTEwiLAogICAgICAgICAgICAic3ludGF4IjogIlBPU1RGSVgiCiAgICAgICAgICB9LAogICAgICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDAsCiAgICAgICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgICAgICB9CiAgICAgICAgICAgIH0KICAgICAgICAgIF0KICAgICAgICB9LAogICAgICAgIHsKICAgICAgICAgICJvcCI6IHsKICAgICAgICAgICAgIm5hbWUiOiAiPSIsCiAgICAgICAgICAgICJraW5kIjogIkVRVUFMUyIsCiAgICAgICAgICAgICJzeW50YXgiOiAiQklOQVJZIgogICAgICAgICAgfSwKICAgICAgICAgICJvcGVyYW5kcyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICJkeW5hbWljUGFyYW0iOiAxLAogICAgICAgICAgICAgICJ0eXBlIjogewogICAgICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgICAgICAgICAgfQogICAgICAgICAgICB9LAogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDIsCiAgICAgICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgICAgICB9CiAgICAgICAgICAgIH0KICAgICAgICAgIF0KICAgICAgICB9CiAgICAgIF0KICAgIH0sCiAgICB7CiAgICAgICJkeW5hbWljUGFyYW0iOiAzLAogICAgICAidHlwZSI6IHsKICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICB9CiAgICB9LAogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiUkVHRVhQX1JFUExBQ0UiLAogICAgICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICAgICAic3ludGF4IjogIkZVTkNUSU9OIgogICAgICB9LAogICAgICAib3BlcmFuZHMiOiBbCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDQsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0sCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDUsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0sCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDYsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgXQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0,0,2,2,0,2,2],"DIGESTS":["email.keyword","email.keyword","","","email.keyword","[a-zA-Z0-9]+","<*>"]}},"missing_bucket":true,"missing_order":"first","order":"asc"}}}]},"aggregations":{"pattern_count":{"value_count":{"script":{"source":"{\"langType\":\"calcite\",\"script\":\"rO0ABXQH2nsKICAib3AiOiB7CiAgICAibmFtZSI6ICJDQVNFIiwKICAgICJraW5kIjogIkNBU0UiLAogICAgInN5bnRheCI6ICJTUEVDSUFMIgogIH0sCiAgIm9wZXJhbmRzIjogWwogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiT1IiLAogICAgICAgICJraW5kIjogIk9SIiwKICAgICAgICAic3ludGF4IjogIkJJTkFSWSIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJvcCI6IHsKICAgICAgICAgICAgIm5hbWUiOiAiSVMgTlVMTCIsCiAgICAgICAgICAgICJraW5kIjogIklTX05VTEwiLAogICAgICAgICAgICAic3ludGF4IjogIlBPU1RGSVgiCiAgICAgICAgICB9LAogICAgICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDAsCiAgICAgICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgICAgICB9CiAgICAgICAgICAgIH0KICAgICAgICAgIF0KICAgICAgICB9LAogICAgICAgIHsKICAgICAgICAgICJvcCI6IHsKICAgICAgICAgICAgIm5hbWUiOiAiPSIsCiAgICAgICAgICAgICJraW5kIjogIkVRVUFMUyIsCiAgICAgICAgICAgICJzeW50YXgiOiAiQklOQVJZIgogICAgICAgICAgfSwKICAgICAgICAgICJvcGVyYW5kcyI6IFsKICAgICAgICAgICAgewogICAgICAgICAgICAgICJkeW5hbWljUGFyYW0iOiAxLAogICAgICAgICAgICAgICJ0eXBlIjogewogICAgICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgICAgICAgICAgfQogICAgICAgICAgICB9LAogICAgICAgICAgICB7CiAgICAgICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDIsCiAgICAgICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICAgICAicHJlY2lzaW9uIjogLTEKICAgICAgICAgICAgICB9CiAgICAgICAgICAgIH0KICAgICAgICAgIF0KICAgICAgICB9CiAgICAgIF0KICAgIH0sCiAgICB7CiAgICAgICJkeW5hbWljUGFyYW0iOiAzLAogICAgICAidHlwZSI6IHsKICAgICAgICAidHlwZSI6ICJWQVJDSEFSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlLAogICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICB9CiAgICB9LAogICAgewogICAgICAib3AiOiB7CiAgICAgICAgIm5hbWUiOiAiUkVHRVhQX1JFUExBQ0UiLAogICAgICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICAgICAic3ludGF4IjogIkZVTkNUSU9OIgogICAgICB9LAogICAgICAib3BlcmFuZHMiOiBbCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDQsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0sCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDUsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0sCiAgICAgICAgewogICAgICAgICAgImR5bmFtaWNQYXJhbSI6IDYsCiAgICAgICAgICAidHlwZSI6IHsKICAgICAgICAgICAgInR5cGUiOiAiVkFSQ0hBUiIsCiAgICAgICAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICAgICAgICJwcmVjaXNpb24iOiAtMQogICAgICAgICAgfQogICAgICAgIH0KICAgICAgXQogICAgfQogIF0KfQ==\"}","lang":"opensearch_compounded_script","params":{"utcTimestamp": 0,"SOURCES":[0,0,2,2,0,2,2],"DIGESTS":["email.keyword","email.keyword","","","email.keyword","[a-zA-Z0-9]+","<*>"]}}}},"sample_logs":{"top_hits":{"from":0,"size":10,"version":false,"seq_no_primary_term":false,"explain":false,"_source":false,"fields":[{"field":"email.keyword"}]}}}}}}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_take.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_take.yaml index e395f0e7485..a88ca72e99f 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_take.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_take.yaml @@ -6,4 +6,4 @@ calcite: CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | EnumerableLimit(fetch=[10000]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},take=TAKE($0, $1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"take":{"top_hits":{"from":0,"size":2,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"firstname.keyword"}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={},take=TAKE($0, $1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"take":{"top_hits":{"from":0,"size":2,"version":false,"seq_no_primary_term":false,"explain":false,"_source":false,"fields":[{"field":"firstname.keyword"}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvcombine.yaml b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvcombine.yaml new file mode 100644 index 00000000000..35f2d79e7c8 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_mvcombine.yaml @@ -0,0 +1,12 @@ +calcite: + logical: | + LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(state=[$0], city=[$1], age=[$2]) + LogicalAggregate(group=[{0, 1}], age=[ARRAY_AGG($2) FILTER $3]) + LogicalProject(state=[$7], city=[$5], age=[$8], $f3=[IS NOT NULL($8)]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableAggregate(group=[{0, 1}], age=[ARRAY_AGG($2) FILTER $3]) + EnumerableCalc(expr#0..16=[{inputs}], expr#17=[IS NOT NULL($t8)], state=[$t7], city=[$t5], age=[$t8], $f3=[$t17]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java index 68350c5a0fd..55d2cfc5bad 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java @@ -55,6 +55,15 @@ public interface OpenSearchClient { */ Map getIndexMaxResultWindows(String... indexExpression); + /** + * Update index.xxx settings to the indices the index expression given. + * + * @param settings index settings (must start with "index.") + * @param indexExpression index expression + * @return true if acknowledged + */ + boolean updateIndexSettings(Map settings, String... indexExpression); + /** * Perform search query in the search request. * diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index dab4b1e8ff1..9280fc8c462 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -25,6 +25,7 @@ import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.search.*; +import org.opensearch.action.support.clustermanager.AcknowledgedResponse; import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.common.action.ActionFuture; import org.opensearch.common.settings.Settings; @@ -136,6 +137,21 @@ public Map getIndexMaxResultWindows(String... indexExpression) } } + @Override + public boolean updateIndexSettings(Map settings, String... indexExpression) { + AcknowledgedResponse response = + client + .admin() + .indices() + .prepareUpdateSettings(indexExpression) + .setSettings( + settings.entrySet().stream() + .filter(e -> e.getKey().startsWith("index.")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .get(); + return response.isAcknowledged(); + } + /** TODO: Scroll doesn't work for aggregation. Support aggregation later. */ @Override public OpenSearchResponse search(OpenSearchRequest request) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java index 427eb7d6b03..8da2bc5fe4b 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java @@ -20,7 +20,9 @@ import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.search.*; +import org.opensearch.action.support.clustermanager.AcknowledgedResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; @@ -115,6 +117,22 @@ public Map getIndexMaxResultWindows(String... indexExpression) } } + @Override + public boolean updateIndexSettings(Map settings, String... indexExpression) { + UpdateSettingsRequest request = new UpdateSettingsRequest(indexExpression); + request.settings( + settings.entrySet().stream() + .filter(e -> e.getKey().startsWith("index.")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + try { + AcknowledgedResponse response = client.indices().putSettings(request, RequestOptions.DEFAULT); + return response.isAcknowledged(); + } catch (IOException e) { + throw new IllegalStateException( + String.format("Failed to update index settings %s for %s", settings, indexExpression, e)); + } + } + @Override public OpenSearchResponse search(OpenSearchRequest request) { return request.search( diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java index 247f40b3733..9b6311f9570 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java @@ -109,6 +109,8 @@ public class AggregateAnalyzer { /** metadata field used when there is no argument. Only apply to COUNT. */ private static final String METADATA_FIELD = "_index"; + private static final int MAX_TOP_HITS_RESULT_WINDOW = 10000; + /** Internal exception. */ @SuppressWarnings("serial") public static final class AggregateAnalyzerException extends RuntimeException { @@ -568,6 +570,7 @@ yield switch (functionName) { case TAKE -> Pair.of( AggregationBuilders.topHits(aggName) + .fetchSource(false) .fetchField( helper .inferNamedField(args.getFirst().getKey()) @@ -625,30 +628,15 @@ yield switch (functionName) { String.format("Unsupported push-down aggregator %s", aggCall.getAggregation())); } Integer dedupNumber = literal.getValueAs(Integer.class); - // Disable fetchSource since TopHitsParser only parses fetchField currently. - TopHitsAggregationBuilder topHitsAggregationBuilder = - AggregationBuilders.topHits(aggName).from(0).size(dedupNumber); - List sources = new ArrayList<>(); - List scripts = new ArrayList<>(); - args.forEach( - rex -> { - if (rex.getKey() instanceof RexInputRef) { - sources.add(helper.inferNamedField(rex.getKey()).getReference()); - } else if (rex.getKey() instanceof RexCall || rex.getKey() instanceof RexLiteral) { - scripts.add( - new SearchSourceBuilder.ScriptField( - rex.getValue(), helper.inferScript(rex.getKey()).getScript(), false)); - } else { - throw new AggregateAnalyzer.AggregateAnalyzerException( - String.format( - "Unsupported push-down aggregator %s due to rex type is %s", - aggCall.getAggregation(), rex.getKey().getKind())); - } - }); - topHitsAggregationBuilder.fetchSource( - sources.stream().distinct().toArray(String[]::new), new String[0]); - topHitsAggregationBuilder.scriptFields(scripts); - yield Pair.of(topHitsAggregationBuilder, new TopHitsParser(aggName, false, false)); + TopHitsAggregationBuilder topHitsBuilder = + getTopHitsAggregationBuilder(aggCall, args, aggName, helper, dedupNumber); + yield Pair.of(topHitsBuilder, new TopHitsParser(aggName, false, false)); + } + case COLLECT, ARRAY_AGG -> { + TopHitsAggregationBuilder topHitsBuilder = + getTopHitsAggregationBuilder( + aggCall, args, aggName, helper, MAX_TOP_HITS_RESULT_WINDOW); + yield Pair.of(topHitsBuilder, new TopHitsParser(aggName, false, true)); } default -> throw new AggregateAnalyzer.AggregateAnalyzerException( @@ -656,6 +644,38 @@ yield switch (functionName) { }; } + private static TopHitsAggregationBuilder getTopHitsAggregationBuilder( + AggregateCall aggCall, + List> args, + String aggName, + AggregateBuilderHelper helper, + Integer topHitsSize) { + // Disable fetchSource since TopHitsParser only parses fetchField currently. + TopHitsAggregationBuilder topHitsAggregationBuilder = + AggregationBuilders.topHits(aggName).from(0).size(topHitsSize); + List sources = new ArrayList<>(); + List scripts = new ArrayList<>(); + args.forEach( + rex -> { + if (rex.getKey() instanceof RexInputRef) { + sources.add(helper.inferNamedField(rex.getKey()).getReference()); + } else if (rex.getKey() instanceof RexCall || rex.getKey() instanceof RexLiteral) { + scripts.add( + new SearchSourceBuilder.ScriptField( + rex.getValue(), helper.inferScript(rex.getKey()).getScript(), false)); + } else { + throw new AggregateAnalyzerException( + String.format( + "Unsupported push-down aggregator %s due to rex type is %s", + aggCall.getAggregation(), rex.getKey().getKind())); + } + }); + topHitsAggregationBuilder.fetchSource( + sources.stream().distinct().toArray(String[]::new), new String[0]); + topHitsAggregationBuilder.scriptFields(scripts); + return topHitsAggregationBuilder; + } + private static boolean supportsMaxMinAggregation(ExprType fieldType) { ExprType coreType = (fieldType instanceof OpenSearchDataType) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/TopHitsParser.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/TopHitsParser.java index 0e178220600..44b345073f0 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/TopHitsParser.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/response/agg/TopHitsParser.java @@ -13,6 +13,7 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.Stream; import lombok.EqualsAndHashCode; import lombok.Getter; import org.opensearch.common.document.DocumentField; @@ -54,19 +55,29 @@ public List> parse(Aggregation agg) { return Collections.singletonList( new HashMap<>(Collections.singletonMap(agg.getName(), value))); } else if (returnMergeValue) { - if (hits[0].getFields() == null || hits[0].getFields().isEmpty()) { + if ((hits[0].getFields() == null || hits[0].getFields().isEmpty()) + && (hits[0].getSourceAsMap() == null || hits[0].getSourceAsMap().isEmpty())) { return Collections.singletonList( new HashMap<>(Collections.singletonMap(agg.getName(), Collections.emptyList()))); } - // Return all values as a list from fields (fetchField) + // Return all values as a list from fields (fetchField) and source return Collections.singletonList( Collections.singletonMap( agg.getName(), - Arrays.stream(hits) - .flatMap(h -> h.getFields().values().stream()) - .map(DocumentField::getValue) - .filter(Objects::nonNull) // Filter out null values + Stream.concat( + Arrays.stream(hits) + .map(SearchHit::getFields) + .filter(Objects::nonNull) + .flatMap(map -> map.values().stream()) + .map(DocumentField::getValue) + .filter(Objects::nonNull), + Arrays.stream(hits) + .map(SearchHit::getSourceAsMap) + .filter(Objects::nonNull) + .flatMap(map -> map.values().stream()) + .filter(Objects::nonNull)) .collect(Collectors.toList()))); + } else { // "hits": { // "hits": [ diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilder.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilder.java index 0a189584af3..390f2e9423c 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilder.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/script/aggregation/dsl/MetricAggregationBuilder.java @@ -216,6 +216,7 @@ private Pair make( MetricParser parser) { String fieldName = ((ReferenceExpression) expression).getAttr(); builder.fetchField(fieldName); + builder.fetchSource(false); builder.size(size.valueOf().integerValue()); builder.from(0); if (condition != null) { diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvCombineTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvCombineTest.java new file mode 100644 index 00000000000..6e6460a2365 --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLMvCombineTest.java @@ -0,0 +1,273 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import static org.junit.Assert.assertThrows; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import org.apache.calcite.DataContext; +import org.apache.calcite.config.CalciteConnectionConfig; +import org.apache.calcite.linq4j.Enumerable; +import org.apache.calcite.linq4j.Linq4j; +import org.apache.calcite.plan.RelTraitDef; +import org.apache.calcite.rel.RelCollations; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.calcite.schema.ScannableTable; +import org.apache.calcite.schema.Schema; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Statistic; +import org.apache.calcite.schema.Statistics; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.test.CalciteAssert; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.Programs; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.junit.Test; + +public class CalcitePPLMvCombineTest extends CalcitePPLAbstractTest { + + public CalcitePPLMvCombineTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Override + protected Frameworks.ConfigBuilder config(CalciteAssert.SchemaSpec... schemaSpecs) { + final SchemaPlus rootSchema = Frameworks.createRootSchema(true); + final SchemaPlus schema = CalciteAssert.addSchema(rootSchema, schemaSpecs); + + ImmutableList rows = + ImmutableList.of( + // existing "basic" + new Object[] {"basic", "A", 10}, + new Object[] {"basic", "A", 20}, + new Object[] {"basic", "B", 60}, + new Object[] {"basic", "A", 30}, + + // NULL target values case (Splunk-style: nulls do NOT contribute to mv) + new Object[] {"nulls", "A", null}, + new Object[] {"nulls", "A", 10}, + new Object[] {"nulls", "B", null}, + + // single-row case + new Object[] {"single", "Z", 5}); + + schema.add("MVCOMBINE_DATA", new MvCombineDataTable(rows)); + + return Frameworks.newConfigBuilder() + .parserConfig(SqlParser.Config.DEFAULT) + .defaultSchema(schema) + .traitDefs((List) null) + .programs(Programs.heuristicJoinOrder(Programs.RULE_SET, true, 2)); + } + + @Test + public void testMvCombineBasic() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"basic\" " + + "| fields case, ip, packets " + + "| mvcombine packets " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[ARRAY_AGG($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'basic')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `case`, `ip`, ARRAY_AGG(`packets`) FILTER (WHERE `packets` IS NOT NULL) `packets`\n" + + "FROM `scott`.`MVCOMBINE_DATA`\n" + + "WHERE `case` = 'basic'\n" + + "GROUP BY `case`, `ip`\n" + + "ORDER BY `ip`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvCombineWithNullTargetValues() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"nulls\" " + + "| fields case, ip, packets " + + "| mvcombine packets " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[ARRAY_AGG($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'nulls')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `case`, `ip`, ARRAY_AGG(`packets`) FILTER (WHERE `packets` IS NOT NULL) `packets`\n" + + "FROM `scott`.`MVCOMBINE_DATA`\n" + + "WHERE `case` = 'nulls'\n" + + "GROUP BY `case`, `ip`\n" + + "ORDER BY `ip`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvCombineWithDelimOption_SplunkSyntaxOrder() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"basic\" " + + "| fields case, ip, packets " + + "| mvcombine packets delim='|' " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[ARRAY_AGG($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'basic')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + + String expectedSparkSql = + "SELECT `case`, `ip`, ARRAY_AGG(`packets`) FILTER (WHERE `packets` IS NOT NULL) `packets`\n" + + "FROM `scott`.`MVCOMBINE_DATA`\n" + + "WHERE `case` = 'basic'\n" + + "GROUP BY `case`, `ip`\n" + + "ORDER BY `ip`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testMvCombineNonExistentField() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"basic\" " + + "| fields case, ip, packets " + + "| mvcombine does_not_exist"; + + Exception ex = assertThrows(Exception.class, () -> getRelNode(ppl)); + + String msg = String.valueOf(ex.getMessage()); + org.junit.Assert.assertTrue( + "Expected error message to mention missing field. Actual: " + msg, + msg.toLowerCase().contains("does_not_exist") || msg.toLowerCase().contains("field")); + } + + @Test + public void testMvCombineSingleRow() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"single\" " + + "| fields case, ip, packets " + + "| mvcombine packets " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[ARRAY_AGG($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'single')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + } + + @Test + public void testMvCombineEmptyResult() { + String ppl = + "source=MVCOMBINE_DATA " + + "| where case = \"no_such_case\" " + + "| fields case, ip, packets " + + "| mvcombine packets " + + "| sort ip"; + + RelNode root = getRelNode(ppl); + + String expectedLogical = + "LogicalSort(sort0=[$1], dir0=[ASC-nulls-first])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2])\n" + + " LogicalAggregate(group=[{0, 1}], packets=[ARRAY_AGG($2) FILTER $3])\n" + + " LogicalProject(case=[$0], ip=[$1], packets=[$2], $f3=[IS NOT NULL($2)])\n" + + " LogicalFilter(condition=[=($0, 'no_such_case')])\n" + + " LogicalTableScan(table=[[scott, MVCOMBINE_DATA]])\n"; + verifyLogical(root, expectedLogical); + } + + // ======================================================================== + // Custom ScannableTable for deterministic mvcombine planning tests + // ======================================================================== + + @RequiredArgsConstructor + static class MvCombineDataTable implements ScannableTable { + private final ImmutableList rows; + + protected final RelProtoDataType protoRowType = + factory -> + factory + .builder() + .add("case", SqlTypeName.VARCHAR) + .nullable(true) + .add("ip", SqlTypeName.VARCHAR) + .nullable(true) + .add("packets", SqlTypeName.INTEGER) + .nullable(true) + .build(); + + @Override + public Enumerable<@Nullable Object[]> scan(DataContext root) { + return Linq4j.asEnumerable(rows); + } + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return protoRowType.apply(typeFactory); + } + + @Override + public Statistic getStatistic() { + return Statistics.of(0d, ImmutableList.of(), RelCollations.createSingleton(0)); + } + + @Override + public Schema.TableType getJdbcTableType() { + return Schema.TableType.TABLE; + } + + @Override + public boolean isRolledUp(String column) { + return false; + } + + @Override + public boolean rolledUpColumnValidInsideAgg( + String column, + SqlCall call, + @Nullable SqlNode parent, + @Nullable CalciteConnectionConfig config) { + return false; + } + } +}