Skip to content

[analytics engine] Add /_plugins/_analytics/stats endpoint#21796

Draft
OVI3D0 wants to merge 16 commits into
opensearch-project:mainfrom
OVI3D0:stats-api
Draft

[analytics engine] Add /_plugins/_analytics/stats endpoint#21796
OVI3D0 wants to merge 16 commits into
opensearch-project:mainfrom
OVI3D0:stats-api

Conversation

@OVI3D0
Copy link
Copy Markdown
Member

@OVI3D0 OVI3D0 commented May 21, 2026

Description

Builds ontop of #21660 and adds per stage timing but rolled up so users won't need to hit _explain for every request

Example:

  $ curl -s
  'http://localhost:9200/_plugins/_analytics/stats' | jq
  {
    "analytics": {
      "queries": {
        "total": 46,
        "succeeded": 46,
        "failed": 0,
        "in_flight": 0,
        "elapsed_ms":  { "count": 46, "sum_ms": 633, "max_ms": 419, "p50_ms": 4, "p95_ms": 14, "p99_ms": 419 },
        "planning_ms": { "count": 46, "sum_ms": 437, "max_ms": 223, "p50_ms": 2, "p95_ms": 46, "p99_ms": 223 }
      },
      "stages_by_type": {
        "SHARD_FRAGMENT": {
          "started": 46, "succeeded": 46, "failed": 0, "cancelled": 0,
          "rows_processed_total": 334,
          "elapsed_ms": { "count": 46, "sum_ms": 649, "max_ms": 415, "p50_ms": 4, "p95_ms": 15, "p99_ms": 415 }
        }
      },
      "fragments": {
        "total": 90, "succeeded": 90, "failed": 0,
        "elapsed_ms": { "count": 90, "sum_ms": 507, "max_ms": 291, "p50_ms": 2, "p95_ms": 5, "p99_ms": 291 }
      }
    }
  }

Some follow ups that can be added:

  • TransportNodesAction to broadcast/merge results, since right now this API is per-node only
  • Planning-time failures don't increment queries.failed — query lifecycle events fire from QueryScheduler.execute(), but parse / table-resolution / planning failures fail upstream of that

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

