feat(cubestore): EXPLAIN ANALYZE DETAILED per-query trace#11016
feat(cubestore): EXPLAIN ANALYZE DETAILED per-query trace#11016waralexrom wants to merge 21 commits into
Conversation
End-to-end per-query trace plumbing across the network and IPC boundaries: - trace module: OpKind/OpSample aggregatable measurements + typed exec fields, per-query TraceCtx behind a task-local gate (OpGuard captures Instant + Weak<TraceCtx> only when a detailed query is in flight). - IPC: WorkerMessage::Select carries a detailed flag; the subprocess Response returns an Option<SubprocessTrace>; stage guards on the single select code path are no-ops without an active ctx. - network: NetworkMessage::AnalyzeDetailed/Result + Cluster::run_analyze_detailed, executed for real through the select subprocess. - command: parse EXPLAIN ANALYZE DETAILED, assemble QueryTrace on the entry node and render it to a DataFrame. Measurement points (metastore, memory, physical plan) land in follow-ups.
- cuberpc generates variant_name() on the *RpcMethodCall enum for cheap per-method labels. - ClusterMetaStoreClient::invoke_method records an OpKind::Metastore sample keyed by method name. - explain_detailed scopes the planning phase (logical_plan) so choose_index metastore calls are captured.
…decorator - cuberpc #[service(trace_guard = path)] generates a Traced<Service> decorator that wraps Arc<dyn Service> and holds <path>(method) across each async call, keeping cuberpc decoupled from the host crate's tracing module. - MetaStore is wrapped at both DI sites (remote MetaStoreRpcClient and local RocksMetaStore), so metastore calls are traced by method name under an active trace ctx on any node, RPC or local. The concrete RocksMetaStore stays retrievable by type. - Drops the now-redundant invoke_method guard (the decorator supersedes it). variant_name() is generated for the upcoming transport/access split.
Add OpKind::Planning and phase guards inside logical_plan (session_context, statement_to_plan, optimize, choose_index) and explain_detailed (plan.serialize, router_physical_plan). Under an active trace ctx the router planning breaks down into phases, with metastore calls interleaved as separate Metastore samples; no-ops on the normal query path.
…t path
EXPLAIN ANALYZE DETAILED now executes the full query on a real main worker
(random, like prod) instead of a fan-out, so the final stages run where they
actually happen.
- new message variants RouterSelectDetailed/SelectDetailed (+ *Result); hot
Select/SelectResult untouched (flexbuffers tolerates a trailing field only
new->old, so modifying a hot message would break rolling deploys).
- main runs the real router plan via execute_router_plan_detailed; a
WorkerTraceCollector rides the DataFusion TaskContext so ClusterSendExec
records per-worker traces while rows still flow into the merge (no new plan
node, task-local would not survive DF task spawning).
- workers run their part for real and return rows + WorkerTrace.
- MainTrace + QueryTrace { router, main }; render gains main level.
- main.router_physical_plan guard; obsolete AnalyzeDetailed path removed.
MemoryPool peak and per-node DataFusion metrics on the main are the next step.
Add TrackingMemoryPool (an unbounded MemoryPool that records the peak of operator reservations) and run the detailed router-plan execution under it via a per-query RuntimeEnv. The peak is isolated from concurrent queries and lands in MainTrace.exec_memory_peak_bytes — the finalization memory of the query. - queryplanner: make_execution_context_with_runtime / minimal_session_state_*_with_runtime let a caller inject a RuntimeEnv (default unchanged for existing callers). - covers operator-reserved memory (sort/aggregate/join), not every allocation. Subprocess (worker) memory peak and per-node DF metrics are the next steps.
execute_worker_plan takes an optional TrackingMemoryPool; when a detailed query runs, the subprocess builds its RuntimeEnv with it and reports the peak into SubprocessTrace.exec_memory_peak_bytes. None on the normal path (default runtime, no overhead).
After the detailed router-plan execution, walk the physical plan and record each node's elapsed_compute into MainTrace.ops as OpKind::Execution samples keyed by node type (aggregated: summed time + node count). Surfaces where time goes in the final stages (aggregation/sort/merge) beyond the single main.execute bucket. Adds trace::record_op for pre-measured samples.
execute_worker_plan records per-node elapsed_compute of the worker subplan into the subprocess trace (OpKind::Execution) when running detailed, mirroring the main side. Gives per-worker execution breakdown (scan/filter/partial-agg).
SerializedRecordBatchStream::byte_size + OpGuard::set_bytes on result.serialize and chunks.serialize, so the trace shows transport data volume (bytes) alongside time for the serialize stages.
OpSample gains a rows field (aggregated like bytes); record_plan_node_metrics records DataFusion output_rows per node on both main and workers, rendered as a rows column. Surfaces nodes processing unexpectedly many rows.
The main (router plan) and each worker (subplan) stash their pp_phys_plan text through the trace ctx into MainTrace/SubprocessTrace.physical_plan, rendered as a Plan row. Surfaces the plan tree shape for understanding heavy customer queries. Routed through the ctx, so no execute_worker_plan return-type change.
…trip Each region records its total wall time (SubprocessTrace/WorkerTrace/MainTrace .total_us); ClusterSendExec times the main->worker round-trip into WorkerTrace.net_roundtrip_us. These let transport be derived as round-trip minus the child's wall (the missing main->worker leg is now covered).
…nalyze Compute transport (wire + queue) = round-trip - child wall for each boundary and emit Transport rows: transport.entry_to_main, transport.main_to_worker (per worker), transport.ipc (per worker). saturating_sub guards measurement skew.
Replace the wide flat table with a single-column nested tree (router -> main -> worker -> subprocess): per-node measurements as aligned label/value lines, plan as an indented block. Far more readable for manual inspection.
…durations Detailed-analyze tree now opens with a per-category summary (sum of elapsed by kind, sorted desc, wrappers excluded to avoid double counting), tags every line with its category, and formats durations as us/ms/s and bytes as B/KB/MB.
… per-node breakdown Each node header now shows its total wall time; the category summary moves below the tree and gains an overall block plus per-node (router/main/workers) category breakdowns.
…from summary OpSample gains is_wrapper, set via OpGuard::start_wrapper at the span that contains other measured ops (round-trips, *.execute, plan.choose_index). The summary filters on the flag instead of matching labels by string across files, and choose_index is now a wrapper so its nested metastore calls are no longer double-counted in Planning. Categories now form a non-overlapping partition.
Runs the command on a router+worker test cluster and asserts the single trace cell contains every level plus the category summary (router/main/worker, Planning/Metastore/Execution/Transport, transport.* and plan:). The test harness has no select subprocess pool, so the subprocess section is not asserted.
DETAILED is a superset of EXTENDED (per-node DataFusion metrics on main and workers, plus metastore/transport/planning/memory). Drops the EXTENDED command, its parser arm/dispatch, and the execute flag threaded through ExplainAnalyze / run_explain_analyze / pp_worker_plan; plain EXPLAIN ANALYZE returns to non-executing (plan-only) as before. parse test repurposed to cover DETAILED + plain EXPLAIN.
4297c0e to
4f84a78
Compare
|
Claude finished @waralexrom's task in 5m 15s —— View job Code review — EXPLAIN ANALYZE DETAILEDOverall this is a solid piece of plumbing: the gating via task-local Verdict: 0 high, 1 medium, 4 low. Findings & checklist
Findings (linked to inline comments):
Things that look right and were worth checking:
|
|
|
||
| /// Detailed-trace mirror of [RouterSelect]: the entry node asks a main worker to | ||
| /// run the full router plan for real and return the assembled `MainTrace`. | ||
| RouterSelectDetailed(SerializedPlan), | ||
| RouterSelectDetailedResult(Result<MainTrace, CubeError>), | ||
|
|
||
| /// Detailed-trace mirror of [Select]: a worker runs its part for real and returns | ||
| /// both the result rows (for the main to merge) and its `WorkerTrace`. | ||
| SelectDetailed(SerializedPlan, WorkerPlanningParams), | ||
| SelectDetailedResult( | ||
| Result<(SchemaRef, Vec<SerializedRecordBatchStream>, WorkerTrace), CubeError>, | ||
| ), |
There was a problem hiding this comment.
NETWORK_MESSAGE_VERSION is still 1 even though NetworkMessage gained four new variants. Serde's default externally‑tagged repr keys by variant name, so existing variants keep working across a rolling deploy — but an old binary that receives one of the new variants will fail deserialization with a generic "unknown variant" error rather than the friendlier protocol-version mismatch on line 137. Since EXPLAIN ANALYZE DETAILED is user-initiated and opt-in this is unlikely to bite in practice, but worth either (a) bumping the version to make the failure mode clear, or (b) leaving a comment here noting that adding variants is wire‑compatible only because the repr is by name.
There was a problem hiding this comment.
@claude We have never bumped the protocol version before. In this case, we preserve both compatibility on the production path and consistency during a rolling deployment across the cluster (which would actually be disrupted by a version bump). So it seems that bumping the version is not justified here.
There was a problem hiding this comment.
Claude finished @waralexrom's task in 39s —— View job
Re: bumping NETWORK_MESSAGE_VERSION
You're right, and I'll withdraw the suggestion to bump it. Your reasoning is actually stronger than my original note gave it credit for.
The check at message.rs:136 is a strict equality:
let ver = socket.read_u32().await?;
if ver != NETWORK_MESSAGE_VERSION {
return Err(CubeError::user(format!("Network protocol version mismatch. ...")));
}So bumping 1 → 2 doesn't just make the new *Detailed variants fail more clearly — it makes every message between a v1 node and a v2 node fail, including the hot Select / RouterSelect / MetaStoreCall traffic. During a rolling deploy the cluster is mixed-version by definition, so a bump would take the whole cluster down for the rollout window. That's the opposite of what the version field should protect.
Staying at 1 is correct here:
- Existing variants stay wire-compatible because serde's flexbuffers repr is externally-tagged by name, not by index — a mixed-version cluster keeps talking on the production path.
- New
*Detailedvariants only ever cross the wire when a user explicitly runsEXPLAIN ANALYZE DETAILED, which is opt-in and not part of steady-state or rolling-deploy traffic. The only degenerate case (an old binary receiving a new variant) yields a generic "unknown variant" deser error instead of the friendlier mismatch message — an acceptable trade for not breaking rolling deploys.
So: no version bump. The only thing I'd still gently suggest — purely optional — is a one-line comment near the version constant or the new variants recording why it's safe to add variants without bumping (the by-name repr + opt-in path), so the next person doesn't re-derive this or "helpfully" bump it. Happy to add that if you want it; otherwise this is resolved on my end.
· branch cubestore-datailed-analyze
| async fn run_local_select_detailed_worker( | ||
| &self, | ||
| plan_node: SerializedPlan, | ||
| worker_planning_params: WorkerPlanningParams, | ||
| ) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>, WorkerTrace), CubeError> { | ||
| let ctx = crate::trace::TraceCtx::new(); | ||
| let node_name = self.server_name.clone(); | ||
| let started = std::time::Instant::now(); | ||
| let (schema, records, subprocess) = crate::trace::scoped(Some(ctx.clone()), async { | ||
| self.run_local_select_worker_impl(plan_node, worker_planning_params, true) | ||
| .await | ||
| .map(|(schema, records, _size, subtrace)| (schema, records, subtrace)) | ||
| }) | ||
| .await?; | ||
| let worker_trace = WorkerTrace { | ||
| node_name, | ||
| total_us: started.elapsed().as_micros() as u64, | ||
| net_roundtrip_us: None, | ||
| ops: ctx.take_ops(), | ||
| subprocess, | ||
| }; | ||
| Ok((schema, records, worker_trace)) |
There was a problem hiding this comment.
When the select subprocess pool is disabled (non-Linux / unconfigured), run_local_select_worker_impl silently drops the detailed flag in its fallback branch: it calls execute_worker_plan(..., None) and returns subprocess = None. The user gets a tree missing the subprocess · region with no indication the trace was degraded — distinguishing "no subprocess pool" from "subprocess didn't run" requires reading the source.
Reasonable fixes:
- Pass a
TrackingMemoryPoolthrough the fallback branch too —execute_worker_planalready records per-node metrics + plan text whenmemory_pool.is_some(), so the subprocess section can be populated even without the pool. - Or, when degraded, surface a one-line note in the trace (e.g. a synthetic op or a header) explaining the subprocess pool wasn't available.
The PR's smoke test runs in this exact path and doesn't assert the subprocess · marker, which currently masks the gap.
| } | ||
| // Harvest per-node DataFusion metrics of the final stages (router-level nodes | ||
| // above ClusterSend), aggregated by node type into the active trace. | ||
| record_plan_node_metrics(&physical_plan); |
There was a problem hiding this comment.
record_plan_node_metrics walks the entire main physical plan, including ClusterSendExec itself. ClusterSendExec's elapsed_compute includes the wall time spent waiting on workers (the run_select_detailed future runs inside its execute), so it gets folded into the main's Execution category — while that same wait time is also accounted for via the per-worker net_roundtrip_us → Transport derivation in the summary.
That's a soft double-count for the overall total. Consider either marking ClusterSendExec so record_plan_node_metrics skips it, or flagging the recorded Execution/ClusterSendExec sample as a wrapper so it's tree-visible but excluded from the summary.
| fn push(&self, sample: OpSample) { | ||
| let mut ops = self.ops.lock().unwrap(); | ||
| if let Some(existing) = ops | ||
| .iter_mut() | ||
| .find(|o| o.kind == sample.kind && o.label == sample.label) | ||
| { | ||
| existing.elapsed_us += sample.elapsed_us; | ||
| existing.count += sample.count; | ||
| existing.bytes = match (existing.bytes, sample.bytes) { | ||
| (Some(a), Some(b)) => Some(a + b), | ||
| (a, b) => a.or(b), | ||
| }; | ||
| existing.rows = match (existing.rows, sample.rows) { | ||
| (Some(a), Some(b)) => Some(a + b), | ||
| (a, b) => a.or(b), | ||
| }; | ||
| } else { | ||
| ops.push(sample); | ||
| } | ||
| } |
There was a problem hiding this comment.
Linear scan-and-merge inside the mutex. For typical traces (~tens of ops) this is fine, but a query that records many distinct (kind, label) keys (e.g. a wide physical plan with many node types via record_plan_node_metrics) becomes quadratic and holds the mutex across the whole scan. A small HashMap<(OpKind, &str), usize> index built lazily on first push would keep this O(1) per insert and remove the cost concern for larger plans.
Not blocking — flagging in case detailed traces ever land in a path that records hundreds of samples.
| // Smoke check: the whole path produced the levels + the summary. | ||
| // (The test harness runs the worker in-process, without the select | ||
| // subprocess pool, so no `subprocess ·` section here.) | ||
| for marker in [ | ||
| "summary by category", | ||
| "router", | ||
| "Metastore", | ||
| "main \u{b7}", | ||
| "worker \u{b7}", | ||
| "Planning", | ||
| "Execution", | ||
| "transport.", | ||
| "plan:", | ||
| ] { | ||
| assert!(trace.contains(marker), "trace missing '{}':\n{}", marker, trace); | ||
| } |
There was a problem hiding this comment.
The smoke test only asserts marker presence — it never checks that the numbers are sensible. A regression that leaves MainTrace.total_us == 0, an empty workers vector, or a malformed transport line would still pass as long as the literal substrings appear.
Consider adding a couple of cheap structural assertions: e.g. parse out at least one total <duration> and assert it's >0, or grep for main · 127.0.0.1:14016 and worker · 127.0.0.1:14016 rather than the generic main \u{b7} / worker \u{b7} substrings. It's still a smoke test, but it would catch silently-empty traces.
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #11016 +/- ##
=======================================
Coverage 58.57% 58.57%
=======================================
Files 216 216
Lines 17181 17181
Branches 3502 3502
=======================================
Hits 10064 10064
Misses 6609 6609
Partials 508 508
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
…dule Move explain_detailed (orchestration) and render_query_trace (pure QueryTrace -> DataFrame rendering) out of the oversized sql/mod.rs into sql/explain_detailed.rs. render stays out of trace.rs to keep that a DataFrame-free serde leaf.
|
Claude finished @waralexrom's task in 1m 12s —— View job Code review — incremental (synchronize)Only one new commit since the prior review: Verdict on the new commit: 0 findings. The split is clean — Status of prior threads
|
Summary
Adds
EXPLAIN ANALYZE DETAILED <select>to CubeStore: a per-query trace that decomposes a real query execution across the whole topology (entry → main → workers → select subprocess) into planning, metastore, transport, execution, serialization and memory — so prod bottlenecks (where local ≠ prod) can be found without blind per-query tracing in steady state.The trace is gated: it only runs for the explicit
DETAILEDcommand and is a no-op on the normal query path.Changes
trace.rs):OpSample(aggregated by kind/label, with bytes/rows/is_wrapper) + typed per-region structs (RouterTrace/MainTrace/WorkerTrace/SubprocessTrace/QueryTrace); per-queryTraceCtxbehind a task-local gate;OpGuardcaptures time only when a detailed query is in flight.NetworkMessagevariantsRouterSelectDetailed/SelectDetailed(hotSelect/SelectResultuntouched — flexbuffers tolerates a trailing field only new→old, so changing a hot message would break rolling deploys). The main runs the real router plan; aWorkerTraceCollectorrides the DataFusionTaskContextsoClusterSendExecrecords per-worker traces while rows still flow into the merge (no new plan node).#[cuberpc::service(trace_guard=…)]-generatedTracedMetaStoredecorator, both DI sites); router planning phases; per-node DataFusion metrics (elapsed_compute/output_rows) on main and workers; per-queryMemoryPoolpeak on main and subprocess; serialized byte volume; executed physical plan text; per-boundary transport (wire+queue) = round-trip − child wall.*.execute, round-trips,choose_index) are excluded from the summary so categories don't double-count.Testing
cargo test -p cubestore --lib explain_analyze_detailed— smoke test on a router+worker cluster asserting the trace contains every level + the category summary.getActivePartitionsAndChunksByIndexIdForSelect/getMultiPartitionSubtreeappear under the entry node's planning).🤖 Generated with Claude Code