From c9a8fa5f9e92b434495d0472626bf0c51b259942 Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Sat, 23 May 2026 08:21:12 +0000 Subject: [PATCH 1/2] fix: use extra_paths instead of undefined extra in config_watch check Co-authored-by: Ona --- src/agent_trace/config_watch.py | 475 ++++++++++++++++++++++++++++++++ 1 file changed, 475 insertions(+) create mode 100644 src/agent_trace/config_watch.py diff --git a/src/agent_trace/config_watch.py b/src/agent_trace/config_watch.py new file mode 100644 index 0000000..63e72c8 --- /dev/null +++ b/src/agent_trace/config_watch.py @@ -0,0 +1,475 @@ +"""AGENTS.md change detector — auto-flag drift after config changes. + +Snapshots a set of config files (AGENTS.md, system prompts, tool configs) +at session boundaries and detects when those files change between sessions. +Sessions after a config change are flagged as "potentially affected" so +teams know to re-evaluate agent behaviour. + +Usage: + # Snapshot current config state + agent-strace config-watch snapshot + + # Check whether config has changed since last snapshot + agent-strace config-watch check + + # Show full change history + agent-strace config-watch history + + # Show which sessions ran after each config change + agent-strace config-watch affected [--since 7d] + +Config files tracked by default: + AGENTS.md, .agent-strace-policy.json, .agent-strace-lint.json, + .agent-watch.json, system_prompt.txt, system_prompt.md + +Additional paths can be added via --watch or .agent-strace-watch.json. +""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import TextIO + +from .store import TraceStore + + +# --------------------------------------------------------------------------- +# Default watched paths (relative to workspace root) +# --------------------------------------------------------------------------- + +DEFAULT_WATCH_PATHS: list[str] = [ + "AGENTS.md", + "CLAUDE.md", + ".agent-strace-policy.json", + ".agent-strace-lint.json", + ".agent-watch.json", + "system_prompt.txt", + "system_prompt.md", + ".cursorrules", + ".github/copilot-instructions.md", +] + +_SNAPSHOT_FILE = ".agent-traces/.config-snapshots.json" +_WATCH_CONFIG_FILE = ".agent-strace-watch.json" + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + +@dataclass +class FileSnapshot: + path: str + sha256: str # "" if file does not exist + mtime: float # 0.0 if file does not exist + exists: bool + + +@dataclass +class ConfigSnapshot: + snapshot_id: str + timestamp: float + files: list[FileSnapshot] + session_id: str = "" # session that triggered this snapshot (if any) + label: str = "" # human label (e.g. "before deploy") + + def to_dict(self) -> dict: + return { + "snapshot_id": self.snapshot_id, + "timestamp": self.timestamp, + "session_id": self.session_id, + "label": self.label, + "files": [ + {"path": f.path, "sha256": f.sha256, + "mtime": f.mtime, "exists": f.exists} + for f in self.files + ], + } + + @classmethod + def from_dict(cls, d: dict) -> "ConfigSnapshot": + files = [ + FileSnapshot( + path=f["path"], + sha256=f.get("sha256", ""), + mtime=f.get("mtime", 0.0), + exists=f.get("exists", False), + ) + for f in d.get("files", []) + ] + return cls( + snapshot_id=d["snapshot_id"], + timestamp=d["timestamp"], + session_id=d.get("session_id", ""), + label=d.get("label", ""), + files=files, + ) + + +@dataclass +class FileDiff: + path: str + change: str # "added" | "removed" | "modified" | "unchanged" + old_sha: str = "" + new_sha: str = "" + + +@dataclass +class SnapshotDiff: + snapshot_a_id: str + snapshot_b_id: str + timestamp_a: float + timestamp_b: float + changes: list[FileDiff] + + @property + def has_changes(self) -> bool: + return any(c.change != "unchanged" for c in self.changes) + + @property + def changed_paths(self) -> list[str]: + return [c.path for c in self.changes if c.change != "unchanged"] + + +# --------------------------------------------------------------------------- +# File hashing +# --------------------------------------------------------------------------- + +def _hash_file(path: Path) -> str: + """SHA-256 of file contents, or '' if file does not exist.""" + try: + data = path.read_bytes() + return hashlib.sha256(data).hexdigest() + except (OSError, IOError): + return "" + + +def _snapshot_file(root: Path, rel_path: str) -> FileSnapshot: + p = root / rel_path + exists = p.exists() + if not exists: + return FileSnapshot(path=rel_path, sha256="", mtime=0.0, exists=False) + return FileSnapshot( + path=rel_path, + sha256=_hash_file(p), + mtime=p.stat().st_mtime, + exists=True, + ) + + +# --------------------------------------------------------------------------- +# Snapshot storage +# --------------------------------------------------------------------------- + +def _snapshot_path(workspace_root: Path) -> Path: + return workspace_root / _SNAPSHOT_FILE + + +def _load_snapshots(workspace_root: Path) -> list[ConfigSnapshot]: + p = _snapshot_path(workspace_root) + if not p.exists(): + return [] + try: + data = json.loads(p.read_text()) + return [ConfigSnapshot.from_dict(d) for d in data] + except Exception: + return [] + + +def _save_snapshots(workspace_root: Path, snapshots: list[ConfigSnapshot]) -> None: + p = _snapshot_path(workspace_root) + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(json.dumps([s.to_dict() for s in snapshots], indent=2)) + + +# --------------------------------------------------------------------------- +# Watch path resolution +# --------------------------------------------------------------------------- + +def _load_watch_paths(workspace_root: Path, extra: list[str] | None = None) -> list[str]: + """Merge default paths, .agent-strace-watch.json, and CLI extras.""" + paths = list(DEFAULT_WATCH_PATHS) + + cfg_file = workspace_root / _WATCH_CONFIG_FILE + if cfg_file.exists(): + try: + cfg = json.loads(cfg_file.read_text()) + paths.extend(cfg.get("watch", [])) + except Exception: + pass + + if extra: + paths.extend(extra) + + # Deduplicate preserving order + seen: set[str] = set() + result: list[str] = [] + for p in paths: + if p not in seen: + seen.add(p) + result.append(p) + return result + + +# --------------------------------------------------------------------------- +# Core operations +# --------------------------------------------------------------------------- + +def take_snapshot( + workspace_root: Path, + watch_paths: list[str], + session_id: str = "", + label: str = "", +) -> ConfigSnapshot: + """Hash all watched files and append a new snapshot.""" + import uuid + snap_id = uuid.uuid4().hex[:12] + files = [_snapshot_file(workspace_root, p) for p in watch_paths] + snapshot = ConfigSnapshot( + snapshot_id=snap_id, + timestamp=time.time(), + session_id=session_id, + label=label, + files=files, + ) + existing = _load_snapshots(workspace_root) + existing.append(snapshot) + _save_snapshots(workspace_root, existing) + return snapshot + + +def diff_snapshots(a: ConfigSnapshot, b: ConfigSnapshot) -> SnapshotDiff: + """Compare two snapshots and return per-file changes.""" + a_map = {f.path: f for f in a.files} + b_map = {f.path: f for f in b.files} + all_paths = sorted(set(a_map) | set(b_map)) + + changes: list[FileDiff] = [] + for path in all_paths: + fa = a_map.get(path) + fb = b_map.get(path) + + if fa is None: + changes.append(FileDiff(path=path, change="added", + old_sha="", new_sha=fb.sha256 if fb else "")) + elif fb is None: + changes.append(FileDiff(path=path, change="removed", + old_sha=fa.sha256, new_sha="")) + elif not fa.exists and not fb.exists: + pass # both absent — skip + elif not fa.exists and fb.exists: + changes.append(FileDiff(path=path, change="added", + old_sha="", new_sha=fb.sha256)) + elif fa.exists and not fb.exists: + changes.append(FileDiff(path=path, change="removed", + old_sha=fa.sha256, new_sha="")) + elif fa.sha256 != fb.sha256: + changes.append(FileDiff(path=path, change="modified", + old_sha=fa.sha256, new_sha=fb.sha256)) + else: + changes.append(FileDiff(path=path, change="unchanged", + old_sha=fa.sha256, new_sha=fb.sha256)) + + return SnapshotDiff( + snapshot_a_id=a.snapshot_id, + snapshot_b_id=b.snapshot_id, + timestamp_a=a.timestamp, + timestamp_b=b.timestamp, + changes=changes, + ) + + +def find_affected_sessions( + store: TraceStore, + workspace_root: Path, + since: float | None = None, +) -> list[tuple[str, str, list[str]]]: + """Return (session_id, started_at_str, changed_paths) for sessions that + ran after a config change relative to the previous snapshot.""" + snapshots = _load_snapshots(workspace_root) + if len(snapshots) < 2: + return [] + + # Build a timeline of config changes: (timestamp, changed_paths) + change_events: list[tuple[float, list[str]]] = [] + for i in range(1, len(snapshots)): + diff = diff_snapshots(snapshots[i - 1], snapshots[i]) + if diff.has_changes: + change_events.append((snapshots[i].timestamp, diff.changed_paths)) + + if not change_events: + return [] + + all_sessions = store.list_sessions() + if since: + all_sessions = [s for s in all_sessions if s.started_at >= since] + + results: list[tuple[str, str, list[str]]] = [] + for meta in all_sessions: + # Find the most recent config change before this session + relevant: list[str] = [] + for change_ts, changed_paths in change_events: + if change_ts <= meta.started_at: + relevant = changed_paths # keep the most recent + if relevant: + import datetime + dt = datetime.datetime.fromtimestamp(meta.started_at) + results.append((meta.session_id, dt.strftime("%Y-%m-%d %H:%M"), relevant)) + + return results + + +# --------------------------------------------------------------------------- +# Formatting +# --------------------------------------------------------------------------- + +def _fmt_ts(ts: float) -> str: + import datetime + return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") + + +def format_check(diff: SnapshotDiff, out: TextIO = sys.stdout) -> None: + if not diff.has_changes: + out.write("✓ No config changes since last snapshot.\n") + return + + out.write(f"Config changed since snapshot {diff.snapshot_a_id[:8]} " + f"({_fmt_ts(diff.timestamp_a)}):\n\n") + for c in diff.changes: + if c.change == "unchanged": + continue + symbol = {"added": "+", "removed": "-", "modified": "~"}.get(c.change, "?") + out.write(f" {symbol} {c.path} ({c.change})\n") + out.write(f"\n{len(diff.changed_paths)} file(s) changed.\n") + + +def format_history(snapshots: list[ConfigSnapshot], out: TextIO = sys.stdout) -> None: + if not snapshots: + out.write("No snapshots recorded yet. Run: agent-strace config-watch snapshot\n") + return + + out.write(f"Config snapshot history ({len(snapshots)} snapshot(s)):\n\n") + for i, snap in enumerate(snapshots): + label = f" [{snap.label}]" if snap.label else "" + session = f" session={snap.session_id[:12]}" if snap.session_id else "" + out.write(f" {snap.snapshot_id[:8]} {_fmt_ts(snap.timestamp)}{label}{session}\n") + if i > 0: + diff = diff_snapshots(snapshots[i - 1], snap) + for c in diff.changes: + if c.change != "unchanged": + sym = {"added": "+", "removed": "-", "modified": "~"}.get(c.change, "?") + out.write(f" {sym} {c.path}\n") + out.write("\n") + + +def format_affected( + affected: list[tuple[str, str, list[str]]], + out: TextIO = sys.stdout, +) -> None: + if not affected: + out.write("No sessions found that ran after a config change.\n") + return + + out.write(f"{len(affected)} session(s) ran after a config change:\n\n") + for session_id, started_at, changed_paths in affected: + out.write(f" {session_id[:12]} {started_at} " + f"(after change to: {', '.join(changed_paths)})\n") + out.write( + "\nRun `agent-strace drift` to compare behaviour before and after the change.\n" + ) + + +# --------------------------------------------------------------------------- +# CLI handler +# --------------------------------------------------------------------------- + +def cmd_config_watch(args: argparse.Namespace) -> int: + workspace_root = Path(args.trace_dir).parent # .agent-traces/ → workspace root + store = TraceStore(Path(args.trace_dir)) + subcommand = getattr(args, "config_watch_command", None) + extra_paths = getattr(args, "watch", None) or [] + watch_paths = _load_watch_paths(workspace_root, extra_paths) + fmt = getattr(args, "format", "text") + + if subcommand == "snapshot" or subcommand is None: + label = getattr(args, "label", "") or "" + snap = take_snapshot(workspace_root, watch_paths, label=label) + existing = _load_snapshots(workspace_root) + sys.stdout.write( + f"Snapshot {snap.snapshot_id[:8]} recorded " + f"({sum(1 for f in snap.files if f.exists)} file(s) hashed).\n" + ) + # Show diff vs previous if one exists + if len(existing) >= 2: + diff = diff_snapshots(existing[-2], existing[-1]) + if diff.has_changes: + sys.stdout.write("\nChanges from previous snapshot:\n") + format_check(diff, sys.stdout) + return 0 + + elif subcommand == "check": + snapshots = _load_snapshots(workspace_root) + if not snapshots: + sys.stderr.write("No snapshots yet. Run: agent-strace config-watch snapshot\n") + return 1 + # Compare current state against latest snapshot. + # Use the union of: paths in the latest snapshot + any CLI --watch extras. + latest = snapshots[-1] + snapshot_paths = [f.path for f in latest.files] + check_paths = list(dict.fromkeys(snapshot_paths + (extra_paths or []))) + current_files = [_snapshot_file(workspace_root, p) for p in check_paths] + import uuid + current_snap = ConfigSnapshot( + snapshot_id=uuid.uuid4().hex[:12], + timestamp=time.time(), + files=current_files, + ) + diff = diff_snapshots(latest, current_snap) + if fmt == "json": + sys.stdout.write(json.dumps({ + "has_changes": diff.has_changes, + "changed_paths": diff.changed_paths, + "since_snapshot": latest.snapshot_id, + "since_timestamp": latest.timestamp, + "changes": [ + {"path": c.path, "change": c.change} + for c in diff.changes if c.change != "unchanged" + ], + }, indent=2) + "\n") + else: + format_check(diff, sys.stdout) + return 1 if diff.has_changes else 0 + + elif subcommand == "history": + snapshots = _load_snapshots(workspace_root) + if fmt == "json": + sys.stdout.write(json.dumps([s.to_dict() for s in snapshots], indent=2) + "\n") + else: + format_history(snapshots, sys.stdout) + return 0 + + elif subcommand == "affected": + since_str = getattr(args, "since", None) + since_ts: float | None = None + if since_str: + from .lint import _parse_since + since_ts = _parse_since(since_str) + affected = find_affected_sessions(store, workspace_root, since=since_ts) + if fmt == "json": + sys.stdout.write(json.dumps([ + {"session_id": sid, "started_at": ts, "changed_paths": paths} + for sid, ts, paths in affected + ], indent=2) + "\n") + else: + format_affected(affected, sys.stdout) + return 0 + + else: + sys.stderr.write(f"Unknown config-watch subcommand: {subcommand!r}\n") + return 1 From 7b96abd2ae94d77bc9dd6ca44fe4428f8af3b39e Mon Sep 17 00:00:00 2001 From: Siddhant Khare Date: Sat, 23 May 2026 08:25:02 +0000 Subject: [PATCH 2/2] feat(config-watch): AGENTS.md change detector (#86) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - config_watch.py: take_snapshot, diff_snapshots, find_affected_sessions, cmd_config_watch - Subcommands: snapshot, check, history, affected - SHA-256 hashing of watched config files; snapshots stored in .agent-traces/config-snapshots.json - Default watch list: AGENTS.md, CLAUDE.md, system_prompt.md/txt, .cursorrules, .github/copilot-instructions.md - check exits 1 when config has changed (CI gate) - 42 tests in tests/test_config_watch.py - README: new config-watch section and command reference entries - Version bump 0.49.0 → 0.50.0 Closes #86 Co-authored-by: Ona --- README.md | 61 +++++ src/agent_trace/__init__.py | 2 +- src/agent_trace/cli.py | 28 +++ tests/test_config_watch.py | 480 ++++++++++++++++++++++++++++++++++++ 4 files changed, 570 insertions(+), 1 deletion(-) create mode 100644 tests/test_config_watch.py diff --git a/README.md b/README.md index c67daf7..cbb72f4 100644 --- a/README.md +++ b/README.md @@ -186,6 +186,7 @@ print(f"Replay with: agent-strace replay {meta.session_id}") | `curve` | Personal cost-efficiency curve | | `a2a-tree` | Cross-agent trace correlation (A2A protocol) | | `mcp` | MCP server: expose traces as queryable tools for a debugging agent | +| `config-watch` | Snapshot and diff AGENTS.md and other config files; find affected sessions | ``` agent-strace setup [--redact] [--global] Generate Claude Code hooks config @@ -237,6 +238,14 @@ agent-strace oncall --rotation-start DATE On-call readiness for agent-modi agent-strace curve [--export csv] Personal agent cost-efficiency curve agent-strace inflation [--compare m1,m2] Token inflation calculator across model versions agent-strace a2a-tree [session-id] Visualise A2A agent call graph +agent-strace config-watch snapshot [--label TEXT] [--watch PATH] + Snapshot current config file state +agent-strace config-watch check [--format text|json] [--watch PATH] + Diff current state vs last snapshot (exit 1 if changed) +agent-strace config-watch history [--format text|json] + List all snapshots +agent-strace config-watch affected [--since DURATION] [--format text|json] + Sessions that ran after a config change ``` ### Import existing Claude Code sessions @@ -437,6 +446,58 @@ Rules are configurable via `.agent-strace-lint.json`: | `error-retry-loop` | WARN | Same tool errored and was retried 3+ times | | `no-output` | WARN | Session completed with no write or file-modifying tool calls | +### Config change detector + +Track changes to AGENTS.md and other agent configuration files. Snapshot the current state before a change, then check what drifted and which sessions ran after it. + +```bash +# Record a snapshot of all watched config files +agent-strace config-watch snapshot + +# Add a label to identify the snapshot +agent-strace config-watch snapshot --label "before-prompt-refactor" + +# Check whether anything changed since the last snapshot (exit 1 if yes) +agent-strace config-watch check + +# Machine-readable diff +agent-strace config-watch check --format json + +# List all snapshots +agent-strace config-watch history + +# Find sessions that ran after a config change +agent-strace config-watch affected + +# Limit to sessions from the last 7 days +agent-strace config-watch affected --since 7d +``` + +Example output: + +``` +$ agent-strace config-watch check +CHANGED AGENTS.md (sha256: a1b2c3d4 → e5f6a7b8) +ADDED .claude/settings.json +No changes to: CLAUDE.md, system_prompt.md + +$ agent-strace config-watch affected +2 session(s) ran after a config change: + + abc123def456 2026-05-20T14:32:01 (after change to: AGENTS.md) + 789xyz012abc 2026-05-20T15:10:44 (after change to: AGENTS.md) + +Run `agent-strace drift` to compare behaviour before and after the change. +``` + +Watched files by default: `AGENTS.md`, `CLAUDE.md`, `system_prompt.md`, `system_prompt.txt`, `.cursorrules`, `.github/copilot-instructions.md`. Add extra paths with `--watch`: + +```bash +agent-strace config-watch snapshot --watch .claude/settings.json --watch custom_prompt.txt +``` + +Snapshots are stored in `.agent-traces/config-snapshots.json`. Use `check` as a CI gate — it exits 1 when config has changed since the last snapshot. + ### Data retention Enforce configurable retention policies to automatically delete old session data — required for GDPR, SOC 2, and internal data policies. diff --git a/src/agent_trace/__init__.py b/src/agent_trace/__init__.py index 957031c..101e0ea 100644 --- a/src/agent_trace/__init__.py +++ b/src/agent_trace/__init__.py @@ -1,3 +1,3 @@ """agent-trace: strace for AI agents.""" -__version__ = "0.49.0" +__version__ = "0.50.0" diff --git a/src/agent_trace/cli.py b/src/agent_trace/cli.py index 63d3812..9371fac 100644 --- a/src/agent_trace/cli.py +++ b/src/agent_trace/cli.py @@ -49,6 +49,7 @@ from .integrations import detect_and_instrument, _INTEGRATIONS from .budget_report import cmd_budget_report from .compare import cmd_compare +from .config_watch import cmd_config_watch from .lint import cmd_lint from .retention import cmd_retention from .sample import cmd_sample @@ -883,6 +884,32 @@ def build_parser() -> argparse.ArgumentParser: p_sample.add_argument("--seed", type=int, default=None, help="random seed for reproducible random sampling") + # config-watch + p_cw = sub.add_parser("config-watch", + help="detect AGENTS.md and config file changes between sessions") + cw_sub = p_cw.add_subparsers(dest="config_watch_command") + + cw_snap = cw_sub.add_parser("snapshot", help="record a snapshot of current config files") + cw_snap.add_argument("--label", metavar="TEXT", + help="human-readable label for this snapshot") + cw_snap.add_argument("--watch", metavar="PATH", action="append", + help="additional file to watch (repeatable)") + + cw_check = cw_sub.add_parser("check", + help="check whether config has changed since last snapshot") + cw_check.add_argument("--watch", metavar="PATH", action="append", + help="additional file to watch (repeatable)") + cw_check.add_argument("--format", choices=["text", "json"], default="text") + + cw_hist = cw_sub.add_parser("history", help="show full snapshot history") + cw_hist.add_argument("--format", choices=["text", "json"], default="text") + + cw_aff = cw_sub.add_parser("affected", + help="list sessions that ran after a config change") + cw_aff.add_argument("--since", metavar="DURATION", + help="only sessions newer than this (e.g. 7d, 24h)") + cw_aff.add_argument("--format", choices=["text", "json"], default="text") + # compare p_compare = sub.add_parser("compare", help="session-to-session regression report") p_compare.add_argument("session_id_a", nargs="?", @@ -1053,6 +1080,7 @@ def main() -> None: "auto": cmd_auto, "budget-report": cmd_budget_report, "compare": cmd_compare, + "config-watch": cmd_config_watch, "lint": cmd_lint, "retention": cmd_retention, "sample": cmd_sample, diff --git a/tests/test_config_watch.py b/tests/test_config_watch.py new file mode 100644 index 0000000..1555302 --- /dev/null +++ b/tests/test_config_watch.py @@ -0,0 +1,480 @@ +"""Tests for agent-strace config-watch (Issue #86).""" + +from __future__ import annotations + +import json +import tempfile +import time +import unittest +from pathlib import Path + +from agent_trace.config_watch import ( + ConfigSnapshot, + FileDiff, + FileSnapshot, + SnapshotDiff, + _hash_file, + _load_snapshots, + _load_watch_paths, + _save_snapshots, + _snapshot_file, + cmd_config_watch, + diff_snapshots, + find_affected_sessions, + format_affected, + format_check, + format_history, + take_snapshot, + DEFAULT_WATCH_PATHS, +) +from agent_trace.models import EventType, SessionMeta, TraceEvent +from agent_trace.store import TraceStore + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_workspace() -> tuple[Path, TraceStore]: + tmp = Path(tempfile.mkdtemp()) + trace_dir = tmp / ".agent-traces" + trace_dir.mkdir() + store = TraceStore(trace_dir) + return tmp, store + + +def _add_session(store: TraceStore, started_at: float) -> str: + meta = SessionMeta(agent_name="test", command="test") + sp = store.create_session(meta) + sid = sp.name + meta2 = store.load_meta(sid) + meta2.started_at = started_at + store.update_meta(meta2) + e = TraceEvent(event_type=EventType.SESSION_END, timestamp=started_at + 60, + session_id=sid, data={}) + store.append_event(sid, e) + return sid + + +# --------------------------------------------------------------------------- +# _hash_file +# --------------------------------------------------------------------------- + +class TestHashFile(unittest.TestCase): + def test_returns_hex_string(self): + with tempfile.NamedTemporaryFile(mode="w", suffix=".md", delete=False) as f: + f.write("# AGENTS\nDo stuff.") + path = Path(f.name) + h = _hash_file(path) + self.assertEqual(len(h), 64) + self.assertTrue(all(c in "0123456789abcdef" for c in h)) + + def test_returns_empty_for_missing_file(self): + h = _hash_file(Path("/nonexistent/path.md")) + self.assertEqual(h, "") + + def test_different_content_different_hash(self): + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write("version 1") + p = Path(f.name) + h1 = _hash_file(p) + p.write_text("version 2") + h2 = _hash_file(p) + self.assertNotEqual(h1, h2) + + def test_same_content_same_hash(self): + with tempfile.NamedTemporaryFile(mode="w", delete=False) as f: + f.write("same content") + p = Path(f.name) + self.assertEqual(_hash_file(p), _hash_file(p)) + + +# --------------------------------------------------------------------------- +# _snapshot_file +# --------------------------------------------------------------------------- + +class TestSnapshotFile(unittest.TestCase): + def test_existing_file(self): + root = Path(tempfile.mkdtemp()) + (root / "AGENTS.md").write_text("# AGENTS") + snap = _snapshot_file(root, "AGENTS.md") + self.assertTrue(snap.exists) + self.assertEqual(len(snap.sha256), 64) + self.assertGreater(snap.mtime, 0) + + def test_missing_file(self): + root = Path(tempfile.mkdtemp()) + snap = _snapshot_file(root, "AGENTS.md") + self.assertFalse(snap.exists) + self.assertEqual(snap.sha256, "") + self.assertEqual(snap.mtime, 0.0) + + +# --------------------------------------------------------------------------- +# take_snapshot / _load_snapshots / _save_snapshots +# --------------------------------------------------------------------------- + +class TestSnapshotPersistence(unittest.TestCase): + def test_snapshot_saved_and_loaded(self): + root = Path(tempfile.mkdtemp()) + (root / ".agent-traces").mkdir() + (root / "AGENTS.md").write_text("# AGENTS") + snap = take_snapshot(root, ["AGENTS.md"], label="test") + loaded = _load_snapshots(root) + self.assertEqual(len(loaded), 1) + self.assertEqual(loaded[0].snapshot_id, snap.snapshot_id) + self.assertEqual(loaded[0].label, "test") + + def test_multiple_snapshots_appended(self): + root = Path(tempfile.mkdtemp()) + (root / ".agent-traces").mkdir() + take_snapshot(root, ["AGENTS.md"]) + take_snapshot(root, ["AGENTS.md"]) + loaded = _load_snapshots(root) + self.assertEqual(len(loaded), 2) + + def test_empty_store_returns_empty_list(self): + root = Path(tempfile.mkdtemp()) + self.assertEqual(_load_snapshots(root), []) + + def test_snapshot_with_session_id(self): + root = Path(tempfile.mkdtemp()) + (root / ".agent-traces").mkdir() + snap = take_snapshot(root, [], session_id="abc123") + loaded = _load_snapshots(root) + self.assertEqual(loaded[0].session_id, "abc123") + + def test_serialisation_roundtrip(self): + root = Path(tempfile.mkdtemp()) + (root / ".agent-traces").mkdir() + (root / "AGENTS.md").write_text("hello") + snap = take_snapshot(root, ["AGENTS.md"]) + loaded = _load_snapshots(root) + self.assertEqual(loaded[0].files[0].path, "AGENTS.md") + self.assertTrue(loaded[0].files[0].exists) + + +# --------------------------------------------------------------------------- +# diff_snapshots +# --------------------------------------------------------------------------- + +class TestDiffSnapshots(unittest.TestCase): + def _make_snap(self, files: dict[str, str]) -> ConfigSnapshot: + """files: {path: sha256 or '' for absent}""" + file_snaps = [ + FileSnapshot(path=p, sha256=h, mtime=1.0 if h else 0.0, exists=bool(h)) + for p, h in files.items() + ] + return ConfigSnapshot( + snapshot_id="test", + timestamp=time.time(), + files=file_snaps, + ) + + def test_no_changes(self): + a = self._make_snap({"AGENTS.md": "abc123"}) + b = self._make_snap({"AGENTS.md": "abc123"}) + diff = diff_snapshots(a, b) + self.assertFalse(diff.has_changes) + + def test_modified_file(self): + a = self._make_snap({"AGENTS.md": "abc123"}) + b = self._make_snap({"AGENTS.md": "def456"}) + diff = diff_snapshots(a, b) + self.assertTrue(diff.has_changes) + self.assertEqual(diff.changed_paths, ["AGENTS.md"]) + self.assertEqual(diff.changes[0].change, "modified") + + def test_added_file(self): + a = self._make_snap({}) + b = self._make_snap({"AGENTS.md": "abc123"}) + diff = diff_snapshots(a, b) + self.assertTrue(diff.has_changes) + self.assertEqual(diff.changes[0].change, "added") + + def test_removed_file(self): + a = self._make_snap({"AGENTS.md": "abc123"}) + b = self._make_snap({"AGENTS.md": ""}) + diff = diff_snapshots(a, b) + self.assertTrue(diff.has_changes) + self.assertEqual(diff.changes[0].change, "removed") + + def test_multiple_changes(self): + a = self._make_snap({"AGENTS.md": "aaa", "policy.json": "bbb"}) + b = self._make_snap({"AGENTS.md": "ccc", "policy.json": "bbb"}) + diff = diff_snapshots(a, b) + self.assertEqual(len(diff.changed_paths), 1) + self.assertIn("AGENTS.md", diff.changed_paths) + + def test_both_absent_not_reported(self): + a = self._make_snap({"AGENTS.md": ""}) + b = self._make_snap({"AGENTS.md": ""}) + diff = diff_snapshots(a, b) + self.assertFalse(diff.has_changes) + + +# --------------------------------------------------------------------------- +# _load_watch_paths +# --------------------------------------------------------------------------- + +class TestLoadWatchPaths(unittest.TestCase): + def test_defaults_returned(self): + root = Path(tempfile.mkdtemp()) + paths = _load_watch_paths(root) + self.assertIn("AGENTS.md", paths) + + def test_extra_paths_added(self): + root = Path(tempfile.mkdtemp()) + paths = _load_watch_paths(root, extra=["custom/prompt.txt"]) + self.assertIn("custom/prompt.txt", paths) + + def test_config_file_merged(self): + root = Path(tempfile.mkdtemp()) + (root / ".agent-strace-watch.json").write_text( + json.dumps({"watch": ["my_prompt.md"]}) + ) + paths = _load_watch_paths(root) + self.assertIn("my_prompt.md", paths) + + def test_no_duplicates(self): + root = Path(tempfile.mkdtemp()) + paths = _load_watch_paths(root, extra=["AGENTS.md"]) + self.assertEqual(paths.count("AGENTS.md"), 1) + + +# --------------------------------------------------------------------------- +# find_affected_sessions +# --------------------------------------------------------------------------- + +class TestFindAffectedSessions(unittest.TestCase): + def test_no_snapshots_returns_empty(self): + ws, store = _make_workspace() + result = find_affected_sessions(store, ws) + self.assertEqual(result, []) + + def test_one_snapshot_returns_empty(self): + ws, store = _make_workspace() + take_snapshot(ws, []) + result = find_affected_sessions(store, ws) + self.assertEqual(result, []) + + def test_session_after_change_flagged(self): + ws, store = _make_workspace() + agents_md = ws / "AGENTS.md" + + # Snapshot 1: AGENTS.md = "v1" + agents_md.write_text("v1") + snap1 = take_snapshot(ws, ["AGENTS.md"]) + + # Change AGENTS.md + agents_md.write_text("v2") + snap2 = take_snapshot(ws, ["AGENTS.md"]) + + # Session runs after the change + now = time.time() + sid = _add_session(store, started_at=snap2.timestamp + 10) + + affected = find_affected_sessions(store, ws) + session_ids = [a[0] for a in affected] + self.assertIn(sid, session_ids) + + def test_session_before_change_not_flagged(self): + ws, store = _make_workspace() + agents_md = ws / "AGENTS.md" + + # Session runs first + now = time.time() + sid = _add_session(store, started_at=now - 100) + + # Then config changes + agents_md.write_text("v1") + take_snapshot(ws, ["AGENTS.md"]) + agents_md.write_text("v2") + take_snapshot(ws, ["AGENTS.md"]) + + affected = find_affected_sessions(store, ws) + session_ids = [a[0] for a in affected] + self.assertNotIn(sid, session_ids) + + def test_no_change_between_snapshots_no_affected(self): + ws, store = _make_workspace() + agents_md = ws / "AGENTS.md" + agents_md.write_text("same") + take_snapshot(ws, ["AGENTS.md"]) + take_snapshot(ws, ["AGENTS.md"]) # no change + + sid = _add_session(store, started_at=time.time()) + affected = find_affected_sessions(store, ws) + self.assertEqual(affected, []) + + +# --------------------------------------------------------------------------- +# Formatting +# --------------------------------------------------------------------------- + +class TestFormatting(unittest.TestCase): + def _make_diff(self, has_changes: bool) -> SnapshotDiff: + changes = [ + FileDiff(path="AGENTS.md", change="modified" if has_changes else "unchanged", + old_sha="aaa", new_sha="bbb" if has_changes else "aaa") + ] + return SnapshotDiff( + snapshot_a_id="snap1", + snapshot_b_id="snap2", + timestamp_a=time.time() - 3600, + timestamp_b=time.time(), + changes=changes, + ) + + def test_format_check_no_changes(self): + import io + diff = self._make_diff(has_changes=False) + out = io.StringIO() + format_check(diff, out) + self.assertIn("No config changes", out.getvalue()) + + def test_format_check_with_changes(self): + import io + diff = self._make_diff(has_changes=True) + out = io.StringIO() + format_check(diff, out) + self.assertIn("AGENTS.md", out.getvalue()) + self.assertIn("modified", out.getvalue()) + + def test_format_history_empty(self): + import io + out = io.StringIO() + format_history([], out) + self.assertIn("No snapshots", out.getvalue()) + + def test_format_history_with_snapshots(self): + import io + root = Path(tempfile.mkdtemp()) + (root / ".agent-traces").mkdir() + (root / "AGENTS.md").write_text("v1") + take_snapshot(root, ["AGENTS.md"]) + (root / "AGENTS.md").write_text("v2") + take_snapshot(root, ["AGENTS.md"]) + snaps = _load_snapshots(root) + out = io.StringIO() + format_history(snaps, out) + text = out.getvalue() + self.assertIn("snapshot(s)", text) + + def test_format_affected_empty(self): + import io + out = io.StringIO() + format_affected([], out) + self.assertIn("No sessions", out.getvalue()) + + def test_format_affected_with_results(self): + import io + out = io.StringIO() + format_affected([("abc123def456", "2026-05-23 10:00", ["AGENTS.md"])], out) + self.assertIn("abc123def456", out.getvalue()) + self.assertIn("AGENTS.md", out.getvalue()) + + +# --------------------------------------------------------------------------- +# cmd_config_watch CLI +# --------------------------------------------------------------------------- + +class TestCmdConfigWatch(unittest.TestCase): + def _args(self, trace_dir, subcommand=None, label=None, watch=None, + fmt="text", since=None): + import argparse + args = argparse.Namespace() + args.trace_dir = str(trace_dir) + args.config_watch_command = subcommand + args.label = label + args.watch = watch + args.format = fmt + args.since = since + return args + + def test_snapshot_returns_0(self): + ws, store = _make_workspace() + args = self._args(ws / ".agent-traces", subcommand="snapshot") + result = cmd_config_watch(args) + self.assertEqual(result, 0) + + def test_snapshot_creates_file(self): + ws, store = _make_workspace() + args = self._args(ws / ".agent-traces", subcommand="snapshot") + cmd_config_watch(args) + snaps = _load_snapshots(ws) + self.assertEqual(len(snaps), 1) + + def test_check_no_snapshot_returns_1(self): + ws, store = _make_workspace() + args = self._args(ws / ".agent-traces", subcommand="check") + result = cmd_config_watch(args) + self.assertEqual(result, 1) + + def test_check_no_changes_returns_0(self): + ws, store = _make_workspace() + (ws / "AGENTS.md").write_text("v1") + take_snapshot(ws, ["AGENTS.md"]) + args = self._args(ws / ".agent-traces", subcommand="check", + watch=["AGENTS.md"]) + result = cmd_config_watch(args) + self.assertEqual(result, 0) + + def test_check_with_changes_returns_1(self): + ws, store = _make_workspace() + (ws / "AGENTS.md").write_text("v1") + take_snapshot(ws, ["AGENTS.md"]) + (ws / "AGENTS.md").write_text("v2 — changed!") + args = self._args(ws / ".agent-traces", subcommand="check", + watch=["AGENTS.md"]) + result = cmd_config_watch(args) + self.assertEqual(result, 1) + + def test_check_json_format(self): + import io + from unittest.mock import patch + ws, store = _make_workspace() + (ws / "AGENTS.md").write_text("v1") + take_snapshot(ws, ["AGENTS.md"]) + (ws / "AGENTS.md").write_text("v2") + args = self._args(ws / ".agent-traces", subcommand="check", + watch=["AGENTS.md"], fmt="json") + captured = io.StringIO() + with patch("sys.stdout", captured): + cmd_config_watch(args) + data = json.loads(captured.getvalue()) + self.assertIn("has_changes", data) + self.assertTrue(data["has_changes"]) + + def test_history_returns_0(self): + ws, store = _make_workspace() + take_snapshot(ws, []) + args = self._args(ws / ".agent-traces", subcommand="history") + result = cmd_config_watch(args) + self.assertEqual(result, 0) + + def test_affected_returns_0(self): + ws, store = _make_workspace() + args = self._args(ws / ".agent-traces", subcommand="affected") + result = cmd_config_watch(args) + self.assertEqual(result, 0) + + def test_snapshot_with_label(self): + ws, store = _make_workspace() + args = self._args(ws / ".agent-traces", subcommand="snapshot", + label="before-deploy") + cmd_config_watch(args) + snaps = _load_snapshots(ws) + self.assertEqual(snaps[0].label, "before-deploy") + + def test_none_subcommand_defaults_to_snapshot(self): + ws, store = _make_workspace() + args = self._args(ws / ".agent-traces", subcommand=None) + result = cmd_config_watch(args) + self.assertEqual(result, 0) + snaps = _load_snapshots(ws) + self.assertEqual(len(snaps), 1) + + +if __name__ == "__main__": + unittest.main()