From b4fa547a45011e334b3d179f282d62d4f4b6ebcb Mon Sep 17 00:00:00 2001 From: Your Name Date: Wed, 22 Apr 2026 17:02:05 -0400 Subject: [PATCH] feat(llm-obs): add spans analytics command with multi-dimension group-by MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `pup llm-obs spans analytics` — aggregates LLM Observability spans by one or more dimensions using the llm-obs-query-rewriter timeseries endpoint. Supports `--group-by`, `--compute`, `--from`/`--to`, `--ml-app`, `--query`, and all output formats via formatter::output. Co-Authored-By: Claude Sonnet 4.6 (1M context) --- src/commands/llm_obs.rs | 291 ++++++++++++++++++++++++++++++++++++++++ src/main.rs | 46 +++++++ src/test_commands.rs | 174 ++++++++++++++++++++++++ 3 files changed, 511 insertions(+) diff --git a/src/commands/llm_obs.rs b/src/commands/llm_obs.rs index 250168b8..d15ba74c 100644 --- a/src/commands/llm_obs.rs +++ b/src/commands/llm_obs.rs @@ -356,3 +356,294 @@ pub async fn spans_search( .map_err(|e| anyhow::anyhow!("failed to search spans: {e:?}"))?; formatter::output(cfg, &resp) } + +fn build_analytics_filter(query: Option, ml_app: Option) -> String { + match (query, ml_app) { + (Some(q), Some(app)) => format!("{q} @ml_app:{app}"), + (Some(q), None) => q, + (None, Some(app)) => format!("@ml_app:{app}"), + (None, None) => String::new(), + } +} + +fn parse_group_by_facets(group_by: Option, limit: u32) -> Vec { + group_by + .map(|g| { + g.split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .map(|facet| serde_json::json!({ "facet": facet, "limit": limit })) + .collect() + }) + .unwrap_or_default() +} + +fn flatten_analytics_response( + resp: &serde_json::Value, + facets: &[String], + compute: &str, +) -> Result> { + let buckets = resp + .get("buckets") + .and_then(|b| b.as_array()) + .ok_or_else(|| anyhow::anyhow!("unexpected response: missing 'buckets' array"))?; + + let rows = buckets + .iter() + .map(|bucket| { + let by = bucket.get("by").and_then(|b| b.as_object()); + let computes = bucket.get("computes").and_then(|c| c.as_object()); + + let mut row = serde_json::Map::new(); + for facet in facets { + let val = by + .and_then(|b| b.get(facet)) + .cloned() + .unwrap_or(serde_json::Value::Null); + row.insert(facet.clone(), val); + } + let val = computes + .and_then(|c| c.get("c0")) + .cloned() + .unwrap_or(serde_json::Value::Null); + row.insert(compute.to_string(), val); + + serde_json::Value::Object(row) + }) + .collect(); + + Ok(rows) +} + +#[allow(clippy::too_many_arguments)] +pub async fn spans_analytics( + cfg: &Config, + query: Option, + from: String, + to: String, + group_by: Option, + compute: String, + limit: u32, + ml_app: Option, +) -> Result<()> { + let filter_query = build_analytics_filter(query, ml_app); + let from_ms = crate::util::parse_time_to_unix_millis(&from) + .map_err(|e| anyhow::anyhow!("invalid --from value: {e}"))?; + let to_ms = crate::util::parse_time_to_unix_millis(&to) + .map_err(|e| anyhow::anyhow!("invalid --to value: {e}"))?; + let facets: Vec = group_by + .as_deref() + .map(|g| { + g.split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect() + }) + .unwrap_or_default(); + let group_by_arr: Vec = facets + .iter() + .map(|f| serde_json::json!({ "facet": f, "limit": limit })) + .collect(); + let body = serde_json::json!({ + "search": { "query": filter_query }, + "time": { "from": from_ms.to_string(), "to": to_ms.to_string() }, + "indexes": ["llmobs"], + "type": "llmobs", + "computes": [{ "aggregation": compute, "name": "c0" }], + "groupBy": group_by_arr, + }); + let resp = client::raw_post(cfg, "/api/unstable/llm-obs-query-rewriter/timeseries", body) + .await + .map_err(|e| anyhow::anyhow!("failed to run spans analytics: {e:?}"))?; + let rows = flatten_analytics_response(&resp, &facets, &compute)?; + formatter::output(cfg, &rows) +} + +#[cfg(test)] +mod tests { + use super::*; + + // --- build_analytics_filter --- + + #[test] + fn test_filter_query_only() { + assert_eq!( + build_analytics_filter(Some("span.kind:llm".into()), None), + "span.kind:llm" + ); + } + + #[test] + fn test_filter_ml_app_only() { + assert_eq!( + build_analytics_filter(None, Some("my-app".into())), + "@ml_app:my-app" + ); + } + + #[test] + fn test_filter_query_and_ml_app() { + assert_eq!( + build_analytics_filter(Some("span.kind:llm".into()), Some("my-app".into())), + "span.kind:llm @ml_app:my-app" + ); + } + + #[test] + fn test_filter_neither() { + assert_eq!(build_analytics_filter(None, None), ""); + } + + // --- parse_group_by_facets --- + + #[test] + fn test_group_by_single() { + let result = parse_group_by_facets(Some("span_name".into()), 10); + assert_eq!( + result, + vec![serde_json::json!({"facet": "span_name", "limit": 10})] + ); + } + + #[test] + fn test_group_by_multiple() { + let result = parse_group_by_facets( + Some("span_name,@meta.error.type,@meta.error.message".into()), + 10, + ); + assert_eq!( + result, + vec![ + serde_json::json!({"facet": "span_name", "limit": 10}), + serde_json::json!({"facet": "@meta.error.type", "limit": 10}), + serde_json::json!({"facet": "@meta.error.message", "limit": 10}), + ] + ); + } + + #[test] + fn test_group_by_trims_whitespace() { + let result = parse_group_by_facets(Some(" span_name , @meta.error.type ".into()), 5); + assert_eq!( + result, + vec![ + serde_json::json!({"facet": "span_name", "limit": 5}), + serde_json::json!({"facet": "@meta.error.type", "limit": 5}), + ] + ); + } + + #[test] + fn test_group_by_none() { + let result = parse_group_by_facets(None, 10); + assert!(result.is_empty()); + } + + #[test] + fn test_group_by_filters_empty_segments() { + let result = parse_group_by_facets(Some("span_name,,@meta.error.type".into()), 10); + assert_eq!( + result, + vec![ + serde_json::json!({"facet": "span_name", "limit": 10}), + serde_json::json!({"facet": "@meta.error.type", "limit": 10}), + ] + ); + } + + #[test] + fn test_group_by_limit_applied_to_all() { + let result = parse_group_by_facets(Some("a,b,c".into()), 25); + assert!(result.iter().all(|v| v["limit"] == 25)); + } + + // --- flatten_analytics_response --- + + #[test] + fn test_flatten_analytics_single_facet() { + let resp = serde_json::json!({ + "buckets": [ + { "by": { "span_name": "llm.call" }, "computes": { "c0": 42 } }, + { "by": { "span_name": "tool.run" }, "computes": { "c0": 7 } }, + ] + }); + let rows = flatten_analytics_response(&resp, &["span_name".into()], "count").unwrap(); + assert_eq!(rows.len(), 2); + assert_eq!(rows[0]["span_name"], "llm.call"); + assert_eq!(rows[0]["count"], 42); + assert_eq!(rows[1]["span_name"], "tool.run"); + assert_eq!(rows[1]["count"], 7); + } + + #[test] + fn test_flatten_analytics_multiple_facets() { + let resp = serde_json::json!({ + "buckets": [ + { + "by": { "span_name": "llm.call", "@ml_app": "my-app" }, + "computes": { "c0": 10 } + } + ] + }); + let rows = + flatten_analytics_response(&resp, &["span_name".into(), "@ml_app".into()], "count") + .unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0]["span_name"], "llm.call"); + assert_eq!(rows[0]["@ml_app"], "my-app"); + assert_eq!(rows[0]["count"], 10); + } + + #[test] + fn test_flatten_analytics_no_facets() { + // Total aggregate with no group-by: one bucket, no "by" fields, just the compute. + let resp = serde_json::json!({ + "buckets": [ + { "by": {}, "computes": { "c0": 99 } } + ] + }); + let rows = flatten_analytics_response(&resp, &[], "count").unwrap(); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0]["count"], 99); + // No extra keys beyond the compute label. + assert_eq!(rows[0].as_object().unwrap().len(), 1); + } + + #[test] + fn test_flatten_analytics_empty_buckets() { + let resp = serde_json::json!({ "buckets": [] }); + let rows = flatten_analytics_response(&resp, &["span_name".into()], "count").unwrap(); + assert!(rows.is_empty()); + } + + #[test] + fn test_flatten_analytics_missing_buckets_key() { + let resp = serde_json::json!({ "data": [] }); + let err = flatten_analytics_response(&resp, &[], "count").unwrap_err(); + assert!(err.to_string().contains("missing 'buckets' array")); + } + + #[test] + fn test_flatten_analytics_missing_facet_value_is_null() { + // If a bucket's "by" object is missing a facet, the cell should be null. + let resp = serde_json::json!({ + "buckets": [ + { "by": {}, "computes": { "c0": 5 } } + ] + }); + let rows = flatten_analytics_response(&resp, &["span_name".into()], "count").unwrap(); + assert_eq!(rows[0]["span_name"], serde_json::Value::Null); + } + + #[test] + fn test_flatten_analytics_missing_compute_is_null() { + // If a bucket has no "computes" object, the compute cell should be null. + let resp = serde_json::json!({ + "buckets": [ + { "by": { "span_name": "llm.call" } } + ] + }); + let rows = flatten_analytics_response(&resp, &["span_name".into()], "count").unwrap(); + assert_eq!(rows[0]["count"], serde_json::Value::Null); + } +} diff --git a/src/main.rs b/src/main.rs index d174736f..6ce030a1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7996,6 +7996,38 @@ enum LlmObsSpansActions { #[arg(long, help = "Pagination cursor from a previous response")] cursor: Option, }, + /// Aggregate LLM Observability spans grouped by one or more dimensions + Analytics { + #[arg(long, help = "Search filter query")] + query: Option, + #[arg( + long, + default_value = "1h", + help = "Start time (relative like '1h' or RFC3339)" + )] + from: String, + #[arg( + long, + default_value = "now", + help = "End time (relative like 'now' or RFC3339)" + )] + to: String, + #[arg( + long, + help = "Dimensions to group by, comma-separated (e.g. \"span_name,@meta.error.type\")" + )] + group_by: Option, + #[arg( + long, + default_value = "count", + help = "Aggregation to compute (e.g. count, avg(@meta.span.duration))" + )] + compute: String, + #[arg(long, default_value = "10", help = "Max results per group dimension")] + limit: u32, + #[arg(long, help = "Filter by ML app name")] + ml_app: Option, + }, } #[derive(Subcommand)] @@ -13282,6 +13314,20 @@ async fn main_inner() -> anyhow::Result<()> { ) .await?; } + LlmObsSpansActions::Analytics { + query, + from, + to, + group_by, + compute, + limit, + ml_app, + } => { + commands::llm_obs::spans_analytics( + &cfg, query, from, to, group_by, compute, limit, ml_app, + ) + .await?; + } }, LlmObsActions::AnnotationQueues { action } => match action { LlmObsAnnotationQueuesActions::Create { file } => { diff --git a/src/test_commands.rs b/src/test_commands.rs index 0cc840f0..fe2a646c 100644 --- a/src/test_commands.rs +++ b/src/test_commands.rs @@ -4053,6 +4053,180 @@ async fn test_llm_obs_spans_search_no_auth() { cleanup_env(); } +// ---- spans analytics ---- + +#[tokio::test] +async fn test_llm_obs_spans_analytics() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + + let body = r#"{"buckets":[{"by":{"span_name":"llm.call","@meta.error.type":"timeout"},"computes":{"c0":42}}]}"#; + let _mock = mock_post( + &mut server, + "/api/unstable/llm-obs-query-rewriter/timeseries", + 200, + body, + ) + .await; + + let result = crate::commands::llm_obs::spans_analytics( + &cfg, + Some("span.kind:llm".into()), + "1h".into(), + "now".into(), + Some("span_name,@meta.error.type".into()), + "count".into(), + 10, + None, + ) + .await; + assert!(result.is_ok(), "spans_analytics failed: {:?}", result.err()); + cleanup_env(); +} + +#[tokio::test] +async fn test_llm_obs_spans_analytics_401() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + + let _mock = mock_post( + &mut server, + "/api/unstable/llm-obs-query-rewriter/timeseries", + 401, + r#"{"errors":["Unauthorized"]}"#, + ) + .await; + + let result = crate::commands::llm_obs::spans_analytics( + &cfg, + None, + "1h".into(), + "now".into(), + Some("span_name".into()), + "count".into(), + 10, + None, + ) + .await; + assert!(result.is_err(), "should fail on 401"); + assert!(result.unwrap_err().to_string().contains("401")); + cleanup_env(); +} + +#[tokio::test] +async fn test_llm_obs_spans_analytics_403() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + + let _mock = mock_post( + &mut server, + "/api/unstable/llm-obs-query-rewriter/timeseries", + 403, + r#"{"errors":["Forbidden"]}"#, + ) + .await; + + let result = crate::commands::llm_obs::spans_analytics( + &cfg, + None, + "1h".into(), + "now".into(), + Some("span_name".into()), + "count".into(), + 10, + None, + ) + .await; + assert!(result.is_err(), "should fail on 403"); + assert!(result.unwrap_err().to_string().contains("403")); + cleanup_env(); +} + +#[tokio::test] +async fn test_llm_obs_spans_analytics_500() { + let _lock = lock_env(); + let mut server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + + let _mock = mock_post( + &mut server, + "/api/unstable/llm-obs-query-rewriter/timeseries", + 500, + r#"{"errors":["internal server error"]}"#, + ) + .await; + + let result = crate::commands::llm_obs::spans_analytics( + &cfg, + None, + "1h".into(), + "now".into(), + Some("span_name".into()), + "count".into(), + 10, + None, + ) + .await; + assert!(result.is_err(), "should fail on 500"); + assert!(result.unwrap_err().to_string().contains("500")); + cleanup_env(); +} + +#[tokio::test] +async fn test_llm_obs_spans_analytics_invalid_from() { + let _lock = lock_env(); + let server = mockito::Server::new_async().await; + let cfg = test_config(&server.url()); + + // No mock — should error before any network call + let result = crate::commands::llm_obs::spans_analytics( + &cfg, + None, + "not-a-valid-time".into(), + "now".into(), + Some("span_name".into()), + "count".into(), + 10, + None, + ) + .await; + assert!(result.is_err(), "should fail with invalid --from"); + cleanup_env(); +} + +#[tokio::test] +async fn test_llm_obs_spans_analytics_no_auth() { + let _lock = lock_env(); + let cfg = Config { + api_key: None, + app_key: None, + access_token: None, + site: "datadoghq.com".into(), + org: None, + output_format: OutputFormat::Json, + auto_approve: false, + agent_mode: false, + read_only: false, + }; + + let result = crate::commands::llm_obs::spans_analytics( + &cfg, + None, + "1h".into(), + "now".into(), + Some("span_name".into()), + "count".into(), + 10, + None, + ) + .await; + assert!(result.is_err(), "should fail without auth"); + cleanup_env(); +} + // ------------------------------------------------------------------------- // Auth status --site flag // -------------------------------------------------------------------------