Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions integrations/shadow-ai/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Shadow AI Discovery

Detects unregistered agents making MCP tool calls not declared in a cMCP `catalog.json`, and maps findings to Agent Manifest records for remediation or quarantine.

## What it does

Shadow AI Discovery watches a cMCP audit log (JSONL) and compares every `tool_call` event against the set of tool names declared in a `catalog.json`. Any call from an agent not registered in the catalog, or using a tool not listed for that agent, is emitted as a `DiscoveryEvent`. Each event includes the agent ID, tool name, timestamp, and a suggested Agent Manifest `agent_id` field to help operators register or quarantine the offending agent.

## Integration points

| Stack component | How |
|---|---|
| **cMCP** | Reads audit log; compares tool names against `catalog.json` entries |
| **Agent Manifest** | Emits `agent_id` and tool schema fields ready for manifest registration |

## Install

```bash
pip install pyyaml
```

No other runtime dependencies. Designed to run as a sidecar or post-processor alongside the cMCP gateway.

## Usage

```python
from shadow_ai_discovery import ShadowAIScanner

scanner = ShadowAIScanner(catalog_path="catalog.json")

# Scan a cMCP audit log
events = scanner.scan_audit_log("cmcp-audit.jsonl")
for event in events:
print(event.agent_id, event.tool_name, event.reason)
```

## DiscoveryEvent fields

| Field | Type | Description |
|---|---|---|
| `agent_id` | `str` | Agent that made the call |
| `tool_name` | `str` | Tool name called |
| `timestamp` | `str` | ISO-8601 timestamp from the audit log |
| `reason` | `str` | `"unregistered_agent"` or `"undeclared_tool"` |
| `suggested_manifest_id` | `str` | Sanitized ID suitable for an Agent Manifest `agent_id` field |

## Running tests

```bash
python -m pytest tests/ -v
```

## License

Apache 2.0
17 changes: 17 additions & 0 deletions integrations/shadow-ai/integration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: Shadow AI Discovery
vendor: agentrust-io
integrates_with:
- cmcp
- agent-manifest
description: >-
Detects unregistered agents making MCP tool calls not declared in a cmcp
catalog.json, emitting DiscoveryEvents that map to Agent Manifest records
for remediation or quarantine.
maintainer:
github: imransiddique-opaque
repository: https://github.com/agentrust-io/integrations
license: Apache-2.0
tier: community
tested_against:
cmcp-runtime: "0.2.0"
agent-manifest: "1.0.0"
Empty file.
105 changes: 105 additions & 0 deletions integrations/shadow-ai/src/shadow_ai_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
"""
Shadow AI Discovery — detect unregistered agents and undeclared tool calls
against a cMCP catalog.json.
"""
from __future__ import annotations

import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any


@dataclass
class DiscoveryEvent:
agent_id: str
tool_name: str
timestamp: str
reason: str # "unregistered_agent" | "undeclared_tool"
suggested_manifest_id: str = field(init=False)

def __post_init__(self) -> None:
self.suggested_manifest_id = re.sub(r"[^a-z0-9-]", "-", self.agent_id.lower()).strip("-")

def to_dict(self) -> dict[str, str]:
return {
"agent_id": self.agent_id,
"tool_name": self.tool_name,
"timestamp": self.timestamp,
"reason": self.reason,
"suggested_manifest_id": self.suggested_manifest_id,
}


class ShadowAIScanner:
"""
Compares a cMCP audit log against a catalog.json and emits DiscoveryEvents
for any agent or tool call not declared in the catalog.
"""

def __init__(self, catalog_path: str | Path) -> None:
self._catalog: dict[str, set[str]] = {}
self._load_catalog(Path(catalog_path))

