Skip to content

[analytics engine] Add /_analytics/ppl/_explain endpoint with stage profiling#21660

Merged
mch2 merged 12 commits into
opensearch-project:mainfrom
finnegancarroll:feature/explain-api
May 22, 2026
Merged

[analytics engine] Add /_analytics/ppl/_explain endpoint with stage profiling#21660
mch2 merged 12 commits into
opensearch-project:mainfrom
finnegancarroll:feature/explain-api

Conversation

@finnegancarroll
Copy link
Copy Markdown
Contributor

@finnegancarroll finnegancarroll commented May 14, 2026

Description

Adds an explain/profile API to the analytics engine, accessible via POST /_analytics/ppl/_explain. The endpoint executes the query and returns per-stage timing from the coordinator's perspective alongside normal results.

Follow up work which is not included in this PR:

  • Integrate with SQL plugin's RestUnifiedQueryAction.explain() to use executeWithProfile
  • Data-node-level profiling for fragments executed on shards

Response format

Full response format with SQL plugin (SQL PR) with some test data.

{
    "columns": [
        "avg(score)",
        "name"
    ],
    "rows": [
        [
            92.7,
            "eve"
        ],
        [
            91.0,
            "carol"
        ],
        [
            95.5,
            "alice"
        ],
        [
            88.1,
            "dave"
        ],
        [
            87.3,
            "bob"
        ]
    ],
    "profile": {
        "query_id": "a3009ddf-629b-45bc-ac61-5ab607d38cf5",
        "full_plan": [
            "OpenSearchProject(avg(score)=[$1], name=[$0], viableBackends=[[datafusion]])",
            "  OpenSearchProject(name=[$0], avg(score)=[ANNOTATED_PROJECT_EXPR(id=2, backends=[datafusion], /($1, $2))], viableBackends=[[datafusion]])",
            "    OpenSearchAggregate(group=[{0}], agg#0=[SUM(AGG_CALL_ANNOTATION(id=0, viableBackends=[datafusion]), $1)], agg#1=[COUNT(AGG_CALL_ANNOTATION(id=1, viableBackends=[datafusion]), $1)], mode=[FINAL], viableBackends=[[datafusion]])",
            "      OpenSearchExchangeReducer(viableBackends=[[datafusion]], exchange=[ExchangeInfo[distributionType=SINGLETON, partitionKeyIndices=[]]])",
            "        OpenSearchAggregate(group=[{0}], agg#0=[SUM(AGG_CALL_ANNOTATION(id=0, viableBackends=[datafusion]), $1)], agg#1=[COUNT(AGG_CALL_ANNOTATION(id=1, viableBackends=[datafusion]), $1)], mode=[PARTIAL], viableBackends=[[datafusion]])",
            "          OpenSearchProject(name=[$1], score=[$2], viableBackends=[[datafusion]])",
            "            OpenSearchTableScan(table=[[test_parquet]], viableBackends=[[datafusion]])"
        ],
        "planning_time_ms": 17,
        "execution_time_ms": 12,
        "stages": [
            {
                "stage_id": 0,
                "execution_type": "SHARD_FRAGMENT",
                "distribution": "SINGLETON",
                "state": "SUCCEEDED",
                "start_ms": 1779209122752,
                "end_ms": 1779209122763,
                "elapsed_ms": 11,
                "rows_processed": 5,
                "tasks_completed": 2,
                "tasks_failed": 0,
                "fragment": [
                    "OpenSearchAggregate(group=[{0}], agg#0=[SUM(AGG_CALL_ANNOTATION(id=0, viableBackends=[datafusion]), $1)], agg#1=[COUNT(AGG_CALL_ANNOTATION(id=1, viableBackends=[datafusion]), $1)], mode=[PARTIAL], viableBackends=[[datafusion]])",
                    "  OpenSearchProject(name=[$1], score=[$2], viableBackends=[[datafusion]])",
                    "    OpenSearchTableScan(table=[[test_parquet]], viableBackends=[[datafusion]])"
                ],
                "tasks": [
                    {
                        "partition_id": 0,
                        "node": "8L307qJ0RTmJLo6_vPhl4A/shard[0]",
                        "state": "FINISHED",
                        "start_ms": 1779209122752,
                        "end_ms": 1779209122762,
                        "elapsed_ms": 10
                    },
                    {
                        "partition_id": 1,
                        "node": "8L307qJ0RTmJLo6_vPhl4A/shard[1]",
                        "state": "FINISHED",
                        "start_ms": 1779209122752,
                        "end_ms": 1779209122763,
                        "elapsed_ms": 11
                    }
                ]
            },
            {
                "stage_id": 1,
                "execution_type": "COORDINATOR_REDUCE",
                "state": "SUCCEEDED",
                "start_ms": 1779209122752,
                "end_ms": 1779209122764,
                "elapsed_ms": 12,
                "rows_processed": 0,
                "tasks_completed": 1,
                "tasks_failed": 0,
                "fragment": [
                    "OpenSearchProject(avg(score)=[$1], name=[$0], viableBackends=[[datafusion]])",
                    "  OpenSearchProject(name=[$0], avg(score)=[ANNOTATED_PROJECT_EXPR(id=2, backends=[datafusion], /($1, $2))], viableBackends=[[datafusion]])",
                    "    OpenSearchAggregate(group=[{0}], agg#0=[SUM(AGG_CALL_ANNOTATION(id=0, viableBackends=[datafusion]), $1)], agg#1=[COUNT(AGG_CALL_ANNOTATION(id=1, viableBackends=[datafusion]), $1)], mode=[FINAL], viableBackends=[[datafusion]])",
                    "      OpenSearchExchangeReducer(viableBackends=[[datafusion]], exchange=[ExchangeInfo[distributionType=SINGLETON, partitionKeyIndices=[]]])",
                    "        OpenSearchStageInputScan(childStageId=[0], viableBackends=[[datafusion]])"
                ],
                "tasks": [
                    {
                        "partition_id": 0,
                        "node": "(local)",
                        "state": "FINISHED",
                        "start_ms": 1779209122752,
                        "end_ms": 1779209122764,
                        "elapsed_ms": 12
                    }
                ]
            }
        ]
    }
}

