Skip to content

Commit 6470b8e

Browse files
committed
Support fetch_size API for PPL
Signed-off-by: Kai Huang <ahkcs@amazon.com> # Conflicts: # integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java
1 parent 8073b4e commit 6470b8e

19 files changed

Lines changed: 576 additions & 24 deletions

File tree

core/src/main/java/org/opensearch/sql/ast/statement/Query.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
import lombok.EqualsAndHashCode;
99
import lombok.Getter;
10-
import lombok.RequiredArgsConstructor;
1110
import lombok.Setter;
1211
import lombok.ToString;
1312
import org.opensearch.sql.ast.AbstractNodeVisitor;
@@ -19,13 +18,18 @@
1918
@Setter
2019
@ToString
2120
@EqualsAndHashCode(callSuper = false)
22-
@RequiredArgsConstructor
2321
public class Query extends Statement {
2422

2523
protected final UnresolvedPlan plan;
2624
protected final int fetchSize;
2725
private final QueryType queryType;
2826

27+
public Query(UnresolvedPlan plan, int fetchSize, QueryType queryType) {
28+
this.plan = plan;
29+
this.fetchSize = fetchSize;
30+
this.queryType = queryType;
31+
}
32+
2933
@Override
3034
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {
3135
return visitor.visitQuery(this, context);

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,9 @@ public void executeWithCalcite(
100100
QueryProfiling.activate(QueryContext.isProfileEnabled());
101101
ProfileMetric analyzeMetric = profileContext.getOrCreateMetric(MetricName.ANALYZE);
102102
long analyzeStart = System.nanoTime();
103+
SysLimit sysLimit = SysLimit.fromSettings(settings);
103104
CalcitePlanContext context =
104-
CalcitePlanContext.create(
105-
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
105+
CalcitePlanContext.create(buildFrameworkConfig(), sysLimit, queryType);
106106
RelNode relNode = analyze(plan, context);
107107
RelNode calcitePlan = convertToCalcitePlan(relNode, context);
108108
analyzeMetric.set(System.nanoTime() - analyzeStart);
@@ -236,16 +236,18 @@ public void executePlan(
236236
.getSplit()
237237
.ifPresentOrElse(
238238
split -> executionEngine.execute(plan(plan), new ExecutionContext(split), listener),
239-
() ->
240-
executionEngine.execute(
241-
plan(plan),
242-
ExecutionContext.querySizeLimit(
243-
// For pagination, querySizeLimit shouldn't take effect.
244-
// See {@link PaginationWindowIT::testQuerySizeLimitDoesNotEffectPageSize}
245-
plan instanceof LogicalPaginate
246-
? null
247-
: SysLimit.fromSettings(settings).querySizeLimit()),
248-
listener));
239+
() -> {
240+
Integer effectiveLimit;
241+
if (plan instanceof LogicalPaginate) {
242+
// For pagination, querySizeLimit shouldn't take effect.
243+
// See {@link PaginationWindowIT::testQuerySizeLimitDoesNotEffectPageSize}
244+
effectiveLimit = null;
245+
} else {
246+
effectiveLimit = SysLimit.fromSettings(settings).querySizeLimit();
247+
}
248+
executionEngine.execute(
249+
plan(plan), ExecutionContext.querySizeLimit(effectiveLimit), listener);
250+
});
249251
} catch (Exception e) {
250252
listener.onFailure(e);
251253
}

core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class QueryPlan extends AbstractPlan {
2929

3030
protected final Optional<Integer> pageSize;
3131

32-
/** Constructor. */
32+
/** Constructor without page size. */
3333
public QueryPlan(
3434
QueryId queryId,
3535
QueryType queryType,
@@ -43,7 +43,7 @@ public QueryPlan(
4343
this.pageSize = Optional.empty();
4444
}
4545

46-
/** Constructor with page size. */
46+
/** Constructor with page size (for pagination). */
4747
public QueryPlan(
4848
QueryId queryId,
4949
QueryType queryType,

docs/user/interfaces/endpoint.rst

Lines changed: 60 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,13 +208,13 @@ Explain::
208208
}
209209
}
210210

211-
Cursor
212-
======
211+
Cursor (SQL)
212+
============
213213

214214
Description
215215
-----------
216216

217-
To get paginated response for a query, user needs to provide `fetch_size` parameter as part of normal query. The value of `fetch_size` should be greater than `0`. In absence of `fetch_size` or a value of `0`, it will fallback to non-paginated response. This feature is only available over `jdbc` format for now.
217+
To get paginated response for a SQL query, user needs to provide `fetch_size` parameter as part of normal query. The value of `fetch_size` should be greater than `0`. In absence of `fetch_size` or a value of `0`, it will fallback to non-paginated response. This feature is only available over `jdbc` format for now.
218218

219219
Example
220220
-------
@@ -266,3 +266,60 @@ Result set::
266266
"size": 5,
267267
"status": 200
268268
}
269+
270+
Fetch Size (PPL)
271+
================
272+
273+
Description
274+
-----------
275+
276+
PPL also supports the ``fetch_size`` parameter, but with different semantics from SQL. In PPL, ``fetch_size`` limits the number of rows returned in a single, complete response. **PPL does not support cursor-based pagination** — no cursor is returned and there is no way to fetch additional pages. The value of ``fetch_size`` should be between ``1`` and ``10000``. In absence of ``fetch_size`` or a value of ``0``, it will use the system default behavior (no limit).
277+
278+
+--------------------+-------------------------------------+------------------------------------+
279+
| Aspect | SQL ``fetch_size`` | PPL ``fetch_size`` |
280+
+====================+=====================================+====================================+
281+
| Purpose | Cursor-based pagination | Response size limiting |
282+
+--------------------+-------------------------------------+------------------------------------+
283+
| Returns cursor? | Yes | No |
284+
+--------------------+-------------------------------------+------------------------------------+
285+
| Can fetch more? | Yes (with cursor) | No (single response) |
286+
+--------------------+-------------------------------------+------------------------------------+
287+
| Maximum value | No hard limit | 10,000 |
288+
+--------------------+-------------------------------------+------------------------------------+
289+
290+
Example
291+
-------
292+
293+
PPL query::
294+
295+
>> curl -H 'Content-Type: application/json' -X POST localhost:9200/_plugins/_ppl -d '{
296+
"fetch_size" : 5,
297+
"query" : "source = accounts | fields firstname, lastname | where age > 20"
298+
}'
299+
300+
Result set::
301+
302+
{
303+
"schema": [
304+
{
305+
"name": "firstname",
306+
"type": "text"
307+
},
308+
{
309+
"name": "lastname",
310+
"type": "text"
311+
}
312+
],
313+
"total": 5,
314+
"datarows": [
315+
["Cherry", "Carey"],
316+
["Lindsey", "Hawkins"],
317+
["Sargent", "Powers"],
318+
["Campos", "Olsen"],
319+
["Savannah", "Kirby"]
320+
],
321+
"size": 5,
322+
"status": 200
323+
}
324+
325+
Note that unlike the SQL response above, there is no ``cursor`` field in the PPL response. The response is complete and final.

docs/user/ppl/limitations/limitations.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ For the following functionalities, the query will be forwarded to the V2 query e
6262
* ML
6363
* Kmeans
6464
* `show datasources` and command
65-
* Commands with `fetch_size` parameter
65+
* SQL commands with `fetch_size` parameter (cursor-based pagination). Note: PPL's `fetch_size` (response size limiting, no cursor) is supported in Calcite Engine.
6666

6767

6868
## Malformed Field Names in Object Fields

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.sql.calcite.remote;
77

8+
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
89
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
910
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ALIAS;
1011
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
@@ -25,8 +26,12 @@
2526

2627
import java.io.IOException;
2728
import java.util.Locale;
29+
import org.junit.Assert;
2830
import org.junit.Ignore;
2931
import org.junit.Test;
32+
import org.opensearch.client.Request;
33+
import org.opensearch.client.RequestOptions;
34+
import org.opensearch.client.Response;
3035
import org.opensearch.sql.ast.statement.ExplainMode;
3136
import org.opensearch.sql.common.setting.Settings;
3237
import org.opensearch.sql.common.setting.Settings.Key;
@@ -2497,4 +2502,68 @@ public void testExplainMvCombine() throws IOException {
24972502
String expected = loadExpectedPlan("explain_mvcombine.yaml");
24982503
assertYamlEqualsIgnoreId(expected, actual);
24992504
}
2505+
2506+
// ==================== fetch_size explain tests ====================
2507+
2508+
@Test
2509+
public void testExplainFetchSizePushDown() throws IOException {
2510+
// fetch_size=5 injects Head(5, 0) on top of the plan
2511+
// Logical plan: LogicalSort(fetch=[5]) wraps the Project
2512+
String expected = loadExpectedPlan("explain_fetch_size_push.yaml");
2513+
assertYamlEqualsIgnoreId(
2514+
expected,
2515+
explainQueryWithFetchSizeYaml(
2516+
String.format("source=%s | fields age", TEST_INDEX_ACCOUNT), 5));
2517+
}
2518+
2519+
@Test
2520+
public void testExplainFetchSizeWithSmallerHead() throws IOException {
2521+
// fetch_size=10 with user's | head 3
2522+
// Two LogicalSort nodes: inner fetch=[3] from user head, outer fetch=[10] from fetch_size
2523+
// Effective limit = min(3, 10) = 3
2524+
String expected = loadExpectedPlan("explain_fetch_size_with_head_push.yaml");
2525+
assertYamlEqualsIgnoreId(
2526+
expected,
2527+
explainQueryWithFetchSizeYaml(
2528+
String.format("source=%s | head 3 | fields age", TEST_INDEX_ACCOUNT), 10));
2529+
}
2530+
2531+
@Test
2532+
public void testExplainFetchSizeSmallerThanHead() throws IOException {
2533+
// fetch_size=5 with user's | head 100
2534+
// Two LogicalSort nodes: inner fetch=[100] from user head, outer fetch=[5] from fetch_size
2535+
// Effective limit = min(100, 5) = 5
2536+
String expected = loadExpectedPlan("explain_fetch_size_smaller_than_head_push.yaml");
2537+
assertYamlEqualsIgnoreId(
2538+
expected,
2539+
explainQueryWithFetchSizeYaml(
2540+
String.format("source=%s | head 100 | fields age", TEST_INDEX_ACCOUNT), 5));
2541+
}
2542+
2543+
/**
2544+
* Send an explain request with fetch_size in the JSON body and return YAML output.
2545+
*
2546+
* @param query the PPL query string
2547+
* @param fetchSize the fetch_size parameter value
2548+
* @return the explain output as YAML string
2549+
*/
2550+
private String explainQueryWithFetchSizeYaml(String query, int fetchSize) throws IOException {
2551+
Request request =
2552+
new Request(
2553+
"POST",
2554+
String.format(
2555+
"/_plugins/_ppl/_explain?format=%s&mode=%s", Format.YAML, ExplainMode.STANDARD));
2556+
String jsonBody =
2557+
String.format(
2558+
Locale.ROOT, "{\n \"query\": \"%s\",\n \"fetch_size\": %d\n}", query, fetchSize);
2559+
request.setJsonEntity(jsonBody);
2560+
2561+
RequestOptions.Builder restOptionsBuilder = RequestOptions.DEFAULT.toBuilder();
2562+
restOptionsBuilder.addHeader("Content-Type", "application/json");
2563+
request.setOptions(restOptionsBuilder);
2564+
2565+
Response response = client().performRequest(request);
2566+
Assert.assertEquals(200, response.getStatusLine().getStatusCode());
2567+
return getResponseBody(response, true);
2568+
}
25002569
}

0 commit comments

Comments
 (0)