Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
83a42c5
feat(cubestore): EXPLAIN ANALYZE DETAILED trace skeleton
waralexrom Jun 4, 2026
6eee4f8
feat(cubestore): trace metastore RPC calls during detailed analyze
waralexrom Jun 4, 2026
1a3e157
feat(cubestore): trace metastore calls via generated TracedMetaStore …
waralexrom Jun 4, 2026
b15d611
feat(cubestore): trace router planning phases in detailed analyze
waralexrom Jun 4, 2026
2f9c0d9
feat(cubestore): route detailed analyze through the real router-selec…
waralexrom Jun 4, 2026
75d20b7
feat(cubestore): per-query MemoryPool peak on the detailed-analyze main
waralexrom Jun 4, 2026
3ab788d
feat(cubestore): per-query MemoryPool peak on the select subprocess
waralexrom Jun 4, 2026
d0f4a5f
feat(cubestore): harvest final-stage DataFusion node metrics on the main
waralexrom Jun 4, 2026
fcb8d9b
feat(cubestore): harvest worker subplan DataFusion node metrics
waralexrom Jun 4, 2026
8efe826
feat(cubestore): record serialized byte volume on serialize ops
waralexrom Jun 4, 2026
afbb23a
feat(cubestore): add output_rows to node metrics
waralexrom Jun 4, 2026
6ec146a
feat(cubestore): capture executed physical plan text in detailed analyze
waralexrom Jun 4, 2026
d9d5fcf
feat(cubestore): measure per-region wall time and main->worker round-…
waralexrom Jun 4, 2026
ae36601
feat(cubestore): render explicit transport per boundary in detailed a…
waralexrom Jun 4, 2026
f1c46e5
feat(cubestore): render detailed analyze as a compact tree
waralexrom Jun 5, 2026
4537290
feat(cubestore): add category summary, per-line kind, human-readable …
waralexrom Jun 5, 2026
fc07213
feat(cubestore): per-node totals in tree, move summary to bottom with…
waralexrom Jun 5, 2026
a75d6b9
fix(cubestore): mark wrapper spans with a flag, exclude choose_index …
waralexrom Jun 5, 2026
f6cab8d
test(cubestore): smoke test for EXPLAIN ANALYZE DETAILED
waralexrom Jun 5, 2026
4f84a78
feat(cubestore): remove EXPLAIN ANALYZE EXTENDED, superseded by DETAILED
waralexrom Jun 5, 2026
be0f17f
refactor(cubestore): extract EXPLAIN ANALYZE DETAILED into its own mo…
waralexrom Jun 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 127 additions & 2 deletions rust/cubestore/cuberpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,47 @@ use syn::spanned::Spanned;
use syn::{parse_macro_input, FnArg, Item, Pat, ReturnType, TraitItem};