Related Issues

N/A

Check List

  • Functionality includes testing.
    - [ ] API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 14, 2026

PR Reviewer Guide 🔍

(Review updated until commit dd2724f)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Null Pointer Risk

In buildProfilingRowsListener, the failure path checks execRef.get() != null && execRef.get().getGraph() != null before calling QueryProfileBuilder.snapshot. However, if execRef.get() returns non-null but getGraph() returns null, the fallback creates a profile with context.queryId() — but context is not in scope in this static method. This will cause a compilation error or NPE if the code path is reached.

private static ActionListener<Iterable<Object[]>> buildProfilingRowsListener(
    AtomicReference<QueryExecution> execRef,
    QueryContext context,
    String fullPlan,
    long planningTimeMs,
    ActionListener<ProfiledResult> listener
) {
    return ActionListener.wrap(rows -> {
        QueryProfile qp = QueryProfileBuilder.snapshot(execRef.get().getGraph(), context, fullPlan, planningTimeMs);
        listener.onResponse(new ProfiledResult(rows, null, qp));
    }, e -> {
        QueryProfile qp = execRef.get() != null && execRef.get().getGraph() != null
            ? QueryProfileBuilder.snapshot(execRef.get().getGraph(), context, fullPlan, planningTimeMs)
            : new QueryProfile(context.queryId(), java.util.List.of(), planningTimeMs, 0L, java.util.List.of());
        listener.onResponse(new ProfiledResult(null, e, qp));
Race Condition

execRef is populated at line 223 after scheduler.execute returns, but buildProfilingRowsListener (constructed earlier at line 202) may fire immediately if the query fails during scheduling or execution setup. If the listener's failure callback runs before line 223 completes, execRef.get() will be null, causing the fallback profile path to trigger even when a valid QueryExecution exists. This results in incomplete or missing profile data on fast-failing queries.

final AtomicReference<QueryExecution> execRef = new AtomicReference<>();

ActionListener<Iterable<Object[]>> rowsListener = profile
    ? buildProfilingRowsListener(execRef, context, fullPlan, planningTimeMs, listener)
    : ActionListener.wrap(rows -> listener.onResponse(new ProfiledResult(rows, null, null)), listener::onFailure);

ActionListener<Iterable<VectorSchemaRoot>> batchesListener = ActionListener.runAfter(
    ActionListener.wrap(batches -> rowsListener.onResponse(batchesToRows(batches)), rowsListener::onFailure),
    () -> taskManager.unregister(queryTask)
);

TimeValue taskTimeout = queryTask.getCancelAfterTimeInterval();
TimeValue clusterTimeout = clusterService.getClusterSettings().get(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING);
if (taskTimeout != null || SearchService.NO_TIMEOUT.equals(clusterTimeout) == false) {
    batchesListener = TimeoutTaskCancellationUtility.wrapWithCancellationListener(
        client,
        queryTask,
        clusterTimeout,
        batchesListener,
        e -> {}
    );
}

execRef.set(scheduler.execute(context, batchesListener)); // execRef read by profile listener after execution completes

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 14, 2026

PR Code Suggestions ✨

Latest suggestions up to dd2724f

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix type mismatch in listener

The non-profile path wraps rows in a ProfiledResult with null profile, but the
executeInternal method's non-profile caller expects Iterable<Object[]> directly. This type
mismatch will cause a ClassCastException at runtime when the listener fires.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [202-204]

 ActionListener<Iterable<Object[]>> rowsListener = profile
     ? buildProfilingRowsListener(execRef, context, fullPlan, planningTimeMs, listener)
-    : ActionListener.wrap(rows -> listener.onResponse(new ProfiledResult(rows, null, null)), listener::onFailure);
+    : ActionListener.wrap(listener::onResponse, listener::onFailure);
Suggestion importance[1-10]: 10

__

Why: Critical bug: the non-profile path wraps rows in ProfiledResult(rows, null, null) but rowsListener is typed as ActionListener<Iterable<Object[]>>, causing a guaranteed ClassCastException at runtime. The fix correctly removes the wrapper.

High
General
Optimize task counting with single pass

The code iterates taskProfiles stream twice to count completed and failed tasks. For
large task lists, this creates unnecessary overhead. Combine into a single pass for
better performance.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/profile/QueryProfileBuilder.java [57-58]

-long tasksCompleted = taskProfiles.stream().filter(t -> "FINISHED".equals(t.state())).count();
-long tasksFailed = taskProfiles.stream().filter(t -> "FAILED".equals(t.state())).count();
+long tasksCompleted = 0, tasksFailed = 0;
+for (TaskProfile t : taskProfiles) {
+    if ("FINISHED".equals(t.state())) tasksCompleted++;
+    else if ("FAILED".equals(t.state())) tasksFailed++;
+}
Suggestion importance[1-10]: 4

__

Why: Minor optimization that reduces two stream passes to one loop. While correct, the performance gain is negligible for typical task counts (dozens, not thousands), making this a low-impact improvement.

Low

Previous suggestions

Suggestions up to commit 3bf6532
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix type mismatch in listener

The non-profile path wraps rows in a ProfiledResult with null profile, but the
execute() method's listener expects Iterable<Object[]> directly. This type mismatch will cause
a ClassCastException at runtime when the listener fires.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [202-204]

 ActionListener<Iterable<Object[]>> rowsListener = profile
     ? buildProfilingRowsListener(execRef, context, fullPlan, planningTimeMs, listener)
-    : ActionListener.wrap(rows -> listener.onResponse(new ProfiledResult(rows, null, null)), listener::onFailure);
+    : ActionListener.wrap(listener::onResponse, listener::onFailure);
Suggestion importance[1-10]: 10

__

Why: Critical bug: the non-profile path wraps rows in a ProfiledResult and passes it to a listener expecting Iterable<Object[]>, causing a guaranteed ClassCastException at runtime. The execute() method at line 132 expects Iterable<Object[]>, not ProfiledResult.

High
Add AssertionError handling to both paths

The AssertionError catch block is only present in the executeWithProfile path but
missing from the original execute path. This inconsistency means Calcite assertion
errors will crash the JVM when profiling is disabled but be handled gracefully when
profiling is enabled.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [134-136]

+} catch (Exception e) {
+    listener.onFailure(e);
 } catch (AssertionError e) {
     listener.onFailure(new IllegalStateException("Analytics-engine executor rejected the plan: " + e.getMessage(), e));
 }
Suggestion importance[1-10]: 9

__

Why: The AssertionError catch block exists in executeWithProfile (line 147-148) but is missing from the non-profile execute path. This inconsistency means Calcite assertion errors will crash the JVM when profiling is disabled but be handled gracefully when enabled, creating unpredictable behavior.

High
General
Use metrics counters instead of filtering

The code counts tasks by filtering on hardcoded state strings "FINISHED" and
"FAILED", but these may not match the actual enum values from StageTask.state(). If
the enum uses different names (e.g., "SUCCEEDED"), the counts will always be zero.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/profile/QueryProfileBuilder.java [57-58]

-long tasksCompleted = taskProfiles.stream().filter(t -> "FINISHED".equals(t.state())).count();
-long tasksFailed = taskProfiles.stream().filter(t -> "FAILED".equals(t.state())).count();
+long tasksCompleted = m.getTasksCompleted();
+long tasksFailed = m.getTasksFailed();
Suggestion importance[1-10]: 8

__

Why: The code filters TaskProfile objects by hardcoded state strings "FINISHED" and "FAILED", but StageMetrics already tracks tasksCompleted and tasksFailed counters. Using the existing metrics is more reliable and avoids potential mismatches with actual state enum values.

Medium
Suggestions up to commit 0a13457
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix type mismatch in listener

The non-profile path wraps rows in a ProfiledResult with null profile, but the
execute() method's listener expects Iterable<Object[]>, not ProfiledResult. This type mismatch
will cause a ClassCastException at runtime when the non-profile path is used.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [202-204]

 ActionListener<Iterable<Object[]>> rowsListener = profile
     ? buildProfilingRowsListener(execRef, context, fullPlan, planningTimeMs, listener)
-    : ActionListener.wrap(rows -> listener.onResponse(new ProfiledResult(rows, null, null)), listener::onFailure);
+    : ActionListener.wrap(listener::onResponse, listener::onFailure);
Suggestion importance[1-10]: 10

__

Why: Critical bug: the non-profile path wraps rows in ProfiledResult but the rowsListener expects Iterable<Object[]>, causing a ClassCastException at runtime. The execute() method's convertingListener expects Iterable<Object[]>, not ProfiledResult.

High
General
Use metrics counters directly

The code filters taskProfiles by state strings "FINISHED" and "FAILED", but these
values are derived from StageTask.state().name(). If the actual enum values differ
(e.g., "SUCCEEDED" instead of "FINISHED"), the counters will always be zero,
producing incorrect metrics.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/profile/QueryProfileBuilder.java [57-58]

-long tasksCompleted = taskProfiles.stream().filter(t -> "FINISHED".equals(t.state())).count();
-long tasksFailed = taskProfiles.stream().filter(t -> "FAILED".equals(t.state())).count();
+long tasksCompleted = m.getTasksCompleted();
+long tasksFailed = m.getTasksFailed();
Suggestion importance[1-10]: 8

__

Why: The code filters by hardcoded state strings "FINISHED" and "FAILED", but these may not match actual enum values. Using StageMetrics counters directly (m.getTasksCompleted() and m.getTasksFailed()) is more reliable and avoids potential mismatches.

Medium
Return profile on assertion errors

The AssertionError catch block in executeWithProfile doesn't wrap the error in a
ProfiledResult like the main failure path does. This means profile data is lost when
an assertion fires during profiled execution, breaking the API contract that profile
is always returned.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [147-149]

 } catch (AssertionError e) {
-    listener.onFailure(new IllegalStateException("Analytics-engine executor rejected the plan: " + e.getMessage(), e));
+    QueryProfile emptyProfile = new QueryProfile(context != null ? context.queryId() : "(unknown)", java.util.List.of(), 0L, 0L, java.util.List.of());
+    listener.onResponse(new ProfiledResult(null, new IllegalStateException("Analytics-engine executor rejected the plan: " + e.getMessage(), e), emptyProfile));
 }
Suggestion importance[1-10]: 7

__

Why: The AssertionError catch in executeWithProfile doesn't wrap the error in a ProfiledResult with profile data, breaking the API contract that profile is always returned. This should return a ProfiledResult with an empty profile like the main failure path does.

Medium
Suggestions up to commit 16d5059
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix type mismatch in listener

The non-profile path wraps rows in a ProfiledResult with null profile, but the
execute() method's listener expects Iterable<Object[]>, not ProfiledResult. This type mismatch
will cause a ClassCastException at runtime when the listener fires.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [202-204]

 ActionListener<Iterable<Object[]>> rowsListener = profile
     ? buildProfilingRowsListener(execRef, context, fullPlan, planningTimeMs, listener)
-    : ActionListener.wrap(rows -> listener.onResponse(new ProfiledResult(rows, null, null)), listener::onFailure);
+    : ActionListener.wrap(listener::onResponse, listener::onFailure);
Suggestion importance[1-10]: 10

__

Why: Critical bug: the non-profile path wraps rows in ProfiledResult but rowsListener is typed as ActionListener<Iterable<Object[]>>, causing a guaranteed ClassCastException at runtime. The suggested fix correctly passes rows directly to the listener.

High
General
Use metrics counters directly

The code filters task profiles by hardcoded state strings "FINISHED" and "FAILED",
but StageProfile constructor parameters show tasksCompleted and tasksFailed come
from StageMetrics counters. This double-counting from both metrics and profile
filtering may produce inconsistent values if the metrics were already incremented.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/profile/QueryProfileBuilder.java [57-58]

-long tasksCompleted = taskProfiles.stream().filter(t -> "FINISHED".equals(t.state())).count();
-long tasksFailed = taskProfiles.stream().filter(t -> "FAILED".equals(t.state())).count();
+stageProfiles.add(
+    new StageProfile(
+        exec.getStageId(),
+        stage != null ? stage.getExecutionType().name() : exec.getClass().getSimpleName(),
+        distribution,
+        exec.getState().name(),
+        start,
+        end,
+        elapsed,
+        m.getRowsProcessed(),
+        m.getTasksCompleted(),
+        m.getTasksFailed(),
+        fragment,
+        taskProfiles
+    )
+);
Suggestion importance[1-10]: 7

__

Why: The code filters taskProfiles by hardcoded state strings to count completed/failed tasks, but StageMetrics already maintains these counters (m.getTasksCompleted(), m.getTasksFailed()). Using metrics directly avoids potential inconsistencies and redundant computation.

Medium
Preserve profile on assertion failures

The AssertionError catch block in executeWithProfile converts to
IllegalStateException, but the listener expects ProfiledResult which should contain
the profile even on failure. This path bypasses profile capture, losing diagnostic
information when planning assertions fail.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [147-149]

 } catch (AssertionError e) {
-    listener.onFailure(new IllegalStateException("Analytics-engine executor rejected the plan: " + e.getMessage(), e));
+    QueryProfile emptyProfile = new QueryProfile(
+        "(unknown)",
+        java.util.List.of(),
+        0L,
+        0L,
+        java.util.List.of()
+    );
+    listener.onResponse(
+        new ProfiledResult(
+            null,
+            new IllegalStateException("Analytics-engine executor rejected the plan: " + e.getMessage(), e),
+            emptyProfile
+        )
+    );
 }
