From 80d241842e7f992a4223376c0bcae14723aefc85 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 18 Jun 2026 10:19:27 +0000 Subject: [PATCH] refactor: reduce complexity of analyze_all in src/audit/analyzers/mcp.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract EventAccumulators struct with per-event handler methods (on_tool_call, on_tool_error, on_server_error, on_server_lifecycle) to replace a deeply-nested for+for+match inside analyze_all. Also extract four helpers: - validate_logs_dir_exists — replaces the inline metadata guard - process_log_contents — handles the per-line parse/dispatch loop - build_tool_summaries — builds and sorts the MCPToolSummary list - build_server_stats — builds and sorts the MCPServerStats list analyze_all is now a linear 20-line orchestration function. Cognitive complexity: 19 → below 10 (zero clippy::cognitive_complexity warnings at threshold=10). All 2062 tests pass. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/audit/analyzers/mcp.rs | 278 +++++++++++++++++++++---------------- 1 file changed, 155 insertions(+), 123 deletions(-) diff --git a/src/audit/analyzers/mcp.rs b/src/audit/analyzers/mcp.rs index 9a08767a..369c6235 100644 --- a/src/audit/analyzers/mcp.rs +++ b/src/audit/analyzers/mcp.rs @@ -46,136 +46,166 @@ pub async fn extract_mcp_failures( Ok(analyze_all(mcpg_logs_dir).await?.failures) } -async fn analyze_all(mcpg_logs_dir: &Path) -> Result { - match tokio::fs::metadata(mcpg_logs_dir).await { - Ok(metadata) => { - anyhow::ensure!( - metadata.is_dir(), - "MCPG logs path is not a directory: {}", - mcpg_logs_dir.display() - ); +/// Tracks all state accumulated while scanning gateway log files. +#[derive(Default)] +struct EventAccumulators { + saw_recognizable_event: bool, + per_tool: BTreeMap<(String, String), ToolAccumulator>, + observed_servers: BTreeSet, + server_error_counts: BTreeMap, + failures: Vec, +} + +impl EventAccumulators { + fn process_event(&mut self, event_kind: &str, value: Value) { + match event_kind { + "tool_call" => self.on_tool_call(value), + "tool_error" => self.on_tool_error(value), + "server_error" => self.on_server_error(value), + "server_start" | "server_stop" => self.on_server_lifecycle(value), + _ => return, + } + self.saw_recognizable_event = true; + } + + fn on_tool_call(&mut self, value: Value) { + let server = extract_string_field(&value, &["server", "mcp_server", "provider"]) + .unwrap_or_default(); + let tool = extract_string_field(&value, &["tool", "name"]); + + if !server.is_empty() { + self.observed_servers.insert(server.clone()); + } + + if let Some(tool) = tool.filter(|tool| !tool.is_empty()) { + let entry = self.per_tool.entry((server, tool)).or_default(); + update_tool_sizes(entry, &value); + entry.call_count += 1; + } + } + + fn on_tool_error(&mut self, value: Value) { + let server = extract_string_field(&value, &["server", "mcp_server", "provider"]) + .unwrap_or_default(); + let tool = extract_string_field(&value, &["tool", "name"]); + + if !server.is_empty() { + self.observed_servers.insert(server.clone()); + } + + if let Some(tool_name) = tool.clone().filter(|tool| !tool.is_empty()) { + let entry = self.per_tool.entry((server, tool_name)).or_default(); + update_tool_sizes(entry, &value); + entry.error_count += 1; + } + + self.failures.push(MCPFailureReport { + tool: tool.filter(|tool| !tool.is_empty()), + context: None, + reason: extract_stringish_field(&value, &["error"]), + timestamp: extract_string_field(&value, &["ts", "time", "timestamp", "@timestamp"]), + extra: value, + }); + } + + fn on_server_error(&mut self, value: Value) { + let server = extract_string_field(&value, &["server", "mcp_server", "provider"]) + .unwrap_or_default(); + + if !server.is_empty() { + self.observed_servers.insert(server.clone()); + *self.server_error_counts.entry(server).or_default() += 1; } - Err(error) if error.kind() == ErrorKind::NotFound => return Ok(AnalyzeAllResult::default()), - Err(error) => { - return Err(error) - .with_context(|| format!("Failed to stat {}", mcpg_logs_dir.display())); + + self.failures.push(MCPFailureReport { + tool: None, + context: None, + reason: extract_stringish_field(&value, &["error"]), + timestamp: extract_string_field(&value, &["ts", "time", "timestamp", "@timestamp"]), + extra: value, + }); + } + + fn on_server_lifecycle(&mut self, value: Value) { + if let Some(server) = + extract_string_field(&value, &["server", "mcp_server", "provider"]) + { + self.observed_servers.insert(server); } } +} + +async fn analyze_all(mcpg_logs_dir: &Path) -> Result { + if !validate_logs_dir_exists(mcpg_logs_dir).await? { + return Ok(AnalyzeAllResult::default()); + } let file_paths = read_log_file_paths(mcpg_logs_dir).await?; - let mut saw_recognizable_event = false; - let mut per_tool = BTreeMap::<(String, String), ToolAccumulator>::new(); - let mut observed_servers = BTreeSet::::new(); - let mut server_error_counts = BTreeMap::::new(); - let mut failures = Vec::new(); + let mut acc = EventAccumulators::default(); for path in file_paths { let contents = tokio::fs::read_to_string(&path) .await .with_context(|| format!("Failed to read MCP gateway log {}", path.display()))?; + process_log_contents(&mut acc, &contents); + } - for line in contents.lines() { - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } + if !acc.saw_recognizable_event { + return Ok(AnalyzeAllResult::default()); + } - let value: Value = match serde_json::from_str(trimmed) { - Ok(value) => value, - Err(_) => continue, - }; + let tools = build_tool_summaries(&acc.per_tool); + let servers = build_server_stats(acc.observed_servers, &acc.per_tool, acc.server_error_counts); - let Some(event_kind) = extract_string_field(&value, &["event", "kind", "type"]) - .map(|kind| kind.to_ascii_lowercase()) - else { - continue; - }; + Ok(AnalyzeAllResult { + tool_usage: Some(MCPToolUsageData { tools }), + server_health: Some(MCPServerHealth { servers }), + failures: acc.failures, + }) +} - match event_kind.as_str() { - "tool_call" => { - saw_recognizable_event = true; - let server = - extract_string_field(&value, &["server", "mcp_server", "provider"]) - .unwrap_or_default(); - let tool = extract_string_field(&value, &["tool", "name"]); - - if !server.is_empty() { - observed_servers.insert(server.clone()); - } - - if let Some(tool) = tool.filter(|tool| !tool.is_empty()) { - let entry = per_tool.entry((server, tool)).or_default(); - update_tool_sizes(entry, &value); - entry.call_count += 1; - } - } - "tool_error" => { - saw_recognizable_event = true; - let server = - extract_string_field(&value, &["server", "mcp_server", "provider"]) - .unwrap_or_default(); - let tool = extract_string_field(&value, &["tool", "name"]); - - if !server.is_empty() { - observed_servers.insert(server.clone()); - } - - if let Some(tool_name) = tool.clone().filter(|tool| !tool.is_empty()) { - let entry = per_tool.entry((server, tool_name)).or_default(); - update_tool_sizes(entry, &value); - entry.error_count += 1; - } - - failures.push(MCPFailureReport { - tool: tool.filter(|tool| !tool.is_empty()), - context: None, - reason: extract_stringish_field(&value, &["error"]), - timestamp: extract_string_field( - &value, - &["ts", "time", "timestamp", "@timestamp"], - ), - extra: value, - }); - } - "server_error" => { - saw_recognizable_event = true; - let server = - extract_string_field(&value, &["server", "mcp_server", "provider"]) - .unwrap_or_default(); - - if !server.is_empty() { - observed_servers.insert(server.clone()); - *server_error_counts.entry(server).or_default() += 1; - } - - failures.push(MCPFailureReport { - tool: None, - context: None, - reason: extract_stringish_field(&value, &["error"]), - timestamp: extract_string_field( - &value, - &["ts", "time", "timestamp", "@timestamp"], - ), - extra: value, - }); - } - "server_start" | "server_stop" => { - saw_recognizable_event = true; - if let Some(server) = - extract_string_field(&value, &["server", "mcp_server", "provider"]) - { - observed_servers.insert(server); - } - } - _ => {} - } +/// Returns `true` if the directory exists and is valid, `false` if not found. +async fn validate_logs_dir_exists(path: &Path) -> Result { + match tokio::fs::metadata(path).await { + Ok(metadata) => { + anyhow::ensure!( + metadata.is_dir(), + "MCPG logs path is not a directory: {}", + path.display() + ); + Ok(true) } + Err(error) if error.kind() == ErrorKind::NotFound => Ok(false), + Err(error) => Err(error).with_context(|| format!("Failed to stat {}", path.display())), } +} - if !saw_recognizable_event { - return Ok(AnalyzeAllResult::default()); +/// Parses every non-empty line of `contents` as JSON and dispatches to `acc`. +fn process_log_contents(acc: &mut EventAccumulators, contents: &str) { + for line in contents.lines() { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let value: Value = match serde_json::from_str(trimmed) { + Ok(value) => value, + Err(_) => continue, + }; + + let Some(event_kind) = extract_string_field(&value, &["event", "kind", "type"]) + .map(|kind| kind.to_ascii_lowercase()) + else { + continue; + }; + + acc.process_event(&event_kind, value); } +} +fn build_tool_summaries( + per_tool: &BTreeMap<(String, String), ToolAccumulator>, +) -> Vec { let mut tools: Vec = per_tool .iter() .map(|((server, tool), stats)| MCPToolSummary { @@ -192,7 +222,14 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result { .cmp(&left.call_count) .then_with(|| left.name.cmp(&right.name)) }); + tools +} +fn build_server_stats( + observed_servers: BTreeSet, + per_tool: &BTreeMap<(String, String), ToolAccumulator>, + server_error_counts: BTreeMap, +) -> Vec { let mut server_rollups = BTreeMap::::new(); for server in observed_servers { server_rollups.insert( @@ -204,28 +241,28 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result { ); } - for ((server, _tool), stats) in &per_tool { + for ((server, _tool), stats) in per_tool { if server.is_empty() { continue; } - let server_entry = server_rollups + let entry = server_rollups .entry(server.clone()) .or_insert_with(|| MCPServerStats { name: server.clone(), ..MCPServerStats::default() }); - server_entry.total_calls += stats.call_count; - server_entry.error_count += stats.error_count; + entry.total_calls += stats.call_count; + entry.error_count += stats.error_count; } for (server, error_count) in server_error_counts { - let server_entry = server_rollups + let entry = server_rollups .entry(server.clone()) .or_insert_with(|| MCPServerStats { name: server, ..MCPServerStats::default() }); - server_entry.error_count += error_count; + entry.error_count += error_count; } let mut servers: Vec = server_rollups @@ -246,12 +283,7 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result { .cmp(&left.total_calls) .then_with(|| left.name.cmp(&right.name)) }); - - Ok(AnalyzeAllResult { - tool_usage: Some(MCPToolUsageData { tools }), - server_health: Some(MCPServerHealth { servers }), - failures, - }) + servers } async fn read_log_file_paths(dir: &Path) -> Result> {