From e6a104d4963f3ae97a90b65963d186151686ae47 Mon Sep 17 00:00:00 2001 From: John Swanson Date: Thu, 13 Nov 2025 11:46:33 -0800 Subject: [PATCH] refactor(monitoring): consolidate runtime contexts and instrumentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit consolidates the monitoring infrastructure into a cleaner, more unified API: **Runtime Context Consolidation:** - Removed QueryRuntimeContext in favor of generic TaskRuntimeContext - Moved TaskMetrics and performance monitoring methods to task_runtime_context.rs - TaskRuntimeContext now supports all task types via TaskType enum **Instrumentation Consolidation:** - Removed InstrumentedQueryExecution in favor of generic InstrumentedTaskExecution - Added convenience methods: for_query() and for_query_with_task_id() - InstrumentedTaskExecution handles all task types (queries, dumps, compactions, etc.) **Module Renaming:** - Renamed task_context to task_type for clarity - Avoids confusion with task_runtime_context - Better reflects that the module contains the TaskType enum **Performance Monitoring:** - All metrics methods consolidated in TaskMetrics: - busy_duration_ms(), idle_duration_ms() - avg_poll_duration_us() - cpu_utilization(), cpu_utilization_f64() - Thread tracking and parallelism metrics **API Improvements:** - Single, consistent API for all instrumentation needs - Query-specific shortcuts for common use cases - Better separation of concerns between modules 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .gitignore | 1 + Cargo.lock | 16 + Cargo.toml | 1 + crates/core/common/src/planning_context.rs | 8 + crates/core/dump/src/core/raw_dataset.rs | 126 ++-- crates/core/monitoring/Cargo.toml | 4 + .../core/monitoring/src/instrumented_task.rs | 673 ++++++++++++++++++ crates/core/monitoring/src/lib.rs | 12 + crates/core/monitoring/src/plan_histogram.rs | 204 ++++++ crates/core/monitoring/src/runtime_metrics.rs | 151 ++++ crates/core/monitoring/src/task_id.rs | 84 +++ .../monitoring/src/task_runtime_context.rs | 399 +++++++++++ crates/core/monitoring/src/task_type.rs | 149 ++++ crates/services/server/src/flight.rs | 50 +- crates/services/server/src/metrics.rs | 64 ++ grafana-dashboards/amp-dump-operations.json | 258 +++++++ grafana-dashboards/amp-query-performance.json | 263 +++++++ grafana-dashboards/amp-system-overview.json | 348 +++++++++ 18 files changed, 2748 insertions(+), 63 deletions(-) create mode 100644 crates/core/monitoring/src/instrumented_task.rs create mode 100644 crates/core/monitoring/src/plan_histogram.rs create mode 100644 crates/core/monitoring/src/runtime_metrics.rs create mode 100644 crates/core/monitoring/src/task_id.rs create mode 100644 crates/core/monitoring/src/task_runtime_context.rs create mode 100644 crates/core/monitoring/src/task_type.rs create mode 100644 grafana-dashboards/amp-dump-operations.json create mode 100644 grafana-dashboards/amp-query-performance.json create mode 100644 grafana-dashboards/amp-system-overview.json diff --git a/.gitignore b/.gitignore index b225bbefc..0270e126b 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ build/ .vscode/ config.toml .amp/ +**/ampdbc/drivers/ \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index fffdaf7c7..7e9b4f3a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5901,13 +5901,17 @@ version = "0.1.0" dependencies = [ "cargo_metadata", "common", + "datafusion", "opentelemetry", "opentelemetry-otlp", "opentelemetry_sdk", "serde_json", + "tokio", + "tokio-metrics", "tracing", "tracing-opentelemetry", "tracing-subscriber", + "uuid", ] [[package]] @@ -8935,6 +8939,18 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "tokio-metrics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eace09241d62c98b7eeb1107d4c5c64ca3bd7da92e8c218c153ab3a78f9be112" +dependencies = [ + "futures-util", + "pin-project-lite", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-native-tls" version = "0.3.1" diff --git a/Cargo.toml b/Cargo.toml index 61d288197..cb8869abe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -109,6 +109,7 @@ tokio = { version = "1.36.0", features = [ "rt-multi-thread", "parking_lot", ] } +tokio-metrics = "0.3" tokio-stream = "0.1.17" tokio-util = { version = "0.7.15", features = ["rt"] } toml = "0.8.10" diff --git a/crates/core/common/src/planning_context.rs b/crates/core/common/src/planning_context.rs index 1f880cecc..316cb1cd6 100644 --- a/crates/core/common/src/planning_context.rs +++ b/crates/core/common/src/planning_context.rs @@ -167,6 +167,14 @@ impl DetachedLogicalPlan { self.0.schema().clone() } + /// Get a reference to the inner LogicalPlan for analysis purposes. + /// + /// Note: This plan still contains `PlanningTable` providers and cannot be executed + /// until it is attached to a `QueryContext` via `attach_to()`. + pub fn as_inner(&self) -> &LogicalPlan { + &self.0 + } + pub fn propagate_block_num(self) -> Result { Ok(Self(propagate_block_num(self.0)?)) } diff --git a/crates/core/dump/src/core/raw_dataset.rs b/crates/core/dump/src/core/raw_dataset.rs index d3157f2bb..9e4533e0a 100644 --- a/crates/core/dump/src/core/raw_dataset.rs +++ b/crates/core/dump/src/core/raw_dataset.rs @@ -405,65 +405,97 @@ impl DumpPartition { } async fn run_range(&self, range: RangeInclusive) -> Result<(), BoxError> { - let stream = { - let block_streamer = self.block_streamer.clone(); - block_streamer - .block_stream(*range.start(), *range.end()) - .await + // Get dataset name for instrumentation + let dataset = self.catalog.tables()[0] + .job_labels() + .dataset_name + .to_string(); + + // Track records extracted + let records_extracted = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + + // Create task type for instrumentation + let task_type = monitoring::TaskType::Dump { + dataset: dataset.clone(), + block_range: (*range.start(), *range.end()), + records_extracted: None, // Will be set later }; - // limit the missing table ranges to the partition range - let mut missing_ranges_by_table: BTreeMap>> = - Default::default(); - for (table, ranges) in &self.missing_ranges_by_table { - let entry = missing_ranges_by_table.entry(table.clone()).or_default(); - for missing in ranges { - let start = BlockNum::max(*missing.start(), *range.start()); - let end = BlockNum::min(*missing.end(), *range.end()); - if start <= end { - entry.push(start..=end); + let instrumentation = monitoring::InstrumentedTaskExecution::new(task_type); + let records_counter = records_extracted.clone(); + + let dump_fut = async move { + let stream = { + let block_streamer = self.block_streamer.clone(); + block_streamer + .block_stream(*range.start(), *range.end()) + .await + }; + + // limit the missing table ranges to the partition range + let mut missing_ranges_by_table: BTreeMap>> = + Default::default(); + for (table, ranges) in &self.missing_ranges_by_table { + let entry = missing_ranges_by_table.entry(table.clone()).or_default(); + for missing in ranges { + let start = BlockNum::max(*missing.start(), *range.start()); + let end = BlockNum::min(*missing.end(), *range.end()); + if start <= end { + entry.push(start..=end); + } } } - } - let mut writer = RawDatasetWriter::new( - self.catalog.clone(), - self.metadata_db.clone(), - self.parquet_opts.clone(), - missing_ranges_by_table, - self.metrics.clone(), - )?; - - let mut stream = std::pin::pin!(stream); - while let Some(dataset_rows) = stream.try_next().await? { - for table_rows in dataset_rows { - if let Some(ref metrics) = self.metrics { + let mut writer = RawDatasetWriter::new( + self.catalog.clone(), + self.metadata_db.clone(), + self.parquet_opts.clone(), + missing_ranges_by_table, + self.metrics.clone(), + )?; + + let mut stream = std::pin::pin!(stream); + while let Some(dataset_rows) = stream.try_next().await? { + for table_rows in dataset_rows { let num_rows: u64 = table_rows.rows.num_rows().try_into().unwrap(); - let table_name = table_rows.table.name(); - let block_num = table_rows.block_num(); - let physical_table = self - .catalog - .tables() - .iter() - .find(|t| t.table_name() == table_name) - .expect("table should exist"); - let location_id = *physical_table.location_id(); - // Record rows only (bytes tracked separately in writer) - metrics.record_ingestion_rows(num_rows, table_name.to_string(), location_id); - // Update latest block gauge - metrics.set_latest_block(block_num, table_name.to_string(), location_id); + records_counter + .fetch_add(num_rows as usize, std::sync::atomic::Ordering::Relaxed); + + if let Some(ref metrics) = self.metrics { + let table_name = table_rows.table.name(); + let block_num = table_rows.block_num(); + let physical_table = self + .catalog + .tables() + .iter() + .find(|t| t.table_name() == table_name) + .expect("table should exist"); + let location_id = *physical_table.location_id(); + // Record rows only (bytes tracked separately in writer) + metrics.record_ingestion_rows( + num_rows, + table_name.to_string(), + location_id, + ); + // Update latest block gauge + metrics.set_latest_block(block_num, table_name.to_string(), location_id); + } + + writer.write(table_rows).await?; } - writer.write(table_rows).await?; + self.progress_reporter.block_covered(); } - self.progress_reporter.block_covered(); - } + // Close the last part file for each table, checking for any errors. + writer.close().await?; - // Close the last part file for each table, checking for any errors. - writer.close().await?; + Ok(()) + }; - Ok(()) + // Execute with instrumentation + let (result, _metrics) = instrumentation.execute(dump_fut).await; + result } } diff --git a/crates/core/monitoring/Cargo.toml b/crates/core/monitoring/Cargo.toml index a6476bfa3..12ac64b80 100644 --- a/crates/core/monitoring/Cargo.toml +++ b/crates/core/monitoring/Cargo.toml @@ -6,13 +6,17 @@ license-file.workspace = true [dependencies] common = { path = "../common" } +datafusion.workspace = true opentelemetry.workspace = true opentelemetry-otlp.workspace = true opentelemetry_sdk.workspace = true +tokio.workspace = true +tokio-metrics.workspace = true serde_json.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true tracing-subscriber.workspace = true +uuid.workspace = true [dev-dependencies] diff --git a/crates/core/monitoring/src/instrumented_task.rs b/crates/core/monitoring/src/instrumented_task.rs new file mode 100644 index 000000000..4a82bff7d --- /dev/null +++ b/crates/core/monitoring/src/instrumented_task.rs @@ -0,0 +1,673 @@ +//! Generic task execution instrumentation with runtime metrics and tracing. +//! +//! This module provides [`InstrumentedTaskExecution`] which wraps any task execution +//! with comprehensive instrumentation including: +//! - Distributed tracing spans with structured fields +//! - Per-task tokio runtime metrics collection +//! - Automatic metrics finalization and recording +//! - Task-local context propagation +//! - Task-specific metadata (plan histograms, block ranges, etc.) + +use std::future::Future; + +use tracing::{Span, field}; + +use crate::{ + task_id::TaskId, + task_runtime_context::{TaskMetrics, TaskRuntimeContext}, + task_type::TaskType, +}; + +/// Wrapper that instruments task execution with metrics and tracing. +/// +/// This is the generic version that works with any `TaskType`, providing +/// comprehensive observability for any operation (queries, dumps, compactions, etc). +/// +/// # Example +/// +/// ```ignore +/// use monitoring::{InstrumentedTaskExecution, TaskType}; +/// +/// let task_type = TaskType::from_query("SELECT * FROM blocks", &logical_plan); +/// let instrumentation = InstrumentedTaskExecution::new(task_type); +/// +/// let (result, metrics) = instrumentation.execute(async { +/// // Your task execution code here +/// query_engine.execute(sql).await +/// }).await; +/// ``` +pub struct InstrumentedTaskExecution { + /// Runtime context for metrics collection + context: TaskRuntimeContext, + + /// Tracing span for this task execution + span: Span, +} + +impl InstrumentedTaskExecution { + /// Create a new instrumented task execution. + /// + /// This creates a new task context and tracing span with the task ID + /// and task-specific fields. + pub fn new(task_type: TaskType) -> Self { + let context = TaskRuntimeContext::new(task_type.clone()); + + let span = create_span_for_task(&context.task_id, &task_type); + + Self { context, span } + } + + /// Create instrumentation for a SQL query execution (convenience method). + /// + /// This is a shorthand for creating a `TaskType::QueryExecution` and + /// instrumenting it. The plan histogram will be empty initially. + /// + /// # Example + /// + /// ```ignore + /// let instrumentation = InstrumentedTaskExecution::for_query("SELECT * FROM blocks"); + /// let (result, metrics) = instrumentation.execute(async { + /// query_engine.execute(sql).await + /// }).await; + /// ``` + pub fn for_query(sql: impl Into) -> Self { + use std::collections::HashMap; + let task_type = TaskType::QueryExecution { + sql: sql.into(), + plan_node_histogram: HashMap::new(), + }; + Self::new(task_type) + } + + /// Create instrumentation with a specific task ID. + /// + /// Useful when you want to correlate with an existing task ID from + /// another part of the system. + pub fn with_task_id(task_id: TaskId, task_type: TaskType) -> Self { + let context = TaskRuntimeContext::with_task_id(task_id, task_type.clone()); + + let span = create_span_for_task(&context.task_id, &task_type); + + Self { context, span } + } + + /// Create instrumentation for a SQL query with a specific task ID (convenience method). + /// + /// Useful when you want to correlate with an existing task ID from + /// another part of the system. + pub fn for_query_with_task_id(task_id: TaskId, sql: impl Into) -> Self { + use std::collections::HashMap; + let task_type = TaskType::QueryExecution { + sql: sql.into(), + plan_node_histogram: HashMap::new(), + }; + Self::with_task_id(task_id, task_type) + } + + /// Execute a future with full instrumentation. + /// + /// This: + /// 1. Enters the tracing span + /// 2. Wraps the future with task monitoring + /// 3. Sets up task-local context + /// 4. Automatically records metrics to the span on completion + /// + /// Returns the result of the future along with the final metrics. + pub async fn execute(self, future: F) -> (R, TaskMetrics) + where + F: Future, + { + let _enter = self.span.enter(); + + // Execute within monitored scope + let result = self.context.scope(future).await; + + // Finalize and record metrics + self.context.finalize_metrics(); + let metrics = self + .context + .final_metrics() + .expect("Metrics should be finalized"); + + // Record metrics to the span + record_metrics_to_span(&self.span, &metrics, &self.context); + + (result, metrics) + } + + /// Execute a future and return only the result (metrics are recorded but discarded). + /// + /// Use this when you only care about the tracing side effects, not the metrics themselves. + pub async fn execute_and_discard_metrics(self, future: F) -> R + where + F: Future, + { + let (result, _metrics) = self.execute(future).await; + result + } + + /// Get access to the task ID. + pub fn task_id(&self) -> TaskId { + self.context.task_id + } + + /// Get access to the underlying context (for advanced usage). + pub fn context(&self) -> &TaskRuntimeContext { + &self.context + } + + /// Get access to the span (for advanced usage). + pub fn span(&self) -> &Span { + &self.span + } +} + +/// Create a tracing span appropriate for the given task type. +fn create_span_for_task(task_id: &TaskId, task_type: &TaskType) -> Span { + match task_type { + TaskType::QueryExecution { sql, .. } => { + tracing::info_span!( + "query_execution", + task_id = %task_id, + task_type = task_type.name(), + sql = %sql, + // Plan complexity fields + plan.total_nodes = field::Empty, + plan.node_summary = field::Empty, + // Runtime metrics fields (will be recorded at the end) + runtime.busy_ms = field::Empty, + runtime.idle_ms = field::Empty, + runtime.poll_count = field::Empty, + runtime.scheduled_count = field::Empty, + runtime.cpu_utilization = field::Empty, + runtime.avg_poll_us = field::Empty, + // Parallelism metrics + runtime.unique_threads = field::Empty, + runtime.thread_ids = field::Empty, + // Wall-clock time + elapsed_ms = field::Empty, + ) + } + TaskType::PlanExecution { is_streaming, .. } => { + tracing::info_span!( + "plan_execution", + task_id = %task_id, + task_type = task_type.name(), + is_streaming = is_streaming, + // Plan complexity fields + plan.total_nodes = field::Empty, + plan.node_summary = field::Empty, + // Runtime metrics fields + runtime.busy_ms = field::Empty, + runtime.idle_ms = field::Empty, + runtime.poll_count = field::Empty, + runtime.scheduled_count = field::Empty, + runtime.cpu_utilization = field::Empty, + runtime.avg_poll_us = field::Empty, + // Parallelism metrics + runtime.unique_threads = field::Empty, + runtime.thread_ids = field::Empty, + elapsed_ms = field::Empty, + ) + } + TaskType::Dump { + dataset, + block_range, + records_extracted, + } => { + tracing::info_span!( + "dump", + task_id = %task_id, + task_type = task_type.name(), + dataset = %dataset, + block_range.start = block_range.0, + block_range.end = block_range.1, + records_extracted = records_extracted, + // Runtime metrics fields + runtime.busy_ms = field::Empty, + runtime.idle_ms = field::Empty, + runtime.poll_count = field::Empty, + runtime.scheduled_count = field::Empty, + runtime.cpu_utilization = field::Empty, + // Parallelism metrics + runtime.unique_threads = field::Empty, + runtime.thread_ids = field::Empty, + elapsed_ms = field::Empty, + ) + } + TaskType::Compact { + dataset, + input_files, + output_files, + } => { + tracing::info_span!( + "compact", + task_id = %task_id, + task_type = task_type.name(), + dataset = %dataset, + input_files = input_files, + output_files = output_files, + // Runtime metrics fields + runtime.busy_ms = field::Empty, + runtime.idle_ms = field::Empty, + runtime.poll_count = field::Empty, + runtime.scheduled_count = field::Empty, + runtime.cpu_utilization = field::Empty, + // Parallelism metrics + runtime.unique_threads = field::Empty, + runtime.thread_ids = field::Empty, + elapsed_ms = field::Empty, + ) + } + TaskType::Collect { + dataset, + files_collected, + } => { + tracing::info_span!( + "collect", + task_id = %task_id, + task_type = task_type.name(), + dataset = %dataset, + files_collected = files_collected, + // Runtime metrics fields + runtime.busy_ms = field::Empty, + runtime.idle_ms = field::Empty, + runtime.poll_count = field::Empty, + runtime.scheduled_count = field::Empty, + runtime.cpu_utilization = field::Empty, + // Parallelism metrics + runtime.unique_threads = field::Empty, + runtime.thread_ids = field::Empty, + elapsed_ms = field::Empty, + ) + } + TaskType::Generic { + operation, + metadata, + } => { + let span = tracing::info_span!( + "generic_task", + task_id = %task_id, + task_type = task_type.name(), + operation = %operation, + // Runtime metrics fields + runtime.busy_ms = field::Empty, + runtime.idle_ms = field::Empty, + runtime.poll_count = field::Empty, + runtime.scheduled_count = field::Empty, + runtime.cpu_utilization = field::Empty, + // Parallelism metrics + runtime.unique_threads = field::Empty, + runtime.thread_ids = field::Empty, + elapsed_ms = field::Empty, + ); + + // Record custom metadata as span attributes + for (key, value) in metadata { + span.record(key.as_str(), value.as_str()); + } + + span + } + } +} + +/// Record metrics to the tracing span, including task-specific fields. +fn record_metrics_to_span(span: &Span, metrics: &TaskMetrics, context: &TaskRuntimeContext) { + // Record standard runtime metrics + span.record("runtime.busy_ms", metrics.busy_duration_ms()); + span.record("runtime.idle_ms", metrics.idle_duration_ms()); + span.record("runtime.poll_count", metrics.total_poll_count); + span.record("runtime.scheduled_count", metrics.total_scheduled_count); + span.record("runtime.cpu_utilization", metrics.cpu_utilization()); + + if let Some(avg_poll_us) = metrics.avg_poll_duration_us() { + span.record("runtime.avg_poll_us", avg_poll_us); + } + + // Record parallelism metrics + span.record("runtime.unique_threads", metrics.unique_thread_count); + let thread_ids_str = format!("{:?}", metrics.thread_ids); + span.record("runtime.thread_ids", thread_ids_str.as_str()); + + let elapsed_ms = context.elapsed().as_secs_f64() * 1000.0; + span.record("elapsed_ms", elapsed_ms); + + // Record task-specific fields and log completion + match &context.task_type { + TaskType::QueryExecution { + sql, + plan_node_histogram, + } => { + // Record plan complexity metrics + let total_nodes = crate::plan_histogram::total_nodes(plan_node_histogram); + let node_summary = crate::plan_histogram::summarize_histogram(plan_node_histogram, 5); + + span.record("plan.total_nodes", total_nodes); + span.record("plan.node_summary", node_summary.as_str()); + + // Log query completion with plan complexity and parallelism + tracing::info!( + task_id = %context.task_id, + elapsed_ms = elapsed_ms, + busy_ms = metrics.busy_duration_ms(), + idle_ms = metrics.idle_duration_ms(), + cpu_utilization = metrics.cpu_utilization(), + unique_threads = metrics.unique_thread_count, + plan_nodes = total_nodes, + plan_summary = %node_summary, + sql = %crate::task_type::truncate_sql(sql, 100), + "Query execution completed" + ); + } + TaskType::PlanExecution { + plan_node_histogram, + is_streaming, + } => { + // Record plan complexity metrics + let total_nodes = crate::plan_histogram::total_nodes(plan_node_histogram); + let node_summary = crate::plan_histogram::summarize_histogram(plan_node_histogram, 5); + + span.record("plan.total_nodes", total_nodes); + span.record("plan.node_summary", node_summary.as_str()); + + // Log plan execution completion with parallelism + tracing::info!( + task_id = %context.task_id, + elapsed_ms = elapsed_ms, + cpu_utilization = metrics.cpu_utilization(), + unique_threads = metrics.unique_thread_count, + plan_nodes = total_nodes, + plan_summary = %node_summary, + is_streaming = is_streaming, + "Plan execution completed" + ); + } + TaskType::Dump { + dataset, + block_range, + records_extracted, + } => { + tracing::info!( + task_id = %context.task_id, + elapsed_ms = elapsed_ms, + busy_ms = metrics.busy_duration_ms(), + idle_ms = metrics.idle_duration_ms(), + cpu_utilization = metrics.cpu_utilization(), + unique_threads = metrics.unique_thread_count, + dataset = %dataset, + blocks = format!("{}-{}", block_range.0, block_range.1), + records = ?records_extracted, + "Dump completed" + ); + } + TaskType::Compact { + dataset, + input_files, + output_files, + } => { + tracing::info!( + task_id = %context.task_id, + elapsed_ms = elapsed_ms, + cpu_utilization = metrics.cpu_utilization(), + unique_threads = metrics.unique_thread_count, + dataset = %dataset, + input_files = input_files, + output_files = output_files, + "Compaction completed" + ); + } + TaskType::Collect { + dataset, + files_collected, + } => { + tracing::info!( + task_id = %context.task_id, + elapsed_ms = elapsed_ms, + cpu_utilization = metrics.cpu_utilization(), + unique_threads = metrics.unique_thread_count, + dataset = %dataset, + files_collected = files_collected, + "Garbage collection completed" + ); + } + TaskType::Generic { operation, .. } => { + tracing::info!( + task_id = %context.task_id, + elapsed_ms = elapsed_ms, + cpu_utilization = metrics.cpu_utilization(), + unique_threads = metrics.unique_thread_count, + operation = %operation, + "Task completed" + ); + } + } +} + +/// RAII guard for automatic metrics recording. +/// +/// This is an alternative API that uses Drop to automatically record metrics. +/// Useful when you want to instrument a block of code without explicitly +/// calling `execute()`. +/// +/// # Example +/// +/// ```ignore +/// use monitoring::{InstrumentedTaskGuard, TaskType}; +/// +/// let task_type = TaskType::from_query("SELECT * FROM blocks", &plan); +/// let _guard = InstrumentedTaskGuard::new(task_type); +/// +/// // Your code here... +/// // Metrics will be automatically finalized and recorded on drop +/// ``` +pub struct InstrumentedTaskGuard { + context: TaskRuntimeContext, + span: Span, + _entered: tracing::span::EnteredSpan, +} + +impl InstrumentedTaskGuard { + /// Create a new guard and enter the span. + pub fn new(task_type: TaskType) -> Self { + let instrumentation = InstrumentedTaskExecution::new(task_type); + let entered = instrumentation.span.clone().entered(); + + Self { + context: instrumentation.context, + span: instrumentation.span, + _entered: entered, + } + } + + /// Create a guard for a SQL query (convenience method). + /// + /// This is a shorthand for creating a `TaskType::QueryExecution` guard. + pub fn for_query(sql: impl Into) -> Self { + let instrumentation = InstrumentedTaskExecution::for_query(sql); + let entered = instrumentation.span.clone().entered(); + + Self { + context: instrumentation.context, + span: instrumentation.span, + _entered: entered, + } + } + + /// Get the task ID. + pub fn task_id(&self) -> TaskId { + self.context.task_id + } + + /// Get the runtime context. + pub fn context(&self) -> &TaskRuntimeContext { + &self.context + } + + /// Manually finalize and get metrics (otherwise happens on Drop). + pub fn finalize(self) -> TaskMetrics { + self.context.finalize_metrics(); + self.context + .final_metrics() + .expect("Metrics should be finalized") + } +} + +impl Drop for InstrumentedTaskGuard { + fn drop(&mut self) { + // Finalize metrics and record to span + self.context.finalize_metrics(); + if let Some(metrics) = self.context.final_metrics() { + record_metrics_to_span(&self.span, &metrics, &self.context); + } + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + + #[tokio::test] + async fn test_instrumented_task_execution() { + let task_type = TaskType::Generic { + operation: "test_operation".to_string(), + metadata: HashMap::new(), + }; + let instrumentation = InstrumentedTaskExecution::new(task_type); + + let (result, metrics) = instrumentation + .execute(async { + // Simulate some work + for _ in 0..10 { + tokio::task::yield_now().await; + } + 42 + }) + .await; + + assert_eq!(result, 42); + assert!(metrics.total_poll_count > 0); + assert!(metrics.busy_duration_ms() > 0.0); + } + + #[tokio::test] + async fn test_dump_task_instrumentation() { + let task_type = TaskType::Dump { + dataset: "eth_blocks".to_string(), + block_range: (0, 1000), + records_extracted: None, + }; + let instrumentation = InstrumentedTaskExecution::new(task_type); + + let (result, metrics) = instrumentation + .execute(async { + // Simulate dump work + tokio::time::sleep(tokio::time::Duration::from_millis(5)).await; + "dump_completed" + }) + .await; + + assert_eq!(result, "dump_completed"); + assert!(metrics.total_poll_count > 0); + } + + #[tokio::test] + async fn test_for_query_convenience_method() { + let instrumentation = InstrumentedTaskExecution::for_query("SELECT * FROM blocks"); + + let (result, metrics) = instrumentation + .execute(async { + // Simulate query work + for _ in 0..10 { + tokio::task::yield_now().await; + } + 42 + }) + .await; + + assert_eq!(result, 42); + assert!(metrics.total_poll_count > 0); + assert!(metrics.busy_duration_ms() > 0.0); + } + + #[tokio::test] + async fn test_for_query_with_task_id() { + let task_id = TaskId::new(); + let instrumentation = + InstrumentedTaskExecution::for_query_with_task_id(task_id, "SELECT 1"); + + assert_eq!(instrumentation.task_id(), task_id); + + let (result, metrics) = instrumentation + .execute(async { + tokio::task::yield_now().await; + "done" + }) + .await; + + assert_eq!(result, "done"); + assert!(metrics.total_poll_count > 0); + } + + #[tokio::test] + async fn test_instrumented_task_guard() { + let task_type = TaskType::Generic { + operation: "test_guard".to_string(), + metadata: HashMap::new(), + }; + let guard = InstrumentedTaskGuard::new(task_type); + let task_id = guard.task_id(); + + // Simulate work + for _ in 0..5 { + tokio::task::yield_now().await; + } + + // Guard will automatically finalize on drop + drop(guard); + + // Verify task ID was valid + assert_ne!(task_id.as_str(), ""); + } + + #[tokio::test] + async fn test_instrumented_task_guard_for_query() { + let guard = InstrumentedTaskGuard::for_query("SELECT * FROM logs"); + let task_id = guard.task_id(); + + // Simulate query work + for _ in 0..3 { + tokio::task::yield_now().await; + } + + // Note: Guard doesn't use scope(), so it won't track tokio task metrics + // The guard is primarily for tracing spans with automatic finalization + drop(guard); + + assert_ne!(task_id.as_str(), ""); + } + + #[tokio::test] + async fn test_context_propagation_with_task_guard() { + let task_type = TaskType::Generic { + operation: "test_propagation".to_string(), + metadata: HashMap::new(), + }; + let guard = InstrumentedTaskGuard::new(task_type); + let expected_id = guard.task_id(); + + // Verify context is accessible + let ctx = TaskRuntimeContext::try_current(); + // Note: Guard doesn't use scope(), so context won't be available via task-local + // This test documents the current behavior difference + assert!( + ctx.is_none(), + "Guard doesn't propagate context via task-local (use execute() for that)" + ); + + drop(guard); + assert_ne!(expected_id.as_str(), ""); + } +} diff --git a/crates/core/monitoring/src/lib.rs b/crates/core/monitoring/src/lib.rs index dd6144d05..ab26d0ea4 100644 --- a/crates/core/monitoring/src/lib.rs +++ b/crates/core/monitoring/src/lib.rs @@ -1,6 +1,18 @@ +pub mod instrumented_task; pub mod logging; +pub mod plan_histogram; +pub mod runtime_metrics; +pub mod task_id; +pub mod task_runtime_context; +pub mod task_type; pub mod telemetry; +// Re-export commonly used types +pub use instrumented_task::{InstrumentedTaskExecution, InstrumentedTaskGuard}; +pub use task_id::TaskId; +pub use task_runtime_context::{TASK_CONTEXT, TaskMetrics, TaskRuntimeContext}; +pub use task_type::TaskType; + pub type TelemetryKit = ( Option, Option, diff --git a/crates/core/monitoring/src/plan_histogram.rs b/crates/core/monitoring/src/plan_histogram.rs new file mode 100644 index 000000000..c471cfb7b --- /dev/null +++ b/crates/core/monitoring/src/plan_histogram.rs @@ -0,0 +1,204 @@ +//! Logical plan histogram collection for query analysis. +//! +//! This module provides utilities for walking DataFusion logical plans and +//! collecting statistics about the types of nodes present. This is useful for +//! understanding query complexity and for metrics/monitoring. + +use std::collections::HashMap; + +use datafusion::{ + common::tree_node::{TreeNode, TreeNodeRecursion}, + logical_expr::LogicalPlan, +}; + +/// Collects a histogram of logical plan node types. +/// +/// This walks a DataFusion logical plan tree and counts how many of each +/// node type appears. The histogram maps node type names to counts. +/// +/// # Example +/// +/// ```ignore +/// use datafusion::logical_expr::LogicalPlan; +/// use monitoring::plan_histogram::collect_plan_histogram; +/// +/// let plan: LogicalPlan = ...; // your plan +/// let histogram = collect_plan_histogram(&plan); +/// +/// // histogram might contain: +/// // {"TableScan": 2, "Filter": 3, "Projection": 1, "Join": 1} +/// ``` +pub fn collect_plan_histogram(plan: &LogicalPlan) -> HashMap { + let mut histogram = HashMap::new(); + + // Use DataFusion's TreeNode apply() to traverse the plan + let _ = plan.apply(|node| { + // Get the node type name + let node_type = match node { + LogicalPlan::Projection(_) => "Projection", + LogicalPlan::Filter(_) => "Filter", + LogicalPlan::Window(_) => "Window", + LogicalPlan::Aggregate(_) => "Aggregate", + LogicalPlan::Sort(_) => "Sort", + LogicalPlan::Join(_) => "Join", + LogicalPlan::Repartition(_) => "Repartition", + LogicalPlan::Union(_) => "Union", + LogicalPlan::TableScan(_) => "TableScan", + LogicalPlan::EmptyRelation(_) => "EmptyRelation", + LogicalPlan::Subquery(_) => "Subquery", + LogicalPlan::SubqueryAlias(_) => "SubqueryAlias", + LogicalPlan::Limit(_) => "Limit", + LogicalPlan::Statement(_) => "Statement", + LogicalPlan::Values(_) => "Values", + LogicalPlan::Explain(_) => "Explain", + LogicalPlan::Analyze(_) => "Analyze", + LogicalPlan::Extension(_) => "Extension", + LogicalPlan::Distinct(_) => "Distinct", + LogicalPlan::Dml(_) => "Dml", + LogicalPlan::Ddl(_) => "Ddl", + LogicalPlan::Copy(_) => "Copy", + LogicalPlan::DescribeTable(_) => "DescribeTable", + LogicalPlan::Unnest(_) => "Unnest", + LogicalPlan::RecursiveQuery(_) => "RecursiveQuery", + }; + + // Record this node type + *histogram.entry(node_type.to_owned()).or_insert(0) += 1; + + // Continue visiting children + Ok(TreeNodeRecursion::Continue) + }); + + histogram +} + +/// Get a summary string of the most common node types. +/// +/// Returns a formatted string showing the top N most common node types. +/// Useful for logging/display purposes. +/// +/// # Example +/// +/// ```ignore +/// let histogram = collect_plan_histogram(&plan); +/// let summary = summarize_histogram(&histogram, 3); +/// // Returns something like: "TableScan:2, Filter:3, Join:1" +/// ``` +pub fn summarize_histogram(histogram: &HashMap, top_n: usize) -> String { + let mut items: Vec<_> = histogram.iter().collect(); + items.sort_by(|a, b| b.1.cmp(a.1).then_with(|| a.0.cmp(b.0))); + + items + .iter() + .take(top_n) + .map(|(k, v)| format!("{}:{}", k, v)) + .collect::>() + .join(", ") +} + +/// Get the total number of nodes in the plan. +pub fn total_nodes(histogram: &HashMap) -> usize { + histogram.values().sum() +} + +#[cfg(test)] +mod tests { + use datafusion::prelude::*; + + use super::*; + + #[tokio::test] + async fn test_simple_plan_histogram() { + let ctx = SessionContext::new(); + + // Create a simple plan: SELECT * FROM table WHERE x > 10 + let plan = ctx + .sql("SELECT * FROM (VALUES (1, 2), (3, 4)) AS t(a, b) WHERE a > 1") + .await + .unwrap() + .into_unoptimized_plan(); + + let histogram = collect_plan_histogram(&plan); + + // Should have at least a Projection, Filter, and Values node + assert!(histogram.contains_key("Projection")); + assert!(histogram.contains_key("Filter")); + assert!(histogram.contains_key("Values")); + + let total = total_nodes(&histogram); + assert!(total >= 3, "Expected at least 3 nodes, got {}", total); + } + + #[tokio::test] + async fn test_join_plan_histogram() { + let ctx = SessionContext::new(); + + // Create a join plan + let plan = ctx + .sql( + "SELECT t1.a, t2.b + FROM (VALUES (1, 2)) AS t1(a, b) + JOIN (VALUES (1, 3)) AS t2(a, b) + ON t1.a = t2.a", + ) + .await + .unwrap() + .into_unoptimized_plan(); + + let histogram = collect_plan_histogram(&plan); + + // Should have a Join node + assert!( + histogram.contains_key("Join"), + "Expected Join node in histogram" + ); + + let summary = summarize_histogram(&histogram, 3); + assert!(!summary.is_empty(), "Summary should not be empty"); + } + + #[tokio::test] + async fn test_aggregate_plan_histogram() { + let ctx = SessionContext::new(); + + // Create an aggregate plan + let plan = ctx + .sql("SELECT a, COUNT(*) FROM (VALUES (1, 2), (1, 3), (2, 4)) AS t(a, b) GROUP BY a") + .await + .unwrap() + .into_unoptimized_plan(); + + let histogram = collect_plan_histogram(&plan); + + // Should have an Aggregate node + assert!( + histogram.contains_key("Aggregate"), + "Expected Aggregate node in histogram" + ); + } + + #[test] + fn test_summarize_histogram() { + let mut histogram = HashMap::new(); + histogram.insert("TableScan".to_string(), 5); + histogram.insert("Filter".to_string(), 3); + histogram.insert("Projection".to_string(), 2); + histogram.insert("Join".to_string(), 1); + + let summary = summarize_histogram(&histogram, 2); + // Should show the top 2 most common + assert!(summary.contains("TableScan:5")); + assert!(summary.contains("Filter:3")); + assert!(!summary.contains("Join")); + } + + #[test] + fn test_total_nodes() { + let mut histogram = HashMap::new(); + histogram.insert("TableScan".to_string(), 2); + histogram.insert("Filter".to_string(), 3); + histogram.insert("Join".to_string(), 1); + + assert_eq!(total_nodes(&histogram), 6); + } +} diff --git a/crates/core/monitoring/src/runtime_metrics.rs b/crates/core/monitoring/src/runtime_metrics.rs new file mode 100644 index 000000000..4a43f5911 --- /dev/null +++ b/crates/core/monitoring/src/runtime_metrics.rs @@ -0,0 +1,151 @@ +//! Tokio runtime metrics collection and tracking. +//! +//! This module provides utilities for capturing and analyzing tokio runtime metrics +//! at both the global level and per-query level. It enables observability into +//! worker thread utilization, task scheduling, and resource consumption. + +use std::time::Duration; + +use tokio::runtime::Handle; + +/// Snapshot of tokio runtime metrics at a specific point in time. +/// +/// This captures the state of the entire tokio runtime, including all worker threads +/// and task scheduling statistics. Snapshots can be compared to calculate deltas +/// for per-query attribution. +#[derive(Debug, Clone)] +pub struct RuntimeMetricsSnapshot { + /// Number of worker threads in the runtime + pub workers_count: usize, + + /// Total number of times worker threads parked (aggregate across all workers) + pub total_park_count: u64, + + /// Total busy duration across all workers (in nanoseconds) + pub total_busy_duration_ns: u64, +} + +impl RuntimeMetricsSnapshot { + /// Capture current runtime metrics from the tokio runtime handle. + /// + /// This aggregates metrics from all worker threads to provide a complete + /// picture of the runtime's state at this moment. + pub fn capture(runtime_handle: &Handle) -> Self { + let metrics = runtime_handle.metrics(); + let workers_count = metrics.num_workers(); + + // Aggregate metrics across all workers + let mut total_busy_duration_ns = 0u64; + let mut total_park_count = 0u64; + + for worker_id in 0..workers_count { + total_busy_duration_ns += + metrics.worker_total_busy_duration(worker_id).as_nanos() as u64; + total_park_count += metrics.worker_park_count(worker_id); + } + + Self { + workers_count, + total_park_count, + total_busy_duration_ns, + } + } + + /// Calculate the delta between this snapshot and a previous one. + /// + /// This is useful for attributing resource consumption to a specific time window, + /// such as a single query execution. Returns `None` if this snapshot appears to be + /// older than the previous one (which would indicate incorrect usage). + pub fn delta(&self, previous: &Self) -> Option { + // Ensure we're calculating delta in the right direction + if self.total_park_count < previous.total_park_count { + return None; + } + + Some(RuntimeMetricsDelta { + park_count: self + .total_park_count + .saturating_sub(previous.total_park_count), + busy_duration: Duration::from_nanos( + self.total_busy_duration_ns + .saturating_sub(previous.total_busy_duration_ns), + ), + }) + } +} + +/// Delta between two runtime metric snapshots. +/// +/// Represents the change in runtime metrics over a specific time period, +/// such as a single query execution. All values are differences (end - start). +#[derive(Debug, Clone)] +pub struct RuntimeMetricsDelta { + /// Number of times worker threads parked during this period + pub park_count: u64, + + /// Total worker busy time during this period + pub busy_duration: Duration, +} + +impl RuntimeMetricsDelta { + /// Get the busy duration as milliseconds (f64) for metrics export. + pub fn busy_duration_ms(&self) -> f64 { + self.busy_duration.as_secs_f64() * 1000.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_runtime_metrics_snapshot() { + let handle = Handle::current(); + let snapshot = RuntimeMetricsSnapshot::capture(&handle); + + assert!(snapshot.workers_count > 0); + // Park count is captured successfully (u64 is always >= 0, just verify capture worked) + let _ = snapshot.total_park_count; + } + + #[tokio::test] + async fn test_runtime_metrics_delta() { + let handle = Handle::current(); + + let snapshot1 = RuntimeMetricsSnapshot::capture(&handle); + + // Do some work to change metrics + for _ in 0..10 { + tokio::task::yield_now().await; + } + + let snapshot2 = RuntimeMetricsSnapshot::capture(&handle); + + let delta = snapshot2.delta(&snapshot1).expect("Delta should be valid"); + + // After yielding, busy time should have increased + assert!(delta.busy_duration.as_nanos() > 0); + } + + #[tokio::test] + async fn test_delta_direction() { + let handle = Handle::current(); + + let snapshot1 = RuntimeMetricsSnapshot::capture(&handle); + + // Do enough work to ensure metrics change + for _ in 0..100 { + tokio::task::yield_now().await; + } + + let snapshot2 = RuntimeMetricsSnapshot::capture(&handle); + + // Correct direction should work + assert!(snapshot2.delta(&snapshot1).is_some()); + + // Incorrect direction should return None (only if metrics actually changed) + if snapshot2.total_park_count > snapshot1.total_park_count { + assert!(snapshot1.delta(&snapshot2).is_none()); + } + } +} diff --git a/crates/core/monitoring/src/task_id.rs b/crates/core/monitoring/src/task_id.rs new file mode 100644 index 000000000..32308f308 --- /dev/null +++ b/crates/core/monitoring/src/task_id.rs @@ -0,0 +1,84 @@ +//! Unique identifier for task executions. +//! +//! This module provides the [`TaskId`] type used to track individual task executions +//! (queries, dumps, compactions, etc.) through the system, enabling per-task metrics and tracing. + +use std::fmt; + +/// Unique identifier for a task execution. +/// +/// Uses UUIDv7 which incorporates a timestamp, providing both uniqueness +/// and chronological ordering of task IDs. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct TaskId(uuid::Uuid); + +impl TaskId { + /// Create a new task ID with the current timestamp. + /// + /// Uses UUIDv7 which embeds a millisecond-precision timestamp in the first 48 bits, + /// followed by random bits for uniqueness. + pub fn new() -> Self { + Self(uuid::Uuid::now_v7()) + } + + /// Get the task ID as a string. + /// + /// Returns the standard hyphenated UUID format: + /// `xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx` + pub fn as_str(&self) -> String { + self.0.to_string() + } + + /// Get the inner UUID. + pub fn as_uuid(&self) -> &uuid::Uuid { + &self.0 + } +} + +impl Default for TaskId { + fn default() -> Self { + Self::new() + } +} + +impl fmt::Display for TaskId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for TaskId { + fn from(uuid: uuid::Uuid) -> Self { + Self(uuid) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_task_id_uniqueness() { + let id1 = TaskId::new(); + let id2 = TaskId::new(); + assert_ne!(id1, id2, "Task IDs should be unique"); + } + + #[test] + fn test_task_id_display() { + let id = TaskId::new(); + let display_str = format!("{}", id); + let as_str = id.as_str(); + assert_eq!(display_str, as_str); + } + + #[test] + fn test_task_id_chronological() { + let id1 = TaskId::new(); + std::thread::sleep(std::time::Duration::from_millis(2)); + let id2 = TaskId::new(); + + // UUIDv7 embeds timestamp, so newer IDs should be "greater" + assert!(id2.as_uuid() > id1.as_uuid()); + } +} diff --git a/crates/core/monitoring/src/task_runtime_context.rs b/crates/core/monitoring/src/task_runtime_context.rs new file mode 100644 index 000000000..5aba18fb4 --- /dev/null +++ b/crates/core/monitoring/src/task_runtime_context.rs @@ -0,0 +1,399 @@ +//! Generic task runtime metrics tracking context. +//! +//! This module provides [`TaskRuntimeContext`] which tracks tokio task metrics +//! for any type of operation using task-local storage, enabling accurate +//! per-task resource attribution even in concurrent scenarios. + +use std::{ + collections::HashSet, + future::Future, + sync::{Arc, Mutex}, + time::Instant, +}; + +use tokio_metrics::TaskMonitor; + +use crate::{task_id::TaskId, task_type::TaskType}; + +/// Get the current OS thread ID as a u64. +/// +/// This uses platform-specific methods to get a unique identifier for the current thread. +pub(crate) fn current_thread_id() -> u64 { + // std::thread::current().id() gives us a ThreadId, but we need to extract the numeric value. + // We use a hash of the thread ID as a stable numeric identifier. + use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + }; + + let thread_id = std::thread::current().id(); + let mut hasher = DefaultHasher::new(); + thread_id.hash(&mut hasher); + hasher.finish() +} + +tokio::task_local! { + /// Task-local storage for the current task context. + /// + /// This allows any code within a task execution to access the task's + /// runtime context without explicit parameter passing. The context is + /// automatically propagated to spawned tasks. + pub static TASK_CONTEXT: Arc; +} + +/// Snapshot of task metrics at a point in time. +/// +/// These metrics are specific to the task tree being monitored and are not +/// affected by other concurrent tasks in the runtime. +#[derive(Debug, Clone)] +pub struct TaskMetrics { + /// Total time the task spent executing (active work) + pub total_busy_duration: std::time::Duration, + + /// Total time the task spent waiting/idle + pub total_idle_duration: std::time::Duration, + + /// Number of times the task was polled + pub total_poll_count: u64, + + /// Number of times the task was scheduled for execution + pub total_scheduled_count: u64, + + /// Number of unique OS threads that executed this task + pub unique_thread_count: usize, + + /// Set of OS thread IDs that executed this task + pub thread_ids: HashSet, +} + +impl TaskMetrics { + /// Get the total busy duration in milliseconds for metrics export. + pub fn busy_duration_ms(&self) -> f64 { + self.total_busy_duration.as_secs_f64() * 1000.0 + } + + /// Get the total idle duration in milliseconds for metrics export. + pub fn idle_duration_ms(&self) -> f64 { + self.total_idle_duration.as_secs_f64() * 1000.0 + } + + /// Calculate the average time per poll in microseconds. + pub fn avg_poll_duration_us(&self) -> Option { + if self.total_poll_count == 0 { + None + } else { + let total_duration = self.total_busy_duration + self.total_idle_duration; + Some(total_duration.as_micros() as f64 / self.total_poll_count as f64) + } + } + + /// Calculate CPU utilization as a percentage (0-100). + /// + /// Returns an integer percentage where 100 means 100% CPU busy. + pub fn cpu_utilization(&self) -> u8 { + let total = self.total_busy_duration + self.total_idle_duration; + if total.is_zero() { + 0 + } else { + let ratio = self.total_busy_duration.as_secs_f64() / total.as_secs_f64(); + (ratio * 100.0).round() as u8 + } + } + + /// Calculate CPU utilization as a floating point percentage (0.0-100.0). + /// + /// Returns a float where 100.0 means 100% CPU busy. + pub fn cpu_utilization_f64(&self) -> f64 { + let total = self.total_busy_duration + self.total_idle_duration; + if total.is_zero() { + 0.0 + } else { + let ratio = self.total_busy_duration.as_secs_f64() / total.as_secs_f64(); + ratio * 100.0 + } + } +} + +/// Context for tracking runtime metrics during a single task execution. +/// +/// This uses `tokio-metrics::TaskMonitor` to track actual per-task metrics, +/// providing accurate resource attribution even when multiple tasks run +/// concurrently. The context is propagated via task-local storage. +/// +/// Unlike the older `QueryRuntimeContext`, this is generic and can track +/// any type of operation (queries, dumps, compactions, etc). +/// +/// # Concurrent Task Support +/// +/// `TaskMonitor` tracks metrics for the specific task tree spawned for this +/// operation, isolating it from other concurrent tasks. +/// +/// # Thread Safety +/// +/// This type is `Clone` and uses `Arc` internally, making it safe to share +/// across async tasks. +#[derive(Clone)] +pub struct TaskRuntimeContext { + /// Unique identifier for this task execution + pub task_id: TaskId, + + /// Type of task being executed with metadata + pub task_type: TaskType, + + /// Task monitor for collecting per-task metrics + task_monitor: TaskMonitor, + + /// Wall-clock time when context was created + start_time: Instant, + + /// Captured task metrics at task completion + /// + /// TODO: Consider using RwLock if read contention becomes an issue + /// OR consider atomic snapshot approach + final_metrics: Arc>>, + + /// Thread IDs observed during task execution (for parallelism tracking) + thread_ids: Arc>>, +} + +impl TaskRuntimeContext { + /// Create a new runtime context for a task execution. + /// + /// This automatically generates a new task ID and creates a fresh + /// `TaskMonitor` for tracking this task's metrics. + pub fn new(task_type: TaskType) -> Self { + Self { + task_id: TaskId::new(), + task_type, + task_monitor: TaskMonitor::new(), + start_time: Instant::now(), + final_metrics: Arc::new(Mutex::new(None)), + thread_ids: Arc::new(Mutex::new(HashSet::new())), + } + } + + /// Create a runtime context with a specific task ID. + /// + /// Useful when you want to use an existing task ID from another part of the system. + pub fn with_task_id(task_id: TaskId, task_type: TaskType) -> Self { + Self { + task_id, + task_type, + task_monitor: TaskMonitor::new(), + start_time: Instant::now(), + final_metrics: Arc::new(Mutex::new(None)), + thread_ids: Arc::new(Mutex::new(HashSet::new())), + } + } + + /// Record that the current OS thread is executing this task. + /// + /// Call this periodically during task execution to track which threads + /// are being used. This helps identify parallelism and thread migration. + fn record_current_thread(&self) { + let thread_id = current_thread_id(); + self.thread_ids.lock().unwrap().insert(thread_id); + } + + /// Execute a future within this task's monitoring context. + /// + /// This instruments the future with the task monitor and sets up task-local + /// storage so that any code within the future (or tasks it spawns) can access + /// this context. + /// + /// # Example + /// + /// ```ignore + /// let ctx = TaskRuntimeContext::new(TaskType::Dump { + /// dataset: "eth_blocks".to_string(), + /// block_range: (0, 1000), + /// records_extracted: None, + /// }); + /// let result = ctx.scope(async { + /// // This code and any spawned tasks can access TASK_CONTEXT + /// extract_blocks().await + /// }).await; + /// ``` + pub async fn scope(&self, future: F) -> R + where + F: Future, + { + let ctx = Arc::new(self.clone()); + let monitored = self.task_monitor.instrument(future); + + TASK_CONTEXT.scope(ctx, monitored).await + } + + /// Get the current task context from task-local storage. + /// + /// Returns `None` if called outside of a `scope()` context. + /// + /// # Example + /// + /// ```ignore + /// if let Some(ctx) = TaskRuntimeContext::try_current() { + /// tracing::info!(task_id = %ctx.task_id, "Processing task"); + /// } + /// ``` + pub fn try_current() -> Option> { + TASK_CONTEXT.try_with(|ctx| ctx.clone()).ok() + } + + /// Get current task metrics from the monitor. + /// + /// Returns metrics accumulated since the context was created. + /// Can be called multiple times to track progress. + pub fn current_metrics(&self) -> TaskMetrics { + // Record current thread before capturing metrics + self.record_current_thread(); + + // Use cumulative() to get a snapshot without blocking + let cumulative = self.task_monitor.cumulative(); + + // Capture thread tracking information + let thread_ids = self.thread_ids.lock().unwrap().clone(); + let unique_thread_count = thread_ids.len(); + + TaskMetrics { + total_busy_duration: cumulative.total_poll_duration, + total_idle_duration: cumulative.total_idle_duration, + total_poll_count: cumulative.total_poll_count, + total_scheduled_count: cumulative.total_scheduled_count, + unique_thread_count, + thread_ids, + } + } + + /// Finalize metrics collection and store final snapshot. + /// + /// This should be called after task execution completes. Subsequent calls + /// to `final_metrics()` will return the captured snapshot. + pub fn finalize_metrics(&self) { + let metrics = self.current_metrics(); + *self.final_metrics.lock().unwrap() = Some(metrics); + } + + /// Get the final metrics snapshot captured at task completion. + /// + /// Returns `None` if `finalize_metrics()` hasn't been called yet. + pub fn final_metrics(&self) -> Option { + self.final_metrics.lock().unwrap().clone() + } + + /// Get the wall-clock elapsed time since context creation. + pub fn elapsed(&self) -> std::time::Duration { + self.start_time.elapsed() + } + + /// Get a reference to the task monitor for advanced usage. + pub fn task_monitor(&self) -> &TaskMonitor { + &self.task_monitor + } +} + +impl std::fmt::Debug for TaskRuntimeContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TaskRuntimeContext") + .field("task_id", &self.task_id) + .field("task_type", &self.task_type.name()) + .field("elapsed", &self.elapsed()) + .field( + "has_final_metrics", + &self.final_metrics.lock().unwrap().is_some(), + ) + .finish() + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + + #[tokio::test] + async fn test_task_runtime_context_lifecycle() { + let ctx = TaskRuntimeContext::new(TaskType::Generic { + operation: "test".to_string(), + metadata: HashMap::new(), + }); + + // Execute work within the context + ctx.scope(async { + // Access context from within + let current = TaskRuntimeContext::try_current().expect("Should have context"); + assert_eq!(current.task_id, ctx.task_id); + + // Do some work + for _ in 0..10 { + tokio::task::yield_now().await; + } + }) + .await; + + // Check metrics were collected + let metrics = ctx.current_metrics(); + assert!(metrics.total_poll_count > 0, "Should have recorded polls"); + } + + #[tokio::test] + async fn test_concurrent_tasks() { + let ctx1 = Arc::new(TaskRuntimeContext::new(TaskType::Dump { + dataset: "test1".to_string(), + block_range: (0, 100), + records_extracted: None, + })); + let ctx2 = Arc::new(TaskRuntimeContext::new(TaskType::Dump { + dataset: "test2".to_string(), + block_range: (100, 200), + records_extracted: None, + })); + + let id1 = ctx1.task_id; + let id2 = ctx2.task_id; + + // Run two tasks concurrently + let (_, _) = tokio::join!( + ctx1.scope(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + let current = TaskRuntimeContext::try_current().unwrap(); + assert_eq!(current.task_id, id1); + }), + ctx2.scope(async move { + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + let current = TaskRuntimeContext::try_current().unwrap(); + assert_eq!(current.task_id, id2); + }) + ); + + // Each should have independent metrics + let metrics1 = ctx1.current_metrics(); + let metrics2 = ctx2.current_metrics(); + + assert!(metrics1.total_poll_count > 0); + assert!(metrics2.total_poll_count > 0); + } + + #[tokio::test] + async fn test_finalize_metrics() { + let ctx = TaskRuntimeContext::new(TaskType::Compact { + dataset: "test".to_string(), + input_files: 10, + output_files: 1, + }); + + assert!(ctx.final_metrics().is_none()); + + ctx.scope(async { + for _ in 0..5 { + tokio::task::yield_now().await; + } + }) + .await; + + ctx.finalize_metrics(); + + let final_metrics = ctx.final_metrics().expect("Should have final metrics"); + assert!(final_metrics.total_poll_count > 0); + } +} diff --git a/crates/core/monitoring/src/task_type.rs b/crates/core/monitoring/src/task_type.rs new file mode 100644 index 000000000..83222f10e --- /dev/null +++ b/crates/core/monitoring/src/task_type.rs @@ -0,0 +1,149 @@ +//! Task type definitions and metadata. +//! +//! This module provides the [`TaskType`] enum which categorizes different types of +//! operations (queries, dumps, compactions, etc.) and includes task-specific metadata. + +use std::collections::HashMap; + +use datafusion::logical_expr::LogicalPlan; + +use crate::plan_histogram; + +/// Type of task being executed with associated metadata. +/// +/// This enum captures both the category of work being done and relevant +/// contextual information for observability. +#[derive(Debug, Clone)] +pub enum TaskType { + /// SQL query execution with plan analysis + QueryExecution { + /// The SQL query text + sql: String, + /// Histogram of logical plan node types + plan_node_histogram: HashMap, + }, + /// Logical plan execution (may or may not have originated from SQL) + PlanExecution { + /// Histogram of logical plan node types + plan_node_histogram: HashMap, + /// Whether this is a streaming query + is_streaming: bool, + }, + /// Data dump/extraction operation + Dump { + /// Dataset being dumped + dataset: String, + /// Block range being extracted (start, end) + block_range: (u64, u64), + /// Number of records extracted (if known) + records_extracted: Option, + }, + /// File compaction operation + Compact { + /// Dataset being compacted + dataset: String, + /// Number of input files + input_files: usize, + /// Number of output files + output_files: usize, + }, + /// Garbage collection operation + Collect { + /// Dataset being collected + dataset: String, + /// Number of files collected + files_collected: usize, + }, + /// Generic operation with custom metadata + Generic { + /// Operation name + operation: String, + /// Custom key-value metadata + metadata: HashMap, + }, +} + +impl TaskType { + /// Get a human-readable name for this task type. + pub fn name(&self) -> &'static str { + match self { + TaskType::QueryExecution { .. } => "query_execution", + TaskType::PlanExecution { .. } => "plan_execution", + TaskType::Dump { .. } => "dump", + TaskType::Compact { .. } => "compact", + TaskType::Collect { .. } => "collect", + TaskType::Generic { .. } => "generic", + } + } + + /// Create a QueryExecution task type from SQL and a logical plan. + /// + /// This is a convenience method that automatically collects the plan histogram. + pub fn from_query(sql: impl Into, plan: &LogicalPlan) -> Self { + let plan_node_histogram = plan_histogram::collect_plan_histogram(plan); + TaskType::QueryExecution { + sql: sql.into(), + plan_node_histogram, + } + } + + /// Create a PlanExecution task type from a logical plan. + /// + /// This is a convenience method that automatically collects the plan histogram. + pub fn from_plan(plan: &LogicalPlan, is_streaming: bool) -> Self { + let plan_node_histogram = plan_histogram::collect_plan_histogram(plan); + TaskType::PlanExecution { + plan_node_histogram, + is_streaming, + } + } +} + +/// Truncate a SQL string for display purposes. +pub fn truncate_sql(sql: &str, max_len: usize) -> String { + let trimmed = sql.trim(); + if trimmed.len() <= max_len { + trimmed.to_string() + } else { + format!("{}...", &trimmed[..max_len]) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_task_type_names() { + let query = TaskType::QueryExecution { + sql: "SELECT 1".to_string(), + plan_node_histogram: HashMap::new(), + }; + assert_eq!(query.name(), "query_execution"); + + let dump = TaskType::Dump { + dataset: "test".to_string(), + block_range: (0, 100), + records_extracted: None, + }; + assert_eq!(dump.name(), "dump"); + + let compact = TaskType::Compact { + dataset: "test".to_string(), + input_files: 10, + output_files: 1, + }; + assert_eq!(compact.name(), "compact"); + } + + #[test] + fn test_truncate_sql() { + let short_sql = "SELECT * FROM blocks"; + assert_eq!(truncate_sql(short_sql, 100), short_sql); + + let long_sql = "SELECT * FROM blocks WHERE block_number > 1000 AND timestamp < 2000"; + let truncated = truncate_sql(long_sql, 20); + assert_eq!(truncated, "SELECT * FROM blocks..."); + assert_eq!(truncated.len(), 23); // 20 chars + "..." + } +} diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index 10d8872d2..f5fdcf04a 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -110,6 +110,7 @@ impl Service { is_streaming: Option, resume_watermark: Option, ) -> Result { + // Parse and plan the query first to get the logical plan for instrumentation let query = parse_sql(sql).map_err(Error::from)?; let dataset_store = self.dataset_store.clone(); let catalog = catalog_for_sql( @@ -124,25 +125,42 @@ impl Service { let ctx = PlanningContext::new(catalog.logical().clone()); let plan = ctx.plan_sql(query.clone()).await.map_err(Error::from)?; - let is_streaming = + let is_streaming_mode = is_streaming.unwrap_or_else(|| common::stream_helpers::is_streaming(&query)); - let result = self - .execute_plan(catalog, dataset_store, plan, is_streaming, resume_watermark) - .await; - // Record execution error - if result.is_err() - && let Some(metrics) = &self.metrics - { - let error_code = result - .as_ref() - .err() - .map(|e| e.error_code()) - .unwrap_or("UNKNOWN_ERROR"); - metrics.record_query_error(error_code); - } + if let Some(metrics) = &self.metrics { + // Use generic task instrumentation with QueryExecution task type + let task_type = monitoring::TaskType::from_query(sql, plan.as_inner()); + let instrumentation = monitoring::InstrumentedTaskExecution::new(task_type); + + let execution_fut = self.execute_plan( + catalog, + dataset_store, + plan, + is_streaming_mode, + resume_watermark, + ); + + let (result, task_metrics) = instrumentation.execute(execution_fut).await; - result + metrics.record_query_runtime_metrics(&task_metrics); + + // Record execution error if present + if let Err(e) = &result { + metrics.record_query_error(e.error_code()); + } + + result + } else { + self.execute_plan( + catalog, + dataset_store, + plan, + is_streaming_mode, + resume_watermark, + ) + .await + } } #[tracing::instrument(skip_all, err)] diff --git a/crates/services/server/src/metrics.rs b/crates/services/server/src/metrics.rs index 0b7722065..f5b5683cd 100644 --- a/crates/services/server/src/metrics.rs +++ b/crates/services/server/src/metrics.rs @@ -40,6 +40,22 @@ pub struct MetricsRegistry { /// Total bytes sent incrementally via streaming queries pub streaming_bytes_sent: telemetry::metrics::Counter, + + // Runtime metrics (per-query tokio task metrics) + /// Query runtime busy duration (CPU time) in milliseconds + pub query_runtime_busy_duration: telemetry::metrics::Histogram, + + /// Query runtime idle duration (waiting time) in milliseconds + pub query_runtime_idle_duration: telemetry::metrics::Histogram, + + /// Number of times query task was polled + pub query_runtime_poll_count: telemetry::metrics::Histogram, + + /// Number of times query task was scheduled + pub query_runtime_scheduled_count: telemetry::metrics::Histogram, + + /// Query CPU utilization ratio (0.0 to 1.0) + pub query_runtime_cpu_utilization: telemetry::metrics::Histogram, } impl MetricsRegistry { @@ -115,6 +131,36 @@ impl MetricsRegistry { "streaming_bytes_sent_total", "Total bytes sent incrementally via streaming queries", ), + query_runtime_busy_duration: telemetry::metrics::Histogram::new_f64( + meter, + "query_runtime_busy_duration_milliseconds", + "Per-query tokio task busy duration (actual CPU time)", + "milliseconds", + ), + query_runtime_idle_duration: telemetry::metrics::Histogram::new_f64( + meter, + "query_runtime_idle_duration_milliseconds", + "Per-query tokio task idle duration (waiting time)", + "milliseconds", + ), + query_runtime_poll_count: telemetry::metrics::Histogram::new_u64( + meter, + "query_runtime_poll_count", + "Number of times query task was polled by tokio runtime", + "polls", + ), + query_runtime_scheduled_count: telemetry::metrics::Histogram::new_u64( + meter, + "query_runtime_scheduled_count", + "Number of times query task was scheduled for execution", + "schedules", + ), + query_runtime_cpu_utilization: telemetry::metrics::Histogram::new_f64( + meter, + "query_runtime_cpu_utilization", + "Query CPU utilization ratio (busy / total time)", + "ratio", + ), } } @@ -157,4 +203,22 @@ impl MetricsRegistry { pub fn record_streaming_lifetime(&self, duration_millis: f64) { self.streaming_query_lifetime.record(duration_millis); } + + /// Record runtime metrics from a query's TaskMetrics. + /// + /// This captures per-query tokio task metrics including busy/idle time, + /// poll counts, and CPU utilization. These metrics provide insights into + /// query resource consumption at the runtime level. + pub fn record_query_runtime_metrics(&self, metrics: &monitoring::TaskMetrics) { + self.query_runtime_busy_duration + .record(metrics.busy_duration_ms()); + self.query_runtime_idle_duration + .record(metrics.idle_duration_ms()); + self.query_runtime_poll_count + .record(metrics.total_poll_count); + self.query_runtime_scheduled_count + .record(metrics.total_scheduled_count); + self.query_runtime_cpu_utilization + .record(metrics.cpu_utilization_f64()); + } } diff --git a/grafana-dashboards/amp-dump-operations.json b/grafana-dashboards/amp-dump-operations.json new file mode 100644 index 000000000..04c05cbcf --- /dev/null +++ b/grafana-dashboards/amp-dump-operations.json @@ -0,0 +1,258 @@ +{ + "dashboard": { + "title": "Amp Dump Operations", + "tags": ["amp", "dump", "ingestion"], + "timezone": "browser", + "schemaVersion": 38, + "version": 1, + "refresh": "30s", + "panels": [ + { + "id": 1, + "title": "Dump Throughput (records/sec)", + "type": "timeseries", + "gridPos": { "x": 0, "y": 0, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "sum(rate(records_extracted[5m])) by (dataset)", + "legendFormat": "{{dataset}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "rps", + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + } + } + } + }, + { + "id": 2, + "title": "Active Dump Operations", + "type": "stat", + "gridPos": { "x": 12, "y": 0, "w": 6, "h": 4 }, + "targets": [ + { + "expr": "count(task_type{task_type=\"dump\"})", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "value": 0, "color": "green" }, + { "value": 5, "color": "yellow" }, + { "value": 10, "color": "red" } + ] + } + } + } + }, + { + "id": 3, + "title": "Total Records Extracted", + "type": "stat", + "gridPos": { "x": 18, "y": 0, "w": 6, "h": 4 }, + "targets": [ + { + "expr": "sum(increase(records_extracted[1h]))", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 0, + "color": { + "mode": "palette-classic" + } + } + } + }, + { + "id": 4, + "title": "Dump Duration by Dataset", + "type": "timeseries", + "gridPos": { "x": 0, "y": 8, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "avg(elapsed_ms{task_type=\"dump\"}) by (dataset)", + "legendFormat": "{{dataset}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ms", + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + } + } + } + }, + { + "id": 5, + "title": "Dump CPU Utilization", + "type": "timeseries", + "gridPos": { "x": 12, "y": 8, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "avg(runtime_cpu_utilization{task_type=\"dump\"}) by (dataset)", + "legendFormat": "{{dataset}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percent", + "min": 0, + "max": 100, + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + } + } + } + }, + { + "id": 6, + "title": "Block Range Coverage", + "type": "timeseries", + "gridPos": { "x": 0, "y": 16, "w": 24, "h": 8 }, + "targets": [ + { + "expr": "block_range_end - block_range_start", + "legendFormat": "{{dataset}} - {{task_id}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "custom": { + "lineWidth": 1, + "fillOpacity": 5 + } + } + } + }, + { + "id": 7, + "title": "Dump Parallelism (Threads Used)", + "type": "timeseries", + "gridPos": { "x": 0, "y": 24, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "avg(runtime_unique_threads{task_type=\"dump\"}) by (dataset)", + "legendFormat": "{{dataset}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 1, + "custom": { + "lineWidth": 2 + } + } + } + }, + { + "id": 8, + "title": "Dump Errors (Last 24h)", + "type": "stat", + "gridPos": { "x": 12, "y": 24, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "sum(increase(dump_errors_total[24h])) by (dataset)", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "value": 0, "color": "green" }, + { "value": 1, "color": "red" } + ] + } + } + } + }, + { + "id": 9, + "title": "Recent Dump Operations", + "type": "table", + "gridPos": { "x": 0, "y": 32, "w": 24, "h": 10 }, + "targets": [ + { + "expr": "task_type{task_type=\"dump\"}", + "format": "table", + "instant": true, + "refId": "A" + } + ], + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "__name__": true, + "job": true, + "instance": true, + "task_type": true + }, + "indexByName": { + "task_id": 0, + "dataset": 1, + "block_range_start": 2, + "block_range_end": 3, + "records_extracted": 4, + "elapsed_ms": 5, + "runtime_cpu_utilization": 6, + "runtime_unique_threads": 7 + }, + "renameByName": { + "task_id": "Task ID", + "dataset": "Dataset", + "block_range_start": "Start Block", + "block_range_end": "End Block", + "records_extracted": "Records", + "elapsed_ms": "Duration (ms)", + "runtime_cpu_utilization": "CPU %", + "runtime_unique_threads": "Threads" + } + } + } + ], + "fieldConfig": { + "overrides": [ + { + "matcher": { "id": "byName", "options": "Duration (ms)" }, + "properties": [ + { "id": "unit", "value": "ms" }, + { "id": "custom.width", "value": 120 } + ] + } + ] + } + } + ] + } +} diff --git a/grafana-dashboards/amp-query-performance.json b/grafana-dashboards/amp-query-performance.json new file mode 100644 index 000000000..8d3ede944 --- /dev/null +++ b/grafana-dashboards/amp-query-performance.json @@ -0,0 +1,263 @@ +{ + "dashboard": { + "title": "Amp Query Performance", + "tags": ["amp", "queries", "performance"], + "timezone": "browser", + "schemaVersion": 38, + "version": 1, + "refresh": "30s", + "panels": [ + { + "id": 1, + "title": "Query Latency (P50, P95, P99)", + "type": "timeseries", + "gridPos": { "x": 0, "y": 0, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(elapsed_ms_bucket[5m])) by (le, task_type))", + "legendFormat": "P50 - {{task_type}}", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(elapsed_ms_bucket[5m])) by (le, task_type))", + "legendFormat": "P95 - {{task_type}}", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(elapsed_ms_bucket[5m])) by (le, task_type))", + "legendFormat": "P99 - {{task_type}}", + "refId": "C" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ms", + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + } + } + } + }, + { + "id": 2, + "title": "Query Rate (queries/sec)", + "type": "timeseries", + "gridPos": { "x": 12, "y": 0, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "sum(rate(query_execution_total[5m])) by (task_type)", + "legendFormat": "{{task_type}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "reqps", + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + } + } + } + }, + { + "id": 3, + "title": "CPU Utilization", + "type": "gauge", + "gridPos": { "x": 0, "y": 8, "w": 6, "h": 6 }, + "targets": [ + { + "expr": "avg(runtime_cpu_utilization{task_type=\"query_execution\"})", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percent", + "min": 0, + "max": 100, + "thresholds": { + "mode": "absolute", + "steps": [ + { "value": 0, "color": "red" }, + { "value": 50, "color": "yellow" }, + { "value": 80, "color": "green" } + ] + } + } + } + }, + { + "id": 4, + "title": "Average Parallelism (Threads Used)", + "type": "stat", + "gridPos": { "x": 6, "y": 8, "w": 6, "h": 6 }, + "targets": [ + { + "expr": "avg(runtime_unique_threads{task_type=\"query_execution\"})", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 1, + "thresholds": { + "mode": "absolute", + "steps": [ + { "value": 0, "color": "red" }, + { "value": 2, "color": "yellow" }, + { "value": 4, "color": "green" } + ] + } + } + } + }, + { + "id": 5, + "title": "Busy vs Idle Time", + "type": "timeseries", + "gridPos": { "x": 12, "y": 8, "w": 12, "h": 6 }, + "targets": [ + { + "expr": "avg(runtime_busy_ms{task_type=\"query_execution\"})", + "legendFormat": "Busy Time", + "refId": "A" + }, + { + "expr": "avg(runtime_idle_ms{task_type=\"query_execution\"})", + "legendFormat": "Idle Time", + "refId": "B" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ms", + "custom": { + "lineWidth": 2, + "fillOpacity": 10, + "stacking": { "mode": "normal" } + } + } + } + }, + { + "id": 6, + "title": "Query Plan Complexity Distribution", + "type": "barchart", + "gridPos": { "x": 0, "y": 14, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "sum by (plan_node_summary) (count_over_time(plan_total_nodes[1h]))", + "legendFormat": "{{plan_node_summary}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short" + } + }, + "options": { + "orientation": "horizontal", + "xTickLabelRotation": -45 + } + }, + { + "id": 7, + "title": "Average Plan Nodes per Query", + "type": "timeseries", + "gridPos": { "x": 12, "y": 14, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "avg(plan_total_nodes) by (task_type)", + "legendFormat": "{{task_type}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "custom": { + "lineWidth": 2 + } + } + } + }, + { + "id": 8, + "title": "Top 10 Slowest Queries", + "type": "table", + "gridPos": { "x": 0, "y": 22, "w": 24, "h": 8 }, + "targets": [ + { + "expr": "topk(10, elapsed_ms{task_type=\"query_execution\"})", + "format": "table", + "instant": true, + "refId": "A" + } + ], + "fieldConfig": { + "overrides": [ + { + "matcher": { "id": "byName", "options": "elapsed_ms" }, + "properties": [ + { "id": "unit", "value": "ms" }, + { "id": "custom.width", "value": 120 } + ] + }, + { + "matcher": { "id": "byName", "options": "sql" }, + "properties": [ + { "id": "custom.width", "value": 400 } + ] + } + ] + }, + "transformations": [ + { + "id": "organize", + "options": { + "excludeByName": { + "__name__": true, + "job": true, + "instance": true + }, + "indexByName": { + "task_id": 0, + "sql": 1, + "elapsed_ms": 2, + "runtime_cpu_utilization": 3, + "runtime_unique_threads": 4, + "plan_total_nodes": 5 + } + } + } + ] + }, + { + "id": 9, + "title": "Poll Count Distribution", + "type": "heatmap", + "gridPos": { "x": 0, "y": 30, "w": 24, "h": 8 }, + "targets": [ + { + "expr": "sum(rate(runtime_poll_count_bucket[5m])) by (le)", + "format": "heatmap", + "refId": "A" + } + ], + "options": { + "calculate": true, + "cellGap": 2, + "color": { + "mode": "scheme", + "scheme": "Spectral", + "steps": 128 + } + } + } + ] + } +} diff --git a/grafana-dashboards/amp-system-overview.json b/grafana-dashboards/amp-system-overview.json new file mode 100644 index 000000000..e71edb7b3 --- /dev/null +++ b/grafana-dashboards/amp-system-overview.json @@ -0,0 +1,348 @@ +{ + "dashboard": { + "title": "Amp System Overview", + "tags": ["amp", "overview", "system"], + "timezone": "browser", + "schemaVersion": 38, + "version": 1, + "refresh": "30s", + "panels": [ + { + "id": 1, + "title": "Operations Overview", + "type": "row", + "gridPos": { "x": 0, "y": 0, "w": 24, "h": 1 }, + "collapsed": false + }, + { + "id": 2, + "title": "Total Operations (24h)", + "type": "stat", + "gridPos": { "x": 0, "y": 1, "w": 6, "h": 4 }, + "targets": [ + { + "expr": "sum(increase(task_executions_total[24h]))", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 0 + } + } + }, + { + "id": 3, + "title": "Query Operations", + "type": "stat", + "gridPos": { "x": 6, "y": 1, "w": 6, "h": 4 }, + "targets": [ + { + "expr": "sum(increase(task_executions_total{task_type=\"query_execution\"}[24h]))", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 0, + "color": { + "mode": "fixed", + "fixedColor": "blue" + } + } + } + }, + { + "id": 4, + "title": "Dump Operations", + "type": "stat", + "gridPos": { "x": 12, "y": 1, "w": 6, "h": 4 }, + "targets": [ + { + "expr": "sum(increase(task_executions_total{task_type=\"dump\"}[24h]))", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 0, + "color": { + "mode": "fixed", + "fixedColor": "green" + } + } + } + }, + { + "id": 5, + "title": "Error Rate", + "type": "stat", + "gridPos": { "x": 18, "y": 1, "w": 6, "h": 4 }, + "targets": [ + { + "expr": "sum(rate(task_errors_total[5m]))", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops", + "decimals": 2, + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { "value": 0, "color": "green" }, + { "value": 0.01, "color": "yellow" }, + { "value": 0.1, "color": "red" } + ] + } + } + } + }, + { + "id": 6, + "title": "Operation Rate by Type", + "type": "timeseries", + "gridPos": { "x": 0, "y": 5, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "sum(rate(task_executions_total[5m])) by (task_type)", + "legendFormat": "{{task_type}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ops", + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + } + } + } + }, + { + "id": 7, + "title": "Average Latency by Type", + "type": "timeseries", + "gridPos": { "x": 12, "y": 5, "w": 12, "h": 8 }, + "targets": [ + { + "expr": "avg(elapsed_ms) by (task_type)", + "legendFormat": "{{task_type}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ms", + "custom": { + "lineWidth": 2, + "fillOpacity": 10 + } + } + } + }, + { + "id": 8, + "title": "Resource Utilization", + "type": "row", + "gridPos": { "x": 0, "y": 13, "w": 24, "h": 1 }, + "collapsed": false + }, + { + "id": 9, + "title": "Overall CPU Utilization", + "type": "gauge", + "gridPos": { "x": 0, "y": 14, "w": 8, "h": 8 }, + "targets": [ + { + "expr": "avg(runtime_cpu_utilization)", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "percent", + "min": 0, + "max": 100, + "thresholds": { + "mode": "absolute", + "steps": [ + { "value": 0, "color": "red" }, + { "value": 50, "color": "yellow" }, + { "value": 80, "color": "green" } + ] + } + } + }, + "options": { + "showThresholdLabels": true, + "showThresholdMarkers": true + } + }, + { + "id": 10, + "title": "System-Wide Parallelism", + "type": "timeseries", + "gridPos": { "x": 8, "y": 14, "w": 8, "h": 8 }, + "targets": [ + { + "expr": "sum(runtime_unique_threads) by (task_type)", + "legendFormat": "{{task_type}}", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "custom": { + "lineWidth": 2, + "fillOpacity": 10, + "stacking": { "mode": "normal" } + } + } + } + }, + { + "id": 11, + "title": "Busy vs Idle Ratio", + "type": "piechart", + "gridPos": { "x": 16, "y": 14, "w": 8, "h": 8 }, + "targets": [ + { + "expr": "sum(runtime_busy_ms)", + "legendFormat": "Busy", + "refId": "A" + }, + { + "expr": "sum(runtime_idle_ms)", + "legendFormat": "Idle", + "refId": "B" + } + ], + "options": { + "pieType": "pie", + "displayLabels": ["percent"], + "legend": { + "displayMode": "table", + "placement": "right", + "values": ["value", "percent"] + } + } + }, + { + "id": 12, + "title": "Performance Trends", + "type": "row", + "gridPos": { "x": 0, "y": 22, "w": 24, "h": 1 }, + "collapsed": false + }, + { + "id": 13, + "title": "Latency Percentiles (All Operations)", + "type": "timeseries", + "gridPos": { "x": 0, "y": 23, "w": 24, "h": 8 }, + "targets": [ + { + "expr": "histogram_quantile(0.50, sum(rate(elapsed_ms_bucket[5m])) by (le))", + "legendFormat": "P50", + "refId": "A" + }, + { + "expr": "histogram_quantile(0.90, sum(rate(elapsed_ms_bucket[5m])) by (le))", + "legendFormat": "P90", + "refId": "B" + }, + { + "expr": "histogram_quantile(0.95, sum(rate(elapsed_ms_bucket[5m])) by (le))", + "legendFormat": "P95", + "refId": "C" + }, + { + "expr": "histogram_quantile(0.99, sum(rate(elapsed_ms_bucket[5m])) by (le))", + "legendFormat": "P99", + "refId": "D" + } + ], + "fieldConfig": { + "defaults": { + "unit": "ms", + "custom": { + "lineWidth": 2, + "fillOpacity": 5 + } + }, + "overrides": [ + { + "matcher": { "id": "byName", "options": "P99" }, + "properties": [ + { "id": "custom.lineWidth", "value": 3 } + ] + } + ] + } + }, + { + "id": 14, + "title": "Task Efficiency Score", + "type": "stat", + "gridPos": { "x": 0, "y": 31, "w": 24, "h": 4 }, + "targets": [ + { + "expr": "(avg(runtime_cpu_utilization) * avg(runtime_unique_threads)) / 100", + "refId": "A" + } + ], + "fieldConfig": { + "defaults": { + "unit": "short", + "decimals": 2, + "mappings": [ + { + "type": "range", + "options": { + "from": 0, + "to": 2, + "result": { + "text": "Low - Underutilized", + "color": "red" + } + } + }, + { + "type": "range", + "options": { + "from": 2, + "to": 4, + "result": { + "text": "Good - Balanced", + "color": "yellow" + } + } + }, + { + "type": "range", + "options": { + "from": 4, + "to": 100, + "result": { + "text": "Excellent - High Utilization", + "color": "green" + } + } + } + ] + } + }, + "description": "Efficiency score = (CPU Utilization × Avg Threads Used) / 100. Higher is better." + } + ] + } +}