Suggestion importance[1-10]: 6

__

Why: The AssertionError catch in executeWithProfile calls listener.onFailure(), but the listener expects ProfiledResult which should contain profile even on failure. The suggested fix wraps the error in ProfiledResult with an empty profile, maintaining consistency with the profile API contract.

Low
Suggestions up to commit 0603780
CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle failure path consistently

The non-profile path wraps rows in a ProfiledResult with null profile, but the
failure path doesn't. If rowsListener.onFailure is called, the non-profile branch
will invoke listener.onFailure(e) directly instead of wrapping it in a
ProfiledResult. This breaks the contract that executeWithProfile always returns a
ProfiledResult.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [198-200]

 ActionListener<Iterable<Object[]>> rowsListener = profile
     ? buildProfilingRowsListener(execRef, context, fullPlan, planningTimeMs, listener)
-    : ActionListener.wrap(rows -> listener.onResponse(new ProfiledResult(rows, null, null)), listener::onFailure);
+    : ActionListener.wrap(
+        rows -> listener.onResponse(new ProfiledResult(rows, null, null)),
+        e -> listener.onResponse(new ProfiledResult(null, e, null))
+    );
Suggestion importance[1-10]: 9

__

Why: Critical bug: the non-profile path's failure handler calls listener.onFailure(e) directly instead of wrapping in a ProfiledResult, violating the contract that executeWithProfile always returns a ProfiledResult. This breaks the unified listener handling.