finnegancarroll and others added 16 commits May 20, 2026 22:44
Mirror the SQL plugin's /_explain URL-based routing pattern in the
test-ppl-frontend harness. Add explain flag to PPLRequest and register
the POST /_analytics/ppl/_explain route in RestPPLQueryAction.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Rebase of the explain API feature onto the new scheduler architecture
(post PR opensearch-project#21699). Key changes from the previous version:

- Profile types (QueryProfile, StageProfile, TaskProfile, ProfiledResult)
  in analytics-api with executeWithProfile on QueryPlanExecutor interface
- QueryProfileBuilder rewritten to use StageExecution.tasks() directly
  (no TaskTracker needed — tasks are accessible from the stage)
- Scheduler.execute() returns QueryExecution for post-execution inspection
- QueryExecution.getGraph() accessor added for profile snapshot
- DefaultPlanExecutor: unified executeInternal with profile boolean,
  captures planning_time_ms and execution_time_ms
- PPL frontend: /_analytics/ppl/_explain endpoint with full wiring
- Integration tests (ExplainApiIT)

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Fixes missingJavadoc check for the QueryProfileBuilder package.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Remove the dual execute/executeWithProfile paths. Always call
executeWithProfile and conditionally include the profile in the
response based on the explain flag. Eliminates code duplication
in the test frontend.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: Finn Carroll <carrofin@amazon.com>
Local IDE setting accidentally committed. Should not be in the repo.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: Finn Carroll <carrofin@amazon.com>
- Remove partition_id (only relevant for future shuffle exchanges)
- Remove start_ms/end_ms (redundant with elapsed_ms)
- describeTarget returns node ID for all task types instead of '(local)'

Task output now contains only: node, state, elapsed_ms.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
The dsl-query-executor tests use QueryPlanExecutor as a lambda (single
abstract method). Making executeWithProfile abstract broke this. Restore
the default implementation that throws UnsupportedOperationException —
real implementations (DefaultPlanExecutor) still override it, but test
lambdas that only need execute() continue to work.

Signed-off-by: Finn Carroll <carrofin@amazon.com>
Signed-off-by: Finn Carroll <carrofin@amazon.com>
Adds a default no-op callback that fires once per query immediately after
the planning pipeline (Calcite optimization + DAG construction) completes,
before any stage starts. This gives consumers a hook for distinguishing
planner-bound queries from execution-bound queries — useful for observability
rollups and diagnostic tooling that wants to attribute time spent in CBO
versus time spent in stages.

Adds matching dispatch in CompositeListener so the new event fans out to
delegates with the same exception-isolation semantics as the existing
callbacks.

Existing implementations are unaffected because the method has a default
no-op body.

Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
executeFragmentStreamingAsync was firing onPreFragmentExecution and
onFragmentFailure but never onFragmentSuccess, leaving the success branch
of the data-node-side fragment instrumentation silent.

Wraps the stream-draining loop with a nanoTime timer and a row counter,
then fires onFragmentSuccess once the stream completes successfully. This
makes any AnalyticsOperationListener that consumes fragment events able
to record wall-clock latency and rows produced per task.

No behaviour change for the existing failure path — failures continue to
route through executeFragmentStreaming's catch blocks.

Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
Adds a node-local rollup of analytics-engine activity, exposed at
GET /_plugins/_analytics/stats. Aggregates query, stage, and fragment
lifecycle events from AnalyticsOperationListener into counters surfaced
as JSON. Designed for oncall triage: when the cluster is slow, hit the
endpoint to see whether time is going into planning, into a particular
stage type, or into individual fragments — rather than chasing per-query
profiles for every request.

Output (per-node, since stats are recorded on whichever node handled the
event):

  analytics:
    queries:        total / succeeded / failed / in_flight,
                    elapsed_ms_sum / max,
                    planning_ms_sum / max
    stages_by_type: <ExecutionClassName>:
                    started / succeeded / failed / cancelled,
                    rows_processed_total,
                    elapsed_ms_sum / max
    fragments:      total / succeeded / failed,
                    elapsed_ms_sum / max

The listener-list machinery existed but had no producers wired in — both
QueryContext and AnalyticsSearchService were constructed with List.of()
in production. AnalyticsPlugin now owns a singleton listener registry,
registers the collector into it, and threads the list into both
QueryContext (coordinator-side) and AnalyticsSearchService (data-node-
side). DefaultPlanExecutor injects the registry via Guice and forwards
the list through to QueryContext.

DefaultPlanExecutor's planning-time timer is lifted out of the if(profile)
branch so it runs unconditionally, then fires onQueryPlanned on every
query. The existing _explain payload is unchanged — it still uses the
same timer.

Recording is wait-free: counters are LongAdder, max-update is a CAS loop,
the stage-type bucket map is a ConcurrentHashMap. snapshot() reads each
counter once into an immutable AnalyticsStats record and renders to JSON
via toXContent.

Per-node only for v1; cluster-wide aggregation via TransportNodesAction is
a follow-up. Counters + sum + max only; HdrHistogram percentiles are a
follow-up. The output types are marked @experimentalapi to signal that
field shapes can evolve.

Coverage: AnalyticsStatsCollectorTests exercises every callback path
including a concurrent-recording stress test; AnalyticsStatsApiIT
provisions a dataset, fires PPL queries, and verifies the endpoint
reflects the activity.

Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
Latency fields under queries / stages_by_type / fragments now serialize as
a LatencyStats object with count, sum_ms, max_ms, p50_ms, p95_ms, p99_ms
instead of separate elapsed_ms_sum / elapsed_ms_max scalars. Same for the
new planning_ms field.

Backed by a small LatencyHistogram helper that wraps HdrHistogram's
Recorder for wait-free recording across many writer threads. Each
snapshot folds the latest interval into a long-lived cumulative
accumulator, preserving the cumulative-since-startup semantics the rest
of the rollup uses. count and sum_ms are tracked alongside via
LongAdders since HdrHistogram only stores bucket midpoints and would
lose exact totals.

HdrHistogram is configured for 1ms..10min range at 3 significant digits
(~0.1% relative precision). Values outside that range clamp.

The dependency is already on the analytics-engine compile classpath
(transitively from server), so no new license/notice paperwork.

This makes oncall triage qualitatively better: a slow tail (p99 >> p50)
now jumps out of the same response that already shows the totals, instead
of needing a separate per-query profile to discover it.

Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
AbstractStageExecution.fireOperationListeners was passing
getClass().getSimpleName() as the stageType to onStageStart and friends —
that's the execution class name (e.g. "ShardFragmentStageExecution"),
which is the wrong stable surface for an external rollup. The Stage's
StageExecutionType enum (SHARD_FRAGMENT, COORDINATOR_REDUCE,
LOCAL_PASSTHROUGH, LOCAL_COMPUTE) is the canonical name and is what
PR opensearch-project#21660's StageProfile.execution_type already uses.

Pass stage.getExecutionType().name() instead. The stats endpoint's
stages_by_type buckets now key on the enum name, matching the explain
API's field naming and surviving any future class-name refactor.

Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
The original test fired 6 identical queries and asserted on whichever
node the REST client happened to land on. With only 2 of those queries
landing on the same node, percentile spread was binary and fragments.total
showed 0 when the snapshot landed on the coordinator-only node.

Fire 90 queries across three shapes (project, filter, aggregate) so the
per-stage-type buckets see varied work, then pick the busiest node's
snapshot for assertions. All three buckets (queries, stages_by_type,
fragments) reliably populate this way and percentile spread (p50 vs p99)
is meaningful.

Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The executeInternal method always calls executeWithProfile regardless of the profile parameter value. When profile is false, the profile data is discarded after execution, but the profiling overhead (capturing stage timings, building the profile snapshot) is still incurred. This contradicts the intent of having separate code paths and wastes resources on non-profiling queries.

ActionListener<Iterable<Object[]>> rowsListener = profile
    ? buildProfilingRowsListener(execRef, context, fullPlan, planningTimeMs, listener)
    : ActionListener.wrap(rows -> listener.onResponse(new ProfiledResult(rows, null, null)), listener::onFailure);
Possible Issue

The executeFragmentStreamingAsync method increments rows by batch.getRowCount() but never calls responseHandler.onBatch(batch) inside the loop. The batch is read but not sent to the handler, so the streaming response will be empty. The handler only receives onComplete() with no data.

while (it.hasNext()) {
    EngineResultBatch batch = it.next();
    rows += batch.getRowCount();
    responseHandler.onBatch(batch);
Possible Issue

The stageTypeByStageId map is used to route terminal stage events to the correct bucket, but it is never cleaned up when a stage completes. The map will grow unbounded over the lifetime of the node, leaking memory proportional to the number of stages executed. The comment claims entries are pruned on terminal states, but currentTypeForStage only removes the entry—it does not prevent accumulation if the same stageId is reused across queries.

private final ConcurrentMap<Integer, String> stageTypeByStageId = new ConcurrentHashMap<>();

private String currentTypeForStage(int stageId) {
    String t = stageTypeByStageId.remove(stageId);
    return t != null ? t : "unknown";
}

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent potential race condition NPE

The failure path accesses execRef.get() twice without null-guarding the second
access. If execRef.get() returns non-null but getGraph() returns null between the
two calls (race condition), this could throw NullPointerException. Cache the result
in a local variable to ensure consistent null checks.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [248-252]

 } catch (Exception e) {
-    QueryProfile qp = execRef.get() != null && execRef.get().getGraph() != null
-        ? QueryProfileBuilder.snapshot(execRef.get().getGraph(), context, fullPlan, planningTimeMs)
+    QueryExecution exec = execRef.get();
+    QueryProfile qp = exec != null && exec.getGraph() != null
+        ? QueryProfileBuilder.snapshot(exec.getGraph(), context, fullPlan, planningTimeMs)
         : new QueryProfile(context.queryId(), java.util.List.of(), planningTimeMs, 0L, java.util.List.of());
     listener.onResponse(new ProfiledResult(null, e, qp));
 });
