From 5309b5c78f8b5852935afc3b9f3a8c0f8f767af6 Mon Sep 17 00:00:00 2001 From: Imran Siddique Date: Thu, 25 Jun 2026 13:46:47 -0700 Subject: [PATCH] feat(shadow-ai): add Shadow AI Discovery integration Detects unregistered agents making MCP tool calls not declared in a cMCP catalog.json, emitting DiscoveryEvents with suggested Agent Manifest IDs for operator remediation or quarantine. Closes #9 Co-Authored-By: Claude Sonnet 4.6 --- integrations/shadow-ai/README.md | 55 +++++++ integrations/shadow-ai/integration.yaml | 17 ++ integrations/shadow-ai/src/__init__.py | 0 .../shadow-ai/src/shadow_ai_discovery.py | 105 +++++++++++++ integrations/shadow-ai/tests/__init__.py | 0 .../tests/test_shadow_ai_discovery.py | 145 ++++++++++++++++++ 6 files changed, 322 insertions(+) create mode 100644 integrations/shadow-ai/README.md create mode 100644 integrations/shadow-ai/integration.yaml create mode 100644 integrations/shadow-ai/src/__init__.py create mode 100644 integrations/shadow-ai/src/shadow_ai_discovery.py create mode 100644 integrations/shadow-ai/tests/__init__.py create mode 100644 integrations/shadow-ai/tests/test_shadow_ai_discovery.py diff --git a/integrations/shadow-ai/README.md b/integrations/shadow-ai/README.md new file mode 100644 index 0000000..1a4f932 --- /dev/null +++ b/integrations/shadow-ai/README.md @@ -0,0 +1,55 @@ +# Shadow AI Discovery + +Detects unregistered agents making MCP tool calls not declared in a cMCP `catalog.json`, and maps findings to Agent Manifest records for remediation or quarantine. + +## What it does + +Shadow AI Discovery watches a cMCP audit log (JSONL) and compares every `tool_call` event against the set of tool names declared in a `catalog.json`. Any call from an agent not registered in the catalog, or using a tool not listed for that agent, is emitted as a `DiscoveryEvent`. Each event includes the agent ID, tool name, timestamp, and a suggested Agent Manifest `agent_id` field to help operators register or quarantine the offending agent. + +## Integration points + +| Stack component | How | +|---|---| +| **cMCP** | Reads audit log; compares tool names against `catalog.json` entries | +| **Agent Manifest** | Emits `agent_id` and tool schema fields ready for manifest registration | + +## Install + +```bash +pip install pyyaml +``` + +No other runtime dependencies. Designed to run as a sidecar or post-processor alongside the cMCP gateway. + +## Usage + +```python +from shadow_ai_discovery import ShadowAIScanner + +scanner = ShadowAIScanner(catalog_path="catalog.json") + +# Scan a cMCP audit log +events = scanner.scan_audit_log("cmcp-audit.jsonl") +for event in events: + print(event.agent_id, event.tool_name, event.reason) +``` + +## DiscoveryEvent fields + +| Field | Type | Description | +|---|---|---| +| `agent_id` | `str` | Agent that made the call | +| `tool_name` | `str` | Tool name called | +| `timestamp` | `str` | ISO-8601 timestamp from the audit log | +| `reason` | `str` | `"unregistered_agent"` or `"undeclared_tool"` | +| `suggested_manifest_id` | `str` | Sanitized ID suitable for an Agent Manifest `agent_id` field | + +## Running tests + +```bash +python -m pytest tests/ -v +``` + +## License + +Apache 2.0 diff --git a/integrations/shadow-ai/integration.yaml b/integrations/shadow-ai/integration.yaml new file mode 100644 index 0000000..66b21e1 --- /dev/null +++ b/integrations/shadow-ai/integration.yaml @@ -0,0 +1,17 @@ +name: Shadow AI Discovery +vendor: agentrust-io +integrates_with: + - cmcp + - agent-manifest +description: >- + Detects unregistered agents making MCP tool calls not declared in a cmcp + catalog.json, emitting DiscoveryEvents that map to Agent Manifest records + for remediation or quarantine. +maintainer: + github: imransiddique-opaque +repository: https://github.com/agentrust-io/integrations +license: Apache-2.0 +tier: community +tested_against: + cmcp-runtime: "0.2.0" + agent-manifest: "1.0.0" diff --git a/integrations/shadow-ai/src/__init__.py b/integrations/shadow-ai/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/integrations/shadow-ai/src/shadow_ai_discovery.py b/integrations/shadow-ai/src/shadow_ai_discovery.py new file mode 100644 index 0000000..b79c9f4 --- /dev/null +++ b/integrations/shadow-ai/src/shadow_ai_discovery.py @@ -0,0 +1,105 @@ +""" +Shadow AI Discovery — detect unregistered agents and undeclared tool calls +against a cMCP catalog.json. +""" +from __future__ import annotations + +import json +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any + + +@dataclass +class DiscoveryEvent: + agent_id: str + tool_name: str + timestamp: str + reason: str # "unregistered_agent" | "undeclared_tool" + suggested_manifest_id: str = field(init=False) + + def __post_init__(self) -> None: + self.suggested_manifest_id = re.sub(r"[^a-z0-9-]", "-", self.agent_id.lower()).strip("-") + + def to_dict(self) -> dict[str, str]: + return { + "agent_id": self.agent_id, + "tool_name": self.tool_name, + "timestamp": self.timestamp, + "reason": self.reason, + "suggested_manifest_id": self.suggested_manifest_id, + } + + +class ShadowAIScanner: + """ + Compares a cMCP audit log against a catalog.json and emits DiscoveryEvents + for any agent or tool call not declared in the catalog. + """ + + def __init__(self, catalog_path: str | Path) -> None: + self._catalog: dict[str, set[str]] = {} + self._load_catalog(Path(catalog_path)) + + def _load_catalog(self, path: Path) -> None: + raw: Any = json.loads(path.read_text()) + # catalog.json: {"agents": [{"id": "...", "tools": ["tool1", ...]}]} + # also accept flat {"agent-id": ["tool1", ...]} map + if isinstance(raw, dict) and "agents" in raw: + for entry in raw["agents"]: + self._catalog[entry["id"]] = set(entry.get("tools", [])) + elif isinstance(raw, dict): + for agent_id, tools in raw.items(): + self._catalog[agent_id] = set(tools) + else: + raise ValueError(f"Unrecognized catalog format in {path}") + + def is_registered(self, agent_id: str) -> bool: + return agent_id in self._catalog + + def is_tool_declared(self, agent_id: str, tool_name: str) -> bool: + if agent_id not in self._catalog: + return False + return tool_name in self._catalog[agent_id] + + def scan_audit_log(self, log_path: str | Path) -> list[DiscoveryEvent]: + """ + Read a cMCP audit log (newline-delimited JSON) and return one + DiscoveryEvent per violation. Each line must be a JSON object with + at minimum: {"agent_id": "...", "tool_name": "...", "timestamp": "..."}. + Lines that are not tool_call events (no tool_name key) are skipped. + """ + events: list[DiscoveryEvent] = [] + for line in Path(log_path).read_text().splitlines(): + line = line.strip() + if not line: + continue + record: dict[str, Any] = json.loads(line) + tool_name = record.get("tool_name") + if not tool_name: + continue + agent_id: str = record.get("agent_id", "unknown") + timestamp: str = record.get("timestamp", "") + + if not self.is_registered(agent_id): + events.append(DiscoveryEvent(agent_id, tool_name, timestamp, "unregistered_agent")) + elif not self.is_tool_declared(agent_id, tool_name): + events.append(DiscoveryEvent(agent_id, tool_name, timestamp, "undeclared_tool")) + return events + + def scan_records(self, records: list[dict[str, Any]]) -> list[DiscoveryEvent]: + """Scan an in-memory list of audit records (same schema as scan_audit_log).""" + events: list[DiscoveryEvent] = [] + for record in records: + tool_name = record.get("tool_name") + if not tool_name: + continue + agent_id: str = record.get("agent_id", "unknown") + timestamp: str = record.get("timestamp", "") + + if not self.is_registered(agent_id): + events.append(DiscoveryEvent(agent_id, tool_name, timestamp, "unregistered_agent")) + elif not self.is_tool_declared(agent_id, tool_name): + events.append(DiscoveryEvent(agent_id, tool_name, timestamp, "undeclared_tool")) + return events diff --git a/integrations/shadow-ai/tests/__init__.py b/integrations/shadow-ai/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/integrations/shadow-ai/tests/test_shadow_ai_discovery.py b/integrations/shadow-ai/tests/test_shadow_ai_discovery.py new file mode 100644 index 0000000..63acf22 --- /dev/null +++ b/integrations/shadow-ai/tests/test_shadow_ai_discovery.py @@ -0,0 +1,145 @@ +"""Tests for Shadow AI Discovery integration.""" +import json +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) +from shadow_ai_discovery import DiscoveryEvent, ShadowAIScanner + + +# ── fixtures ────────────────────────────────────────────────────────────────── + +CATALOG_AGENTS = { + "agents": [ + {"id": "billing-agent", "tools": ["get_invoice", "list_invoices"]}, + {"id": "support-agent", "tools": ["open_ticket", "close_ticket"]}, + ] +} + +CATALOG_FLAT = { + "billing-agent": ["get_invoice", "list_invoices"], +} + + +@pytest.fixture() +def catalog_file(tmp_path): + p = tmp_path / "catalog.json" + p.write_text(json.dumps(CATALOG_AGENTS)) + return p + + +@pytest.fixture() +def flat_catalog_file(tmp_path): + p = tmp_path / "catalog.json" + p.write_text(json.dumps(CATALOG_FLAT)) + return p + + +@pytest.fixture() +def scanner(catalog_file): + return ShadowAIScanner(catalog_file) + + +def make_log(tmp_path, records): + p = tmp_path / "audit.jsonl" + p.write_text("\n".join(json.dumps(r) for r in records)) + return p + + +# ── 1. registered agent, declared tool → no event ──────────────────────────── + +def test_clean_call_produces_no_event(scanner): + records = [{"agent_id": "billing-agent", "tool_name": "get_invoice", "timestamp": "2026-06-25T10:00:00Z"}] + assert scanner.scan_records(records) == [] + + +# ── 2. unregistered agent → reason = unregistered_agent ────────────────────── + +def test_unregistered_agent_detected(scanner): + records = [{"agent_id": "rogue-agent", "tool_name": "delete_all", "timestamp": "2026-06-25T10:01:00Z"}] + events = scanner.scan_records(records) + assert len(events) == 1 + assert events[0].reason == "unregistered_agent" + assert events[0].agent_id == "rogue-agent" + + +# ── 3. registered agent, undeclared tool → reason = undeclared_tool ─────────── + +def test_undeclared_tool_detected(scanner): + records = [{"agent_id": "billing-agent", "tool_name": "delete_all", "timestamp": "2026-06-25T10:02:00Z"}] + events = scanner.scan_records(records) + assert len(events) == 1 + assert events[0].reason == "undeclared_tool" + assert events[0].tool_name == "delete_all" + + +# ── 4. suggested_manifest_id sanitises the agent_id ────────────────────────── + +def test_suggested_manifest_id_sanitized(scanner): + records = [{"agent_id": "My Agent/v2 (prod)", "tool_name": "x", "timestamp": "t"}] + events = scanner.scan_records(records) + assert events[0].suggested_manifest_id == "my-agent-v2--prod" + # must only contain lowercase alphanum and hyphens, no leading/trailing dash + import re + assert re.match(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$", events[0].suggested_manifest_id) + + +# ── 5. records without tool_name are skipped ────────────────────────────────── + +def test_non_tool_call_records_skipped(scanner): + records = [ + {"agent_id": "rogue-agent", "event": "session_start", "timestamp": "t"}, + {"agent_id": "rogue-agent", "tool_name": "bad_tool", "timestamp": "t"}, + ] + events = scanner.scan_records(records) + assert len(events) == 1 # only the tool_call record + + +# ── 6. scan_audit_log reads a JSONL file ───────────────────────────────────── + +def test_scan_audit_log_file(scanner, tmp_path): + records = [ + {"agent_id": "rogue-agent", "tool_name": "exfil", "timestamp": "2026-06-25T10:05:00Z"}, + {"agent_id": "billing-agent", "tool_name": "get_invoice", "timestamp": "2026-06-25T10:06:00Z"}, + ] + log = make_log(tmp_path, records) + events = scanner.scan_audit_log(log) + assert len(events) == 1 + assert events[0].agent_id == "rogue-agent" + + +# ── 7. flat catalog format is parsed correctly ──────────────────────────────── + +def test_flat_catalog_format(flat_catalog_file): + scanner = ShadowAIScanner(flat_catalog_file) + assert scanner.is_registered("billing-agent") + assert scanner.is_tool_declared("billing-agent", "get_invoice") + assert not scanner.is_tool_declared("billing-agent", "delete_all") + + +# ── 8. multiple violations in one log ──────────────────────────────────────── + +def test_multiple_violations(scanner): + records = [ + {"agent_id": "rogue-1", "tool_name": "tool_a", "timestamp": "t1"}, + {"agent_id": "rogue-2", "tool_name": "tool_b", "timestamp": "t2"}, + {"agent_id": "billing-agent", "tool_name": "hack", "timestamp": "t3"}, + ] + events = scanner.scan_records(records) + assert len(events) == 3 + reasons = {e.reason for e in events} + assert "unregistered_agent" in reasons + assert "undeclared_tool" in reasons + + +# ── 9. to_dict returns all required fields ──────────────────────────────────── + +def test_discovery_event_to_dict_fields(): + e = DiscoveryEvent("billing-agent", "bad_tool", "2026-06-25T00:00:00Z", "undeclared_tool") + d = e.to_dict() + for key in ("agent_id", "tool_name", "timestamp", "reason", "suggested_manifest_id"): + assert key in d, f"Missing key: {key}" + assert d["agent_id"] == "billing-agent" + assert d["suggested_manifest_id"] == "billing-agent"