High
General
Cache atomic reference read

The failure path calls execRef.get() twice without caching the result. If execRef is
modified concurrently (though unlikely here), this could cause inconsistent null
checks. Cache the result in a local variable.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [237-240]

-QueryProfile qp = execRef.get() != null && execRef.get().getGraph() != null
-    ? QueryProfileBuilder.snapshot(execRef.get().getGraph(), context, fullPlan, planningTimeMs)
+QueryExecution exec = execRef.get();
+QueryProfile qp = exec != null && exec.getGraph() != null
+    ? QueryProfileBuilder.snapshot(exec.getGraph(), context, fullPlan, planningTimeMs)
     : new QueryProfile(context.queryId(), java.util.List.of(), planningTimeMs, 0L, java.util.List.of());
Suggestion importance[1-10]: 5

__

Why: Calling execRef.get() twice without caching could theoretically cause inconsistent null checks if modified concurrently. Caching improves code clarity and eliminates potential race conditions, though concurrency issues are unlikely here.

Low
Optimize task counting loop

The code iterates taskProfiles twice to count completed and failed tasks. For large
task lists, this is inefficient. Use a single pass to compute both counters
simultaneously.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/profile/QueryProfileBuilder.java [57-58]

-long tasksCompleted = taskProfiles.stream().filter(t -> "FINISHED".equals(t.state())).count();
-long tasksFailed = taskProfiles.stream().filter(t -> "FAILED".equals(t.state())).count();
+long tasksCompleted = 0;
+long tasksFailed = 0;
+for (TaskProfile t : taskProfiles) {
+    if ("FINISHED".equals(t.state())) tasksCompleted++;
+    else if ("FAILED".equals(t.state())) tasksFailed++;
+}
Suggestion importance[1-10]: 4