#[proc_macro_attribute]
pub fn service(_attr: TokenStream, input: TokenStream) -> proc_macro::TokenStream {
let svc = parse_macro_input!(input as RpcService);
pub fn service(attr: TokenStream, input: TokenStream) -> proc_macro::TokenStream {
let args = parse_macro_input!(attr as ServiceArgs);
let mut svc = parse_macro_input!(input as RpcService);
svc.trace_guard = args.trace_guard;

proc_macro::TokenStream::from(svc.into_token_stream())
}

/// Optional arguments to `#[cuberpc::service]`.
/// `trace_guard = <path>`: generate a `Traced<Service>` decorator that wraps an
/// `Arc<dyn Service>` and, for every async method, holds the guard returned by
/// `<path>(method_name)` across the call. Keeps cuberpc decoupled from the host
/// crate's tracing module.
struct ServiceArgs {
trace_guard: Option<syn::Path>,
}

impl Parse for ServiceArgs {
fn parse(input: ParseStream) -> syn::Result<Self> {
let mut trace_guard = None;
if !input.is_empty() {
let key: Ident = input.parse()?;
input.parse::<syn::Token![=]>()?;
let path: syn::Path = input.parse()?;
if key == "trace_guard" {
trace_guard = Some(path);
} else {
return Err(syn::Error::new(
key.span(),
"unknown cuberpc::service argument",
));
}
}
Ok(ServiceArgs { trace_guard })
}
}

struct RpcService {
ident: Ident,
methods: Vec<RpcMethod>,
trace_guard: Option<syn::Path>,
}

struct RpcMethod {
Expand Down Expand Up @@ -46,6 +78,7 @@ impl Parse for RpcService {
RpcService {
ident: trait_item.ident.clone(),
methods,
trace_guard: None,
}
}
x => {
Expand Down Expand Up @@ -106,6 +139,62 @@ impl RpcService {
}
}

fn method_call_variant_name_impl(&self) -> proc_macro2::TokenStream {
let method_call = self.method_call_ident();
let arms = self
.methods
.iter()
.map(|m| {
let variant = m.variant_ident();
let has_args = m.args.iter().any(|a| matches!(a, FnArg::Typed(_)));
if has_args {
quote! { #method_call::#variant(..) => stringify!(#variant) }
} else {
quote! { #method_call::#variant => stringify!(#variant) }
}
})
.collect::<Vec<_>>();
quote! {
impl #method_call {
/// The trait method name behind this call, for tracing/metrics labels.
pub fn variant_name(&self) -> &'static str {
match self {
#( #arms ),*
}
}
}
}
}

fn traced_decorator(&self) -> proc_macro2::TokenStream {
let Some(guard_path) = self.trace_guard.as_ref() else {
return quote! {};
};
let service = &self.ident;
let traced = format_ident!("Traced{}", self.ident);
let methods = self
.methods
.iter()
.map(|m| m.traced_method(guard_path))
.collect::<Vec<_>>();
quote! {
pub struct #traced {
inner: std::sync::Arc<dyn #service>,
}

impl #traced {
pub fn new(inner: std::sync::Arc<dyn #service>) -> std::sync::Arc<Self> {
std::sync::Arc::new(Self { inner })
}
}

#[async_trait]
impl #service for #traced {
#( #methods )*
}
}
}

fn client_transport_trait(&self) -> proc_macro2::TokenStream {
let method_call = self.method_call_ident();
let method_result = self.method_result_ident();
Expand Down Expand Up @@ -350,17 +439,53 @@ impl RpcMethod {
fn variant_ident(&self) -> Ident {
format_ident!("{}", self.ident.to_string().to_camel_case())
}

fn traced_method(&self, guard_path: &syn::Path) -> proc_macro2::TokenStream {
let &Self {
ident,
asyncness,
args,
output,
} = &self;
let arg_names = args
.iter()
.filter_map(|a| match a {
FnArg::Typed(ty) => match ty.pat.as_ref() {
Pat::Ident(id) => Some(id.ident.clone()),
x => panic!("Unexpected pattern: {:?}", x),
},
FnArg::Receiver(_) => None,
})
.collect::<Vec<_>>();
let variant = self.variant_ident();
if *asyncness {
quote! {
async fn #ident(#( #args ),*) #output {
let _g = #guard_path(stringify!(#variant));
self.inner.#ident(#( #arg_names ),*).await
}
}
} else {
quote! {
fn #ident(#( #args ),*) #output {
self.inner.#ident(#( #arg_names ),*)
}
}
}
}
}