def _load_catalog(self, path: Path) -> None:
raw: Any = json.loads(path.read_text())
# catalog.json: {"agents": [{"id": "...", "tools": ["tool1", ...]}]}
# also accept flat {"agent-id": ["tool1", ...]} map
if isinstance(raw, dict) and "agents" in raw:
for entry in raw["agents"]:
self._catalog[entry["id"]] = set(entry.get("tools", []))
elif isinstance(raw, dict):
for agent_id, tools in raw.items():
self._catalog[agent_id] = set(tools)
else:
raise ValueError(f"Unrecognized catalog format in {path}")

def is_registered(self, agent_id: str) -> bool:
return agent_id in self._catalog

def is_tool_declared(self, agent_id: str, tool_name: str) -> bool:
if agent_id not in self._catalog:
return False
return tool_name in self._catalog[agent_id]

def scan_audit_log(self, log_path: str | Path) -> list[DiscoveryEvent]:
"""
Read a cMCP audit log (newline-delimited JSON) and return one
DiscoveryEvent per violation. Each line must be a JSON object with
at minimum: {"agent_id": "...", "tool_name": "...", "timestamp": "..."}.
Lines that are not tool_call events (no tool_name key) are skipped.
"""
events: list[DiscoveryEvent] = []
for line in Path(log_path).read_text().splitlines():
line = line.strip()
if not line:
continue
record: dict[str, Any] = json.loads(line)
tool_name = record.get("tool_name")
if not tool_name:
continue
agent_id: str = record.get("agent_id", "unknown")
timestamp: str = record.get("timestamp", "")

if not self.is_registered(agent_id):
events.append(DiscoveryEvent(agent_id, tool_name, timestamp, "unregistered_agent"))
elif not self.is_tool_declared(agent_id, tool_name):
events.append(DiscoveryEvent(agent_id, tool_name, timestamp, "undeclared_tool"))
return events

def scan_records(self, records: list[dict[str, Any]]) -> list[DiscoveryEvent]:
"""Scan an in-memory list of audit records (same schema as scan_audit_log)."""
events: list[DiscoveryEvent] = []
for record in records:
tool_name = record.get("tool_name")
if not tool_name:
continue
agent_id: str = record.get("agent_id", "unknown")
timestamp: str = record.get("timestamp", "")

if not self.is_registered(agent_id):
events.append(DiscoveryEvent(agent_id, tool_name, timestamp, "unregistered_agent"))
elif not self.is_tool_declared(agent_id, tool_name):
events.append(DiscoveryEvent(agent_id, tool_name, timestamp, "undeclared_tool"))
return events
Empty file.
145 changes: 145 additions & 0 deletions integrations/shadow-ai/tests/test_shadow_ai_discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
"""Tests for Shadow AI Discovery integration."""
import json
import sys
from pathlib import Path

import pytest

sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from shadow_ai_discovery import DiscoveryEvent, ShadowAIScanner


# ── fixtures ──────────────────────────────────────────────────────────────────

CATALOG_AGENTS = {
"agents": [
{"id": "billing-agent", "tools": ["get_invoice", "list_invoices"]},
{"id": "support-agent", "tools": ["open_ticket", "close_ticket"]},
]
}

CATALOG_FLAT = {
"billing-agent": ["get_invoice", "list_invoices"],
}


@pytest.fixture()
def catalog_file(tmp_path):
p = tmp_path / "catalog.json"
p.write_text(json.dumps(CATALOG_AGENTS))
return p


@pytest.fixture()
def flat_catalog_file(tmp_path):
p = tmp_path / "catalog.json"
p.write_text(json.dumps(CATALOG_FLAT))
return p


@pytest.fixture()
def scanner(catalog_file):
return ShadowAIScanner(catalog_file)


def make_log(tmp_path, records):
p = tmp_path / "audit.jsonl"
p.write_text("\n".join(json.dumps(r) for r in records))
return p


# ── 1. registered agent, declared tool → no event ────────────────────────────

def test_clean_call_produces_no_event(scanner):
records = [{"agent_id": "billing-agent", "tool_name": "get_invoice", "timestamp": "2026-06-25T10:00:00Z"}]
assert scanner.scan_records(records) == []


