Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 155 additions & 123 deletions src/audit/analyzers/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,136 +46,166 @@ pub async fn extract_mcp_failures(
Ok(analyze_all(mcpg_logs_dir).await?.failures)
}

async fn analyze_all(mcpg_logs_dir: &Path) -> Result<AnalyzeAllResult> {
match tokio::fs::metadata(mcpg_logs_dir).await {
Ok(metadata) => {
anyhow::ensure!(
metadata.is_dir(),
"MCPG logs path is not a directory: {}",
mcpg_logs_dir.display()
);
/// Tracks all state accumulated while scanning gateway log files.
#[derive(Default)]
struct EventAccumulators {
saw_recognizable_event: bool,
per_tool: BTreeMap<(String, String), ToolAccumulator>,
observed_servers: BTreeSet<String>,
server_error_counts: BTreeMap<String, u64>,
failures: Vec<MCPFailureReport>,
}

impl EventAccumulators {
fn process_event(&mut self, event_kind: &str, value: Value) {
match event_kind {
"tool_call" => self.on_tool_call(value),
"tool_error" => self.on_tool_error(value),
"server_error" => self.on_server_error(value),
"server_start" | "server_stop" => self.on_server_lifecycle(value),
_ => return,
}
self.saw_recognizable_event = true;
}

fn on_tool_call(&mut self, value: Value) {
let server = extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();
let tool = extract_string_field(&value, &["tool", "name"]);

if !server.is_empty() {
self.observed_servers.insert(server.clone());
}

if let Some(tool) = tool.filter(|tool| !tool.is_empty()) {
let entry = self.per_tool.entry((server, tool)).or_default();
update_tool_sizes(entry, &value);
entry.call_count += 1;
}
}

fn on_tool_error(&mut self, value: Value) {
let server = extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();
let tool = extract_string_field(&value, &["tool", "name"]);

if !server.is_empty() {
self.observed_servers.insert(server.clone());
}

if let Some(tool_name) = tool.clone().filter(|tool| !tool.is_empty()) {
let entry = self.per_tool.entry((server, tool_name)).or_default();
update_tool_sizes(entry, &value);
entry.error_count += 1;
}

self.failures.push(MCPFailureReport {
tool: tool.filter(|tool| !tool.is_empty()),
context: None,
reason: extract_stringish_field(&value, &["error"]),
timestamp: extract_string_field(&value, &["ts", "time", "timestamp", "@timestamp"]),
extra: value,
});
}

fn on_server_error(&mut self, value: Value) {
let server = extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();

if !server.is_empty() {
self.observed_servers.insert(server.clone());
*self.server_error_counts.entry(server).or_default() += 1;
}
Err(error) if error.kind() == ErrorKind::NotFound => return Ok(AnalyzeAllResult::default()),
Err(error) => {
return Err(error)
.with_context(|| format!("Failed to stat {}", mcpg_logs_dir.display()));

self.failures.push(MCPFailureReport {
tool: None,
context: None,
reason: extract_stringish_field(&value, &["error"]),
timestamp: extract_string_field(&value, &["ts", "time", "timestamp", "@timestamp"]),
extra: value,
});
}

fn on_server_lifecycle(&mut self, value: Value) {
if let Some(server) =
extract_string_field(&value, &["server", "mcp_server", "provider"])
{
self.observed_servers.insert(server);
}
}
}

async fn analyze_all(mcpg_logs_dir: &Path) -> Result<AnalyzeAllResult> {
if !validate_logs_dir_exists(mcpg_logs_dir).await? {
return Ok(AnalyzeAllResult::default());
}

let file_paths = read_log_file_paths(mcpg_logs_dir).await?;
let mut saw_recognizable_event = false;
let mut per_tool = BTreeMap::<(String, String), ToolAccumulator>::new();
let mut observed_servers = BTreeSet::<String>::new();
let mut server_error_counts = BTreeMap::<String, u64>::new();
let mut failures = Vec::new();
let mut acc = EventAccumulators::default();

for path in file_paths {
let contents = tokio::fs::read_to_string(&path)
.await
.with_context(|| format!("Failed to read MCP gateway log {}", path.display()))?;
process_log_contents(&mut acc, &contents);
}

for line in contents.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if !acc.saw_recognizable_event {
return Ok(AnalyzeAllResult::default());
}

let value: Value = match serde_json::from_str(trimmed) {
Ok(value) => value,
Err(_) => continue,
};
let tools = build_tool_summaries(&acc.per_tool);
let servers = build_server_stats(acc.observed_servers, &acc.per_tool, acc.server_error_counts);

let Some(event_kind) = extract_string_field(&value, &["event", "kind", "type"])
.map(|kind| kind.to_ascii_lowercase())
else {
continue;
};
Ok(AnalyzeAllResult {
tool_usage: Some(MCPToolUsageData { tools }),
server_health: Some(MCPServerHealth { servers }),
failures: acc.failures,
})
}

match event_kind.as_str() {
"tool_call" => {
saw_recognizable_event = true;
let server =
extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();
let tool = extract_string_field(&value, &["tool", "name"]);

if !server.is_empty() {
observed_servers.insert(server.clone());
}

if let Some(tool) = tool.filter(|tool| !tool.is_empty()) {
let entry = per_tool.entry((server, tool)).or_default();
update_tool_sizes(entry, &value);
entry.call_count += 1;
}
}
"tool_error" => {
saw_recognizable_event = true;
let server =
extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();
let tool = extract_string_field(&value, &["tool", "name"]);

if !server.is_empty() {
observed_servers.insert(server.clone());
}

if let Some(tool_name) = tool.clone().filter(|tool| !tool.is_empty()) {
let entry = per_tool.entry((server, tool_name)).or_default();
update_tool_sizes(entry, &value);
entry.error_count += 1;
}

failures.push(MCPFailureReport {
tool: tool.filter(|tool| !tool.is_empty()),
context: None,
reason: extract_stringish_field(&value, &["error"]),
timestamp: extract_string_field(
&value,
&["ts", "time", "timestamp", "@timestamp"],
),
extra: value,
});
}
"server_error" => {
saw_recognizable_event = true;
let server =
extract_string_field(&value, &["server", "mcp_server", "provider"])
.unwrap_or_default();

if !server.is_empty() {
observed_servers.insert(server.clone());
*server_error_counts.entry(server).or_default() += 1;
}

failures.push(MCPFailureReport {
tool: None,
context: None,
reason: extract_stringish_field(&value, &["error"]),
timestamp: extract_string_field(
&value,
&["ts", "time", "timestamp", "@timestamp"],
),
extra: value,
});
}
"server_start" | "server_stop" => {
saw_recognizable_event = true;
if let Some(server) =
extract_string_field(&value, &["server", "mcp_server", "provider"])
{
observed_servers.insert(server);
}
}
_ => {}
}
/// Returns `true` if the directory exists and is valid, `false` if not found.
async fn validate_logs_dir_exists(path: &Path) -> Result<bool> {
match tokio::fs::metadata(path).await {
Ok(metadata) => {
anyhow::ensure!(
metadata.is_dir(),
"MCPG logs path is not a directory: {}",
path.display()
);
Ok(true)
}
Err(error) if error.kind() == ErrorKind::NotFound => Ok(false),
Err(error) => Err(error).with_context(|| format!("Failed to stat {}", path.display())),
}
}

if !saw_recognizable_event {
return Ok(AnalyzeAllResult::default());
/// Parses every non-empty line of `contents` as JSON and dispatches to `acc`.
fn process_log_contents(acc: &mut EventAccumulators, contents: &str) {
for line in contents.lines() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}

let value: Value = match serde_json::from_str(trimmed) {
Ok(value) => value,
Err(_) => continue,
};

let Some(event_kind) = extract_string_field(&value, &["event", "kind", "type"])
.map(|kind| kind.to_ascii_lowercase())
else {
continue;
};

acc.process_event(&event_kind, value);
}
}

fn build_tool_summaries(
per_tool: &BTreeMap<(String, String), ToolAccumulator>,
) -> Vec<MCPToolSummary> {
let mut tools: Vec<MCPToolSummary> = per_tool
.iter()
.map(|((server, tool), stats)| MCPToolSummary {
Expand All @@ -192,7 +222,14 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result<AnalyzeAllResult> {
.cmp(&left.call_count)
.then_with(|| left.name.cmp(&right.name))
});
tools
}

fn build_server_stats(
observed_servers: BTreeSet<String>,
per_tool: &BTreeMap<(String, String), ToolAccumulator>,
server_error_counts: BTreeMap<String, u64>,
) -> Vec<MCPServerStats> {
let mut server_rollups = BTreeMap::<String, MCPServerStats>::new();
for server in observed_servers {
server_rollups.insert(
Expand All @@ -204,28 +241,28 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result<AnalyzeAllResult> {
);
}

for ((server, _tool), stats) in &per_tool {
for ((server, _tool), stats) in per_tool {
if server.is_empty() {
continue;
}
let server_entry = server_rollups
let entry = server_rollups
.entry(server.clone())
.or_insert_with(|| MCPServerStats {
name: server.clone(),
..MCPServerStats::default()
});
server_entry.total_calls += stats.call_count;
server_entry.error_count += stats.error_count;
entry.total_calls += stats.call_count;
entry.error_count += stats.error_count;
}

for (server, error_count) in server_error_counts {
let server_entry = server_rollups
let entry = server_rollups
.entry(server.clone())
.or_insert_with(|| MCPServerStats {
name: server,
..MCPServerStats::default()
});
server_entry.error_count += error_count;
entry.error_count += error_count;
}

let mut servers: Vec<MCPServerStats> = server_rollups
Expand All @@ -246,12 +283,7 @@ async fn analyze_all(mcpg_logs_dir: &Path) -> Result<AnalyzeAllResult> {
.cmp(&left.total_calls)
.then_with(|| left.name.cmp(&right.name))
});

Ok(AnalyzeAllResult {
tool_usage: Some(MCPToolUsageData { tools }),
server_health: Some(MCPServerHealth { servers }),
failures,
})
servers
}

async fn read_log_file_paths(dir: &Path) -> Result<Vec<PathBuf>> {
Expand Down