__

Why: Minor optimization: iterating taskProfiles twice is inefficient for large task lists. A single-pass approach would improve performance, though the impact is likely small in typical scenarios.

Low
Suggestions up to commit b869a77
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix type mismatch in listener

The non-profile path wraps rows in a ProfiledResult with null profile, but the
execute() method expects Iterable<Object[]> directly. This type mismatch will cause a
compilation error since rowsListener is typed as ActionListener<Iterable<Object[]>> but receives a
ProfiledResult in the lambda.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [198-200]

 ActionListener<Iterable<Object[]>> rowsListener = profile
     ? buildProfilingRowsListener(execRef, context, fullPlan, planningTimeMs, listener)
-    : ActionListener.wrap(rows -> listener.onResponse(new ProfiledResult(rows, null, null)), listener::onFailure);
+    : ActionListener.wrap(listener::onResponse, listener::onFailure);
Suggestion importance[1-10]: 10

__

Why: Critical bug: the non-profile path wraps rows in ProfiledResult but rowsListener expects Iterable<Object[]>, causing a type mismatch. The execute() method's listener expects Iterable<Object[]> directly, not ProfiledResult.

High
General
Prevent race condition in reference access

The failure path accesses execRef.get() twice without caching, creating a potential
race condition if another thread modifies the reference between checks. Cache the
result in a local variable to ensure consistent null-safety checks.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [236-241]

 }, e -> {
-    QueryProfile qp = execRef.get() != null && execRef.get().getGraph() != null
-        ? QueryProfileBuilder.snapshot(execRef.get().getGraph(), context, fullPlan, planningTimeMs)
+    QueryExecution exec = execRef.get();
+    QueryProfile qp = exec != null && exec.getGraph() != null
+        ? QueryProfileBuilder.snapshot(exec.getGraph(), context, fullPlan, planningTimeMs)
         : new QueryProfile(context.queryId(), java.util.List.of(), planningTimeMs, 0L, java.util.List.of());
     listener.onResponse(new ProfiledResult(null, e, qp));
 });
