From 438762ab710b8e414ed89e77d9402be9e374b356 Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Wed, 25 Mar 2026 06:15:33 +0000 Subject: [PATCH 1/5] Add permission audit trail (closes #7) - audit.py: checks every tool_call against .agent-scope.json policy (file read/write glob patterns, command allow/deny list, network deny_all with allow exceptions via fnmatch) - Auto-flags sensitive files (.env, *.pem, .ssh/*, .github/workflows/*) even without a policy file - Exits with code 1 when violations found (CI-friendly) - 26 new tests (198 total passing) Co-authored-by: Ona --- src/agent_trace/audit.py | 374 +++++++++++++++++++++++++++++++++++++++ src/agent_trace/cli.py | 10 ++ tests/test_audit.py | 275 ++++++++++++++++++++++++++++ 3 files changed, 659 insertions(+) create mode 100644 src/agent_trace/audit.py create mode 100644 tests/test_audit.py diff --git a/src/agent_trace/audit.py b/src/agent_trace/audit.py new file mode 100644 index 0000000..80be937 --- /dev/null +++ b/src/agent_trace/audit.py @@ -0,0 +1,374 @@ +"""Permission audit trail for agent sessions. + +Checks every tool_call event against a policy file (.agent-scope.json) +and auto-flags sensitive file access even without a policy. +""" + +from __future__ import annotations + +import argparse +import fnmatch +import json +import re +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Literal, TextIO + +from .models import EventType, TraceEvent +from .store import TraceStore + + +# --------------------------------------------------------------------------- +# Sensitive file patterns (auto-flagged regardless of policy) +# --------------------------------------------------------------------------- + +SENSITIVE_PATTERNS: list[str] = [ + ".env", ".env.*", "*.env", + "config/secrets*", "secrets.*", "*secret*", + "*.pem", "*.key", "*.p12", "*.pfx", + ".ssh/*", "id_rsa", "id_ed25519", + ".aws/credentials", ".aws/config", + ".netrc", ".npmrc", ".pypirc", + ".github/workflows/*", + "*.token", "*.password", +] + +Verdict = Literal["allowed", "denied", "no_policy"] + + +# --------------------------------------------------------------------------- +# Policy loading +# --------------------------------------------------------------------------- + +@dataclass +class Policy: + file_read_allow: list[str] = field(default_factory=list) + file_read_deny: list[str] = field(default_factory=list) + file_write_allow: list[str] = field(default_factory=list) + file_write_deny: list[str] = field(default_factory=list) + cmd_allow: list[str] = field(default_factory=list) + cmd_deny: list[str] = field(default_factory=list) + network_deny_all: bool = False + network_allow: list[str] = field(default_factory=list) + + @classmethod + def from_dict(cls, d: dict) -> Policy: + files = d.get("files", {}) + read = files.get("read", {}) + write = files.get("write", {}) + cmds = d.get("commands", {}) + net = d.get("network", {}) + return cls( + file_read_allow=read.get("allow", []), + file_read_deny=read.get("deny", []), + file_write_allow=write.get("allow", []), + file_write_deny=write.get("deny", []), + cmd_allow=cmds.get("allow", []), + cmd_deny=cmds.get("deny", []), + network_deny_all=net.get("deny_all", False), + network_allow=net.get("allow", []), + ) + + @classmethod + def load(cls, path: str | Path) -> Policy | None: + p = Path(path) + if not p.exists(): + return None + try: + return cls.from_dict(json.loads(p.read_text())) + except (json.JSONDecodeError, OSError): + return None + + +# --------------------------------------------------------------------------- +# Audit result types +# --------------------------------------------------------------------------- + +@dataclass +class AuditEntry: + event: TraceEvent + event_index: int # 1-based + action: str # human-readable description + verdict: Verdict + reason: str # why this verdict was reached + sensitive: bool = False # auto-flagged as sensitive + + +@dataclass +class AuditReport: + session_id: str + total_events: int + total_tool_calls: int + entries: list[AuditEntry] + policy_loaded: bool + + @property + def allowed(self) -> list[AuditEntry]: + return [e for e in self.entries if e.verdict == "allowed"] + + @property + def denied(self) -> list[AuditEntry]: + return [e for e in self.entries if e.verdict == "denied"] + + @property + def no_policy(self) -> list[AuditEntry]: + return [e for e in self.entries if e.verdict == "no_policy"] + + @property + def sensitive_accesses(self) -> list[AuditEntry]: + return [e for e in self.entries if e.sensitive] + + +# --------------------------------------------------------------------------- +# Matching helpers +# --------------------------------------------------------------------------- + +def _glob_match(path: str, patterns: list[str]) -> bool: + name = Path(path).name + for pat in patterns: + if fnmatch.fnmatch(path, pat) or fnmatch.fnmatch(name, pat): + return True + return False + + +def _is_sensitive(path: str) -> bool: + return _glob_match(path, SENSITIVE_PATTERNS) + + +def _cmd_matches(cmd: str, patterns: list[str]) -> bool: + cmd_lower = cmd.lower().strip() + for pat in patterns: + pat_lower = pat.lower().strip() + if cmd_lower == pat_lower: + return True + if cmd_lower.startswith(pat_lower): + return True + if fnmatch.fnmatch(cmd_lower, pat_lower): + return True + return False + + +_URL_RE = re.compile(r"https?://([^/\s]+)") + + +def _extract_urls(text: str) -> list[str]: + # Strip port from host (e.g. "localhost:8080" → "localhost") + return [host.split(":")[0] for host in _URL_RE.findall(text)] + + +def _url_allowed(host: str, policy: Policy) -> bool: + if not policy.network_deny_all: + return True + return any( + fnmatch.fnmatch(host, allowed) or host == allowed + for allowed in policy.network_allow + ) + + +# --------------------------------------------------------------------------- +# Per-event audit logic +# --------------------------------------------------------------------------- + +def _audit_event( + event: TraceEvent, + index: int, + policy: Policy | None, +) -> list[AuditEntry]: + """Return zero or more AuditEntry objects for a single tool_call event.""" + entries: list[AuditEntry] = [] + data = event.data + tool_name = data.get("tool_name", "").lower() + args = data.get("arguments", {}) or {} + + # --- File read --- + if tool_name in ("read", "view"): + path = str(args.get("file_path", args.get("path", ""))) + if path: + sensitive = _is_sensitive(path) + if policy and (policy.file_read_allow or policy.file_read_deny): + if _glob_match(path, policy.file_read_deny): + verdict, reason = "denied", "denied by files.read.deny" + elif policy.file_read_allow and not _glob_match(path, policy.file_read_allow): + verdict, reason = "denied", "not in files.read.allow" + else: + verdict, reason = "allowed", "matches files.read.allow" + else: + verdict, reason = "no_policy", "no file read policy" + entries.append(AuditEntry( + event=event, event_index=index, + action=f"Read {path}", + verdict=verdict, reason=reason, sensitive=sensitive, + )) + + # --- File write / edit --- + elif tool_name in ("write", "edit", "create"): + path = str(args.get("file_path", args.get("path", ""))) + if path: + sensitive = _is_sensitive(path) + if policy and (policy.file_write_allow or policy.file_write_deny): + if _glob_match(path, policy.file_write_deny): + verdict, reason = "denied", "denied by files.write.deny" + elif policy.file_write_allow and not _glob_match(path, policy.file_write_allow): + verdict, reason = "denied", "not in files.write.allow" + else: + verdict, reason = "allowed", "matches files.write.allow" + else: + verdict, reason = "no_policy", "no file write policy" + entries.append(AuditEntry( + event=event, event_index=index, + action=f"Write {path}", + verdict=verdict, reason=reason, sensitive=sensitive, + )) + + # --- Bash / command execution --- + elif tool_name == "bash": + cmd = str(args.get("command", "")).strip() + if cmd: + # Network access check: scan command for URLs + urls = _extract_urls(cmd) + for url_host in urls: + if policy: + net_ok = _url_allowed(url_host, policy) + net_verdict: Verdict = "allowed" if net_ok else "denied" + net_reason = ( + "allowed by network.allow" + if net_ok + else "denied by network.deny_all" + ) + else: + net_verdict = "no_policy" + net_reason = "no network policy" + entries.append(AuditEntry( + event=event, event_index=index, + action=f"Network access {url_host}", + verdict=net_verdict, reason=net_reason, + )) + + # Command policy check + if policy and (policy.cmd_allow or policy.cmd_deny): + if _cmd_matches(cmd, policy.cmd_deny): + verdict, reason = "denied", "denied by commands.deny" + elif policy.cmd_allow and not _cmd_matches(cmd, policy.cmd_allow): + verdict, reason = "denied", "not in commands.allow" + else: + verdict, reason = "allowed", "matches commands.allow" + else: + verdict, reason = "no_policy", "no command policy" + + entries.append(AuditEntry( + event=event, event_index=index, + action=f"Ran: {cmd[:80]}{'...' if len(cmd) > 80 else ''}", + verdict=verdict, reason=reason, + )) + + # --- Generic tool call (mcp tools, agent, etc.) --- + else: + entries.append(AuditEntry( + event=event, event_index=index, + action=f"Tool: {data.get('tool_name', '?')}", + verdict="no_policy", + reason="no policy for this tool type", + )) + + return entries + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def audit_session( + store: TraceStore, + session_id: str, + policy_path: str | Path = ".agent-scope.json", +) -> AuditReport: + """Audit all tool_call events in *session_id* against *policy_path*.""" + events = store.load_events(session_id) + policy = Policy.load(policy_path) + + entries: list[AuditEntry] = [] + tool_call_count = 0 + + for i, event in enumerate(events): + if event.event_type != EventType.TOOL_CALL: + continue + tool_call_count += 1 + entries.extend(_audit_event(event, i + 1, policy)) + + return AuditReport( + session_id=session_id, + total_events=len(events), + total_tool_calls=tool_call_count, + entries=entries, + policy_loaded=policy is not None, + ) + + +# --------------------------------------------------------------------------- +# Formatting +# --------------------------------------------------------------------------- + +def format_audit(report: AuditReport, out: TextIO = sys.stdout) -> None: + w = out.write + policy_note = "" if report.policy_loaded else " (no policy file)" + w(f"\nAUDIT: Session {report.session_id[:12]}" + f" ({report.total_events} events, {report.total_tool_calls} tool calls)" + f"{policy_note}\n\n") + + if report.allowed: + w(f"✅ Allowed ({len(report.allowed)}):\n") + for e in report.allowed[:20]: + w(f" {e.action}\n") + if len(report.allowed) > 20: + w(f" ... and {len(report.allowed) - 20} more\n") + w("\n") + + if report.no_policy: + w(f"⚠️ No policy ({len(report.no_policy)}):\n") + for e in report.no_policy[:20]: + w(f" {e.action} ({e.reason})\n") + if len(report.no_policy) > 20: + w(f" ... and {len(report.no_policy) - 20} more\n") + w("\n") + + if report.denied: + w(f"❌ Violations ({len(report.denied)}):\n") + for e in report.denied: + w(f" {e.action} ← {e.reason}\n") + w("\n") + + if report.sensitive_accesses: + w(f"🔐 Sensitive files accessed ({len(report.sensitive_accesses)}):\n") + for e in report.sensitive_accesses: + w(f" {e.action} (event #{e.event_index})\n") + w("\n") + + if not report.denied and not report.sensitive_accesses: + w("No violations found.\n\n") + + +# --------------------------------------------------------------------------- +# CLI handler +# --------------------------------------------------------------------------- + +def cmd_audit(args: argparse.Namespace) -> int: + store = TraceStore(args.trace_dir) + + session_id = args.session_id + if not session_id: + session_id = store.get_latest_session_id() + if not session_id: + sys.stderr.write("No sessions found.\n") + return 1 + full_id = store.find_session(session_id) + if not full_id: + sys.stderr.write(f"Session not found: {session_id}\n") + return 1 + + policy_path = getattr(args, "policy", ".agent-scope.json") + report = audit_session(store, full_id, policy_path=policy_path) + format_audit(report) + + # Exit 1 if there are violations so CI can catch them + return 1 if report.denied else 0 diff --git a/src/agent_trace/cli.py b/src/agent_trace/cli.py index bf1c492..2e2c4a5 100644 --- a/src/agent_trace/cli.py +++ b/src/agent_trace/cli.py @@ -22,9 +22,12 @@ from . import __version__ from .hooks import hook_main from .http_proxy import HTTPProxyServer +from .audit import cmd_audit from .cost import cmd_cost +from .diff import cmd_diff from .explain import cmd_explain from .jsonl_import import cmd_import +from .why import cmd_why from .models import EventType, SessionMeta, TraceEvent from .proxy import MCPProxy from .replay import format_event, format_summary, list_sessions, replay_session @@ -456,6 +459,12 @@ def build_parser() -> argparse.ArgumentParser: p_cost.add_argument("--output-price", type=float, dest="output_price", help="custom output price per 1M tokens (overrides --model)") + # audit + p_audit = sub.add_parser("audit", help="check session tool calls against a policy file") + p_audit.add_argument("session_id", nargs="?", help="session ID or prefix (default: latest)") + p_audit.add_argument("--policy", default=".agent-scope.json", + help="path to policy file (default: .agent-scope.json)") + return parser @@ -487,6 +496,7 @@ def main() -> None: "import": cmd_import, "explain": cmd_explain, "cost": cmd_cost, + "audit": cmd_audit, } handler = handlers.get(args.command) diff --git a/tests/test_audit.py b/tests/test_audit.py new file mode 100644 index 0000000..6a8e98a --- /dev/null +++ b/tests/test_audit.py @@ -0,0 +1,275 @@ +"""Tests for permission audit trail.""" + +import io +import json +import tempfile +import unittest +from pathlib import Path + +from agent_trace.audit import ( + AuditReport, + Policy, + _cmd_matches, + _glob_match, + _is_sensitive, + audit_session, + format_audit, +) +from agent_trace.models import EventType, SessionMeta, TraceEvent +from agent_trace.store import TraceStore + + +def _make_event(event_type: EventType, ts: float, session_id: str, **data) -> TraceEvent: + return TraceEvent(event_type=event_type, timestamp=ts, session_id=session_id, data=data) + + +def _make_store(events: list[TraceEvent], session_id: str = "sess1") -> tuple[TraceStore, tempfile.TemporaryDirectory]: + tmp = tempfile.TemporaryDirectory() + store = TraceStore(tmp.name) + meta = SessionMeta(session_id=session_id, started_at=0.0, + total_duration_ms=5000) + store.create_session(meta) + for e in events: + store.append_event(session_id, e) + store.update_meta(meta) + return store, tmp + + +def _write_policy(d: dict, tmp_dir: str) -> str: + path = str(Path(tmp_dir) / ".agent-scope.json") + Path(path).write_text(json.dumps(d)) + return path + + +class TestGlobMatch(unittest.TestCase): + def test_exact_match(self): + self.assertTrue(_glob_match(".env", [".env"])) + + def test_glob_pattern(self): + self.assertTrue(_glob_match("src/auth.py", ["src/**"])) + + def test_no_match(self): + self.assertFalse(_glob_match("README.md", ["src/**", "tests/**"])) + + def test_filename_match(self): + self.assertTrue(_glob_match("config/.env", [".env"])) + + +class TestIsSensitive(unittest.TestCase): + def test_dotenv(self): + self.assertTrue(_is_sensitive(".env")) + + def test_dotenv_local(self): + self.assertTrue(_is_sensitive(".env.local")) + + def test_pem_file(self): + self.assertTrue(_is_sensitive("certs/server.pem")) + + def test_github_workflow(self): + self.assertTrue(_is_sensitive(".github/workflows/deploy.yml")) + + def test_normal_file(self): + self.assertFalse(_is_sensitive("src/auth.py")) + + def test_readme(self): + self.assertFalse(_is_sensitive("README.md")) + + +class TestCmdMatches(unittest.TestCase): + def test_exact(self): + self.assertTrue(_cmd_matches("pytest", ["pytest"])) + + def test_prefix(self): + self.assertTrue(_cmd_matches("uv run pytest", ["uv run"])) + + def test_no_match(self): + self.assertFalse(_cmd_matches("curl https://example.com", ["pytest", "cat"])) + + def test_case_insensitive(self): + self.assertTrue(_cmd_matches("PYTEST", ["pytest"])) + + +class TestNoPolicyFile(unittest.TestCase): + def test_all_entries_no_policy(self): + events = [ + _make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Read", arguments={"file_path": "src/auth.py"}), + _make_event(EventType.TOOL_CALL, 2.0, "sess1", + tool_name="Bash", arguments={"command": "pytest"}), + ] + store, tmp = _make_store(events) + report = audit_session(store, "sess1", policy_path="/nonexistent/.agent-scope.json") + self.assertFalse(report.policy_loaded) + self.assertEqual(len(report.denied), 0) + self.assertTrue(all(e.verdict == "no_policy" for e in report.entries)) + tmp.cleanup() + + +class TestFileReadPolicy(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + self.policy_path = _write_policy({ + "files": { + "read": {"allow": ["src/**", "tests/**"], "deny": [".env"]} + } + }, self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def test_allowed_read(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Read", arguments={"file_path": "src/auth.py"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + self.assertEqual(report.entries[0].verdict, "allowed") + tmp2.cleanup() + + def test_denied_read(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Read", arguments={"file_path": ".env"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + self.assertEqual(report.entries[0].verdict, "denied") + tmp2.cleanup() + + def test_not_in_allow_list(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Read", arguments={"file_path": "README.md"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + self.assertEqual(report.entries[0].verdict, "denied") + tmp2.cleanup() + + +class TestCommandPolicy(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + self.policy_path = _write_policy({ + "commands": { + "allow": ["pytest", "uv run", "cat"], + "deny": ["rm -rf", "curl", "wget"] + } + }, self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def test_allowed_command(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Bash", arguments={"command": "pytest tests/"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + cmd_entries = [e for e in report.entries if e.action.startswith("Ran:")] + self.assertEqual(cmd_entries[0].verdict, "allowed") + tmp2.cleanup() + + def test_denied_command(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Bash", arguments={"command": "curl https://evil.com"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + cmd_entries = [e for e in report.entries if e.action.startswith("Ran:")] + self.assertEqual(cmd_entries[0].verdict, "denied") + tmp2.cleanup() + + +class TestSensitiveFileDetection(unittest.TestCase): + def test_sensitive_flagged_without_policy(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Read", arguments={"file_path": ".env"})] + store, tmp = _make_store(events) + report = audit_session(store, "sess1", policy_path="/nonexistent") + self.assertTrue(report.entries[0].sensitive) + tmp.cleanup() + + def test_normal_file_not_sensitive(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Read", arguments={"file_path": "src/auth.py"})] + store, tmp = _make_store(events) + report = audit_session(store, "sess1", policy_path="/nonexistent") + self.assertFalse(report.entries[0].sensitive) + tmp.cleanup() + + +class TestNetworkAccessCheck(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + self.policy_path = _write_policy({ + "network": {"deny_all": True, "allow": ["localhost", "127.0.0.1"]} + }, self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def test_denied_external_url(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Bash", + arguments={"command": "curl https://example.com/data"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + net_entries = [e for e in report.entries if "Network access" in e.action] + self.assertTrue(len(net_entries) > 0) + self.assertEqual(net_entries[0].verdict, "denied") + tmp2.cleanup() + + def test_allowed_localhost(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Bash", + arguments={"command": "curl http://localhost:8080/health"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + net_entries = [e for e in report.entries if "Network access" in e.action] + self.assertTrue(len(net_entries) > 0) + self.assertEqual(net_entries[0].verdict, "allowed") + tmp2.cleanup() + + +class TestFullAuditReport(unittest.TestCase): + def test_report_structure(self): + events = [ + _make_event(EventType.USER_PROMPT, 0.0, "sess1", prompt="do it"), + _make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Read", arguments={"file_path": "src/auth.py"}), + _make_event(EventType.TOOL_CALL, 2.0, "sess1", + tool_name="Bash", arguments={"command": "pytest"}), + _make_event(EventType.TOOL_CALL, 3.0, "sess1", + tool_name="Read", arguments={"file_path": ".env"}), + _make_event(EventType.SESSION_END, 4.0, "sess1"), + ] + tmp = tempfile.TemporaryDirectory() + policy_path = _write_policy({ + "files": {"read": {"allow": ["src/**"], "deny": [".env"]}}, + "commands": {"allow": ["pytest"]}, + }, tmp.name) + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=policy_path) + + self.assertEqual(report.total_tool_calls, 3) + self.assertTrue(len(report.denied) > 0) + self.assertTrue(len(report.sensitive_accesses) > 0) + tmp.cleanup() + tmp2.cleanup() + + def test_format_output_contains_violations(self): + events = [ + _make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Read", arguments={"file_path": ".env"}), + ] + tmp = tempfile.TemporaryDirectory() + policy_path = _write_policy({ + "files": {"read": {"deny": [".env"]}} + }, tmp.name) + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=policy_path) + buf = io.StringIO() + format_audit(report, out=buf) + output = buf.getvalue() + self.assertIn("Violations", output) + self.assertIn(".env", output) + tmp.cleanup() + tmp2.cleanup() + + +if __name__ == "__main__": + unittest.main() From 491d2b67497d878cc95ef77b0139ead38a68a96c Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Wed, 25 Mar 2026 06:25:51 +0000 Subject: [PATCH 2/5] Fix _cmd_matches word-boundary false-positive and str(None) path bug - _cmd_matches: prefix match now requires space or end-of-string after the pattern, preventing 'curl' from matching 'curling --help' - _audit_event: use 'or' chaining instead of str(args.get(..., ...)) to avoid str(None) == 'None' passing the path guard Co-authored-by: Ona --- src/agent_trace/audit.py | 10 +++++++--- tests/test_audit.py | 8 ++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/agent_trace/audit.py b/src/agent_trace/audit.py index 80be937..581bee0 100644 --- a/src/agent_trace/audit.py +++ b/src/agent_trace/audit.py @@ -142,7 +142,11 @@ def _cmd_matches(cmd: str, patterns: list[str]) -> bool: pat_lower = pat.lower().strip() if cmd_lower == pat_lower: return True - if cmd_lower.startswith(pat_lower): + # Prefix match only at a word boundary (followed by space or end of string) + if cmd_lower.startswith(pat_lower) and ( + len(cmd_lower) == len(pat_lower) + or cmd_lower[len(pat_lower)] == " " + ): return True if fnmatch.fnmatch(cmd_lower, pat_lower): return True @@ -183,7 +187,7 @@ def _audit_event( # --- File read --- if tool_name in ("read", "view"): - path = str(args.get("file_path", args.get("path", ""))) + path = str(args.get("file_path") or args.get("path") or "") if path: sensitive = _is_sensitive(path) if policy and (policy.file_read_allow or policy.file_read_deny): @@ -203,7 +207,7 @@ def _audit_event( # --- File write / edit --- elif tool_name in ("write", "edit", "create"): - path = str(args.get("file_path", args.get("path", ""))) + path = str(args.get("file_path") or args.get("path") or "") if path: sensitive = _is_sensitive(path) if policy and (policy.file_write_allow or policy.file_write_deny): diff --git a/tests/test_audit.py b/tests/test_audit.py index 6a8e98a..04e74bf 100644 --- a/tests/test_audit.py +++ b/tests/test_audit.py @@ -88,6 +88,14 @@ def test_no_match(self): def test_case_insensitive(self): self.assertTrue(_cmd_matches("PYTEST", ["pytest"])) + def test_prefix_no_false_positive(self): + # "curl" pattern must not match "curling" command + self.assertFalse(_cmd_matches("curling --help", ["curl"])) + + def test_prefix_with_args_matches(self): + # "curl" pattern should match "curl https://example.com" + self.assertTrue(_cmd_matches("curl https://example.com", ["curl"])) + class TestNoPolicyFile(unittest.TestCase): def test_all_entries_no_policy(self): From 252a6ab39e75496544b85e5ba9f131902958f3c0 Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Wed, 25 Mar 2026 06:27:36 +0000 Subject: [PATCH 3/5] Warn on malformed policy file; clarify generic tool verdict reason - Policy.load: distinguish json.JSONDecodeError from OSError and write a warning to stderr for both. Previously a malformed policy was silently treated as missing, giving no indication to the user. - _audit_event generic branch: clarify reason string to explain that no policy rule covers arbitrary tool types, not that no policy file exists. Co-authored-by: Ona --- src/agent_trace/audit.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/agent_trace/audit.py b/src/agent_trace/audit.py index 581bee0..da12bcd 100644 --- a/src/agent_trace/audit.py +++ b/src/agent_trace/audit.py @@ -77,7 +77,11 @@ def load(cls, path: str | Path) -> Policy | None: return None try: return cls.from_dict(json.loads(p.read_text())) - except (json.JSONDecodeError, OSError): + except json.JSONDecodeError as exc: + sys.stderr.write(f"Warning: malformed policy file {p}: {exc}\n") + return None + except OSError as exc: + sys.stderr.write(f"Warning: could not read policy file {p}: {exc}\n") return None @@ -266,13 +270,16 @@ def _audit_event( verdict=verdict, reason=reason, )) - # --- Generic tool call (mcp tools, agent, etc.) --- + # --- Generic tool call (MCP tools, Agent, TodoWrite, etc.) --- + # No policy rules cover arbitrary tool types; always no_policy regardless + # of whether a policy file is loaded. Add explicit tool rules to the policy + # file's "commands" section to cover these if needed. else: entries.append(AuditEntry( event=event, event_index=index, action=f"Tool: {data.get('tool_name', '?')}", verdict="no_policy", - reason="no policy for this tool type", + reason="no policy rule for this tool type", )) return entries From f602c373334015fadf42cb6d977bb4f213256780 Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Sat, 28 Mar 2026 16:07:04 +0000 Subject: [PATCH 4/5] Fix glob matching, add Grep/Glob support, fix double-entry, tighten sensitive patterns MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit audit.py: - _glob_match: replace fnmatch with pathlib.PurePath.match() so src/** correctly matches src/auth.py and src/utils/path.py (fnmatch treats ** as two * wildcards and does not match across path separators) - Add Grep and Glob to the file-read branch so file read policy applies to directory listings and content searches, not just Read/View calls - Network+command double-entry: skip command policy check when a network violation has already been recorded for the same event - SENSITIVE_PATTERNS: remove *secret* (too broad — matches secret_manager.py, test_secrets.py, etc.); replace with explicit secrets.json/yaml/toml/yml tests: - test_glob_pattern_nested: src/utils/path.py matches src/** - test_secret_manager_not_sensitive: source files no longer false-positive - test_secrets_json/yaml_is_sensitive: explicit secrets files still caught - TestGrepGlobPolicy: Grep denied path, Glob allowed path, Grep sensitive flag - TestNetworkCommandDoubleEntry: curl to denied URL = 1 entry not 2; curl to allowed host still checked against command policy - TestMalformedPolicy: malformed JSON returns None, audit continues as no-policy Co-authored-by: Ona --- src/agent_trace/audit.py | 49 ++++++++++++----- tests/test_audit.py | 113 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 147 insertions(+), 15 deletions(-) diff --git a/src/agent_trace/audit.py b/src/agent_trace/audit.py index da12bcd..0592a04 100644 --- a/src/agent_trace/audit.py +++ b/src/agent_trace/audit.py @@ -25,7 +25,8 @@ SENSITIVE_PATTERNS: list[str] = [ ".env", ".env.*", "*.env", - "config/secrets*", "secrets.*", "*secret*", + "secrets.json", "secrets.yaml", "secrets.yml", "secrets.toml", + "config/secrets*", "*.pem", "*.key", "*.p12", "*.pfx", ".ssh/*", "id_rsa", "id_ed25519", ".aws/credentials", ".aws/config", @@ -129,9 +130,15 @@ def sensitive_accesses(self) -> list[AuditEntry]: # --------------------------------------------------------------------------- def _glob_match(path: str, patterns: list[str]) -> bool: - name = Path(path).name + # PurePath.match() supports ** as a recursive wildcard (unlike fnmatch). + # Fall back to fnmatch on the bare filename for simple patterns like ".env". + p = Path(path) + name = p.name for pat in patterns: - if fnmatch.fnmatch(path, pat) or fnmatch.fnmatch(name, pat): + if p.match(pat): + return True + # Also match against the bare filename so ".env" catches "config/.env" + if fnmatch.fnmatch(name, pat): return True return False @@ -189,9 +196,15 @@ def _audit_event( tool_name = data.get("tool_name", "").lower() args = data.get("arguments", {}) or {} - # --- File read --- - if tool_name in ("read", "view"): - path = str(args.get("file_path") or args.get("path") or "") + # --- File read (Read, View, Grep, Glob) --- + if tool_name in ("read", "view", "grep", "glob"): + # Grep uses "path" or "pattern"; Glob uses "pattern" or "path" + path = str( + args.get("file_path") + or args.get("path") + or args.get("pattern") + or "" + ) if path: sensitive = _is_sensitive(path) if policy and (policy.file_read_allow or policy.file_read_deny): @@ -203,9 +216,10 @@ def _audit_event( verdict, reason = "allowed", "matches files.read.allow" else: verdict, reason = "no_policy", "no file read policy" + action_verb = "Glob" if tool_name == "glob" else ("Grep" if tool_name == "grep" else "Read") entries.append(AuditEntry( event=event, event_index=index, - action=f"Read {path}", + action=f"{action_verb} {path}", verdict=verdict, reason=reason, sensitive=sensitive, )) @@ -235,6 +249,7 @@ def _audit_event( if cmd: # Network access check: scan command for URLs urls = _extract_urls(cmd) + network_denied = False for url_host in urls: if policy: net_ok = _url_allowed(url_host, policy) @@ -244,6 +259,8 @@ def _audit_event( if net_ok else "denied by network.deny_all" ) + if not net_ok: + network_denied = True else: net_verdict = "no_policy" net_reason = "no network policy" @@ -253,8 +270,11 @@ def _audit_event( verdict=net_verdict, reason=net_reason, )) - # Command policy check - if policy and (policy.cmd_allow or policy.cmd_deny): + # Command policy check — skip if a network violation already covers + # this event to avoid double-counting the same command. + if network_denied: + pass + elif policy and (policy.cmd_allow or policy.cmd_deny): if _cmd_matches(cmd, policy.cmd_deny): verdict, reason = "denied", "denied by commands.deny" elif policy.cmd_allow and not _cmd_matches(cmd, policy.cmd_allow): @@ -264,11 +284,12 @@ def _audit_event( else: verdict, reason = "no_policy", "no command policy" - entries.append(AuditEntry( - event=event, event_index=index, - action=f"Ran: {cmd[:80]}{'...' if len(cmd) > 80 else ''}", - verdict=verdict, reason=reason, - )) + if not network_denied: + entries.append(AuditEntry( + event=event, event_index=index, + action=f"Ran: {cmd[:80]}{'...' if len(cmd) > 80 else ''}", + verdict=verdict, reason=reason, + )) # --- Generic tool call (MCP tools, Agent, TodoWrite, etc.) --- # No policy rules cover arbitrary tool types; always no_policy regardless diff --git a/tests/test_audit.py b/tests/test_audit.py index 04e74bf..93b40a4 100644 --- a/tests/test_audit.py +++ b/tests/test_audit.py @@ -45,9 +45,12 @@ class TestGlobMatch(unittest.TestCase): def test_exact_match(self): self.assertTrue(_glob_match(".env", [".env"])) - def test_glob_pattern(self): + def test_glob_pattern_single_level(self): self.assertTrue(_glob_match("src/auth.py", ["src/**"])) + def test_glob_pattern_nested(self): + self.assertTrue(_glob_match("src/utils/path.py", ["src/**"])) + def test_no_match(self): self.assertFalse(_glob_match("README.md", ["src/**", "tests/**"])) @@ -74,6 +77,16 @@ def test_normal_file(self): def test_readme(self): self.assertFalse(_is_sensitive("README.md")) + def test_secret_manager_not_sensitive(self): + # *secret* was removed — source files with "secret" in name are not flagged + self.assertFalse(_is_sensitive("src/secret_manager.py")) + + def test_secrets_json_is_sensitive(self): + self.assertTrue(_is_sensitive("secrets.json")) + + def test_secrets_yaml_is_sensitive(self): + self.assertTrue(_is_sensitive("secrets.yaml")) + class TestCmdMatches(unittest.TestCase): def test_exact(self): @@ -233,6 +246,104 @@ def test_allowed_localhost(self): tmp2.cleanup() +class TestGrepGlobPolicy(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + self.policy_path = _write_policy({ + "files": { + "read": {"allow": ["src/**"], "deny": [".env"]} + } + }, self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def test_grep_denied_path(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Grep", arguments={"pattern": ".env"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + self.assertEqual(report.entries[0].verdict, "denied") + tmp2.cleanup() + + def test_glob_allowed_path(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Glob", arguments={"pattern": "src/**"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + self.assertEqual(report.entries[0].verdict, "allowed") + tmp2.cleanup() + + def test_grep_sensitive_flagged(self): + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Grep", arguments={"pattern": ".env"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path="/nonexistent") + self.assertTrue(report.entries[0].sensitive) + tmp2.cleanup() + + +class TestNetworkCommandDoubleEntry(unittest.TestCase): + def setUp(self): + self.tmp = tempfile.TemporaryDirectory() + self.policy_path = _write_policy({ + "commands": {"deny": ["curl"]}, + "network": {"deny_all": True, "allow": ["localhost"]}, + }, self.tmp.name) + + def tearDown(self): + self.tmp.cleanup() + + def test_curl_with_denied_url_produces_one_entry(self): + """curl to a denied URL should produce one network violation, not two.""" + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Bash", + arguments={"command": "curl https://evil.com/data"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + denied = report.denied + # Only the network entry — no duplicate command entry + self.assertEqual(len(denied), 1) + self.assertIn("Network access", denied[0].action) + tmp2.cleanup() + + def test_curl_to_allowed_host_still_checks_command_policy(self): + """curl to an allowed host should still be checked against command policy.""" + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Bash", + arguments={"command": "curl http://localhost/health"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=self.policy_path) + # Network: allowed. Command: denied by commands.deny + cmd_entries = [e for e in report.entries if e.action.startswith("Ran:")] + self.assertTrue(len(cmd_entries) > 0) + self.assertEqual(cmd_entries[0].verdict, "denied") + tmp2.cleanup() + + +class TestMalformedPolicy(unittest.TestCase): + def test_malformed_json_returns_none(self): + tmp = tempfile.TemporaryDirectory() + path = str(Path(tmp.name) / ".agent-scope.json") + Path(path).write_text("{not valid json") + policy = Policy.load(path) + self.assertIsNone(policy) + tmp.cleanup() + + def test_malformed_policy_audit_continues_as_no_policy(self): + tmp = tempfile.TemporaryDirectory() + path = str(Path(tmp.name) / ".agent-scope.json") + Path(path).write_text("{not valid json") + events = [_make_event(EventType.TOOL_CALL, 1.0, "sess1", + tool_name="Read", arguments={"file_path": "src/auth.py"})] + store, tmp2 = _make_store(events) + report = audit_session(store, "sess1", policy_path=path) + self.assertFalse(report.policy_loaded) + self.assertEqual(report.entries[0].verdict, "no_policy") + tmp.cleanup() + tmp2.cleanup() + + class TestFullAuditReport(unittest.TestCase): def test_report_structure(self): events = [ From 90fc53d0d3aa7acd6328bcfc297098dcdd147229 Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Sat, 28 Mar 2026 16:10:43 +0000 Subject: [PATCH 5/5] Fix glob matching: implement ** recursive wildcard without pathlib.match PurePath.match() with ** is unreliable across Python 3.10-3.13 (the semantics changed in 3.12). Replace with a recursive segment-by-segment matcher (_match_parts) that correctly handles ** as zero-or-more path components on all supported versions. src/** now matches src/auth.py, src/utils/path.py, src/a/b/c/deep.py. Co-authored-by: Ona --- src/agent_trace/audit.py | 52 ++++++++++++++++++++++++++++++++++------ tests/test_audit.py | 3 +++ 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/src/agent_trace/audit.py b/src/agent_trace/audit.py index 0592a04..c79ed3e 100644 --- a/src/agent_trace/audit.py +++ b/src/agent_trace/audit.py @@ -130,16 +130,54 @@ def sensitive_accesses(self) -> list[AuditEntry]: # --------------------------------------------------------------------------- def _glob_match(path: str, patterns: list[str]) -> bool: - # PurePath.match() supports ** as a recursive wildcard (unlike fnmatch). - # Fall back to fnmatch on the bare filename for simple patterns like ".env". - p = Path(path) - name = p.name + """Match *path* against any of *patterns*. + + Supports ``**`` as a recursive wildcard (matches zero or more path + components) in addition to the standard fnmatch ``*`` and ``?``. + Also matches against the bare filename so ``.env`` catches + ``config/.env``. + """ + name = Path(path).name + norm_path = path.replace("\\", "/") for pat in patterns: - if p.match(pat): + norm_pat = pat.replace("\\", "/") + if fnmatch.fnmatch(norm_path, norm_pat): return True - # Also match against the bare filename so ".env" catches "config/.env" - if fnmatch.fnmatch(name, pat): + if fnmatch.fnmatch(name, norm_pat): return True + if "**" in norm_pat and _glob_match_recursive(norm_path, norm_pat): + return True + return False + + +def _glob_match_recursive(path: str, pattern: str) -> bool: + """Match path against a pattern that may contain ``**`` wildcards. + + ``**`` matches zero or more path components (including none). + """ + path_parts = path.split("/") + pat_parts = pattern.split("/") + return _match_parts(path_parts, pat_parts) + + +def _match_parts(path_parts: list[str], pat_parts: list[str]) -> bool: + """Recursively match path_parts against pat_parts.""" + if not pat_parts: + return not path_parts + if not path_parts: + # Only match if remaining pattern is all ** + return all(p == "**" for p in pat_parts) + + if pat_parts[0] == "**": + # ** can match zero components (skip it) or one+ components (consume one path part) + return ( + _match_parts(path_parts, pat_parts[1:]) # match zero + or _match_parts(path_parts[1:], pat_parts) # match one, keep ** + ) + + if fnmatch.fnmatch(path_parts[0], pat_parts[0]): + return _match_parts(path_parts[1:], pat_parts[1:]) + return False diff --git a/tests/test_audit.py b/tests/test_audit.py index 93b40a4..0469bc1 100644 --- a/tests/test_audit.py +++ b/tests/test_audit.py @@ -51,6 +51,9 @@ def test_glob_pattern_single_level(self): def test_glob_pattern_nested(self): self.assertTrue(_glob_match("src/utils/path.py", ["src/**"])) + def test_glob_pattern_deeply_nested(self): + self.assertTrue(_glob_match("src/a/b/c/deep.py", ["src/**"])) + def test_no_match(self): self.assertFalse(_glob_match("README.md", ["src/**", "tests/**"]))