Skip to content

Commit 019cd26

Browse files
authored
LAST/FIRST/TAKE aggregation should support TEXT type and Scripts (opensearch-project#5091)
* Use fetchSource API instead of fetchFields for LAST/FIRST/TAKE aggregation Signed-off-by: Lantao Jin <ltjin@amazon.com> * Fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * Merge remote-tracking branch 'upstream/main' into issues/5086 * LAST/FIRST/TAKE aggregation should support TEXT type and Scripts Signed-off-by: Lantao Jin <ltjin@amazon.com> * Add more tests Signed-off-by: Lantao Jin <ltjin@amazon.com> * Merge remote-tracking branch 'upstream/main' into issues/5086 * Fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * Fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix explain text Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 7313be5 commit 019cd26

55 files changed

Lines changed: 539 additions & 197 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteAliasFieldAggregationIT.java

Lines changed: 52 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8+
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS;
89
import static org.opensearch.sql.util.MatcherUtils.rows;
910
import static org.opensearch.sql.util.MatcherUtils.schema;
1011
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
@@ -25,13 +26,14 @@
2526
*/
2627
public class CalciteAliasFieldAggregationIT extends PPLIntegTestCase {
2728

28-
private static final String TEST_INDEX_ALIAS = "test_alias_bug";
29+
private static final String TEST_ALIAS_BUG = "test_alias_bug";
2930

3031
@Override
3132
public void init() throws Exception {
3233
super.init();
3334
enableCalcite();
3435
createTestIndexWithAliasFields();
36+
loadIndex(Index.DATA_TYPE_ALIAS);
3537
}
3638

3739
/**
@@ -41,14 +43,14 @@ public void init() throws Exception {
4143
private void createTestIndexWithAliasFields() throws IOException {
4244
// Delete the index if it exists (for test isolation)
4345
try {
44-
Request deleteIndex = new Request("DELETE", "/" + TEST_INDEX_ALIAS);
46+
Request deleteIndex = new Request("DELETE", "/" + TEST_ALIAS_BUG);
4547
client().performRequest(deleteIndex);
4648
} catch (ResponseException e) {
4749
// Index doesn't exist, which is fine
4850
}
4951

5052
// Create index with alias fields
51-
Request createIndex = new Request("PUT", "/" + TEST_INDEX_ALIAS);
53+
Request createIndex = new Request("PUT", "/" + TEST_ALIAS_BUG);
5254
createIndex.setJsonEntity(
5355
"{\n"
5456
+ " \"mappings\": {\n"
@@ -63,7 +65,7 @@ private void createTestIndexWithAliasFields() throws IOException {
6365
client().performRequest(createIndex);
6466

6567
// Insert test documents
66-
Request bulkRequest = new Request("POST", "/" + TEST_INDEX_ALIAS + "/_bulk?refresh=true");
68+
Request bulkRequest = new Request("POST", "/" + TEST_ALIAS_BUG + "/_bulk?refresh=true");
6769
bulkRequest.setJsonEntity(
6870
"{\"index\":{}}\n"
6971
+ "{\"created_at\": \"2024-01-01T10:00:00Z\", \"value\": 100}\n"
@@ -77,15 +79,15 @@ private void createTestIndexWithAliasFields() throws IOException {
7779
@Test
7880
public void testMinWithDateAliasField() throws IOException {
7981
JSONObject actual =
80-
executeQuery(String.format("source=%s | stats MIN(@timestamp)", TEST_INDEX_ALIAS));
82+
executeQuery(String.format("source=%s | stats MIN(@timestamp)", TEST_ALIAS_BUG));
8183
verifySchema(actual, schema("MIN(@timestamp)", "timestamp"));
8284
verifyDataRows(actual, rows("2024-01-01 10:00:00"));
8385
}
8486

8587
@Test
8688
public void testMaxWithDateAliasField() throws IOException {
8789
JSONObject actual =
88-
executeQuery(String.format("source=%s | stats MAX(@timestamp)", TEST_INDEX_ALIAS));
90+
executeQuery(String.format("source=%s | stats MAX(@timestamp)", TEST_ALIAS_BUG));
8991
verifySchema(actual, schema("MAX(@timestamp)", "timestamp"));
9092
verifyDataRows(actual, rows("2024-01-03 10:00:00"));
9193
}
@@ -94,8 +96,7 @@ public void testMaxWithDateAliasField() throws IOException {
9496
public void testMinMaxWithNumericAliasField() throws IOException {
9597
JSONObject actual =
9698
executeQuery(
97-
String.format(
98-
"source=%s | stats MIN(value_alias), MAX(value_alias)", TEST_INDEX_ALIAS));
99+
String.format("source=%s | stats MIN(value_alias), MAX(value_alias)", TEST_ALIAS_BUG));
99100
verifySchemaInOrder(
100101
actual, schema("MIN(value_alias)", "int"), schema("MAX(value_alias)", "int"));
101102
verifyDataRows(actual, rows(100, 300));
@@ -105,8 +106,7 @@ public void testMinMaxWithNumericAliasField() throws IOException {
105106
public void testFirstWithAliasField() throws IOException {
106107
JSONObject actual =
107108
executeQuery(
108-
String.format(
109-
"source=%s | sort @timestamp | stats FIRST(@timestamp)", TEST_INDEX_ALIAS));
109+
String.format("source=%s | sort @timestamp | stats FIRST(@timestamp)", TEST_ALIAS_BUG));
110110
verifySchema(actual, schema("FIRST(@timestamp)", "timestamp"));
111111
verifyDataRows(actual, rows("2024-01-01 10:00:00"));
112112
}
@@ -115,8 +115,7 @@ public void testFirstWithAliasField() throws IOException {
115115
public void testLastWithAliasField() throws IOException {
116116
JSONObject actual =
117117
executeQuery(
118-
String.format(
119-
"source=%s | sort @timestamp | stats LAST(@timestamp)", TEST_INDEX_ALIAS));
118+
String.format("source=%s | sort @timestamp | stats LAST(@timestamp)", TEST_ALIAS_BUG));
120119
verifySchema(actual, schema("LAST(@timestamp)", "timestamp"));
121120
verifyDataRows(actual, rows("2024-01-03 10:00:00"));
122121
}
@@ -126,7 +125,7 @@ public void testTakeWithAliasField() throws IOException {
126125
JSONObject actual =
127126
executeQuery(
128127
String.format(
129-
"source=%s | sort @timestamp | stats TAKE(@timestamp, 2)", TEST_INDEX_ALIAS));
128+
"source=%s | sort @timestamp | stats TAKE(@timestamp, 2)", TEST_ALIAS_BUG));
130129
verifySchema(actual, schema("TAKE(@timestamp, 2)", "array"));
131130
verifyDataRows(actual, rows(List.of("2024-01-01T10:00:00.000Z", "2024-01-02T10:00:00.000Z")));
132131
}
@@ -135,7 +134,7 @@ public void testTakeWithAliasField() throws IOException {
135134
public void testAggregationsWithOriginalFieldsStillWork() throws IOException {
136135
JSONObject actual =
137136
executeQuery(
138-
String.format("source=%s | stats MIN(created_at), MAX(value)", TEST_INDEX_ALIAS));
137+
String.format("source=%s | stats MIN(created_at), MAX(value)", TEST_ALIAS_BUG));
139138
verifySchemaInOrder(
140139
actual, schema("MIN(created_at)", "timestamp"), schema("MAX(value)", "int"));
141140
verifyDataRows(actual, rows("2024-01-01 10:00:00", 300));
@@ -147,12 +146,50 @@ public void testUnaffectedAggregationsWithAliasFields() throws IOException {
147146
executeQuery(
148147
String.format(
149148
"source=%s | stats SUM(value_alias), AVG(value_alias), COUNT(value_alias)",
150-
TEST_INDEX_ALIAS));
149+
TEST_ALIAS_BUG));
151150
verifySchemaInOrder(
152151
actual,
153152
schema("SUM(value_alias)", "bigint"),
154153
schema("AVG(value_alias)", "double"),
155154
schema("COUNT(value_alias)", "bigint"));
156155
verifyDataRows(actual, rows(600, 200.0, 3));
157156
}
157+
158+
@Test
159+
public void testAliasTypeWithLastFirstTakeLatestEarliestAggregation() throws IOException {
160+
JSONObject actual =
161+
executeQuery(
162+
String.format(
163+
"source=%s | stats take(original_text, 2), last(original_text),"
164+
+ " first(original_text), take(alias_text, 2), last(alias_text),"
165+
+ " first(alias_text), take(original_col, 2), last(original_col),"
166+
+ " first(original_col), take(alias_col, 2), last(alias_col), first(alias_col),"
167+
+ " latest(original_col), earliest(original_col), latest(alias_col),"
168+
+ " earliest(alias_col),latest(original_text), earliest(original_text),"
169+
+ " latest(alias_text), earliest(alias_text)",
170+
TEST_INDEX_ALIAS));
171+
verifyDataRows(
172+
actual,
173+
rows(
174+
List.of("a b c", "d e f"),
175+
"x y z",
176+
"a b c",
177+
List.of("a b c", "d e f"),
178+
"x y z",
179+
"a b c",
180+
List.of(1, 2),
181+
3,
182+
1,
183+
List.of(1, 2),
184+
3,
185+
1,
186+
3,
187+
1,
188+
3,
189+
1,
190+
"x y z",
191+
"a b c",
192+
"x y z",
193+
"a b c"));
194+
}
158195
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,31 @@ public void testExplainOnFirstLast() throws IOException {
840840
TEST_INDEX_BANK)));
841841
}
842842

843+
@Test
844+
public void testExplainOnTextFirstLast() throws IOException {
845+
String expected = loadExpectedPlan("explain_first_last_text.yaml");
846+
assertYamlEqualsIgnoreId(
847+
expected,
848+
explainQueryYaml(
849+
String.format(
850+
"source=%s | stats first(employer) as first_employer, last(employer) as"
851+
+ " last_employer by gender",
852+
TEST_INDEX_BANK)));
853+
}
854+
855+
@Test
856+
public void testExplainTakeAggregationWithNegative() throws IOException {
857+
enabledOnlyWhenPushdownIsEnabled();
858+
// without agg pushdown
859+
String expected = loadExpectedPlan("explain_take_negative.yaml");
860+
assertYamlEqualsIgnoreId(
861+
expected,
862+
explainQueryYaml(
863+
String.format(
864+
"source=%s | stats take(employer, 0), take(balance, -2) by gender",
865+
TEST_INDEX_BANK)));
866+
}
867+
843868
// Only for Calcite
844869
@Test
845870
public void testExplainOnEventstatsEarliestLatest() throws IOException {

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAggregationIT.java

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,75 @@ public void testFirstLastDifferentFields() throws IOException {
311311
verifyDataRows(actual, rows(1L, 48086L, 32L));
312312
}
313313

314+
@Test
315+
public void testFirstAggregationOnTextField() throws IOException {
316+
JSONObject actual =
317+
executeQuery(
318+
String.format("source=%s | stats first(employer), first(email)", TEST_INDEX_BANK));
319+
verifySchema(actual, schema("first(employer)", "string"), schema("first(email)", "string"));
320+
verifyDataRows(actual, rows("Pyrami", "amberduke@pyrami.com"));
321+
}
322+
323+
@Test
324+
public void testLastAggregationOnTextField() throws IOException {
325+
JSONObject actual =
326+
executeQuery(
327+
String.format("source=%s | stats last(employer), first(email)", TEST_INDEX_BANK));
328+
verifySchema(actual, schema("last(employer)", "string"), schema("first(email)", "string"));
329+
verifyDataRows(actual, rows("Quailcom", "amberduke@pyrami.com"));
330+
}
331+
332+
@Test
333+
public void testFirstLastByGroupOnTextField() throws IOException {
334+
JSONObject actual =
335+
executeQuery(
336+
String.format(
337+
"source=%s | stats first(employer), last(email) by gender", TEST_INDEX_BANK));
338+
verifySchema(
339+
actual,
340+
schema("first(employer)", "string"),
341+
schema("last(email)", "string"),
342+
schema("gender", "string"));
343+
verifyDataRows(
344+
actual,
345+
rows("Quility", "dillardmcpherson@quailcom.com", "F"),
346+
rows("Pyrami", "elinorratliff@scentric.com", "M"));
347+
}
348+
349+
@Test
350+
public void testFirstLastWithOtherAggregationsOnTextField() throws IOException {
351+
JSONObject actual =
352+
executeQuery(
353+
String.format(
354+
"source=%s | stats first(employer), last(email), count(), avg(age) by gender",
355+
TEST_INDEX_BANK));
356+
verifySchema(
357+
actual,
358+
schema("first(employer)", "string"),
359+
schema("last(email)", "string"),
360+
schema("count()", "bigint"),
361+
schema("avg(age)", "double"),
362+
schema("gender", "string"));
363+
verifyDataRows(
364+
actual,
365+
rows("Quility", "dillardmcpherson@quailcom.com", 3, 33.666666666666664, "F"),
366+
rows("Pyrami", "elinorratliff@scentric.com", 4, 34.25, "M"));
367+
}
368+
369+
@Test
370+
public void testFirstLastMixedFields() throws IOException {
371+
JSONObject actual =
372+
executeQuery(
373+
String.format(
374+
"source=%s | stats first(employer), last(balance), first(age)", TEST_INDEX_BANK));
375+
verifySchema(
376+
actual,
377+
schema("first(employer)", "string"),
378+
schema("last(balance)", "bigint"),
379+
schema("first(age)", "int"));
380+
verifyDataRows(actual, rows("Pyrami", 48086L, 32));
381+
}
382+
314383
@Test
315384
public void testFirstLastWithBirthdate() throws IOException {
316385
JSONObject actual =
@@ -1533,4 +1602,62 @@ public void testMixedTypesNestedFieldAggregations() throws IOException {
15331602
schema("first_lang", "string"));
15341603
verifyDataRows(actual, rows(10, 14, false, true, "java"));
15351604
}
1605+
1606+
@Test
1607+
public void testTextTypeWithLastFirstTakeAggregation() throws IOException {
1608+
JSONObject actual =
1609+
executeQuery(
1610+
String.format(
1611+
"source=%s | stats take(address, 2), last(address), first(address), "
1612+
+ "take(state, 2), last(state), first(state), "
1613+
+ "take(balance, 2), last(balance), first(balance)",
1614+
TEST_INDEX_BANK));
1615+
verifyDataRows(
1616+
actual,
1617+
rows(
1618+
List.of("880 Holmes Lane", "671 Bristol Street"),
1619+
"702 Quentin Street",
1620+
"880 Holmes Lane",
1621+
List.of("IL", "TN"),
1622+
"IN",
1623+
"IL",
1624+
List.of(39225, 5686),
1625+
48086,
1626+
39225));
1627+
}
1628+
1629+
@Test
1630+
public void testScriptWithLastFirstTakeAggregation() throws IOException {
1631+
JSONObject actual =
1632+
executeQuery(
1633+
String.format(
1634+
"source=%s | eval new_address = upper(address), new_state = lower(state),"
1635+
+ " new_balance = balance * 10 | stats take(new_address, 2), last(new_address),"
1636+
+ " first(new_address), take(new_state, 2), last(new_state), first(new_state),"
1637+
+ " take(new_balance, 2), last(new_balance), first(new_balance)",
1638+
TEST_INDEX_BANK));
1639+
verifyDataRows(
1640+
actual,
1641+
rows(
1642+
List.of("880 HOLMES LANE", "671 BRISTOL STREET"),
1643+
"702 QUENTIN STREET",
1644+
"880 HOLMES LANE",
1645+
List.of("il", "tn"),
1646+
"in",
1647+
"il",
1648+
List.of(392250, 56860),
1649+
480860,
1650+
392250));
1651+
}
1652+
1653+
@Test
1654+
public void testTakeAggregationWithNegative() throws IOException {
1655+
JSONObject actual =
1656+
executeQuery(
1657+
String.format(
1658+
"source=%s | eval new_balance = balance * 10 | stats take(new_balance, 0),"
1659+
+ " take(new_balance, -1), take(balance, 0), take(balance, -1)",
1660+
TEST_INDEX_BANK));
1661+
verifyDataRows(actual, rows(List.of(), List.of(), List.of(), List.of()));
1662+
}
15361663
}
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{"index":{"_id":"1"}}
2-
{"original_col" : 1}
2+
{"original_col" : 1,"original_text" : "a b c","original_date":"2026-01-01T10:00:00Z"}
33
{"index":{"_id":"2"}}
4-
{"original_col" : 2}
4+
{"original_col" : 2,"original_text" : "d e f","original_date":"2026-01-02T10:00:00Z"}
55
{"index":{"_id":"3"}}
6-
{"original_col" : 3}
6+
{"original_col" : 3,"original_text" : "x y z","original_date":"2026-01-03T10:00:00Z"}

integ-test/src/test/resources/expectedOutput/calcite/big5/dedup_metrics_size_field.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@ calcite:
1111
physical: |
1212
EnumerableCalc(expr#0..15=[{inputs}], proj#0..12=[{exprs}], aws=[$t14], event=[$t15])
1313
CalciteEnumerableTopK(sort0=[$7], dir0=[DESC-nulls-last], fetch=[10000])
14-
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"metrics.size":{"terms":{"field":"metrics.size","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"_source":{"includes":["metrics.size","agent","process","log","message","tags","cloud","input","@timestamp","ecs","data_stream","meta","host","metrics","aws","event"],"excludes":[]},"script_fields":{}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
14+
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=LogicalProject#,group={0},agg#0=LITERAL_AGG(1))], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"metrics.size":{"terms":{"field":"metrics.size","missing_bucket":false,"order":"asc"}}}]},"aggregations":{"$f1":{"top_hits":{"from":0,"size":1,"version":false,"seq_no_primary_term":false,"explain":false,"fields":[{"field":"metrics.size"},{"field":"message"},{"field":"tags"},{"field":"@timestamp"}]}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)