Suggestion importance[1-10]: 7

__

Why: Valid improvement: caching execRef.get() prevents potential race conditions and improves code clarity. While unlikely in practice due to the execution model, this defensive coding pattern is good practice.

Medium
Optimize task state counting

The code streams taskProfiles twice to count completed and failed tasks. This is
inefficient and could be replaced with a single pass that counts both states
simultaneously, reducing overhead especially for stages with many tasks.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/profile/QueryProfileBuilder.java [57-58]

-long tasksCompleted = taskProfiles.stream().filter(t -> "FINISHED".equals(t.state())).count();
-long tasksFailed = taskProfiles.stream().filter(t -> "FAILED".equals(t.state())).count();
+long tasksCompleted = 0, tasksFailed = 0;
+for (TaskProfile t : taskProfiles) {
+    if ("FINISHED".equals(t.state())) tasksCompleted++;
+    else if ("FAILED".equals(t.state())) tasksFailed++;
+}
Suggestion importance[1-10]: 4

__

Why: Minor optimization: replacing two stream passes with a single loop reduces overhead. However, the impact is minimal for typical task counts and the stream-based code is more readable.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for ef94fb8: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 14, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit a50b194.

PathLineSeverityDescription
sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/profile/QueryProfileBuilder.java128mediumThe describeTarget() method embeds internal node IDs and shard ordinals (e.g. 'node-abc123/shard[2]') into the profile response. The explain endpoint has no visible authorization check in this diff, meaning any user who can POST to /_analytics/ppl/_explain receives a full map of the cluster's data topology — node identities, shard assignments, and execution plan structure — which aids reconnaissance.
sandbox/plugins/test-ppl-frontend/src/main/java/org/opensearch/ppl/action/RestPPLQueryAction.java51mediumThe new /_analytics/ppl/_explain endpoint is wired alongside the existing /_analytics/ppl endpoint with no additional authorization or role check visible in this diff. If the base endpoint is access-controlled, the explain endpoint may inadvertently bypass those controls and expose full CBO query plans, stage timing, and cluster topology to lower-privilege callers.
sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/QueryScheduler.java115lowgetStageExecutionBuilder() is a new public method that exposes the singleton StageExecutionBuilder. The comment explicitly mentions its intended use for 'fault-injecting schedulers in resilience tests.' Exposing this in production code via a public accessor allows any component with a QueryScheduler reference to swap in custom stage schedulers at runtime, which could be exploited to subvert normal execution ordering or inject malicious scheduler behavior.
sandbox/qa/analytics-engine-rest/src/test/java/org/opensearch/analytics/qa/ExplainApiIT.java170lowThe executeExplain helper builds the JSON request body via string concatenation — "{\"query\": \"" + escapeJson(ppl) + "\"}" — rather than a structured serializer. Correctness depends entirely on the escapeJson implementation in the parent class which is not shown. If escapeJson is incomplete or absent, crafted PPL strings could inject fields into the request body. Low severity because this is test code, but the pattern is unsafe.