impl ToTokens for RpcService {
fn to_tokens(&self, tokens: &mut proc_macro2::TokenStream) {
tokens.extend(vec![
self.original_trait(),
self.method_call_enum(),
self.method_call_variant_name_impl(),
self.method_result_enum(),
self.client_transport_trait(),
self.client_impl(),
self.server_impl(),
self.traced_decorator(),
]);
}
}
16 changes: 14 additions & 2 deletions rust/cubestore/cubestore/src/cluster/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::metastore::{MetaStoreRpcMethodCall, MetaStoreRpcMethodResult};
use crate::queryplanner::query_executor::SerializedRecordBatchStream;
use crate::queryplanner::serialized_plan::SerializedPlan;
use crate::trace::{MainTrace, WorkerTrace};
use crate::CubeError;
use datafusion::arrow::datatypes::SchemaRef;
use serde::{Deserialize, Serialize};
Expand All @@ -20,10 +21,21 @@ pub enum NetworkMessage {
SelectResult(Result<(SchemaRef, Vec<SerializedRecordBatchStream>), CubeError>),

//Perform explain analyze of worker query part and return it pretty printed physical plan
/// The boolean flag is whether to execute the plan to collect runtime metrics.
ExplainAnalyze(SerializedPlan, WorkerPlanningParams, bool),
ExplainAnalyze(SerializedPlan, WorkerPlanningParams),
ExplainAnalyzeResult(Result<String, CubeError>),

/// Detailed-trace mirror of [RouterSelect]: the entry node asks a main worker to
/// run the full router plan for real and return the assembled `MainTrace`.
RouterSelectDetailed(SerializedPlan),
RouterSelectDetailedResult(Result<MainTrace, CubeError>),

/// Detailed-trace mirror of [Select]: a worker runs its part for real and returns
/// both the result rows (for the main to merge) and its `WorkerTrace`.
SelectDetailed(SerializedPlan, WorkerPlanningParams),
SelectDetailedResult(
Result<(SchemaRef, Vec<SerializedRecordBatchStream>, WorkerTrace), CubeError>,
),
Comment on lines 26 to +37
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NETWORK_MESSAGE_VERSION is still 1 even though NetworkMessage gained four new variants. Serde's default externally‑tagged repr keys by variant name, so existing variants keep working across a rolling deploy — but an old binary that receives one of the new variants will fail deserialization with a generic "unknown variant" error rather than the friendlier protocol-version mismatch on line 137. Since EXPLAIN ANALYZE DETAILED is user-initiated and opt-in this is unlikely to bite in practice, but worth either (a) bumping the version to make the failure mode clear, or (b) leaving a comment here noting that adding variants is wire‑compatible only because the repr is by name.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@claude We have never bumped the protocol version before. In this case, we preserve both compatibility on the production path and consistency during a rolling deployment across the cluster (which would actually be disrupted by a version bump). So it seems that bumping the version is not justified here.

Copy link
Copy Markdown
Contributor

@claude claude Bot Jun 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude finished @waralexrom's task in 39s —— View job


Re: bumping NETWORK_MESSAGE_VERSION

You're right, and I'll withdraw the suggestion to bump it. Your reasoning is actually stronger than my original note gave it credit for.

The check at message.rs:136 is a strict equality:

let ver = socket.read_u32().await?;
if ver != NETWORK_MESSAGE_VERSION {
    return Err(CubeError::user(format!("Network protocol version mismatch. ...")));
}

So bumping 1 → 2 doesn't just make the new *Detailed variants fail more clearly — it makes every message between a v1 node and a v2 node fail, including the hot Select / RouterSelect / MetaStoreCall traffic. During a rolling deploy the cluster is mixed-version by definition, so a bump would take the whole cluster down for the rollout window. That's the opposite of what the version field should protect.

Staying at 1 is correct here:

  • Existing variants stay wire-compatible because serde's flexbuffers repr is externally-tagged by name, not by index — a mixed-version cluster keeps talking on the production path.
  • New *Detailed variants only ever cross the wire when a user explicitly runs EXPLAIN ANALYZE DETAILED, which is opt-in and not part of steady-state or rolling-deploy traffic. The only degenerate case (an old binary receiving a new variant) yields a generic "unknown variant" deser error instead of the friendlier mismatch message — an acceptable trade for not breaking rolling deploys.

So: no version bump. The only thing I'd still gently suggest — purely optional — is a one-line comment near the version constant or the new variants recording why it's safe to add variants without bumping (the by-name repr + opt-in path), so the next person doesn't re-derive this or "helpfully" bump it. Happy to add that if you want it; otherwise this is resolved on my end.
· branch cubestore-datailed-analyze


/// Select that sends results in batches. The immediate response is [SelectResultSchema],
/// followed by a stream of [SelectResultBatch].
SelectStart(SerializedPlan, WorkerPlanningParams),
Expand Down
Loading
Loading