# ── 2. unregistered agent → reason = unregistered_agent ──────────────────────

def test_unregistered_agent_detected(scanner):
records = [{"agent_id": "rogue-agent", "tool_name": "delete_all", "timestamp": "2026-06-25T10:01:00Z"}]
events = scanner.scan_records(records)
assert len(events) == 1
assert events[0].reason == "unregistered_agent"
assert events[0].agent_id == "rogue-agent"


# ── 3. registered agent, undeclared tool → reason = undeclared_tool ───────────

def test_undeclared_tool_detected(scanner):
records = [{"agent_id": "billing-agent", "tool_name": "delete_all", "timestamp": "2026-06-25T10:02:00Z"}]
events = scanner.scan_records(records)
assert len(events) == 1
assert events[0].reason == "undeclared_tool"
assert events[0].tool_name == "delete_all"


# ── 4. suggested_manifest_id sanitises the agent_id ──────────────────────────

def test_suggested_manifest_id_sanitized(scanner):
records = [{"agent_id": "My Agent/v2 (prod)", "tool_name": "x", "timestamp": "t"}]
events = scanner.scan_records(records)
assert events[0].suggested_manifest_id == "my-agent-v2--prod"
# must only contain lowercase alphanum and hyphens, no leading/trailing dash
import re
assert re.match(r"^[a-z0-9][a-z0-9-]*[a-z0-9]$", events[0].suggested_manifest_id)


# ── 5. records without tool_name are skipped ──────────────────────────────────

def test_non_tool_call_records_skipped(scanner):
records = [
{"agent_id": "rogue-agent", "event": "session_start", "timestamp": "t"},
{"agent_id": "rogue-agent", "tool_name": "bad_tool", "timestamp": "t"},
]
events = scanner.scan_records(records)
assert len(events) == 1 # only the tool_call record


# ── 6. scan_audit_log reads a JSONL file ─────────────────────────────────────

def test_scan_audit_log_file(scanner, tmp_path):
records = [
{"agent_id": "rogue-agent", "tool_name": "exfil", "timestamp": "2026-06-25T10:05:00Z"},
{"agent_id": "billing-agent", "tool_name": "get_invoice", "timestamp": "2026-06-25T10:06:00Z"},
]
log = make_log(tmp_path, records)
events = scanner.scan_audit_log(log)
assert len(events) == 1
assert events[0].agent_id == "rogue-agent"


# ── 7. flat catalog format is parsed correctly ────────────────────────────────

def test_flat_catalog_format(flat_catalog_file):
scanner = ShadowAIScanner(flat_catalog_file)
assert scanner.is_registered("billing-agent")
assert scanner.is_tool_declared("billing-agent", "get_invoice")
assert not scanner.is_tool_declared("billing-agent", "delete_all")


# ── 8. multiple violations in one log ────────────────────────────────────────

def test_multiple_violations(scanner):
records = [
{"agent_id": "rogue-1", "tool_name": "tool_a", "timestamp": "t1"},
{"agent_id": "rogue-2", "tool_name": "tool_b", "timestamp": "t2"},
{"agent_id": "billing-agent", "tool_name": "hack", "timestamp": "t3"},
]
events = scanner.scan_records(records)
assert len(events) == 3
reasons = {e.reason for e in events}
assert "unregistered_agent" in reasons
assert "undeclared_tool" in reasons


# ── 9. to_dict returns all required fields ────────────────────────────────────

def test_discovery_event_to_dict_fields():
e = DiscoveryEvent("billing-agent", "bad_tool", "2026-06-25T00:00:00Z", "undeclared_tool")
d = e.to_dict()
for key in ("agent_id", "tool_name", "timestamp", "reason", "suggested_manifest_id"):
assert key in d, f"Missing key: {key}"
assert d["agent_id"] == "billing-agent"
assert d["suggested_manifest_id"] == "billing-agent"
Loading