The table above displays the top 10 most important findings.

Total: 4 | Critical: 0 | High: 0 | Medium: 2 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 7584928

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c5d257f

@finnegancarroll finnegancarroll force-pushed the feature/explain-api branch 2 times, most recently from 5771c1c to 1f3e852 Compare May 14, 2026 15:07
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 1f3e852

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit a8279c2

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for a8279c2: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@finnegancarroll finnegancarroll added the skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis. label May 14, 2026
@finnegancarroll
Copy link
Copy Markdown
Contributor Author

Diff analyzer is complaining that test plugin only REST endpoint _explain contains no authorization guards and exposes shard/node ids. Adding skip diff analyzer tag.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 827c886

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 827c886: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 0ad0bad

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 0ad0bad: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ead3a6b

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for ead3a6b: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 19, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.42%. Comparing base (ebea88a) to head (ce7c2dd).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff            @@
##               main   #21660   +/-   ##
=========================================
  Coverage     73.42%   73.42%           
+ Complexity    75223    75182   -41     
=========================================
  Files          6023     6023           
  Lines        341475   341475           
  Branches      49141    49141           
=========================================
+ Hits         250717   250728   +11     
+ Misses        70841    70762   -79     
- Partials      19917    19985   +68     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 083a81c

@finnegancarroll finnegancarroll marked this pull request as ready for review May 19, 2026 16:47
@finnegancarroll finnegancarroll requested a review from a team as a code owner May 19, 2026 16:47
@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 083a81c: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3337501

@finnegancarroll finnegancarroll changed the title Add /_analytics/ppl/_explain endpoint with stage profiling [analytics engine] Add /_analytics/ppl/_explain endpoint with stage profiling May 19, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 0603780

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 0603780: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 16d5059

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ce7c2dd

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Cache execRef to avoid redundant access

The failure path accesses execRef.get() twice without caching the result. If the
reference changes between calls (though unlikely in this single-threaded context),
it could cause inconsistent null checks. Cache the QueryExecution instance to ensure
atomic access.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [240-245]

 }, e -> {
-    QueryProfile qp = execRef.get() != null && execRef.get().getGraph() != null
-        ? QueryProfileBuilder.snapshot(execRef.get().getGraph(), context, fullPlan, planningTimeMs)
+    QueryExecution exec = execRef.get();
+    QueryProfile qp = exec != null && exec.getGraph() != null
+        ? QueryProfileBuilder.snapshot(exec.getGraph(), context, fullPlan, planningTimeMs)
         : new QueryProfile(context.queryId(), java.util.List.of(), planningTimeMs, 0L, java.util.List.of());
     listener.onResponse(new ProfiledResult(null, e, qp));
 });