Suggestion importance[1-10]: 7

__

Why: Valid concern about accessing execRef.get() twice without caching. While the race condition is unlikely in practice (the AtomicReference is set once and never cleared), caching the result in a local variable improves code clarity and eliminates any theoretical race condition risk.

Medium
General
Prevent unbounded map growth

The stageTypeByStageId map grows unbounded if stages never reach terminal states
(e.g., stuck queries, cancellation failures). This creates a memory leak. Consider
using a bounded cache with eviction policy or adding explicit cleanup on query-level
terminal events to prevent unbounded growth.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/stats/AnalyticsStatsCollector.java [152-156]

-private final ConcurrentMap<Integer, String> stageTypeByStageId = new ConcurrentHashMap<>();
+// Use a bounded cache with size limit and LRU eviction
+private final Map<Integer, String> stageTypeByStageId = Collections.synchronizedMap(
+    new LinkedHashMap<Integer, String>(128, 0.75f, true) {
+        @Override
+        protected boolean removeEldestEntry(Map.Entry<Integer, String> eldest) {
+            return size() > 1024; // Limit to 1024 in-flight stages
+        }
+    }
+);
 
 private String currentTypeForStage(int stageId) {
     String t = stageTypeByStageId.remove(stageId);
     return t != null ? t : "unknown";
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion identifies a legitimate memory leak concern where stageTypeByStageId could grow unbounded if stages never reach terminal states. The proposed bounded cache solution is reasonable, though the specific implementation using LinkedHashMap with synchronization may not be ideal for concurrent access patterns already using ConcurrentHashMap.

Low
Prevent stack overflow on deep DAGs

The recursive findStageById traversal has no depth limit and could cause stack
overflow on deeply nested DAGs (e.g., pathological query plans with hundreds of
stages). Convert to iterative traversal using a queue to prevent stack exhaustion on
malformed or adversarial queries.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/profile/QueryProfileBuilder.java [116-123]

 private static Stage findStageById(Stage root, int stageId) {
-    if (root.getStageId() == stageId) return root;
-    for (Stage child : root.getChildStages()) {
-        Stage found = findStageById(child, stageId);
-        if (found != null) return found;
+    java.util.Queue<Stage> queue = new java.util.LinkedList<>();
+    queue.add(root);
+    while (!queue.isEmpty()) {
+        Stage current = queue.poll();
+        if (current.getStageId() == stageId) return current;
+        queue.addAll(current.getChildStages());
     }
     return null;
 }
Suggestion importance[1-10]: 4

__

Why: While converting to iterative traversal is a valid defensive programming practice, the concern about stack overflow on deeply nested DAGs is theoretical. Real-world query plans rarely exceed a few dozen stages, making stack overflow extremely unlikely. The iterative solution adds complexity without addressing a practical problem.

Low
Avoid unnecessary wrapper allocation

When profiling is disabled, the non-profile path still wraps rows in a
ProfiledResult with null profile. This creates an unnecessary object allocation for
every non-profiled query. Consider passing listener directly to avoid the wrapper
allocation overhead.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/DefaultPlanExecutor.java [210-212]

 ActionListener<Iterable<Object[]>> rowsListener = profile
     ? buildProfilingRowsListener(execRef, context, fullPlan, planningTimeMs, listener)
     : ActionListener.wrap(rows -> listener.onResponse(new ProfiledResult(rows, null, null)), listener::onFailure);
 
+ActionListener<Iterable<VectorSchemaRoot>> batchesListener = ActionListener.runAfter(
+    ActionListener.wrap(batches -> {
+        Iterable<Object[]> rows = batchesToRows(batches);
+        if (profile) {
+            rowsListener.onResponse(rows);
+        } else {
+            listener.onResponse(new ProfiledResult(rows, null, null));
+        }
+    }, profile ? rowsListener::onFailure : listener::onFailure),
+    () -> taskManager.unregister(queryTask)
+);
+
Suggestion importance[1-10]: 3

__

Why: The suggestion correctly identifies that the non-profile path creates a ProfiledResult wrapper, but the proposed solution is more complex and doesn't eliminate the allocation—it just moves it. The current code is clearer and the allocation overhead is negligible for the non-profile path.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 74d5d76: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants