diff --git a/rust/cubestore/cuberpc/src/lib.rs b/rust/cubestore/cuberpc/src/lib.rs index 9d9a9ca235036..2e47cead63e48 100644 --- a/rust/cubestore/cuberpc/src/lib.rs +++ b/rust/cubestore/cuberpc/src/lib.rs @@ -7,15 +7,47 @@ use syn::spanned::Spanned; use syn::{parse_macro_input, FnArg, Item, Pat, ReturnType, TraitItem}; #[proc_macro_attribute] -pub fn service(_attr: TokenStream, input: TokenStream) -> proc_macro::TokenStream { - let svc = parse_macro_input!(input as RpcService); +pub fn service(attr: TokenStream, input: TokenStream) -> proc_macro::TokenStream { + let args = parse_macro_input!(attr as ServiceArgs); + let mut svc = parse_macro_input!(input as RpcService); + svc.trace_guard = args.trace_guard; proc_macro::TokenStream::from(svc.into_token_stream()) } +/// Optional arguments to `#[cuberpc::service]`. +/// `trace_guard = `: generate a `Traced` decorator that wraps an +/// `Arc` and, for every async method, holds the guard returned by +/// `(method_name)` across the call. Keeps cuberpc decoupled from the host +/// crate's tracing module. +struct ServiceArgs { + trace_guard: Option, +} + +impl Parse for ServiceArgs { + fn parse(input: ParseStream) -> syn::Result { + let mut trace_guard = None; + if !input.is_empty() { + let key: Ident = input.parse()?; + input.parse::()?; + let path: syn::Path = input.parse()?; + if key == "trace_guard" { + trace_guard = Some(path); + } else { + return Err(syn::Error::new( + key.span(), + "unknown cuberpc::service argument", + )); + } + } + Ok(ServiceArgs { trace_guard }) + } +} + struct RpcService { ident: Ident, methods: Vec, + trace_guard: Option, } struct RpcMethod { @@ -46,6 +78,7 @@ impl Parse for RpcService { RpcService { ident: trait_item.ident.clone(), methods, + trace_guard: None, } } x => { @@ -106,6 +139,62 @@ impl RpcService { } } + fn method_call_variant_name_impl(&self) -> proc_macro2::TokenStream { + let method_call = self.method_call_ident(); + let arms = self + .methods + .iter() + .map(|m| { + let variant = m.variant_ident(); + let has_args = m.args.iter().any(|a| matches!(a, FnArg::Typed(_))); + if has_args { + quote! { #method_call::#variant(..) => stringify!(#variant) } + } else { + quote! { #method_call::#variant => stringify!(#variant) } + } + }) + .collect::>(); + quote! { + impl #method_call { + /// The trait method name behind this call, for tracing/metrics labels. + pub fn variant_name(&self) -> &'static str { + match self { + #( #arms ),* + } + } + } + } + } + + fn traced_decorator(&self) -> proc_macro2::TokenStream { + let Some(guard_path) = self.trace_guard.as_ref() else { + return quote! {}; + }; + let service = &self.ident; + let traced = format_ident!("Traced{}", self.ident); + let methods = self + .methods + .iter() + .map(|m| m.traced_method(guard_path)) + .collect::>(); + quote! { + pub struct #traced { + inner: std::sync::Arc, + } + + impl #traced { + pub fn new(inner: std::sync::Arc) -> std::sync::Arc { + std::sync::Arc::new(Self { inner }) + } + } + + #[async_trait] + impl #service for #traced { + #( #methods )* + } + } + } + fn client_transport_trait(&self) -> proc_macro2::TokenStream { let method_call = self.method_call_ident(); let method_result = self.method_result_ident(); @@ -350,6 +439,40 @@ impl RpcMethod { fn variant_ident(&self) -> Ident { format_ident!("{}", self.ident.to_string().to_camel_case()) } + + fn traced_method(&self, guard_path: &syn::Path) -> proc_macro2::TokenStream { + let &Self { + ident, + asyncness, + args, + output, + } = &self; + let arg_names = args + .iter() + .filter_map(|a| match a { + FnArg::Typed(ty) => match ty.pat.as_ref() { + Pat::Ident(id) => Some(id.ident.clone()), + x => panic!("Unexpected pattern: {:?}", x), + }, + FnArg::Receiver(_) => None, + }) + .collect::>(); + let variant = self.variant_ident(); + if *asyncness { + quote! { + async fn #ident(#( #args ),*) #output { + let _g = #guard_path(stringify!(#variant)); + self.inner.#ident(#( #arg_names ),*).await + } + } + } else { + quote! { + fn #ident(#( #args ),*) #output { + self.inner.#ident(#( #arg_names ),*) + } + } + } + } } impl ToTokens for RpcService { @@ -357,10 +480,12 @@ impl ToTokens for RpcService { tokens.extend(vec![ self.original_trait(), self.method_call_enum(), + self.method_call_variant_name_impl(), self.method_result_enum(), self.client_transport_trait(), self.client_impl(), self.server_impl(), + self.traced_decorator(), ]); } } diff --git a/rust/cubestore/cubestore/src/cluster/message.rs b/rust/cubestore/cubestore/src/cluster/message.rs index 3518da1d93534..35514142a967b 100644 --- a/rust/cubestore/cubestore/src/cluster/message.rs +++ b/rust/cubestore/cubestore/src/cluster/message.rs @@ -1,6 +1,7 @@ use crate::metastore::{MetaStoreRpcMethodCall, MetaStoreRpcMethodResult}; use crate::queryplanner::query_executor::SerializedRecordBatchStream; use crate::queryplanner::serialized_plan::SerializedPlan; +use crate::trace::{MainTrace, WorkerTrace}; use crate::CubeError; use datafusion::arrow::datatypes::SchemaRef; use serde::{Deserialize, Serialize}; @@ -20,10 +21,21 @@ pub enum NetworkMessage { SelectResult(Result<(SchemaRef, Vec), CubeError>), //Perform explain analyze of worker query part and return it pretty printed physical plan - /// The boolean flag is whether to execute the plan to collect runtime metrics. - ExplainAnalyze(SerializedPlan, WorkerPlanningParams, bool), + ExplainAnalyze(SerializedPlan, WorkerPlanningParams), ExplainAnalyzeResult(Result), + /// Detailed-trace mirror of [RouterSelect]: the entry node asks a main worker to + /// run the full router plan for real and return the assembled `MainTrace`. + RouterSelectDetailed(SerializedPlan), + RouterSelectDetailedResult(Result), + + /// Detailed-trace mirror of [Select]: a worker runs its part for real and returns + /// both the result rows (for the main to merge) and its `WorkerTrace`. + SelectDetailed(SerializedPlan, WorkerPlanningParams), + SelectDetailedResult( + Result<(SchemaRef, Vec, WorkerTrace), CubeError>, + ), + /// Select that sends results in batches. The immediate response is [SelectResultSchema], /// followed by a stream of [SelectResultBatch]. SelectStart(SerializedPlan, WorkerPlanningParams), diff --git a/rust/cubestore/cubestore/src/cluster/mod.rs b/rust/cubestore/cubestore/src/cluster/mod.rs index 73a79ea9e42ec..74002a019b358 100644 --- a/rust/cubestore/cubestore/src/cluster/mod.rs +++ b/rust/cubestore/cubestore/src/cluster/mod.rs @@ -42,6 +42,7 @@ use crate::queryplanner::serialized_plan::SerializedPlan; use crate::remotefs::RemoteFs; use crate::store::ChunkDataStore; use crate::telemetry::tracing::{TraceIdAndSpanId, TracingHelper}; +use crate::trace::{MainTrace, OpKind, SubprocessTrace, WorkerTrace}; use crate::CubeError; use async_trait::async_trait; use datafusion::arrow::datatypes::SchemaRef; @@ -106,15 +107,31 @@ pub trait Cluster: DIService + Send + Sync { ) -> Result, CubeError>; /// Runs explain analyze on a single worker node to get pretty printed physical plan - /// from that worker. `execute` runs the plan to report runtime metrics. + /// from that worker. async fn run_explain_analyze( &self, node_name: &str, plan: SerializedPlan, worker_planning_params: WorkerPlanningParams, - execute: bool, ) -> Result; + /// Detailed-trace path: ask a main worker to run the full router plan for real + /// and return the assembled `MainTrace` (its ops + collected per-worker traces). + async fn run_router_select_detailed( + &self, + node_name: &str, + plan: SerializedPlan, + ) -> Result; + + /// Detailed-trace mirror of [run_select]: runs the worker part for real and + /// returns both the result rows (for the main to merge) and the worker's trace. + async fn run_select_detailed( + &self, + node_name: &str, + plan: SerializedPlan, + worker_planning_params: WorkerPlanningParams, + ) -> Result<(Vec, WorkerTrace), CubeError>; + /// Like [run_select], but streams results as they are requested. /// This allows to send only a limited number of results, if the caller does not need all. async fn run_select_stream( @@ -244,6 +261,8 @@ pub enum WorkerMessage { HashMap, HashMap>, Option, + /// When true the subprocess collects a `SubprocessTrace` for the run. + bool, ), } @@ -295,7 +314,12 @@ pub struct WorkerProcessor; #[async_trait] impl WorkerProcessing for WorkerProcessor { type Request = WorkerMessage; - type Response = (SchemaRef, Vec, usize); + type Response = ( + SchemaRef, + Vec, + usize, + Option, + ); type Config = Config; fn spawn_background_processes(config: Self::Config) -> Result<(), CubeError> { @@ -313,7 +337,15 @@ impl WorkerProcessing for WorkerProcessor { async fn process( config: &Self::Config, args: WorkerMessage, - ) -> Result<(SchemaRef, Vec, usize), CubeError> { + ) -> Result< + ( + SchemaRef, + Vec, + usize, + Option, + ), + CubeError, + > { let services = config.worker_services().await; match args { WorkerMessage::Select( @@ -322,13 +354,21 @@ impl WorkerProcessing for WorkerProcessor { remote_to_local_names, chunk_id_to_record_batches, trace_id_and_span_id, + detailed, ) => { + let memory_pool = + detailed.then(crate::queryplanner::query_executor::TrackingMemoryPool::new); + let memory_pool_for_exec = memory_pool.clone(); let future = async move { let time = SystemTime::now(); debug!("Running select in worker started"); let plan_node_to_send = plan_node.clone(); let result = tracing::trace_span!("Deserialize in_memory chunks").in_scope( move || { + let _g = crate::trace::OpGuard::start( + OpKind::Deserialize, + "chunks.deserialize", + ); chunk_id_to_record_batches .into_iter() .map(|(id, batches)| -> Result<_, CubeError> { @@ -343,23 +383,36 @@ impl WorkerProcessing for WorkerProcessor { .collect::, _>>() }, )?; - let res = services - .query_executor - .clone() - .execute_worker_plan( - plan_node_to_send, - worker_planning_params, - remote_to_local_names, - result, - ) - .await; + let res = { + let _g = crate::trace::OpGuard::start_wrapper( + OpKind::Other, + "subprocess.execute", + ); + services + .query_executor + .clone() + .execute_worker_plan( + plan_node_to_send, + worker_planning_params, + remote_to_local_names, + result, + memory_pool_for_exec, + ) + .await + }; debug!( "Running select in worker completed ({:?})", time.elapsed().unwrap() ); let (schema, records, data_loaded_size) = res?; - let records = SerializedRecordBatchStream::write(schema.as_ref(), records)?; - Ok((schema, records, data_loaded_size)) + let records = { + let mut g = + crate::trace::OpGuard::start(OpKind::Serialize, "result.serialize"); + let records = SerializedRecordBatchStream::write(schema.as_ref(), records)?; + g.set_bytes(records.iter().map(|r| r.byte_size() as u64).sum()); + records + }; + Ok::<_, CubeError>((schema, records, data_loaded_size)) }; let span = trace_id_and_span_id.map(|(t, s)| { @@ -380,11 +433,26 @@ impl WorkerProcessing for WorkerProcessor { span }); - if let Some(span) = span { - future.instrument(span).await - } else { - future.await - } + let run = async move { + if let Some(span) = span { + future.instrument(span).await + } else { + future.await + } + }; + + let ctx = detailed.then(crate::trace::TraceCtx::new); + let started = std::time::Instant::now(); + let (schema, records, data_loaded_size) = + crate::trace::scoped(ctx.clone(), run).await?; + let total_us = started.elapsed().as_micros() as u64; + let subtrace = ctx.map(|c| SubprocessTrace { + total_us, + physical_plan: c.take_plan_text(), + ops: c.take_ops(), + exec_memory_peak_bytes: memory_pool.as_ref().map(|p| p.peak() as u64), + }); + Ok((schema, records, data_loaded_size, subtrace)) } } } @@ -524,12 +592,11 @@ impl Cluster for ClusterImpl { node_name: &str, plan: SerializedPlan, worker_planning_params: WorkerPlanningParams, - execute: bool, ) -> Result { let response = self .send_or_process_locally( node_name, - NetworkMessage::ExplainAnalyze(plan, worker_planning_params, execute), + NetworkMessage::ExplainAnalyze(plan, worker_planning_params), ) .await?; match response { @@ -538,6 +605,49 @@ impl Cluster for ClusterImpl { } } + async fn run_router_select_detailed( + &self, + node_name: &str, + plan: SerializedPlan, + ) -> Result { + let response = self + .send_or_process_locally(node_name, NetworkMessage::RouterSelectDetailed(plan)) + .await?; + match response { + NetworkMessage::RouterSelectDetailedResult(r) => r, + _ => panic!("unexpected result for router select detailed"), + } + } + + async fn run_select_detailed( + &self, + node_name: &str, + plan: SerializedPlan, + worker_planning_params: WorkerPlanningParams, + ) -> Result<(Vec, WorkerTrace), CubeError> { + let response = self + .send_or_process_locally( + node_name, + NetworkMessage::SelectDetailed(plan, worker_planning_params), + ) + .await?; + match response { + NetworkMessage::SelectDetailedResult(r) => r.and_then(|(_, batches, trace)| { + // NOTE: this main-side deserialization is not traced. It runs inside + // ClusterSendExec on a DataFusion-spawned task where the task-local trace + // ctx is not visible (the worker trace itself rides the collector in the + // task context for that reason). Measuring it needs an explicit channel, + // not an OpGuard — deferred. + let records = batches + .into_iter() + .map(|b| b.read()) + .collect::, _>>()?; + Ok((records, trace)) + }), + _ => panic!("unexpected result for select detailed"), + } + } + async fn run_select_stream( &self, node_name: &str, @@ -725,12 +835,22 @@ impl Cluster for ClusterImpl { let res = self.run_local_select_worker(plan, planning_params).await; NetworkMessage::SelectResult(res) } - NetworkMessage::ExplainAnalyze(plan, planning_params, execute) => { + NetworkMessage::ExplainAnalyze(plan, planning_params) => { let res = self - .run_local_explain_analyze_worker(plan, planning_params, execute) + .run_local_explain_analyze_worker(plan, planning_params) .await; NetworkMessage::ExplainAnalyzeResult(res) } + NetworkMessage::RouterSelectDetailed(plan) => { + let res = self.run_local_router_select_detailed_main(plan).await; + NetworkMessage::RouterSelectDetailedResult(res) + } + NetworkMessage::SelectDetailed(plan, planning_params) => { + let res = self + .run_local_select_detailed_worker(plan, planning_params) + .await; + NetworkMessage::SelectDetailedResult(res) + } NetworkMessage::WarmupDownload(remote_path, expected_file_size) => { let res = self .remote_fs @@ -740,7 +860,9 @@ impl Cluster for ClusterImpl { } NetworkMessage::SelectResult(_) | NetworkMessage::WarmupDownloadResult(_) - | NetworkMessage::ExplainAnalyzeResult(_) => { + | NetworkMessage::ExplainAnalyzeResult(_) + | NetworkMessage::RouterSelectDetailedResult(_) + | NetworkMessage::SelectDetailedResult(_) => { panic!("result sent to worker"); } NetworkMessage::AddMemoryChunk { chunk_name, data } => { @@ -1278,10 +1400,10 @@ impl ClusterImpl { trace_obj: plan_node.trace_obj(), }; let res = self - .run_local_select_worker_impl(plan_node, worker_planning_params) + .run_local_select_worker_impl(plan_node, worker_planning_params, false) .await; match res { - Ok((schema, records, data_loaded_size)) => { + Ok((schema, records, data_loaded_size, _)) => { self.process_rate_limiter .commit_task_usage( TaskType::Select, @@ -1305,16 +1427,29 @@ impl ClusterImpl { &self, plan_node: SerializedPlan, worker_planning_params: WorkerPlanningParams, - ) -> Result<(SchemaRef, Vec, usize), CubeError> { + detailed: bool, + ) -> Result< + ( + SchemaRef, + Vec, + usize, + Option, + ), + CubeError, + > { let start = SystemTime::now(); debug!("Running select"); + let warmup_guard = crate::trace::OpGuard::start(OpKind::WarmupIo, "worker.warmup"); let remote_to_local_names = self.warmup_select_worker_files(&plan_node).await?; + drop(warmup_guard); let warmup = start.elapsed()?; if warmup.as_millis() > 200 { warn!("Warmup download for select ({:?})", warmup); } + let chunk_load_guard = crate::trace::OpGuard::start(OpKind::ChunkLoad, "chunks.load"); let chunk_id_to_record_batches = self.load_in_memory_chunks(&plan_node).await?; + drop(chunk_load_guard); let mut res = None; #[cfg(not(target_os = "windows"))] @@ -1333,36 +1468,56 @@ impl ClusterImpl { tracing::Level::TRACE, "Serialize chunks into SerializedRecordBatchStream" ); - let chunk_id_to_record_batches = span.in_scope(|| { - chunk_id_to_record_batches - .iter() - .map( - |(id, b)| -> Result<(u64, Vec), CubeError> { - Ok(( - *id, - SerializedRecordBatchStream::write( - &b.iter().next().unwrap().schema(), - b.to_vec(), - )?, - )) - }, - ) - .collect::, _>>() - })?; - res = Some( + let chunk_id_to_record_batches = span.in_scope( + || -> Result>, CubeError> { + let mut g = + crate::trace::OpGuard::start(OpKind::Serialize, "chunks.serialize"); + let result = + chunk_id_to_record_batches + .iter() + .map( + |(id, b)| -> Result< + (u64, Vec), + CubeError, + > { + Ok(( + *id, + SerializedRecordBatchStream::write( + &b.iter().next().unwrap().schema(), + b.to_vec(), + )?, + )) + }, + ) + .collect::, _>>()?; + g.set_bytes( + result + .values() + .flatten() + .map(|r| r.byte_size() as u64) + .sum(), + ); + Ok(result) + }, + )?; + let pool_result = { + let _ipc_guard = + crate::trace::OpGuard::start_wrapper(OpKind::Transport, "ipc.select"); pool.process(WorkerMessage::Select( plan_node.clone(), worker_planning_params, remote_to_local_names.clone(), chunk_id_to_record_batches, self.tracing_helper.trace_and_span_id(), + detailed, )) .instrument(tracing::span!( tracing::Level::TRACE, "execute_worker_plan_on_pool" )) - .await, - ) + .await + }; + res = Some(pool_result) } } @@ -1375,45 +1530,86 @@ impl ClusterImpl { worker_planning_params, remote_to_local_names, chunk_id_to_record_batches, + None, ) .await?; let records = SerializedRecordBatchStream::write(schema.as_ref(), records); - res = Some(Ok((schema, records?, data_loaded_size))) + res = Some(Ok((schema, records?, data_loaded_size, None))) } info!("Running select completed ({:?})", start.elapsed()?); res.unwrap() } + async fn run_local_select_detailed_worker( + &self, + plan_node: SerializedPlan, + worker_planning_params: WorkerPlanningParams, + ) -> Result<(SchemaRef, Vec, WorkerTrace), CubeError> { + let ctx = crate::trace::TraceCtx::new(); + let node_name = self.server_name.clone(); + let started = std::time::Instant::now(); + let (schema, records, subprocess) = crate::trace::scoped(Some(ctx.clone()), async { + self.run_local_select_worker_impl(plan_node, worker_planning_params, true) + .await + .map(|(schema, records, _size, subtrace)| (schema, records, subtrace)) + }) + .await?; + let worker_trace = WorkerTrace { + node_name, + total_us: started.elapsed().as_micros() as u64, + net_roundtrip_us: None, + ops: ctx.take_ops(), + subprocess, + }; + Ok((schema, records, worker_trace)) + } + + async fn run_local_router_select_detailed_main( + &self, + plan_node: SerializedPlan, + ) -> Result { + let ctx = crate::trace::TraceCtx::new(); + let collector = crate::trace::WorkerTraceCollector::new(); + let node_name = self.server_name.clone(); + let cluster = self.this.upgrade().unwrap(); + let started = std::time::Instant::now(); + let memory_peak = crate::trace::scoped(Some(ctx.clone()), async { + self.query_executor + .execute_router_plan_detailed(plan_node, cluster, collector.clone()) + .await + }) + .await?; + Ok(MainTrace { + node_name, + total_us: started.elapsed().as_micros() as u64, + physical_plan: ctx.take_plan_text(), + ops: ctx.take_ops(), + exec_memory_peak_bytes: memory_peak, + workers: collector.take(), + }) + } + async fn run_local_explain_analyze_worker( &self, plan_node: SerializedPlan, worker_planning_params: WorkerPlanningParams, - execute: bool, ) -> Result { let remote_to_local_names = self.warmup_select_worker_files(&plan_node).await?; - let chunk_id_to_record_batches = if execute { - self.load_in_memory_chunks(&plan_node).await? - } else { - plan_node - .in_memory_chunks_to_load() - .into_iter() - .map(|(c, _, _)| (c.get_id(), Vec::new())) - .collect() - }; + let chunk_id_to_record_batches = plan_node + .in_memory_chunks_to_load() + .into_iter() + .map(|(c, _, _)| (c.get_id(), Vec::new())) + .collect(); - let res = self - .query_executor + self.query_executor .pp_worker_plan( plan_node, worker_planning_params, remote_to_local_names, chunk_id_to_record_batches, - execute, ) - .await; - - res + .await } async fn load_in_memory_chunks( diff --git a/rust/cubestore/cubestore/src/config/mod.rs b/rust/cubestore/cubestore/src/config/mod.rs index 8c3c78492af64..9ce247a6e47dd 100644 --- a/rust/cubestore/cubestore/src/config/mod.rs +++ b/rust/cubestore/cubestore/src/config/mod.rs @@ -19,6 +19,7 @@ use crate::import::limits::ConcurrencyLimits; use crate::import::{ImportService, ImportServiceImpl, LocationsValidator, LocationsValidatorImpl}; use crate::metastore::{ BaseRocksStoreFs, MetaStore, MetaStoreRpcClient, RocksMetaStore, RocksStoreConfig, + TracedMetaStore, }; use crate::mysql::{MySqlServer, SqlAuthDefaultImpl, SqlAuthService}; use crate::queryplanner::metadata_cache::BasicMetadataCacheFactory; @@ -2170,7 +2171,7 @@ impl Config { self.injector .register_typed::(async move |i| { let transport = ClusterMetaStoreClient::new(i.get_service_typed().await); - Arc::new(MetaStoreRpcClient::new(transport)) + TracedMetaStore::new(Arc::new(MetaStoreRpcClient::new(transport))) }) .await; } else { @@ -2188,29 +2189,35 @@ impl Config { }) .await; let path = self.meta_store_path().to_str().unwrap().to_string(); + // Register the concrete RocksMetaStore (some code fetches it by type), + // then expose `dyn MetaStore` wrapped in TracedMetaStore so local calls + // are traced too. self.injector - .register_typed_with_default::( - async move |i| { - let config = i.get_service_typed::().await; - let metastore_fs = i.get_service("metastore_fs").await; - let meta_store = if let Some(dump_dir) = config.clone().dump_dir() { - RocksMetaStore::load_from_dump( - &Path::new(&path), - dump_dir, - metastore_fs, - config, - ) + .register_typed::(async move |i| { + let config = i.get_service_typed::().await; + let metastore_fs = i.get_service("metastore_fs").await; + let meta_store = if let Some(dump_dir) = config.clone().dump_dir() { + RocksMetaStore::load_from_dump( + &Path::new(&path), + dump_dir, + metastore_fs, + config, + ) + .await + .unwrap() + } else { + RocksMetaStore::load_from_remote(&path, metastore_fs, config) .await .unwrap() - } else { - RocksMetaStore::load_from_remote(&path, metastore_fs, config) - .await - .unwrap() - }; - meta_store.add_listener(metastore_event_sender).await; - meta_store - }, - ) + }; + meta_store.add_listener(metastore_event_sender).await; + meta_store + }) + .await; + self.injector + .register_typed::(async move |i| { + TracedMetaStore::new(i.get_service_typed::().await) + }) .await; }; diff --git a/rust/cubestore/cubestore/src/lib.rs b/rust/cubestore/cubestore/src/lib.rs index 63f7dd9992291..82fb3dcc3d3f4 100644 --- a/rust/cubestore/cubestore/src/lib.rs +++ b/rust/cubestore/cubestore/src/lib.rs @@ -47,6 +47,7 @@ pub mod streaming; pub mod sys; pub mod table; pub mod telemetry; +pub mod trace; pub mod util; pub use datafusion::cube_ext::spawn; diff --git a/rust/cubestore/cubestore/src/metastore/mod.rs b/rust/cubestore/cubestore/src/metastore/mod.rs index 1edda6fb859a3..02af81428a1ea 100644 --- a/rust/cubestore/cubestore/src/metastore/mod.rs +++ b/rust/cubestore/cubestore/src/metastore/mod.rs @@ -803,7 +803,7 @@ pub struct PartitionData { pub chunks: Vec>, } -#[cuberpc::service] +#[cuberpc::service(trace_guard = crate::trace::metastore_trace_guard)] pub trait MetaStore: DIService + Send + Sync { async fn wait_for_current_seq_to_sync(&self) -> Result<(), CubeError>; fn schemas_table(&self) -> SchemaMetaStoreTable; @@ -1192,6 +1192,7 @@ pub trait MetaStore: DIService + Send + Sync { crate::di_service!(RocksMetaStore, [MetaStore]); crate::di_service!(MetaStoreRpcClient, [MetaStore]); +crate::di_service!(TracedMetaStore, [MetaStore]); #[derive(Clone, Debug)] pub enum MetaStoreEvent { diff --git a/rust/cubestore/cubestore/src/queryplanner/mod.rs b/rust/cubestore/cubestore/src/queryplanner/mod.rs index f2a41ac7d3b7c..c4d4742312e4d 100644 --- a/rust/cubestore/cubestore/src/queryplanner/mod.rs +++ b/rust/cubestore/cubestore/src/queryplanner/mod.rs @@ -64,6 +64,7 @@ use crate::queryplanner::udfs::{registerable_aggregate_udfs_iter, registerable_s use crate::sql::cache::SqlResultCache; use crate::sql::InlineTables; use crate::store::DataFrame; +use crate::trace::{OpGuard, OpKind}; use crate::{app_metrics, metastore, CubeError}; use async_trait::async_trait; use core::fmt; @@ -141,7 +142,9 @@ impl QueryPlanner for QueryPlannerImpl { trace_obj: Option, ) -> Result { let pre_execution_context_time = SystemTime::now(); + let ec_guard = OpGuard::start(OpKind::Planning, "plan.session_context"); let ctx = self.execution_context()?; + drop(ec_guard); let post_execution_context_time = SystemTime::now(); app_metrics::DATA_QUERY_LOGICAL_PLAN_EXECUTION_CONTEXT_TIME_US.report( @@ -163,7 +166,9 @@ impl QueryPlanner for QueryPlannerImpl { let query_planner = SqlToRel::new_with_options(&schema_provider, sql_to_rel_options()); let pre_statement_to_plan_time = SystemTime::now(); + let stp_guard = OpGuard::start(OpKind::Planning, "plan.statement_to_plan"); let mut logical_plan = query_planner.statement_to_plan(statement)?; + drop(stp_guard); let post_statement_to_plan_time = SystemTime::now(); app_metrics::DATA_QUERY_LOGICAL_PLAN_QUERY_PLANNER_SETUP_TIME_US.report( pre_statement_to_plan_time @@ -193,7 +198,9 @@ impl QueryPlanner for QueryPlannerImpl { ); let logical_plan_optimize_time = SystemTime::now(); + let opt_guard = OpGuard::start(OpKind::Planning, "plan.optimize"); logical_plan = state.optimize(&logical_plan)?; + drop(opt_guard); let post_optimize_time = SystemTime::now(); app_metrics::DATA_QUERY_LOGICAL_PLAN_OPTIMIZE_TIME_US.report( post_optimize_time @@ -219,6 +226,7 @@ impl QueryPlanner for QueryPlannerImpl { let plan = if SerializedPlan::is_data_select_query(&logical_plan) { let choose_index_ext_start = SystemTime::now(); post_is_data_select_query_time = choose_index_ext_start; + let choose_guard = OpGuard::start_wrapper(OpKind::Planning, "plan.choose_index"); let (logical_plan, meta) = choose_index_ext( logical_plan, &self.meta_store.as_ref(), @@ -230,6 +238,7 @@ impl QueryPlanner for QueryPlannerImpl { &logical_plan, &meta.multi_part_subtree, )?; + drop(choose_guard); app_metrics::DATA_QUERY_CHOOSE_INDEX_AND_WORKERS_TIME_US .report(choose_index_ext_start.elapsed()?.as_micros() as i64); QueryPlan::Select( @@ -289,9 +298,19 @@ impl QueryPlannerImpl { /// optimizer rules or other parameters affecting execution performance. This is used by /// `QueryPlannerImpl::make_execution_context`. pub fn minimal_session_state_from_final_config(config: SessionConfig) -> SessionStateBuilder { + Self::minimal_session_state_from_final_config_with_runtime( + config, + Arc::new(RuntimeEnv::default()), + ) + } + + pub fn minimal_session_state_from_final_config_with_runtime( + config: SessionConfig, + runtime_env: Arc, + ) -> SessionStateBuilder { let mut state_builder = SessionStateBuilder::new() .with_config(config) - .with_runtime_env(Arc::new(RuntimeEnv::default())) + .with_runtime_env(runtime_env) .with_default_features(); state_builder .aggregate_functions() @@ -306,7 +325,14 @@ impl QueryPlannerImpl { const EXECUTION_BATCH_SIZE: usize = 4096; - pub fn make_execution_context(mut config: SessionConfig) -> SessionContext { + pub fn make_execution_context(config: SessionConfig) -> SessionContext { + Self::make_execution_context_with_runtime(config, Arc::new(RuntimeEnv::default())) + } + + pub fn make_execution_context_with_runtime( + mut config: SessionConfig, + runtime_env: Arc, + ) -> SessionContext { // The config parameter is from metadata_cache_factory (which we need to rename) but doesn't // include all necessary configs. config @@ -317,7 +343,7 @@ impl QueryPlannerImpl { config.options_mut().execution.parquet.split_row_group_reads = false; // TODO upgrade DF: build SessionContexts consistently - let state = Self::minimal_session_state_from_final_config(config) + let state = Self::minimal_session_state_from_final_config_with_runtime(config, runtime_env) .with_optimizer_rule(Arc::new(RollingOptimizerRule {})) .with_optimizer_rule(Arc::new(IsNotDistinctFromJoinKeysRule {})) .build(); diff --git a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs index 5943df1be5e0e..a7729d7832f6e 100644 --- a/rust/cubestore/cubestore/src/queryplanner/query_executor.rs +++ b/rust/cubestore/cubestore/src/queryplanner/query_executor.rs @@ -46,6 +46,8 @@ use datafusion::datasource::physical_plan::{ use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::DataFusionError; use datafusion::error::Result as DFResult; +use datafusion::execution::memory_pool::{MemoryPool, MemoryReservation}; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::TaskContext; use datafusion::logical_expr::{Expr, LogicalPlan}; use datafusion::physical_expr; @@ -74,8 +76,8 @@ use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ - collect, execute_stream, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, - PhysicalExpr, PlanProperties, SendableRecordBatchStream, + collect, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PhysicalExpr, + PlanProperties, SendableRecordBatchStream, }; use datafusion::prelude::{and, SessionConfig, SessionContext}; use datafusion_datasource::memory::MemorySourceConfig; @@ -91,6 +93,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::io::Cursor; use std::mem::take; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::SystemTime; use tracing::{instrument, Instrument}; @@ -98,6 +101,71 @@ use tracing::{instrument, Instrument}; use super::serialized_plan::PreSerializedPlan; use super::{try_make_memory_data_source, QueryPlannerImpl}; +/// Unbounded `MemoryPool` that records the peak of all operator reservations for a +/// single query execution. The pool lives in that query's `RuntimeEnv`, so the peak +/// is per-query and isolated from concurrent queries sharing the process. Covers only +/// memory that operators voluntarily reserve (sort/aggregate/join buffers), not every +/// allocation — by design. +#[derive(Debug, Default)] +pub struct TrackingMemoryPool { + used: AtomicUsize, + peak: AtomicUsize, +} + +impl TrackingMemoryPool { + pub fn new() -> Arc { + Arc::new(Self::default()) + } + + pub fn peak(&self) -> usize { + self.peak.load(Ordering::Relaxed) + } +} + +impl MemoryPool for TrackingMemoryPool { + fn grow(&self, _reservation: &MemoryReservation, additional: usize) { + let used = self.used.fetch_add(additional, Ordering::Relaxed) + additional; + self.peak.fetch_max(used, Ordering::Relaxed); + } + + fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) { + self.used.fetch_sub(shrink, Ordering::Relaxed); + } + + fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> DFResult<()> { + // Unbounded: always succeeds; we only measure, never reject. + self.grow(reservation, additional); + Ok(()) + } + + fn reserved(&self) -> usize { + self.used.load(Ordering::Relaxed) + } +} + +/// Walk an executed physical plan and record each node's `elapsed_compute` and +/// `output_rows` into the active trace as `OpKind::Execution` samples keyed by node +/// type. Same-typed nodes aggregate (summed time/rows, node count). +fn record_plan_node_metrics(plan: &Arc) { + if let Some(metrics) = plan.metrics() { + let elapsed_us = metrics.elapsed_compute().map(|ns| (ns / 1000) as u64); + let rows = metrics.output_rows().map(|r| r as u64); + if elapsed_us.is_some() || rows.is_some() { + crate::trace::record_op( + crate::trace::OpKind::Execution, + plan.name(), + elapsed_us.unwrap_or(0), + None, + rows, + 1, + ); + } + } + for child in plan.children() { + record_plan_node_metrics(child); + } +} + #[automock] #[async_trait] pub trait QueryExecutor: DIService + Send + Sync { @@ -107,12 +175,23 @@ pub trait QueryExecutor: DIService + Send + Sync { cluster: Arc, ) -> Result<(SchemaRef, Vec), CubeError>; + /// Like [execute_router_plan], but runs under detailed tracing: the worker-trace + /// collector rides in the session config so ClusterSendExec records per-worker + /// traces. Result rows are discarded; returns the execution memory peak if known. + async fn execute_router_plan_detailed( + &self, + plan: SerializedPlan, + cluster: Arc, + worker_traces: Arc, + ) -> Result, CubeError>; + async fn execute_worker_plan( &self, plan: SerializedPlan, worker_planning_params: WorkerPlanningParams, remote_to_local_names: HashMap, chunk_id_to_record_batches: HashMap>, + memory_pool: Option>, ) -> Result<(SchemaRef, Vec, usize), CubeError>; async fn router_plan( @@ -136,7 +215,6 @@ pub trait QueryExecutor: DIService + Send + Sync { worker_planning_params: WorkerPlanningParams, remote_to_local_names: HashMap, chunk_id_to_record_batches: HashMap>, - execute: bool, ) -> Result; } @@ -162,6 +240,51 @@ impl QueryExecutorImpl { #[async_trait] impl QueryExecutor for QueryExecutorImpl { + async fn execute_router_plan_detailed( + &self, + plan: SerializedPlan, + cluster: Arc, + worker_traces: Arc, + ) -> Result, CubeError> { + let (physical_plan, _logical_plan) = { + let _g = crate::trace::OpGuard::start( + crate::trace::OpKind::Planning, + "main.router_physical_plan", + ); + self.router_plan(plan, cluster).await? + }; + // The collector rides in the session config so ClusterSendExec can record + // per-worker traces through DataFusion's own task context propagation. + let config = self + .metadata_cache_factory + .make_session_config() + .with_extension(worker_traces); + // Per-query tracking pool in this execution's own RuntimeEnv: the peak is + // isolated from concurrent queries sharing the process. + let memory_pool = TrackingMemoryPool::new(); + let runtime_env = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(memory_pool.clone()) + .build()?, + ); + let session_context = Arc::new(QueryPlannerImpl::make_execution_context_with_runtime( + config, + runtime_env, + )); + { + let _g = crate::trace::OpGuard::start_wrapper( + crate::trace::OpKind::Execution, + "main.execute", + ); + let _results = collect(physical_plan.clone(), session_context.task_ctx()).await?; + } + // Harvest per-node DataFusion metrics of the final stages (router-level nodes + // above ClusterSend), aggregated by node type into the active trace. + record_plan_node_metrics(&physical_plan); + crate::trace::set_plan_text(pp_phys_plan(physical_plan.as_ref())); + Ok(Some(memory_pool.peak() as u64)) + } + #[instrument(level = "trace", skip(self, plan, cluster))] async fn execute_router_plan( &self, @@ -237,6 +360,7 @@ impl QueryExecutor for QueryExecutorImpl { worker_planning_params: WorkerPlanningParams, remote_to_local_names: HashMap, chunk_id_to_record_batches: HashMap>, + memory_pool: Option>, ) -> Result<(SchemaRef, Vec, usize), CubeError> { let data_loaded_size = DataLoadedSize::new(); let create_worker_physical_plan_time = SystemTime::now(); @@ -270,7 +394,20 @@ impl QueryExecutor for QueryExecutorImpl { ); let execution_time = SystemTime::now(); - let session_context = self.execution_context()?; + let session_context = match &memory_pool { + Some(pool) => { + let runtime = Arc::new( + RuntimeEnvBuilder::new() + .with_memory_pool(pool.clone()) + .build()?, + ); + Arc::new(QueryPlannerImpl::make_execution_context_with_runtime( + self.metadata_cache_factory.make_session_config(), + runtime, + )) + } + None => self.execution_context()?, + }; let results = collect(worker_plan.clone(), session_context.task_ctx()) .instrument(tracing::span!( tracing::Level::TRACE, @@ -313,6 +450,11 @@ impl QueryExecutor for QueryExecutorImpl { } // TODO: stream results as they become available. let results = regroup_batches(results?, max_batch_rows)?; + // Detailed trace: record per-node metrics + the worker subplan text. + if memory_pool.is_some() { + record_plan_node_metrics(&worker_plan); + crate::trace::set_plan_text(pp_phys_plan(worker_plan.as_ref())); + } Ok((worker_plan.schema(), results, data_loaded_size.get())) } @@ -373,7 +515,6 @@ impl QueryExecutor for QueryExecutorImpl { worker_planning_params: WorkerPlanningParams, remote_to_local_names: HashMap, chunk_id_to_record_batches: HashMap>, - execute: bool, ) -> Result { let (physical_plan, _) = self .worker_plan( @@ -395,33 +536,7 @@ impl QueryExecutor for QueryExecutorImpl { )); } - if !execute { - return Ok(pp_phys_plan(worker_plan.as_ref())); - } - - // Execute the plan to populate the metrics. The results are discarded batch by - // batch to avoid holding the full result set in memory. - let session_context = self.execution_context()?; - let mut stream = execute_stream(worker_plan.clone(), session_context.task_ctx())?; - async { - while let Some(batch) = stream.next().await { - batch?; - } - Ok::<_, DataFusionError>(()) - } - .instrument(tracing::span!( - tracing::Level::TRACE, - "execute_physical_plan_for_explain_analyze" - )) - .await?; - - Ok(pp_phys_plan_ext( - worker_plan.as_ref(), - &PPOptions { - show_metrics: true, - ..PPOptions::default() - }, - )) + Ok(pp_phys_plan(worker_plan.as_ref())) } } @@ -1808,6 +1923,40 @@ impl ExecutionPlan for ClusterSendExec { let schema = self.properties.eq_properties.schema().clone(); let node_name = node_name.to_string(); let worker_planning_params = self.worker_planning_params(); + + // Detailed-trace path: when a worker-trace collector rides in the task context, + // pull rows + the worker's trace and record the trace. Rows still flow into the + // merge so the final stages execute (and can be measured) for real. + // + // FIDELITY CAVEAT: this path always materializes (non-streaming), even where prod + // would stream this ClusterSend. So main-side wall-time and RSS reflect the + // non-streaming variant; the per-query MemoryPool peak (operator reservations) is + // less affected, but a streaming prod path will diverge here. Streaming + trace + // return is a deferred follow-up. + if let Some(collector) = context + .session_config() + .get_extension::() + { + let record_batches = async move { + let started = std::time::Instant::now(); + let (rows, mut trace) = cluster + .run_select_detailed( + &node_name, + plan.to_serialized_plan()?, + worker_planning_params, + ) + .await?; + trace.net_roundtrip_us = Some(started.elapsed().as_micros() as u64); + collector.push(trace); + Ok::<_, CubeError>(rows) + }; + let stream = futures::stream::once(record_batches).flat_map(|r| match r { + Ok(vec) => stream::iter(vec.into_iter().map(|b| Ok(b)).collect::>()), + Err(e) => stream::iter(vec![Err(DataFusionError::Execution(e.to_string()))]), + }); + return Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))); + } + if self.use_streaming { // A future that yields a stream let fut = async move { @@ -2168,6 +2317,10 @@ pub struct SerializedRecordBatchStream { } impl SerializedRecordBatchStream { + pub fn byte_size(&self) -> usize { + self.record_batch_file.len() + } + pub fn write( schema: &Schema, record_batches: Vec, diff --git a/rust/cubestore/cubestore/src/sql/explain_detailed.rs b/rust/cubestore/cubestore/src/sql/explain_detailed.rs new file mode 100644 index 0000000000000..1e388232c51f3 --- /dev/null +++ b/rust/cubestore/cubestore/src/sql/explain_detailed.rs @@ -0,0 +1,338 @@ +//! `EXPLAIN ANALYZE DETAILED`: drives a real query execution under per-query +//! tracing and renders the collected `QueryTrace` as a compact tree + category +//! summary. Split out of `sql/mod.rs` — the rendering is pure presentation and the +//! orchestration is self-contained. + +use std::sync::Arc; + +use datafusion::sql::parser::Statement as DFStatement; +use rand::distributions::Uniform; +use rand::{thread_rng, Rng}; +use sqlparser::ast::Statement; + +use crate::metastore::{Column, ColumnType}; +use crate::queryplanner::QueryPlan; +use crate::sql::InlineTables; +use crate::store::DataFrame; +use crate::table::{Row, TableValue}; +use crate::trace::{OpKind, OpSample, QueryTrace, RouterTrace}; +use crate::CubeError; + +impl super::SqlServiceImpl { + pub(crate) async fn explain_detailed( + &self, + statement: Statement, + ) -> Result, CubeError> { + let ctx = crate::trace::TraceCtx::new(); + let main = crate::trace::scoped(Some(ctx.clone()), async move { + let query_plan = self + .query_planner + .logical_plan( + DFStatement::Statement(Box::new(statement)), + &InlineTables::new(), + None, + ) + .await?; + let (serialized, workers) = + match query_plan { + QueryPlan::Select(serialized, workers) => (serialized, workers), + QueryPlan::Meta(_) => return Err(CubeError::user( + "EXPLAIN ANALYZE DETAILED is not supported for selects from system tables" + .to_string(), + )), + }; + let serialized_plan = { + let _g = crate::trace::OpGuard::start(OpKind::Serialize, "plan.serialize"); + serialized.to_serialized_plan()? + }; + // Run the full router plan on a real main (a random worker, like prod), so + // the final stages execute where they actually happen. + let main_node = if workers.is_empty() { + self.cluster.server_name().to_string() + } else { + workers[thread_rng().sample(Uniform::new(0, workers.len()))].clone() + }; + let _g = + crate::trace::OpGuard::start_wrapper(OpKind::Transport, "route_select_detailed"); + let main_trace = self + .cluster + .run_router_select_detailed(&main_node, serialized_plan) + .await?; + Ok::<_, CubeError>(main_trace) + }) + .await?; + + let trace = QueryTrace { + router: RouterTrace { + ops: ctx.take_ops(), + }, + main: Some(main), + }; + Ok(Arc::new(render_query_trace(&trace))) + } +} + +fn render_query_trace(trace: &QueryTrace) -> DataFrame { + fn fmt_dur(us: u64) -> String { + if us >= 1_000_000 { + format!("{:.2}s", us as f64 / 1_000_000.0) + } else if us >= 1_000 { + format!("{:.2}ms", us as f64 / 1_000.0) + } else { + format!("{}us", us) + } + } + + fn fmt_bytes(b: u64) -> String { + if b >= 1 << 20 { + format!("{:.1}MB", b as f64 / (1u64 << 20) as f64) + } else if b >= 1 << 10 { + format!("{:.1}KB", b as f64 / (1u64 << 10) as f64) + } else { + format!("{}B", b) + } + } + + fn find_elapsed(ops: &[OpSample], label: &str) -> Option { + ops.iter().find(|o| o.label == label).map(|o| o.elapsed_us) + } + + fn bump(totals: &mut Vec<(String, u64)>, key: &str, v: u64) { + match totals.iter_mut().find(|(k, _)| k == key) { + Some(e) => e.1 += v, + None => totals.push((key.to_string(), v)), + } + } + + // Wrapper spans contain other measured ops (round-trips, the execute span + // around node metrics, choose_index around metastore), so they are excluded + // from the category summary to keep categories a non-overlapping partition. + fn add_ops(totals: &mut Vec<(String, u64)>, ops: &[OpSample]) { + for op in ops { + if !op.is_wrapper { + bump(totals, &format!("{:?}", op.kind), op.elapsed_us); + } + } + } + + // Sum elapsed by category over the given op slices, plus derived transport. + fn cat_totals(op_slices: &[&[OpSample]], transport_us: u64) -> Vec<(String, u64)> { + let mut t = Vec::new(); + for ops in op_slices { + add_ops(&mut t, ops); + } + if transport_us > 0 { + bump(&mut t, "Transport", transport_us); + } + t.sort_by(|a, b| b.1.cmp(&a.1)); + t + } + + fn emit_cats(out: &mut String, title: &str, cats: &[(String, u64)]) { + out.push_str(&format!("{}\n", title)); + let w = cats.iter().map(|(k, _)| k.len()).max().unwrap_or(0); + for (k, v) in cats { + out.push_str(&format!(" {:9}\n", k, fmt_dur(*v), w = w)); + } + } + + // Renders one region (a node in the topology) as an indented block: header, + // then `kind label value` lines aligned within the region, then the plan. + fn region( + out: &mut String, + depth: usize, + header: &str, + total: Option, + ops: &[OpSample], + transports: &[(&str, u64)], + memory: Option, + plan: Option<&str>, + ) { + let pad = " ".repeat(depth); + match total { + Some(t) => out.push_str(&format!("{}{} · total {}\n", pad, header, fmt_dur(t))), + None => out.push_str(&format!("{}{}\n", pad, header)), + } + let ipad = format!("{} ", pad); + + let mut entries: Vec<(String, String, String)> = Vec::new(); + for op in ops { + let mut v = format!("{:>9}", fmt_dur(op.elapsed_us)); + if let Some(b) = op.bytes { + v.push_str(&format!(" {:>8}", fmt_bytes(b))); + } + if let Some(r) = op.rows { + v.push_str(&format!(" {:>9} rows", r)); + } + entries.push((format!("{:?}", op.kind), op.label.clone(), v)); + } + for (label, us) in transports { + entries.push(( + "Transport".to_string(), + label.to_string(), + format!("{:>9}", fmt_dur(*us)), + )); + } + if let Some(m) = memory { + entries.push(( + "Memory".to_string(), + "exec.peak".to_string(), + format!("{:>9}", fmt_bytes(m)), + )); + } + + let kw = entries.iter().map(|(k, _, _)| k.len()).max().unwrap_or(0); + let lw = entries.iter().map(|(_, l, _)| l.len()).max().unwrap_or(0); + for (kind, label, value) in &entries { + out.push_str(&format!( + "{}{: = et + .map(|v| vec![("transport.entry_to_main", v)]) + .unwrap_or_default(); + region( + &mut out, + 1, + &format!("main · {}", main.node_name), + Some(main.total_us), + &main.ops, + &t, + main.exec_memory_peak_bytes, + main.physical_plan.as_deref(), + ); + for w in &main.workers { + let mut wt: Vec<(&str, u64)> = Vec::new(); + if let Some(rt) = w.net_roundtrip_us { + wt.push(("transport.main_to_worker", rt.saturating_sub(w.total_us))); + } + region( + &mut out, + 2, + &format!("worker · {}", w.node_name), + Some(w.total_us), + &w.ops, + &wt, + None, + None, + ); + if let Some(sub) = &w.subprocess { + let mut st: Vec<(&str, u64)> = Vec::new(); + if let Some(rt) = find_elapsed(&w.ops, "ipc.select") { + st.push(("transport.ipc", rt.saturating_sub(sub.total_us))); + } + region( + &mut out, + 3, + &format!("subprocess · {}", w.node_name), + Some(sub.total_us), + &sub.ops, + &st, + sub.exec_memory_peak_bytes, + sub.physical_plan.as_deref(), + ); + } + } + } + + // ---- summary by category (overall + per node) ---- + out.push_str("\n────────────────────────────\n"); + out.push_str("summary by category (transport = wire+queue; wrapper spans excluded)\n\n"); + if let Some(main) = &trace.main { + let et = find_elapsed(&trace.router.ops, "route_select_detailed") + .map(|rt| rt.saturating_sub(main.total_us)) + .unwrap_or(0); + let mut overall_slices: Vec<&[OpSample]> = + vec![trace.router.ops.as_slice(), main.ops.as_slice()]; + let mut overall_transport = et; + for w in &main.workers { + overall_slices.push(w.ops.as_slice()); + if let Some(rt) = w.net_roundtrip_us { + overall_transport += rt.saturating_sub(w.total_us); + } + if let Some(sub) = &w.subprocess { + overall_slices.push(sub.ops.as_slice()); + if let Some(rt) = find_elapsed(&w.ops, "ipc.select") { + overall_transport += rt.saturating_sub(sub.total_us); + } + } + } + emit_cats( + &mut out, + "overall", + &cat_totals(&overall_slices, overall_transport), + ); + out.push('\n'); + emit_cats( + &mut out, + "router", + &cat_totals(&[trace.router.ops.as_slice()], 0), + ); + out.push('\n'); + emit_cats( + &mut out, + &format!("main · {}", main.node_name), + &cat_totals(&[main.ops.as_slice()], et), + ); + for w in &main.workers { + out.push('\n'); + let mut slices: Vec<&[OpSample]> = vec![w.ops.as_slice()]; + let mut wtrans = w + .net_roundtrip_us + .map(|rt| rt.saturating_sub(w.total_us)) + .unwrap_or(0); + if let Some(sub) = &w.subprocess { + slices.push(sub.ops.as_slice()); + if let Some(rt) = find_elapsed(&w.ops, "ipc.select") { + wtrans += rt.saturating_sub(sub.total_us); + } + } + emit_cats( + &mut out, + &format!("worker · {}", w.node_name), + &cat_totals(&slices, wtrans), + ); + } + } else { + emit_cats( + &mut out, + "overall", + &cat_totals(&[trace.router.ops.as_slice()], 0), + ); + } + + DataFrame::new( + vec![Column::new("trace".to_string(), ColumnType::String, 0)], + vec![Row::new(vec![TableValue::String(out)])], + ) +} diff --git a/rust/cubestore/cubestore/src/sql/mod.rs b/rust/cubestore/cubestore/src/sql/mod.rs index 96f3a471c3efa..3966ad5e79e0c 100644 --- a/rust/cubestore/cubestore/src/sql/mod.rs +++ b/rust/cubestore/cubestore/src/sql/mod.rs @@ -78,6 +78,7 @@ use deepsize::DeepSizeOf; pub mod cache; pub mod cachestore; +mod explain_detailed; pub mod parser; mod table_creator; @@ -545,12 +546,10 @@ impl SqlServiceImpl { } /// `analyze` builds and shows the physical plans on the router and the workers. - /// `execute` additionally runs the worker plans to report runtime metrics. async fn explain( &self, statement: Statement, analyze: bool, - execute: bool, ) -> Result, CubeError> { fn extract_worker_plans( p: &Arc, @@ -615,7 +614,6 @@ impl SqlServiceImpl { &name, plan.to_serialized_plan()?, worker_planning_params, - execute, ) .await .map(|p| (name, p)) @@ -1380,7 +1378,7 @@ impl SqlService for SqlServiceImpl { .. }) => match *statement { Statement::Query(q) => Ok(self - .explain(Statement::Query(q.clone()), analyze, false) + .explain(Statement::Query(q.clone()), analyze) .await? .into()), _ => Err(CubeError::user(format!( @@ -1388,12 +1386,13 @@ impl SqlService for SqlServiceImpl { query ))), }, - CubeStoreStatement::ExplainAnalyzeExtended(q) => { - Ok(self.explain(Statement::Query(q), true, true).await?.into()) - } CubeStoreStatement::Dump(q) => Ok(self.dump_select_inputs(query, q).await?.into()), + CubeStoreStatement::ExplainAnalyzeDetailed(q) => { + Ok(self.explain_detailed(Statement::Query(q)).await?.into()) + } + _ => Err(CubeError::user(format!("Unsupported SQL: '{}'", query))), } } @@ -5173,34 +5172,69 @@ mod tests { _ => {assert!(false);} }; + Ok::<(), CubeError>(()) + }).await; + Ok::<(), CubeError>(()) + }).await; + Ok(()) + } + + #[tokio::test] + async fn explain_analyze_detailed() -> Result<(), CubeError> { + Config::test("explain_detailed_router").update_config(|mut config| { + config.select_workers = vec!["127.0.0.1:14016".to_string()]; + config.metastore_bind_address = Some("127.0.0.1:15016".to_string()); + config.compaction_chunks_count_threshold = 0; + config + }).start_test(async move |services| { + let service = services.sql_service; + + Config::test("explain_detailed_worker_1").update_config(|mut config| { + config.worker_bind_address = Some("127.0.0.1:14016".to_string()); + config.server_name = "127.0.0.1:14016".to_string(); + config.metastore_remote_address = Some("127.0.0.1:15016".to_string()); + config.store_provider = FileStoreProvider::Filesystem { + remote_dir: Some(env::current_dir() + .unwrap() + .join("explain_detailed_router-upstream".to_string())), + }; + config.compaction_chunks_count_threshold = 0; + config + }).start_test_worker(async move |_| { + service.exec_query("CREATE SCHEMA foo").await?.collect().await?; + service.exec_query("CREATE TABLE foo.orders (id int, platform text, age int, amount int)").await?.collect().await?; + service.exec_query( + "INSERT INTO foo.orders (id, platform, age, amount) VALUES (1, 'android', 18, 4), (2, 'ios', 17, 4), (3, 'ios', 20, 5)" + ).await?.collect().await?; + let result = service.exec_query( - "EXPLAIN ANALYZE EXTENDED SELECT platform, sum(amount) from foo.orders where age > 15 group by platform" - ).await?.collect().await?; + "EXPLAIN ANALYZE DETAILED SELECT platform, sum(amount) from foo.orders where age > 15 group by platform" + ).await?.collect().await?; - assert_eq!(result.len(), 2); - let worker_row = &result.get_rows()[1]; - match &worker_row - .values()[2] { - TableValue::String(pp_plan) => { - let regex = Regex::new( - r"LinearPartialAggregate[^\n]*\s+Filter[^\n]*\s+Scan, index: default:1:\[1\], fields: \[platform, age, amount\][^\n]*\s+ParquetScan, files: \S*\.chunk\.parquet" - ).unwrap(); - let matches = regex.captures_iter(&pp_plan).count(); - assert_eq!(matches, 1, "pp_plan = {}", pp_plan); - // The plan is executed, so the nodes carry runtime metrics. - assert!( - pp_plan.contains("metrics:") && pp_plan.contains("output_rows="), - "pp_plan = {}", - pp_plan - ); - assert!( - pp_plan.contains("row_groups_pruned_statistics="), - "parquet scans must report pruning metrics, pp_plan = {}", - pp_plan - ); - }, - _ => {assert!(false);} - }; + // Single "trace" cell holding the whole report. + assert_eq!(result.get_columns().len(), 1); + assert_eq!(result.get_rows().len(), 1); + let trace = match &result.get_rows()[0].values()[0] { + TableValue::String(s) => s.clone(), + v => panic!("expected string trace, got {:?}", v), + }; + + // Smoke check: the whole path produced the levels + the summary. + // (The test harness runs the worker in-process, without the select + // subprocess pool, so no `subprocess ·` section here.) + for marker in [ + "summary by category", + "router", + "Metastore", + "main \u{b7}", + "worker \u{b7}", + "Planning", + "Execution", + "transport.", + "plan:", + ] { + assert!(trace.contains(marker), "trace missing '{}':\n{}", marker, trace); + } Ok::<(), CubeError>(()) }).await; @@ -5208,6 +5242,7 @@ mod tests { }).await; Ok(()) } + #[tokio::test] async fn create_aggr_index() -> Result<(), CubeError> { assert!(true); diff --git a/rust/cubestore/cubestore/src/sql/parser.rs b/rust/cubestore/cubestore/src/sql/parser.rs index f7c86d30b2db1..3ebc4ef1651a3 100644 --- a/rust/cubestore/cubestore/src/sql/parser.rs +++ b/rust/cubestore/cubestore/src/sql/parser.rs @@ -66,8 +66,7 @@ pub enum Statement { Queue(QueueCommand), System(SystemCommand), Dump(Box), - /// Like EXPLAIN ANALYZE, but executes worker plans to report runtime metrics. - ExplainAnalyzeExtended(Box), + ExplainAnalyzeDetailed(Box), } #[derive(Debug, Clone, PartialEq)] @@ -303,43 +302,13 @@ impl<'a> CubeStoreParser<'a> { }; Ok(Statement::Dump(q)) } - // Plain EXPLAIN and EXPLAIN ANALYZE fall through to the generic parser. - // A bare identifier after EXPLAIN ANALYZE is intercepted: it is either - // EXTENDED or a parse error. Otherwise the generic parser would treat it - // as an EXPLAIN and silently drop the rest of the statement. - Keyword::EXPLAIN - if matches!( - &self.parser.peek_nth_token(1).token, - Token::Word(w) if w.keyword == Keyword::ANALYZE - ) && matches!( - &self.parser.peek_nth_token(2).token, - Token::Word(w) if w.keyword == Keyword::NoKeyword - || w.value.eq_ignore_ascii_case("extended") - ) => - { - self.parser.next_token(); - self.parser.next_token(); - let word = self.parser.next_token(); - if !matches!( - &word.token, - Token::Word(w) if w.value.eq_ignore_ascii_case("extended") - ) { - return Err(ParserError::ParserError(format!( - "Expected EXTENDED or a query after EXPLAIN ANALYZE, found: {}", - word - ))); - } - let s = self.parser.parse_statement()?; - let q = match s { - SQLStatement::Query(q) => q, - _ => { - return Err(ParserError::ParserError( - "Expected select query after 'explain analyze extended'" - .to_string(), - )) - } - }; - Ok(Statement::ExplainAnalyzeExtended(q)) + _ if self.is_explain_analyze_detailed() => { + self.parser.next_token(); // EXPLAIN + self.parser.next_token(); // ANALYZE + self.parser.next_token(); // DETAILED + Ok(Statement::ExplainAnalyzeDetailed( + self.parser.parse_query()?, + )) } _ => Ok(Statement::Statement(self.parser.parse_statement()?)), }, @@ -347,6 +316,15 @@ impl<'a> CubeStoreParser<'a> { } } + fn is_explain_analyze_detailed(&self) -> bool { + fn is_word(token: Token, value: &str) -> bool { + matches!(token, Token::Word(w) if w.value.eq_ignore_ascii_case(value)) + } + is_word(self.parser.peek_token().token, "explain") + && is_word(self.parser.peek_nth_token(1).token, "analyze") + && is_word(self.parser.peek_nth_token(2).token, "detailed") + } + fn parse_queue_key(&mut self) -> Result { match self.parser.peek_token().token { Token::Placeholder(placeholder) => { @@ -1074,11 +1052,12 @@ mod tests { } #[test] - fn parse_explain_analyze_extended() -> Result<(), CubeError> { - match parse_stmt("EXPLAIN ANALYZE EXTENDED SELECT 1")? { - Statement::ExplainAnalyzeExtended(_) => {} - s => panic!("Expected ExplainAnalyzeExtended, got {:?}", s), + fn parse_explain_variants() -> Result<(), CubeError> { + match parse_stmt("EXPLAIN ANALYZE DETAILED SELECT 1")? { + Statement::ExplainAnalyzeDetailed(_) => {} + s => panic!("Expected ExplainAnalyzeDetailed, got {:?}", s), } + // The DETAILED interception must not affect plain EXPLAIN / EXPLAIN ANALYZE. match parse_stmt("EXPLAIN ANALYZE SELECT 1")? { Statement::Statement(SQLStatement::Explain { analyze: true, .. }) => {} s => panic!("Expected Explain with analyze, got {:?}", s), @@ -1087,9 +1066,6 @@ mod tests { Statement::Statement(SQLStatement::Explain { analyze: false, .. }) => {} s => panic!("Expected Explain without analyze, got {:?}", s), } - // A misspelled EXTENDED is a parse error, not an EXPLAIN of the "EXTANDED" table - // silently dropping the rest of the statement. - assert!(parse_stmt("EXPLAIN ANALYZE EXTANDED SELECT 1").is_err()); Ok(()) } diff --git a/rust/cubestore/cubestore/src/trace.rs b/rust/cubestore/cubestore/src/trace.rs new file mode 100644 index 0000000000000..bbe6e1010139d --- /dev/null +++ b/rust/cubestore/cubestore/src/trace.rs @@ -0,0 +1,284 @@ +//! Per-query detailed trace collection for `EXPLAIN ANALYZE DETAILED`. +//! +//! The trace is assembled per process-region (entry node, worker node, select +//! subprocess) into the serializable `*Trace` structs and merged upwards across +//! the network and IPC boundaries. Collection is gated by the presence of a +//! per-query `TraceCtx` in the task-local `TRACE`: when no detailed query is in +//! flight the recording helpers short-circuit before doing any work. + +use serde::{Deserialize, Serialize}; +use std::sync::{Arc, Mutex, Weak}; +use std::time::Instant; + +/// Aggregation axis for timed operations. Stays a small, stable taxonomy: a new +/// probe usually reuses an existing kind and only adds a `label`. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +pub enum OpKind { + Transport, + Serialize, + Deserialize, + Metastore, + Planning, + Execution, + WarmupIo, + ChunkLoad, + Other, +} + +/// A single (aggregated) measurement: how long a class of operations took and, +/// optionally, how many bytes it moved / rows it produced. Repeats with the same +/// `(kind, label)` are folded together on insertion. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OpSample { + pub kind: OpKind, + pub label: String, + pub elapsed_us: u64, + pub bytes: Option, + pub rows: Option, + pub count: u32, + /// True for spans that contain other measured ops (round-trips, the execute + /// span around node metrics, choose_index around metastore calls). Shown in the + /// tree but excluded from the category summary so categories don't double-count. + pub is_wrapper: bool, +} + +/// Trace assembled inside the select subprocess and shipped back over IPC. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct SubprocessTrace { + /// Wall time the subprocess spent handling the request, for deriving the IPC + /// transport overhead = parent's round-trip − this total. + pub total_us: u64, + pub ops: Vec, + pub exec_memory_peak_bytes: Option, + pub physical_plan: Option, +} + +/// Trace assembled on a worker node and shipped back over the network. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct WorkerTrace { + pub node_name: String, + /// Wall time the worker node spent on its part (set on the worker). + pub total_us: u64, + /// main→worker round-trip measured by ClusterSendExec on the main (set there, + /// not on the worker); transport = net_roundtrip_us − total_us. + pub net_roundtrip_us: Option, + pub ops: Vec, + pub subprocess: Option, +} + +/// Trace of the entry node that received the query (parse + planning). +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct RouterTrace { + pub ops: Vec, +} + +/// Trace assembled on the execution main (the worker that runs the router plan): +/// its own ops + the per-worker traces it collected through ClusterSend. Shipped +/// back to the entry node. +/// +/// `ops` holds the main's stage guards (`main.router_physical_plan`, `main.execute`) +/// plus per-node DataFusion `elapsed_compute` of the final stages (OpKind::Execution). +/// `exec_memory_peak_bytes` is the peak of operator reservations during execution +/// (sort/aggregate/join buffers — not every allocation). +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct MainTrace { + pub node_name: String, + /// Wall time spent on the main; entry→main transport = round-trip − total_us. + pub total_us: u64, + pub ops: Vec, + pub exec_memory_peak_bytes: Option, + pub physical_plan: Option, + pub workers: Vec, +} + +/// Whole-query trace assembled on the entry node. +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct QueryTrace { + pub router: RouterTrace, + pub main: Option, +} + +/// Per-query sink for one process-region. Interior-mutable so recording helpers +/// can write through a shared `Arc` from anywhere in the query's task. +pub struct TraceCtx { + ops: Mutex>, + plan_text: Mutex>, +} + +impl TraceCtx { + pub fn new() -> Arc { + Arc::new(Self { + ops: Mutex::new(Vec::new()), + plan_text: Mutex::new(None), + }) + } + + pub fn take_plan_text(&self) -> Option { + self.plan_text.lock().unwrap().take() + } + + fn push(&self, sample: OpSample) { + let mut ops = self.ops.lock().unwrap(); + if let Some(existing) = ops + .iter_mut() + .find(|o| o.kind == sample.kind && o.label == sample.label) + { + existing.elapsed_us += sample.elapsed_us; + existing.count += sample.count; + existing.bytes = match (existing.bytes, sample.bytes) { + (Some(a), Some(b)) => Some(a + b), + (a, b) => a.or(b), + }; + existing.rows = match (existing.rows, sample.rows) { + (Some(a), Some(b)) => Some(a + b), + (a, b) => a.or(b), + }; + } else { + ops.push(sample); + } + } + + pub fn take_ops(&self) -> Vec { + std::mem::take(&mut self.ops.lock().unwrap()) + } +} + +/// Collects per-worker traces on the execution main. Passed to `ClusterSendExec` +/// through the `TaskContext` session config so it survives DataFusion's internal +/// task spawning (a task-local would not). +#[derive(Default)] +pub struct WorkerTraceCollector { + traces: Mutex>, +} + +impl WorkerTraceCollector { + pub fn new() -> Arc { + Arc::new(Self::default()) + } + + pub fn push(&self, trace: WorkerTrace) { + self.traces.lock().unwrap().push(trace); + } + + pub fn take(&self) -> Vec { + std::mem::take(&mut self.traces.lock().unwrap()) + } +} + +tokio::task_local! { + pub static TRACE: Option>; +} + +/// The active per-query sink, or `None` when no detailed query is in flight. +pub fn current_trace() -> Option> { + TRACE.try_with(|t| t.clone()).ok().flatten() +} + +/// Guard factory for the `#[cuberpc::service(trace_guard = ...)]`-generated +/// `TracedMetaStore` decorator: one sample per metastore method call. +pub fn metastore_trace_guard(method: &'static str) -> OpGuard { + OpGuard::start(OpKind::Metastore, method) +} + +/// Record an already-measured sample into the active trace (no-op when off). +/// For values not timed by an `OpGuard` — e.g. DataFusion node metrics. +pub fn record_op( + kind: OpKind, + label: &str, + elapsed_us: u64, + bytes: Option, + rows: Option, + count: u32, +) { + if let Some(ctx) = current_trace() { + ctx.push(OpSample { + kind, + label: label.to_string(), + elapsed_us, + bytes, + rows, + count, + is_wrapper: false, + }); + } +} + +/// Stash the executed physical plan text into the active trace (no-op when off). +pub fn set_plan_text(text: String) { + if let Some(ctx) = current_trace() { + *ctx.plan_text.lock().unwrap() = Some(text); + } +} + +/// Run `fut` with `ctx` as the active sink for the current process-region. +pub async fn scoped(ctx: Option>, fut: F) -> F::Output +where + F: std::future::Future, +{ + TRACE.scope(ctx, fut).await +} + +/// RAII timer that records an `OpSample` on drop. Captures nothing (not even the +/// start instant) when tracing is off, so leaving these in hot paths is cheap. +pub struct OpGuard { + kind: OpKind, + label: &'static str, + began: Option, + // Captured at `start` so the sample lands in the ctx that was active then, + // even if the guard outlives the task-local scope. Weak so a dropped ctx + // just discards the sample instead of keeping it alive. + ctx: Weak, + bytes: Option, + is_wrapper: bool, +} + +impl OpGuard { + pub fn start(kind: OpKind, label: &'static str) -> Self { + Self::start_inner(kind, label, false) + } + + /// Start a guard for a span that contains other measured ops (round-trip, + /// execute-around-node-metrics, choose_index-around-metastore). Marked so the + /// category summary skips it and does not double-count its contents. + pub fn start_wrapper(kind: OpKind, label: &'static str) -> Self { + Self::start_inner(kind, label, true) + } + + fn start_inner(kind: OpKind, label: &'static str, is_wrapper: bool) -> Self { + let ctx = current_trace(); + let began = ctx.is_some().then(Instant::now); + Self { + kind, + label, + began, + ctx: ctx.as_ref().map(Arc::downgrade).unwrap_or_default(), + bytes: None, + is_wrapper, + } + } + + pub fn set_bytes(&mut self, bytes: u64) { + if self.began.is_some() { + self.bytes = Some(bytes); + } + } +} + +impl Drop for OpGuard { + fn drop(&mut self) { + let Some(began) = self.began else { + return; + }; + if let Some(ctx) = self.ctx.upgrade() { + ctx.push(OpSample { + kind: self.kind, + label: self.label.to_string(), + elapsed_us: began.elapsed().as_micros() as u64, + bytes: self.bytes, + rows: None, + count: 1, + is_wrapper: self.is_wrapper, + }); + } + } +}