Suggestion importance[1-10]: 5

__

Why: Valid suggestion that improves code clarity and eliminates redundant execRef.get() calls. While the thread-safety concern is minimal in this context, caching the reference improves readability and follows best practices.

Low
Optimize task state counting

The code iterates through taskProfiles twice to count completed and failed tasks.
This is inefficient for large task lists. Use a single pass to count both states
simultaneously to reduce computational overhead.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/profile/QueryProfileBuilder.java [57-58]

-long tasksCompleted = taskProfiles.stream().filter(t -> "FINISHED".equals(t.state())).count();
-long tasksFailed = taskProfiles.stream().filter(t -> "FAILED".equals(t.state())).count();
+long tasksCompleted = 0;
+long tasksFailed = 0;
+for (TaskProfile t : taskProfiles) {
+    if ("FINISHED".equals(t.state())) tasksCompleted++;
+    else if ("FAILED".equals(t.state())) tasksFailed++;
+}
Suggestion importance[1-10]: 4

__

Why: Valid optimization that reduces two stream iterations to a single loop. However, the performance impact is minimal since task lists are typically small, and the code readability trade-off is marginal.

Low

OVI3D0 added a commit to OVI3D0/OpenSearch that referenced this pull request May 21, 2026
AbstractStageExecution.fireOperationListeners was passing
getClass().getSimpleName() as the stageType to onStageStart and friends —
that's the execution class name (e.g. "ShardFragmentStageExecution"),
which is the wrong stable surface for an external rollup. The Stage's
StageExecutionType enum (SHARD_FRAGMENT, COORDINATOR_REDUCE,
LOCAL_PASSTHROUGH, LOCAL_COMPUTE) is the canonical name and is what
PR opensearch-project#21660's StageProfile.execution_type already uses.

Pass stage.getExecutionType().name() instead. The stats endpoint's
stages_by_type buckets now key on the enum name, matching the explain
API's field naming and surviving any future class-name refactor.

Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for ce7c2dd: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 0a13457

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3bf6532

Mirror the SQL plugin's /_explain URL-based routing pattern in the
test-ppl-frontend harness. Add explain flag to PPLRequest and register
the POST /_analytics/ppl/_explain route in RestPPLQueryAction.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Rebase of the explain API feature onto the new scheduler architecture
(post PR opensearch-project#21699). Key changes from the previous version:

- Profile types (QueryProfile, StageProfile, TaskProfile, ProfiledResult)
  in analytics-api with executeWithProfile on QueryPlanExecutor interface
- QueryProfileBuilder rewritten to use StageExecution.tasks() directly
  (no TaskTracker needed — tasks are accessible from the stage)
- Scheduler.execute() returns QueryExecution for post-execution inspection
- QueryExecution.getGraph() accessor added for profile snapshot
- DefaultPlanExecutor: unified executeInternal with profile boolean,
  captures planning_time_ms and execution_time_ms
- PPL frontend: /_analytics/ppl/_explain endpoint with full wiring
- Integration tests (ExplainApiIT)

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Fixes missingJavadoc check for the QueryProfileBuilder package.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Remove the dual execute/executeWithProfile paths. Always call
executeWithProfile and conditionally include the profile in the
response based on the explain flag. Eliminates code duplication
in the test frontend.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: Finn Carroll <carrofin@amazon.com>
Local IDE setting accidentally committed. Should not be in the repo.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: Finn Carroll <carrofin@amazon.com>
- Remove partition_id (only relevant for future shuffle exchanges)
- Remove start_ms/end_ms (redundant with elapsed_ms)
- describeTarget returns node ID for all task types instead of '(local)'

Task output now contains only: node, state, elapsed_ms.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
The dsl-query-executor tests use QueryPlanExecutor as a lambda (single
abstract method). Making executeWithProfile abstract broke this. Restore
the default implementation that throws UnsupportedOperationException —
real implementations (DefaultPlanExecutor) still override it, but test
lambdas that only need execute() continue to work.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: Finn Carroll <carrofin@amazon.com>
executeWithProfile wraps failures in ProfiledResult, bypassing
DefaultPlanExecutor's convertingListener which converts native
exceptions (e.g. CircuitBreakingException). The non-profile path
must use execute() directly so exception conversion works correctly.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit dd2724f

@mch2 mch2 merged commit d9a553f into opensearch-project:main May 22, 2026
13 of 14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants