diff --git a/.claude/hooks/.canonical-sha256 b/.claude/hooks/.canonical-sha256 new file mode 100644 index 0000000..7dbd396 --- /dev/null +++ b/.claude/hooks/.canonical-sha256 @@ -0,0 +1,8 @@ +cfd43f72b3f64bde6cb779703eb13ea6dd2c55ea5ae3dace654bfa95e17345c9 security_guard.py +37ee358245e8be80b00517c32d586449cb669d6d6e02526cc37c0e6728c452d5 format_on_save.py +f06a2180e64db35f96bdb896fbbfa9bf0ebc5090744817f5b87a7f0fbbb7ec61 compact_warning.py +efba0f96c211161f3bf39223177dd833b2a58a62f6c603a134afc6ed8fbb57c8 spec_orient.py +ddc5bbd50cad7df0ee1cacb61d19bbdac5c4495f247f20505fef6ed844c3fd01 _state.py +63293f305ff32aab46d1da8b9d28c71ce39b658d2a8572c64024614abdf7dffe _resume_prompt.py +baa145fb6fac25ae7d03a5b655b04aba25bfb77793dcdcaf44acc151394f030b _transcript_size.py +48674de791f509c539417b29214d9c87a33b7934b985597af79711ddd90ea17a _sdk_gate.py diff --git a/.claude/hooks/_resume_prompt.py b/.claude/hooks/_resume_prompt.py new file mode 100644 index 0000000..d2b5bfc --- /dev/null +++ b/.claude/hooks/_resume_prompt.py @@ -0,0 +1,194 @@ +"""Resume-prompt builder — single source of truth for the format. + +Both the Stop-hook compact warning and the ``/handoff`` slash +command call ``build_resume_prompt`` so the user sees identical +output whether the prompt was triggered automatically or on +demand. + +Format: a Markdown blockquote that the user can paste into a +fresh Claude Code session to pick up where they left off. + +Copyright 2026 Smart-AI-Memory +Licensed under Apache 2.0 +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +# Hooks are invoked as standalone scripts; ensure sibling helpers +# resolve regardless of how this module was loaded (script vs. +# pytest import vs. importlib). +_HOOKS_DIR = str(Path(__file__).resolve().parent) +if _HOOKS_DIR not in sys.path: + sys.path.insert(0, _HOOKS_DIR) + +from _state import GitState, SpecInfo # noqa: E402 — sys.path bootstrap above + +# Hard cap on the rendered prompt — protects against pathological +# uncommitted-file lists or oversized commit subjects. +_MAX_PROMPT_BYTES = 4096 + +# Cap on the number of uncommitted files listed before truncation. +_MAX_UNCOMMITTED = 20 + + +def _format_uncommitted(uncommitted: tuple[str, ...]) -> list[str]: + """Render the uncommitted-file lines with truncation footer.""" + if not uncommitted: + return [] + visible = list(uncommitted[:_MAX_UNCOMMITTED]) + lines = [f"> - {path}" for path in visible] + leftover = len(uncommitted) - len(visible) + if leftover > 0: + lines.append(f"> - +{leftover} more") + return lines + + +def _format_phase(spec: SpecInfo) -> str: + """Human-readable ``Phase X (status)`` blurb for the spec.""" + phase_label = { + "requirements": "Phase 1 (Requirements)", + "design": "Phase 2 (Design)", + "tasks": "Phase 3 (Tasks)", + }.get(spec.phase, spec.phase) + status = spec.status or "unknown" + return f"{phase_label} — Status: {status}" + + +def _truncate(text: str) -> str: + """Truncate to the byte cap with a footer, only if needed.""" + encoded = text.encode("utf-8") + if len(encoded) <= _MAX_PROMPT_BYTES: + return text + footer = "\n> …(truncated)\n" + budget = _MAX_PROMPT_BYTES - len(footer.encode("utf-8")) + if budget <= 0: + return text[:_MAX_PROMPT_BYTES] + truncated = encoded[:budget].decode("utf-8", errors="ignore") + return truncated + footer + + +def _read_current_task(spec: SpecInfo) -> str: + """Pick a one-line ``current task`` derived from tasks.md. + + Strategy: scan ``tasks.md`` for the most-recent + ``Status: completed`` row in the implementation table and + quote the row's task description. Caller may override with a + live TodoWrite snapshot via ``todo_summary`` when available. + + Returns an empty string when no completed task is found + (e.g. spec is at requirements/design phase). + """ + tasks_file = spec.path / "tasks.md" + if not tasks_file.is_file(): + return "" + try: + text = tasks_file.read_text(encoding="utf-8", errors="replace") + except OSError: + return "" + last_completed = "" + for line in text.splitlines(): + if "completed" not in line.lower(): + continue + if not line.lstrip().startswith("|"): + continue + cells = [c.strip() for c in line.strip().strip("|").split("|")] + if len(cells) < 3: + continue + # Expected layout: | # | Task | Status | Notes | + status_cell = cells[2].lower() if len(cells) > 2 else "" + if "completed" not in status_cell and "complete" not in status_cell: + continue + task_cell = cells[1] + # Strip leading bold/markdown formatting for readability. + task_cell = task_cell.replace("**", "").strip() + if task_cell: + last_completed = task_cell + return last_completed + + +def build_resume_prompt( + spec_info: SpecInfo | None, + git_state: GitState, + *, + workspace_path: str = "~/attune", + todo_summary: str | None = None, +) -> str: + """Render the user-facing resume prompt body. + + Args: + spec_info: Most-recent in-flight spec, or ``None`` for a + generic fallback that points at the last commit. + git_state: Branch / last-commit / uncommitted snapshot + from ``_state.git_state``. + workspace_path: Display path for the worktree (purely + cosmetic — what the user pastes into a fresh session). + todo_summary: Optional one-line ``current task`` override. + When provided, takes precedence over the + tasks.md-derived fallback. + + Returns: + Markdown blockquote ending with a ``Pick up …`` line. + Always under 4 kB. + """ + branch = git_state.branch or "" + last_commit = "" + if git_state.last_sha: + subject = git_state.last_subject or "(no subject)" + last_commit = f"`{git_state.last_sha} {subject}`" + + lines: list[str] = ["**Resume prompt for a fresh session:**", ""] + blockquote: list[str] = [ + f"> Resume work in worktree `{workspace_path}` on branch `{branch}`.", + ] + if last_commit: + blockquote.append(f"> Last commit: {last_commit}.") + + if spec_info is not None: + blockquote.append(">") + spec_relative = _spec_display_path(spec_info) + blockquote.append(f"> Active spec: `{spec_relative}` — {_format_phase(spec_info)}.") + current_task = todo_summary or _read_current_task(spec_info) + if current_task: + blockquote.append(f"> Current task: {current_task}") + uncommitted_lines = _format_uncommitted(git_state.uncommitted) + if uncommitted_lines: + blockquote.append(">") + blockquote.append("> Uncommitted:") + blockquote.extend(uncommitted_lines) + blockquote.append(">") + blockquote.append(f"> Pick up where the spec left off in `{spec_relative}tasks.md`.") + else: + if todo_summary: + blockquote.append(">") + blockquote.append(f"> Current task: {todo_summary}") + blockquote.append(">") + blockquote.append("> No active spec; pick up from the last commit's diff.") + + lines.extend(blockquote) + lines.append("") + lines.append("(Copy the block above into a fresh Claude Code session.)") + return _truncate("\n".join(lines)) + + +def _spec_display_path(spec_info: SpecInfo) -> str: + """Pretty-relative display path for a spec. + + Aims for ``specs//`` (workspace) or + ``/specs//`` (layer-scoped). Falls back to the + absolute path if the layout doesn't match. + """ + parts = spec_info.path.parts + try: + idx = parts.index("specs") + except ValueError: + return str(spec_info.path) + relative = "/".join(parts[idx:]) + "/" + if spec_info.layer != "workspace": + relative = f"{spec_info.layer}/{relative}" + # Avoid duplicating the layer if it was already in parts. + if Path(relative).parts.count(spec_info.layer) > 1: + relative = "/".join(parts[idx:]) + "/" + return relative diff --git a/.claude/hooks/_sdk_gate.py b/.claude/hooks/_sdk_gate.py new file mode 100644 index 0000000..3a0c165 --- /dev/null +++ b/.claude/hooks/_sdk_gate.py @@ -0,0 +1,42 @@ +"""Detect SDK-spawned subprocess sessions so hooks can self-gate. + +sdk-subprocess-isolation spec (R1/R2, decision D1: gate everything): +an SDK-spawned ``claude`` subprocess is not an interactive session — +no attune hook applies there, and SessionStart hook stdout poisons +the SDK's stream-json channel (the failure that broke every SDK +workflow for subscription users). Every hook script calls +:func:`exit_if_sdk_subprocess` as its first ``__main__`` statement. + +Two detection signals (spec D3): + +- ``ATTUNE_SDK_SUBPROCESS=1`` — attune's explicit marker, set by + ``agent_sdk_adapter.sdk_isolation_kwargs()`` (Phase 2). +- ``CLAUDE_CODE_ENTRYPOINT`` starting with ``sdk-`` — stamped by the + Agent SDK itself into every subprocess env, so the gate also covers + third-party SDK scripts that never touch attune's adapter. + Interactive sessions carry other values (``claude-desktop``, etc.). + +Twin copy: ``src/attune/hooks/scripts/_sdk_gate.py`` (repo-level +hooks). Keep both in sync — each is imported from its own script dir. + +Copyright 2026 Smart-AI-Memory +Licensed under Apache 2.0 +""" + +from __future__ import annotations + +import os +import sys + + +def is_sdk_subprocess() -> bool: + """True when running inside an SDK-spawned ``claude`` subprocess.""" + if os.environ.get("ATTUNE_SDK_SUBPROCESS") == "1": + return True + return os.environ.get("CLAUDE_CODE_ENTRYPOINT", "").startswith("sdk-") + + +def exit_if_sdk_subprocess() -> None: + """Exit 0 with no output when inside an SDK subprocess session.""" + if is_sdk_subprocess(): + sys.exit(0) diff --git a/.claude/hooks/_state.py b/.claude/hooks/_state.py new file mode 100644 index 0000000..bb2bbaf --- /dev/null +++ b/.claude/hooks/_state.py @@ -0,0 +1,546 @@ +"""Shared state-discovery helpers for session-continuity hooks. + +Pure-Python module — no Claude Code SDK calls, no network I/O. +Used by `spec_orient.py`, `compact_warning.py`, and the +`/handoff` slash command. + +Three responsibilities: + +1. ``discover_specs(roots)`` — walk every ``specs/`` directory + under ``roots`` for in-flight specs, returns most-recently + modified first. +2. ``git_state(cwd)`` — branch + last commit + uncommitted file + list. Tolerates missing git or non-repo paths. +3. ``session_sentinel_path(session_id)`` — the once-per-session + file used by ``compact_warning.py`` so the warning fires + exactly once. + +Copyright 2026 Smart-AI-Memory +Licensed under Apache 2.0 +""" + +from __future__ import annotations + +import os +import re +import subprocess +import time +from dataclasses import dataclass +from pathlib import Path + +# Phases checked, highest-priority first. The first phase file +# present in a spec directory determines the displayed phase +# and status. +_PHASE_FILES: tuple[tuple[str, str], ...] = ( + ("tasks", "tasks.md"), + ("design", "design.md"), + ("requirements", "requirements.md"), +) + +# Robust to markdown bold variants around the label and colon: +# ``Status:``, ``**Status**:``, ``**Status:**`` (colon inside the bold), +# ``*Status*:``. Brittleness here was a real bug — a ``**Status:** +# complete`` header matched none of the old pattern and the spec stayed +# in-flight despite being marked done (see decisions.md DECIDE-2). +_STATUS_LINE = re.compile( + r"^\s*\**\s*Status\**\s*:\s*\**\s*(.+?)\s*$", + re.IGNORECASE | re.MULTILINE, +) + +# Self-truthing additions — match terminal signals anywhere in +# the spec file (not just the header), so a stale "draft" header +# above a closed checklist doesn't keep the spec in-flight. +# +# NB: no ``$`` anchor and a ``\b`` after the verdict, so an +# *informative* terminal line — ``Status: complete (2026-06-09) — +# shipped #694`` — is recognized, not just the bare ``Status: +# complete``. Before this, the natural human format was silently +# ignored and the spec stayed in-flight forever despite being marked +# done (see decisions.md DECIDE-2). +_TERMINAL_LINE = re.compile( + r"^\s*\**\s*(?:Spec\s+)?Status\**\s*:\s*\**\s*" + r"(closed|complete|completed|retired|superseded|shipped|done)\b", + re.IGNORECASE | re.MULTILINE, +) +_CHECKLIST_HEADING = re.compile( + r"^##\s+Completion\s+checklist\s*$", + re.IGNORECASE | re.MULTILINE, +) +_CHECKLIST_LINE = re.compile( + r"^\s*-\s*\[([ xX])\]\s+(.*?)\s*$", + re.MULTILINE, +) +_DEFERRED_MARKERS = re.compile( + r"(~~.*?~~|\bdeferred\b|\bN/A\b|\bwon't\s+do\b)", + re.IGNORECASE, +) +_NEXT_H2 = re.compile(r"^##\s+", re.MULTILINE) +_TERMINAL_VERDICTS = frozenset( + {"closed", "complete", "completed", "retired", "superseded", "shipped", "done"} +) +# Ongoing-by-design statuses: a living roadmap / continuous program is not +# pending work, so it should NOT show as in-flight — but it is also not +# "done". Excluded from the in-flight list, kept distinct from terminal. +_ONGOING_VERDICTS = frozenset({"living", "ongoing"}) + +# Leading alphabetic word of a status value, for first-word tokenization: +# ``complete (2026-06-09) — shipped #694`` -> ``complete``. +_LEADING_WORD = re.compile(r"[a-zA-Z]+") + + +def _leading_verdict(status: str) -> str: + """Return the lowercased leading word of a status value, or ``""``. + + Status headers are written informatively (``complete (date) — + reason``), so terminal/ongoing recognition keys off the FIRST word, + not exact-string membership. This is the fix for the class of bug + where a correctly-marked-``complete`` spec stayed in-flight forever + because ``"complete (date) — ..."`` is not in ``_TERMINAL_VERDICTS``. + """ + # search (not match) so a stray leading ``**``/punctuation doesn't + # swallow the verdict — the first alphabetic run is the word we want. + m = _LEADING_WORD.search(status) + return m.group(0).lower() if m else "" + + +# Sentinel TTL: anything older than this on a SessionStart prune +# sweep is considered orphaned from an ungraceful exit. +_SENTINEL_TTL_SECONDS = 7 * 24 * 60 * 60 + + +@dataclass(frozen=True) +class SpecInfo: + """One in-flight spec discovered under a workspace root.""" + + slug: str + """Directory name, e.g. ``precompact-sessionstart-hooks``.""" + + path: Path + """Absolute path to the spec directory.""" + + layer: str + """Layer slug (``workspace`` for root specs, else + ``attune-rag`` / ``attune-help`` / etc.).""" + + phase: str + """``requirements`` | ``design`` | ``tasks``.""" + + status: str + """Verbatim status line value, lowercased + (e.g. ``approved``, ``in-progress``, ``draft``). + + Raw header value, preserved for back-compat. Consumers that + care about completion-state reconciliation should read + ``effective_status`` instead. + """ + + mtime: float + """Most-recent mtime across spec files (seconds since epoch).""" + + # Self-truthing additions (2026-06-02). All optional with safe + # defaults so existing positional constructors don't break. + effective_status: str = "" + """Reconciled status — overrides ``status`` when a terminal + signal (completed checklist / explicit closed line) is found + deeper in the file. Falls back to ``status`` when no terminal + signal is present. See DECIDE-1 in the spec's decisions.md.""" + + status_source: str = "header" + """Where ``effective_status`` came from — one of + ``"header"`` / ``"checklist"`` / ``"terminal-line"``.""" + + status_conflict: bool = False + """True when ``effective_status`` overrode a non-terminal + ``status`` (i.e. header drifted away from completion state). + spec_orient renders a one-line hint when True.""" + + +@dataclass(frozen=True) +class GitState: + """Snapshot of the worktree's git state at hook fire time.""" + + branch: str + last_sha: str + last_subject: str + uncommitted: tuple[str, ...] + + +def _read_phase(path: Path) -> tuple[str, str]: + """Return ``(status, full_text)`` from a phase file. + + Status is the lowercased value from the first ``**Status:**`` + line, or empty string if absent. The full text is returned so + the reconciler can scan for terminal signals deeper in the file + without re-reading from disk. + """ + try: + text = path.read_text(encoding="utf-8", errors="replace") + except OSError: + return "", "" + match = _STATUS_LINE.search(text) + status = match.group(1).strip().lower() if match else "" + return status, text + + +def _read_status(path: Path) -> str: + """Return the lowercased status from a phase file, or empty. + + Back-compat shim — prefer ``_read_phase`` when you also need the + full text for reconciliation. + """ + status, _ = _read_phase(path) + return status + + +def _completion_signal(text: str) -> tuple[str | None, str]: + """Read terminal markers from a phase file's text. + + Looks for two signals: + + 1. **Terminal-line scan** — explicit ``Spec status: closed`` / + ``Status: complete`` / etc. anywhere in the file. + 2. **Completion-checklist scan** — a ``## Completion checklist`` + section where all non-deferred rows are checked. + + Returns ``(verdict, source)``: + - verdict: ``"closed"`` / ``"complete"`` / ``"retired"`` / + ``"superseded"`` if a terminal signal exists, else None. + - source: ``"terminal-line"`` / ``"checklist"`` when verdict + is non-None, else ``"header"``. + """ + # 1. Terminal-line scan — short-circuit on first hit. + match = _TERMINAL_LINE.search(text) + if match: + return match.group(1).lower(), "terminal-line" + + # 2. Completion-checklist scan. + heading = _CHECKLIST_HEADING.search(text) + if heading is None: + return None, "header" + + section_start = heading.end() + next_heading = _NEXT_H2.search(text, section_start) + section_end = next_heading.start() if next_heading else len(text) + section = text[section_start:section_end] + + items = list(_CHECKLIST_LINE.finditer(section)) + if not items: + return None, "header" + + checked = 0 + outstanding = 0 + for item in items: + box, body = item.group(1), item.group(2) + if _DEFERRED_MARKERS.search(body): + continue # deferred — not outstanding + if box.strip().lower() == "x": + checked += 1 + else: + outstanding += 1 + + # All non-deferred items checked, AND at least one was checked + # (guard against empty / all-deferred sections producing a + # spurious "complete" verdict). + if checked > 0 and outstanding == 0: + return "complete", "checklist" + + return None, "header" + + +def _reconcile_status(header_status: str, phase_text: str) -> tuple[str, str, bool]: + """Reconcile header status against completion signals. + + DECIDE-1 (decisions.md): terminal signal wins over stale + non-terminal headers. Falls back to header when no terminal + signal is present. + + Returns ``(effective_status, status_source, status_conflict)``. + """ + verdict, source = _completion_signal(phase_text) + if verdict is None: + return header_status, "header", False + + # Terminal signal exists. Per DECIDE-1, terminal wins. + # First-word tokenization so an informative header (``complete + # (date) — reason``) is recognized as terminal, not just the bare + # word — otherwise we'd flag a spurious conflict against a header + # that actually agrees with the body signal. + header_is_terminal = _leading_verdict(header_status) in _TERMINAL_VERDICTS + return verdict, source, not header_is_terminal + + +def _phase_for_dir( + spec_dir: Path, +) -> tuple[str, str, str, str, bool, float] | None: + """Pick the highest-priority phase file present in a spec dir. + + Returns ``(phase, raw_status, effective_status, status_source, + status_conflict, mtime)``: + + - ``phase`` — ``requirements`` / ``design`` / ``tasks`` + - ``raw_status`` — verbatim header status, lowercased + - ``effective_status`` — reconciled verdict (terminal signal + wins over a stale header per DECIDE-1) + - ``status_source`` — ``"header"`` / ``"checklist"`` / + ``"terminal-line"`` + - ``status_conflict`` — True when ``effective_status`` overrode + a non-terminal ``raw_status`` + - ``mtime`` — most recent across all phase files (fresh-file + bumps the spec to the top of the list) + + Returns ``None`` when no phase file is readable. + """ + chosen: tuple[str, str, str, str, bool] | None = None + latest_mtime = 0.0 + for phase, fname in _PHASE_FILES: + fpath = spec_dir / fname + if not fpath.is_file(): + continue + try: + file_mtime = fpath.stat().st_mtime + except OSError: + continue + if file_mtime > latest_mtime: + latest_mtime = file_mtime + if chosen is None: + raw_status, phase_text = _read_phase(fpath) + effective, source, conflict = _reconcile_status(raw_status, phase_text) + chosen = (phase, raw_status, effective, source, conflict) + if chosen is None: + return None + return chosen[0], chosen[1], chosen[2], chosen[3], chosen[4], latest_mtime + + +def _is_in_flight(phase: str, effective_status: str) -> bool: + """Decide whether a spec is in-flight per the reconciled verdict. + + Rules (keyed off the status's FIRST WORD, so an informative + header like ``complete (2026-06-09) — shipped #694`` is recognized, + not just the bare verdict — see DECIDE-2): + - First word is terminal (closed / complete / completed / retired + / superseded / shipped / done) → done, exclude — regardless of + phase. + - First word is ongoing-by-design (living / ongoing) → a continuous + program / living roadmap is not pending work, exclude. + - Empty status (malformed) → still in-flight (don't drop a + working spec because the heading was malformed). + - Anything else (draft / approved / in-progress / …) → in-flight. + + The historical "tasks + complete only" rule is subsumed; a + requirements.md with a body ``Status: closed`` is excluded too + (the self-truthing improvement). + """ + lead = _leading_verdict(effective_status) + if lead in _TERMINAL_VERDICTS or lead in _ONGOING_VERDICTS: + return False + return True + + +def _layer_for(roots: list[Path], base: Path) -> str: + """Resolve the layer slug for a spec's base directory. + + ``base`` is the directory that *contains* the spec subdir + (``specs`` or ``docs/specs``) — either the workspace root or a + layer dir. + + Workspace-root specs → ``workspace``. + Layer specs (``/attune-rag/...``) → ``attune-rag``. + """ + if base in roots: + return "workspace" + return base.name or "workspace" + + +# Spec-subdir conventions probed under each root and layer dir. +# attune-gui and workspace-root specs live in ``specs/``; +# attune-rag/author/help keep theirs in ``docs/specs/``. Probing both +# lets one identical hook serve every repo (supersedes a per-repo +# config). Root-level matches are processed before layer matches so a +# root ``docs/specs`` is attributed to ``workspace`` (see dedup below). +_SPEC_SUBDIRS: tuple[str, ...] = ("specs", "docs/specs") + + +def discover_specs(roots: list[Path]) -> list[SpecInfo]: + """Walk ``specs/`` directories under each root for in-flight specs. + + Args: + roots: Workspace roots to scan. Each root is checked for a + top-level ``specs/`` and for ``//specs/`` + directories (one nested level only — no recursive walk). + + Returns: + ``SpecInfo`` list, most-recently modified first. Tolerates + missing dirs and malformed status lines. + """ + found: list[SpecInfo] = [] + seen: set[Path] = set() + for root in roots: + # (base, specs_dir) pairs. ``base`` is the dir that contains the + # spec subdir and decides the layer label. Root-level bases are + # listed first so a ``docs/specs`` at the root is attributed to + # ``workspace`` before the layer walk reaches the same path. + candidate_bases: list[tuple[Path, Path]] = [(root, root / sub) for sub in _SPEC_SUBDIRS] + try: + for entry in sorted(root.iterdir()): + if entry.is_dir(): + candidate_bases.extend((entry, entry / sub) for sub in _SPEC_SUBDIRS) + except OSError: + continue + for base, specs_dir in candidate_bases: + if not specs_dir.is_dir(): + continue + try: + spec_dirs = sorted(p for p in specs_dir.iterdir() if p.is_dir()) + except OSError: + continue + for spec_dir in spec_dirs: + resolved = spec_dir.resolve() + if resolved in seen: + continue + seen.add(resolved) + phase_info = _phase_for_dir(spec_dir) + if phase_info is None: + continue + phase, raw_status, effective, source, conflict, mtime = phase_info + if not _is_in_flight(phase, effective): + continue + found.append( + SpecInfo( + slug=spec_dir.name, + path=spec_dir, + layer=_layer_for(roots, base), + phase=phase, + status=raw_status, + mtime=mtime, + effective_status=effective, + status_source=source, + status_conflict=conflict, + ) + ) + found.sort(key=lambda s: s.mtime, reverse=True) + return found + + +def _run_git(cwd: Path, *args: str) -> str: + """Run a git command and return stdout, or empty on any failure.""" + try: + result = subprocess.run( + ["git", *args], + cwd=str(cwd), + capture_output=True, + text=True, + timeout=2.0, + check=False, + ) + except (OSError, subprocess.TimeoutExpired): + return "" + if result.returncode != 0: + return "" + return result.stdout + + +def git_state(cwd: Path) -> GitState: + """Return branch, last commit, and uncommitted files for ``cwd``. + + Empty fields when ``cwd`` isn't a git repo or git is missing. + """ + branch = _run_git(cwd, "rev-parse", "--abbrev-ref", "HEAD").strip() + if not branch: + return GitState(branch="", last_sha="", last_subject="", uncommitted=()) + log_output = _run_git(cwd, "log", "-1", "--format=%h%x09%s") + last_sha = "" + last_subject = "" + if log_output: + parts = log_output.strip().split("\t", 1) + last_sha = parts[0] + if len(parts) > 1: + last_subject = parts[1] + porcelain = _run_git(cwd, "status", "--porcelain") + uncommitted: list[str] = [] + for line in porcelain.splitlines(): + if len(line) < 4: + continue + # Porcelain format: ``XY `` (status flags + space + path). + path = line[3:].strip() + if path: + uncommitted.append(path) + return GitState( + branch=branch, + last_sha=last_sha, + last_subject=last_subject, + uncommitted=tuple(uncommitted), + ) + + +def _sentinel_dir() -> Path: + """Resolve the directory used for once-per-session sentinels. + + Honors ``ATTUNE_AI_SENTINEL_DIR`` for tests; otherwise uses + ``~/.attune``. The Claude Code session-state directory is + not documented as an env var as of 2026-05, so the + ``~/.attune`` fallback is the primary path. + """ + override = os.environ.get("ATTUNE_AI_SENTINEL_DIR") + if override: + return Path(override) + return Path.home() / ".attune" + + +def session_sentinel_path(session_id: str | None) -> Path: + """Path to the once-per-session compact-warning sentinel. + + Uses a sanitized session id so the path stays inside the + sentinel dir even with weird inputs. + """ + base = _sentinel_dir() + safe = "unknown" + if session_id: + safe = re.sub(r"[^A-Za-z0-9_-]", "_", session_id)[:64] or "unknown" + return base / f".compact-warned-{safe}" + + +def prune_stale_sentinels(now: float | None = None) -> int: + """Delete sentinel files older than the TTL. + + Returns the number of files removed. Safe to call when the + sentinel dir doesn't exist. + """ + sentinel_dir = _sentinel_dir() + if not sentinel_dir.is_dir(): + return 0 + cutoff = (now if now is not None else time.time()) - _SENTINEL_TTL_SECONDS + removed = 0 + try: + entries = list(sentinel_dir.iterdir()) + except OSError: + return 0 + for entry in entries: + if not entry.name.startswith(".compact-warned-"): + continue + try: + if entry.stat().st_mtime < cutoff: + entry.unlink() + removed += 1 + except OSError: + continue + return removed + + +def workspace_roots(cwd: Path | None = None) -> list[Path]: + """Best-effort guess at workspace roots to scan for specs. + + Order: + 1. ``ATTUNE_AI_WORKSPACE_ROOTS`` env var + (``os.pathsep``-separated: ``:`` on POSIX, ``;`` on Windows). + 2. The given ``cwd`` (or the process cwd). + 3. ``~/attune`` if it exists and isn't already in the list. + """ + override = os.environ.get("ATTUNE_AI_WORKSPACE_ROOTS") + if override: + return [Path(p) for p in override.split(os.pathsep) if p] + base = (cwd or Path.cwd()).resolve() + roots: list[Path] = [base] + home_workspace = Path.home() / "attune" + if home_workspace.is_dir() and home_workspace.resolve() not in {r.resolve() for r in roots}: + roots.append(home_workspace) + return roots diff --git a/.claude/hooks/_transcript_size.py b/.claude/hooks/_transcript_size.py new file mode 100644 index 0000000..f8d8e07 --- /dev/null +++ b/.claude/hooks/_transcript_size.py @@ -0,0 +1,142 @@ +"""Transcript-size proxy for context utilization. + +Stop-hook payloads from Claude Code do NOT expose a context- +utilization or token-count field (verified 2026-05-09). The +transcript JSONL on disk grows monotonically as the session +accumulates turns, so we use it as a crude-but-monotonic proxy: + + chars = sum of user/assistant message-body characters + tokens ≈ chars / chars-per-token + util = tokens / context window + +The absolute number can drift ±10–15% from real utilization; +that's fine. The hook needs a single threshold-crossing event +to fire the compact warning, not exact accounting. + +Both factors are tunable via env vars so V4 calibration (after +observing real sessions) can adjust without a code change. + +Copyright 2026 Smart-AI-Memory +Licensed under Apache 2.0 +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path + +# Defaults — V4 calibration may revise these. +_DEFAULT_CHARS_PER_TOKEN = 4.0 +_DEFAULT_CONTEXT_WINDOW_TOKENS = 200_000 # Sonnet 4.6 / Opus 4.7 window + + +def _chars_per_token() -> float: + raw = os.environ.get("ATTUNE_AI_CHARS_PER_TOKEN") + if not raw: + return _DEFAULT_CHARS_PER_TOKEN + try: + value = float(raw) + except ValueError: + return _DEFAULT_CHARS_PER_TOKEN + return value if value > 0 else _DEFAULT_CHARS_PER_TOKEN + + +def _context_window_tokens() -> int: + raw = os.environ.get("ATTUNE_AI_CONTEXT_WINDOW_TOKENS") + if not raw: + return _DEFAULT_CONTEXT_WINDOW_TOKENS + try: + value = int(raw) + except ValueError: + return _DEFAULT_CONTEXT_WINDOW_TOKENS + return value if value > 0 else _DEFAULT_CONTEXT_WINDOW_TOKENS + + +def _content_chars(content: object) -> int: + """Recursively count characters in a transcript message body. + + Anthropic transcripts can encode message ``content`` as a + plain string OR a list of typed parts (text, tool_use, + tool_result, image, …). Walk the structure and sum the + string-valued ``text`` / ``content`` fields. Anything else + (binary blobs, image refs) is ignored — those don't drive + context-window pressure proportionally. + """ + if content is None: + return 0 + if isinstance(content, str): + return len(content) + if isinstance(content, list): + return sum(_content_chars(part) for part in content) + if isinstance(content, dict): + total = 0 + text = content.get("text") + if isinstance(text, str): + total += len(text) + nested = content.get("content") + if nested is not None and not isinstance(nested, str): + total += _content_chars(nested) + elif isinstance(nested, str): + total += len(nested) + return total + return 0 + + +def _sum_message_chars(transcript_path: Path) -> int: + """Sum content characters across user/assistant turns in the JSONL. + + Tolerates malformed lines and missing files: returns whatever + has been counted so far. Never raises. + """ + if not transcript_path.is_file(): + return 0 + total = 0 + try: + with transcript_path.open("r", encoding="utf-8", errors="replace") as fh: + for line in fh: + line = line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + # Claude Code transcript records vary in shape; + # handle both the legacy ``{type, message}`` and + # newer flat ``{role, content}`` envelopes. + message = record.get("message") if isinstance(record, dict) else None + if isinstance(message, dict): + total += _content_chars(message.get("content")) + elif isinstance(record, dict): + role = record.get("role") or record.get("type") + if role in {"user", "assistant", "human"}: + total += _content_chars(record.get("content")) + except OSError: + return total + return total + + +def estimate_utilization(transcript_path: str | Path) -> float: + """Return estimated context utilization in ``[0.0, 1.0]``. + + Args: + transcript_path: Path to the session transcript JSONL. + Missing or unreadable transcripts return ``0.0``. + + Returns: + Utilization fraction. Clamped to ``[0.0, 1.0]``. + """ + if not transcript_path: + return 0.0 + path = Path(transcript_path) + chars = _sum_message_chars(path) + if chars <= 0: + return 0.0 + est_tokens = chars / _chars_per_token() + util = est_tokens / _context_window_tokens() + if util < 0.0: + return 0.0 + if util > 1.0: + return 1.0 + return util diff --git a/.claude/hooks/compact_warning.py b/.claude/hooks/compact_warning.py new file mode 100644 index 0000000..a07a9da --- /dev/null +++ b/.claude/hooks/compact_warning.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +"""Stop-hook compact-warning — fires once per session at threshold. + +Stop-hook payloads from Claude Code do NOT expose a context- +utilization field (verified 2026-05-09). We use a transcript- +size proxy via ``_transcript_size._estimate_utilization`` and +fire a single warning per session when utilization crosses +``ATTUNE_AI_COMPACT_WARNING_THRESHOLD`` (default 0.70). + +The warning recommends finishing the current thought, exiting, +and pasting the resume prompt into a fresh session. + +Copyright 2026 Smart-AI-Memory +Licensed under Apache 2.0 +""" + +from __future__ import annotations + +import json +import os +import sys +import traceback +from pathlib import Path + +# Force utf-8 on stdout and stderr. On Windows the default cp1252 +# encoding can't emit emoji/em-dash and would crash this hook (caught +# by the outer try/except → silent breakage). errors='replace' +# substitutes '?' for any stray non-encodable byte. +for _stream in (sys.stdout, sys.stderr): + if _stream.encoding and _stream.encoding.lower() != "utf-8": + _stream.reconfigure(encoding="utf-8", errors="replace") + +_HOOKS_DIR = str(Path(__file__).resolve().parent) +if _HOOKS_DIR not in sys.path: + sys.path.insert(0, _HOOKS_DIR) + +from _resume_prompt import build_resume_prompt # noqa: E402 +from _state import ( # noqa: E402 + discover_specs, + git_state, + session_sentinel_path, + workspace_roots, +) +from _transcript_size import estimate_utilization # noqa: E402 + +_DEFAULT_THRESHOLD = 0.70 + + +def _threshold() -> float: + raw = os.environ.get("ATTUNE_AI_COMPACT_WARNING_THRESHOLD") + if not raw: + return _DEFAULT_THRESHOLD + try: + value = float(raw) + except ValueError: + return _DEFAULT_THRESHOLD + if value <= 0 or value >= 1: + return _DEFAULT_THRESHOLD + return value + + +def format_warning(util: float, threshold: float, resume_body: str) -> str: + """Compose the user-facing warning + resume prompt.""" + util_pct = int(round(util * 100)) + threshold_pct = int(round(threshold * 100)) + return ( + f"\n⚠️ context at {util_pct}% (threshold {threshold_pct}%) — " + "auto-compact fires near 85%\n\n" + "Recommendation: finish your current thought, then exit. " + "Open a fresh session and paste the prompt below to pick up " + "cleanly:\n\n" + f"{resume_body}\n" + ) + + +def main() -> int: + """Entry point — never raises.""" + try: + try: + payload = json.load(sys.stdin) + except (json.JSONDecodeError, ValueError): + return 0 + transcript_path = payload.get("transcript_path") + if not transcript_path: + return 0 + util = estimate_utilization(transcript_path) + threshold = _threshold() + if util < threshold: + return 0 + sentinel = session_sentinel_path(payload.get("session_id")) + if sentinel.exists(): + return 0 # already warned this session + # Write the sentinel BEFORE the print so a re-fire mid-print + # can't double-emit the warning. + try: + sentinel.parent.mkdir(parents=True, exist_ok=True) + sentinel.write_text(f"{util:.4f}\n", encoding="utf-8") + except OSError: + # If we can't write the sentinel, skip the warning to + # avoid spamming on every Stop event. + return 0 + + cwd = Path(payload.get("cwd") or Path.cwd()) + roots = workspace_roots(cwd=cwd) + specs = discover_specs(roots) + spec = specs[0] if specs else None + git = git_state(cwd) + resume_body = build_resume_prompt(spec, git) + sys.stdout.write(format_warning(util, threshold, resume_body)) + return 0 + except Exception: # noqa: BLE001 — hook must never crash a session + traceback.print_exc(file=sys.stderr) + return 0 + + +if __name__ == "__main__": + from _sdk_gate import exit_if_sdk_subprocess + + exit_if_sdk_subprocess() + raise SystemExit(main()) diff --git a/.claude/hooks/format_on_save.py b/.claude/hooks/format_on_save.py new file mode 100644 index 0000000..53f8759 --- /dev/null +++ b/.claude/hooks/format_on_save.py @@ -0,0 +1,108 @@ +"""PostToolUse hook: auto-format Python files after Write/Edit. + +Runs black + ruff --fix on any .py file that Claude writes or edits. +Prevents CI failures from minor formatting issues. + +Inspired by Boris Cherny's PostToolUse formatting hook pattern. + +Reads tool result from stdin (JSON with tool_name and tool_input). +Exits 0 always (formatting is best-effort, never blocks). + +Copyright 2026 Smart-AI-Memory +Licensed under the Apache License, Version 2.0 +""" + +from __future__ import annotations + +import json +import logging +import subprocess +import sys +from pathlib import Path + +logger = logging.getLogger(__name__) + + +def _get_file_path(data: dict) -> str | None: + """Extract the file path from tool input. + + Args: + data: Parsed JSON from stdin with tool_name and tool_input. + + Returns: + File path string if found, None otherwise. + + """ + tool_input = data.get("tool_input", {}) + return tool_input.get("file_path") or tool_input.get("path") + + +def _is_python_file(path: str) -> bool: + """Check if the path points to a Python file. + + Args: + path: File path to check. + + Returns: + True if the file has a .py extension. + + """ + return Path(path).suffix == ".py" + + +def _run_formatter(cmd: list[str], path: str) -> None: + """Run a formatting command silently. + + Args: + cmd: Command and arguments to run. + path: File path to format. + + """ + try: + subprocess.run( + [*cmd, path], + capture_output=True, + timeout=10, + ) + except (subprocess.TimeoutExpired, FileNotFoundError, OSError): + pass + + +def main() -> None: + """Read tool result from stdin, format Python files.""" + try: + raw = sys.stdin.read() + if not raw.strip(): + return + + data = json.loads(raw) + except (json.JSONDecodeError, ValueError): + return + + tool_name = data.get("tool_name", "") + if tool_name not in ("Write", "Edit"): + return + + file_path = _get_file_path(data) + if not file_path or not _is_python_file(file_path): + return + + try: + from attune.security.path_validation import _validate_file_path + + validated = _validate_file_path(file_path) + except (ValueError, ImportError): + return + + if not validated.exists(): + return + + _run_formatter(["black", "--quiet", "--line-length=100"], str(validated)) + _run_formatter(["ruff", "check", "--fix", "--quiet"], str(validated)) + + +if __name__ == "__main__": + from _sdk_gate import exit_if_sdk_subprocess + + exit_if_sdk_subprocess() + main() diff --git a/.claude/hooks/security_guard.py b/.claude/hooks/security_guard.py new file mode 100644 index 0000000..9d1e045 --- /dev/null +++ b/.claude/hooks/security_guard.py @@ -0,0 +1,241 @@ +"""PreToolUse Security Validation Hook. + +Intercepts tool calls to enforce coding standards at runtime: +1. Blocks eval()/exec() in Bash commands (CWE-95) +2. Validates file paths in Edit/Write operations (CWE-22) +3. Prevents writes to system directories +4. Blocks null byte injection in paths + +Claude Code Protocol: + stdin: JSON with tool_name and tool_input + exit 0: allow tool call + exit 2: block tool call (reason printed to stderr) + +Copyright 2026 Smart-AI-Memory +Licensed under Apache 2.0 +""" + +import json +import logging +import re +import sys +from pathlib import Path +from typing import Any + +# Force utf-8 on stdout and stderr. On Windows the default cp1252 +# encoding can't emit emoji/em-dash and would crash this hook (caught +# by the outer try/except → silent breakage). errors='replace' +# substitutes '?' for any stray non-encodable byte. +for _stream in (sys.stdout, sys.stderr): + if _stream.encoding and _stream.encoding.lower() != "utf-8": + _stream.reconfigure(encoding="utf-8", errors="replace") + +logger = logging.getLogger(__name__) + +# Directories that must never be written to (includes macOS /private/* symlinks) +SYSTEM_DIRECTORIES = frozenset( + { + "/etc", + "/sys", + "/proc", + "/dev", + "/boot", + "/sbin", + "/usr/sbin", + "/private/etc", + "/private/var", + }, +) + +# Dangerous patterns in Bash commands +DANGEROUS_BASH_PATTERNS: list[tuple[re.Pattern[str], str]] = [ + ( + re.compile(r"\beval\s*\("), + "Blocked: eval() is prohibited — use ast.literal_eval() instead (CWE-95)", + ), + ( + re.compile(r"\bexec\s*\("), + "Blocked: exec() is prohibited — use safe alternatives (CWE-95)", + ), + ( + re.compile(r"__import__\s*\("), + "Blocked: __import__() is prohibited — use standard imports (CWE-95)", + ), + ( + re.compile(r"subprocess\.call.*shell\s*=\s*True"), + "Blocked: subprocess with shell=True is a shell injection risk (B602)", + ), + ( + # Matches "rm -rf /" only when / is the final path (not /foo). + # (?!\S) = negative lookahead ensures / is followed by whitespace + # or end-of-string, so "rm -rf /tmp" is allowed but "rm -rf /" + # is blocked. + re.compile(r"\brm\s+-rf\s+/(?!\S)"), + "Blocked: rm -rf / is not allowed", + ), +] + +# Commands that search for dangerous patterns (not executing them) +SEARCH_COMMAND_PREFIXES = frozenset( + { + "grep", + "rg", + "ack", + "ag", + "git grep", + "git log", + "git diff", + }, +) + + +def _is_search_command(command: str) -> bool: + """Check if a command is searching FOR dangerous patterns, not executing them. + + Args: + command: The bash command string. + + Returns: + True if the command is a search/grep operation. + + """ + stripped = command.strip() + # Handle piped commands — check if the base command is a search + base = stripped.split("|")[0].strip() + for prefix in SEARCH_COMMAND_PREFIXES: + if base.startswith(prefix): + return True + return False + + +def validate_bash_command(command: str) -> tuple[bool, str]: + """Validate a Bash command against security policies. + + Args: + command: The command string to validate. + + Returns: + (True, "") if safe, (False, reason) if blocked. + + """ + if not command: + return True, "" + + # Allow search commands that look for dangerous patterns + if _is_search_command(command): + return True, "" + + for pattern, message in DANGEROUS_BASH_PATTERNS: + if pattern.search(command): + return False, message + + return True, "" + + +def validate_file_path(file_path: str) -> tuple[bool, str]: + """Validate a file path against security policies. + + Args: + file_path: The file path to validate. + + Returns: + (True, "") if safe, (False, reason) if blocked. + + """ + if not file_path: + return True, "" + + # Check for null bytes + if "\x00" in file_path: + return False, "Blocked: file path contains null bytes (CWE-22)" + + # Check both raw path and resolved path against system directories + # (on macOS, /etc resolves to /private/etc via symlink) + try: + resolved = str(Path(file_path).resolve()) + except (OSError, RuntimeError) as e: + return False, f"Blocked: invalid file path — {e}" + + raw_abs = str(Path(file_path).expanduser()) if file_path.startswith("~") else file_path + paths_to_check = {resolved, raw_abs} + + for check_path in paths_to_check: + for sys_dir in SYSTEM_DIRECTORIES: + if check_path.startswith(sys_dir): + return False, f"Blocked: cannot write to system directory {sys_dir} (CWE-22)" + + return True, "" + + +def main(context: dict[str, Any]) -> dict[str, Any]: + """Validate a tool call against security policies. + + Args: + context: Hook context with tool_name and tool_input from Claude Code. + + Returns: + {"allowed": True} or {"allowed": False, "reason": "..."}. + + """ + tool_name = context.get("tool_name", "") + tool_input = context.get("tool_input", {}) + + if not tool_name: + # No tool info — fail open to avoid blocking Claude Code + return {"allowed": True} + + if tool_name == "Bash": + command = tool_input.get("command", "") + allowed, reason = validate_bash_command(command) + if not allowed: + return {"allowed": False, "reason": reason} + + elif tool_name in ("Edit", "Write"): + file_path = tool_input.get("file_path", "") + allowed, reason = validate_file_path(file_path) + if not allowed: + return {"allowed": False, "reason": reason} + + return {"allowed": True} + + +def _read_stdin_context() -> dict[str, Any]: + """Read hook context from stdin (Claude Code protocol). + + Returns: + Parsed context dict, or empty dict if stdin is empty/invalid. + + """ + if sys.stdin.isatty(): + return {} + try: + raw = sys.stdin.read().strip() + if raw: + return json.loads(raw) + except (json.JSONDecodeError, ValueError) as e: + logger.warning("Could not parse stdin JSON (fail-closed): %s", e) + return {"_parse_error": True} + + +if __name__ == "__main__": + from _sdk_gate import exit_if_sdk_subprocess + + exit_if_sdk_subprocess() + logging.basicConfig(level=logging.WARNING, format="%(message)s") + context = _read_stdin_context() + + # Fail-open: if we couldn't parse stdin, allow the tool call + if context.get("_parse_error"): + # (fail-closed was blocking all tools in Claude Code) + sys.exit(0) + + result = main(context) + + if not result.get("allowed", False): + # Block: print reason to stderr, exit 2 + reason = result.get("reason", "Blocked by security guard") + print(reason, file=sys.stderr) + sys.exit(2) + + # Allow: exit 0 + sys.exit(0) diff --git a/.claude/hooks/spec_orient.py b/.claude/hooks/spec_orient.py new file mode 100644 index 0000000..0624142 --- /dev/null +++ b/.claude/hooks/spec_orient.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +"""SessionStart spec-orientation hook. + +Fires on every SessionStart event. Branches on the ``source`` +field (verified 2026-05-09 to be one of ``startup``, ``resume``, +``clear``, or ``compact``): + +- ``compact`` → emit the most-recent in-flight spec's tasks.md + (or design / requirements as a fallback) so the model has the + spec body in fresh post-compact context. Replaces the original + PreCompact-injection design (V2 found PreCompact has no + content-injection mechanism). +- everything else → emit a short orientation paragraph naming up + to 3 in-flight specs. + +Output goes to stdout; Claude Code splices it into the model's +initial context. The hook is wrapped in try/except so a crash +never breaks the user's session. + +Copyright 2026 Smart-AI-Memory +Licensed under Apache 2.0 +""" + +from __future__ import annotations + +import json +import sys +import traceback +from pathlib import Path + +# Force utf-8 on stdout and stderr. On Windows the default cp1252 +# encoding can't emit emoji/em-dash and would crash this hook (caught +# by the outer try/except → silent breakage). errors='replace' +# substitutes '?' for any stray non-encodable byte. +for _stream in (sys.stdout, sys.stderr): + if _stream.encoding and _stream.encoding.lower() != "utf-8": + _stream.reconfigure(encoding="utf-8", errors="replace") + +# Hooks are invoked as standalone scripts; ensure sibling helpers +# resolve. +_HOOKS_DIR = str(Path(__file__).resolve().parent) +if _HOOKS_DIR not in sys.path: + sys.path.insert(0, _HOOKS_DIR) + +from _state import ( # noqa: E402 — sys.path bootstrap above + SpecInfo, + discover_specs, + prune_stale_sentinels, + workspace_roots, +) + +# Char budget for the post-compact spec body. Generous so the +# spec survives compaction with full content; the model sees this +# as fresh context immediately after the compact summary. +_POST_COMPACT_CHAR_BUDGET = 8_000 + +# Max specs named in the orientation paragraph. +_ORIENTATION_MAX_SPECS = 3 + + +def _format_phase(spec: SpecInfo) -> str: + """Short ``(phase status)`` blurb for the orientation list. + + Renders the reconciled ``effective_status`` (not the raw header + ``status``) so a stale "draft" header above a closed checklist + doesn't show up as in-flight draft. + + When ``status_conflict`` is True (header drifted from the + completion state), append a one-line hint so the source drift + surfaces and can be fixed. + """ + phase_label = { + "requirements": "requirements", + "design": "design", + "tasks": "tasks", + }.get(spec.phase, spec.phase) + effective = spec.effective_status or spec.status or "no status" + base = f"{phase_label} {effective}" + if spec.status_conflict: + source_label = { + "checklist": "tasks closed per checklist", + "terminal-line": "marked terminal in body", + }.get(spec.status_source, spec.status_source) + raw = spec.status or "no header" + return f'{base} — {source_label}; header says "{raw}", worth fixing' + return base + + +def format_orientation(specs: list[SpecInfo]) -> str: + """Short markdown list of in-flight specs for non-compact starts.""" + if not specs: + return "" + lines = ["attune workspace — in-flight specs:"] + for spec in specs[:_ORIENTATION_MAX_SPECS]: + layer_prefix = "" if spec.layer == "workspace" else f"{spec.layer}/" + lines.append(f"- {layer_prefix}specs/{spec.slug}/ ({_format_phase(spec)})") + leftover = len(specs) - _ORIENTATION_MAX_SPECS + if leftover > 0: + lines.append(f"- (+{leftover} more)") + return "\n".join(lines) + + +def render_spec_pin(spec: SpecInfo, char_budget: int = _POST_COMPACT_CHAR_BUDGET) -> str: + """Render a spec body for post-compact context restoration. + + Picks the highest-priority phase file present in the spec dir + (tasks > design > requirements) and emits its body up to the + char budget. Adds a header naming the spec so the model can + map the body back to its location. + """ + for fname in ("tasks.md", "design.md", "requirements.md"): + fpath = spec.path / fname + if not fpath.is_file(): + continue + try: + body = fpath.read_text(encoding="utf-8", errors="replace") + except OSError: + continue + layer_prefix = "" if spec.layer == "workspace" else f"{spec.layer}/" + spec_path = f"{layer_prefix}specs/{spec.slug}/{fname}" + header = ( + f"# Active spec restored after compact: `{spec_path}`\n" + f"# Phase: {spec.phase} — Status: {spec.status or 'unknown'}\n\n" + ) + truncated = body[:char_budget] + if len(body) > char_budget: + truncated += "\n\n…(spec body truncated for context budget)" + return header + truncated + return "" + + +def main() -> int: + """Entry point — branches on ``source``, never raises.""" + try: + try: + payload = json.load(sys.stdin) + except (json.JSONDecodeError, ValueError): + payload = {} + source = (payload.get("source") or "startup").lower() + cwd = Path(payload.get("cwd") or Path.cwd()) + + # Opportunistic TTL prune — keeps sentinel dir tidy without + # a separate cron. + try: + prune_stale_sentinels() + except Exception: # noqa: BLE001 — never propagate + pass + + roots = workspace_roots(cwd=cwd) + specs = discover_specs(roots) + if not specs: + return 0 + + if source == "compact": + body = render_spec_pin(specs[0]) + if body: + print(body) + else: + orient = format_orientation(specs) + if orient: + print(orient) + return 0 + except Exception: # noqa: BLE001 — hook must never crash a session + # Log the full traceback to stderr so plugin authors can + # diagnose, but exit cleanly so Claude Code keeps going. + traceback.print_exc(file=sys.stderr) + return 0 + + +if __name__ == "__main__": + from _sdk_gate import exit_if_sdk_subprocess + + exit_if_sdk_subprocess() + raise SystemExit(main()) diff --git a/.claude/settings.json b/.claude/settings.json new file mode 100644 index 0000000..6575d41 --- /dev/null +++ b/.claude/settings.json @@ -0,0 +1,60 @@ +{ + "hooks": { + "SessionStart": [ + { + "hooks": [ + { + "type": "command", + "command": "python .claude/hooks/spec_orient.py", + "timeout": 4000 + } + ] + } + ], + "Stop": [ + { + "hooks": [ + { + "type": "command", + "command": "python .claude/hooks/compact_warning.py", + "timeout": 4000 + } + ] + } + ], + "PreToolUse": [ + { + "matcher": "Bash", + "hooks": [ + { + "type": "command", + "command": "python .claude/hooks/security_guard.py", + "timeout": 5000 + } + ] + }, + { + "matcher": "Edit|Write", + "hooks": [ + { + "type": "command", + "command": "python .claude/hooks/security_guard.py", + "timeout": 5000 + } + ] + } + ], + "PostToolUse": [ + { + "matcher": "Edit|Write", + "hooks": [ + { + "type": "command", + "command": "python .claude/hooks/format_on_save.py", + "timeout": 10000 + } + ] + } + ] + } +} diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..749f67c --- /dev/null +++ b/.gitattributes @@ -0,0 +1,5 @@ +# Vendored Claude Code session hooks: byte-identical copies of attune-ai's +# canonical plugin/hooks/. The drift-guard test hashes each file; CRLF +# checkouts on Windows would change the bytes and fail the guard. Pin LF. +.claude/hooks/*.py text eol=lf +.claude/hooks/.canonical-sha256 text eol=lf diff --git a/.gitignore b/.gitignore index bba4b8d..023057f 100644 --- a/.gitignore +++ b/.gitignore @@ -39,8 +39,12 @@ editor-frontend/node_modules/ editor-frontend/.vite/ editor-frontend/*.tsbuildinfo -# Local planning artifacts -.claude/ +# Ignore .claude/ cruft (plans, worktrees, launch.json, settings.local.json) +# but track the vendored session hooks + their wiring (see +# specs/sibling-claude-hooks/). +.claude/* +!.claude/hooks/ +!.claude/settings.json # Legacy React UI artifacts (retired in 0.5.0) ui/dist/ diff --git a/Makefile b/Makefile index 296c464..13708e9 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,13 @@ EDITOR_FRONTEND := editor-frontend EDITOR_OUTPUT := sidecar/attune_gui/static/editor .PHONY: build-editor lint-editor typecheck-editor dev-editor test-editor clean-editor install-editor -.PHONY: regen-templates regen-all +.PHONY: regen-templates regen-all sync-hooks + +# Vendored Claude Code session hooks (see specs/sibling-claude-hooks/ in the +# attune umbrella workspace). Byte-identical copies of attune-ai canonical; +# the drift-guard test enforces it. Re-sync after an upstream change. +ATTUNE_AI_ROOT ?= ../attune-ai +HOOK_FILES = security_guard.py format_on_save.py compact_warning.py spec_orient.py _state.py _resume_prompt.py _transcript_size.py _sdk_gate.py install-editor: cd $(EDITOR_FRONTEND) && npm install @@ -45,3 +51,16 @@ regen-templates: regen-all: regen-templates build-editor @echo "" @echo "regen-all complete. Verify with: git status" + +sync-hooks: ## Re-copy session hooks from attune-ai canonical + refresh checksums. + @if [ ! -d "$(ATTUNE_AI_ROOT)/plugin/hooks" ]; then \ + echo "Error: $(ATTUNE_AI_ROOT)/plugin/hooks not found. Set ATTUNE_AI_ROOT="; \ + exit 1; \ + fi + @mkdir -p .claude/hooks + @for f in $(HOOK_FILES); do \ + cp "$(ATTUNE_AI_ROOT)/plugin/hooks/$$f" ".claude/hooks/$$f"; \ + echo " synced: $$f"; \ + done + @(cd .claude/hooks && shasum -a 256 $(HOOK_FILES) > .canonical-sha256) + @echo "✓ .claude/hooks/.canonical-sha256 refreshed" diff --git a/sidecar/tests/test_claude_hooks_behavior.py b/sidecar/tests/test_claude_hooks_behavior.py new file mode 100644 index 0000000..add59c6 --- /dev/null +++ b/sidecar/tests/test_claude_hooks_behavior.py @@ -0,0 +1,165 @@ +"""Behavior smoke tests for vendored Claude Code session hooks. + +Each hook is a standalone Python script under ``.claude/hooks/``; +tests invoke it via subprocess (stdin = JSON payload, exit code = +contract). We assert exit-code semantics, not stdout content, so +that prose tweaks in canonical attune-ai don't break these tests +without genuine behavior drift. + +Contract (per specs/sibling-claude-hooks/ design): +- ``security_guard``: exit 2 to block, 0 to allow. +- ``spec_orient``: exit 0 always (informational SessionStart hook). +- ``format_on_save``: exit 0 always (best-effort PostToolUse hook). +""" + +from __future__ import annotations + +import json +import os +import subprocess +import sys +from pathlib import Path + +# sidecar/tests/ -> parents[2] is the repo root that holds .claude/. +HOOKS_DIR = Path(__file__).resolve().parents[2] / ".claude" / "hooks" + + +def _run_hook( + name: str, + payload: dict | None = None, + cwd: Path | None = None, + env_overrides: dict[str, str] | None = None, + timeout: float = 15.0, +) -> subprocess.CompletedProcess: + """Invoke a hook script with a JSON stdin payload.""" + env = os.environ.copy() + # Neutralize SDK-subprocess markers so hooks exercise their interactive + # behavior. Each hook self-gates via ``_sdk_gate.exit_if_sdk_subprocess`` + # (exit 0, no output) when ``ATTUNE_SDK_SUBPROCESS=1`` or + # ``CLAUDE_CODE_ENTRYPOINT`` starts with ``sdk-``; if the test runner + # carries either, security_guard would exit 0 instead of blocking. + env.pop("ATTUNE_SDK_SUBPROCESS", None) + env["CLAUDE_CODE_ENTRYPOINT"] = "cli" + # Strip pytest-cov's subprocess markers so the spawned hook does not + # auto-start coverage. Under `--cov` (gui CI uses branch=true) the + # subprocess would write a statement-only data file that can't combine + # with the parent's branch data ("Can't combine statement coverage data + # with branch data"). The hooks aren't part of the coverage source anyway. + for _cov_var in ( + "COV_CORE_SOURCE", + "COV_CORE_CONFIG", + "COV_CORE_DATAFILE", + "COVERAGE_PROCESS_START", + ): + env.pop(_cov_var, None) + if env_overrides is not None: + env.update(env_overrides) + return subprocess.run( # noqa: S603 — inputs are sys.executable + a vendored hook path, fully trusted + [sys.executable, str(HOOKS_DIR / name)], + input=json.dumps(payload or {}), + capture_output=True, + text=True, + cwd=str(cwd) if cwd else None, + env=env, + timeout=timeout, + ) + + +# -------------------------------------------------------------------- +# security_guard: PreToolUse Bash + Edit|Write — exit 2 blocks, 0 allows +# -------------------------------------------------------------------- + + +def test_security_guard_blocks_eval_bash() -> None: + payload = { + "tool_name": "Bash", + "tool_input": {"command": "python -c \"eval('1+1')\""}, + } + result = _run_hook("security_guard.py", payload) + assert result.returncode == 2, ( + f"expected exit 2 (block), got {result.returncode}. " + f"stdout={result.stdout!r} stderr={result.stderr!r}" + ) + + +def test_security_guard_blocks_exec_bash() -> None: + payload = { + "tool_name": "Bash", + "tool_input": {"command": "python -c \"exec('print(1)')\""}, + } + result = _run_hook("security_guard.py", payload) + assert result.returncode == 2 + + +def test_security_guard_allows_benign_bash() -> None: + payload = { + "tool_name": "Bash", + "tool_input": {"command": "ls -la"}, + } + result = _run_hook("security_guard.py", payload) + assert result.returncode == 0, ( + f"expected exit 0 (allow), got {result.returncode}. " + f"stdout={result.stdout!r} stderr={result.stderr!r}" + ) + + +def test_security_guard_blocks_path_traversal_write() -> None: + payload = { + "tool_name": "Write", + "tool_input": {"file_path": "/etc/passwd", "content": "x"}, + } + result = _run_hook("security_guard.py", payload) + assert result.returncode == 2, ( + f"expected exit 2 (block) on /etc/passwd write, got {result.returncode}. " + f"stdout={result.stdout!r} stderr={result.stderr!r}" + ) + + +# -------------------------------------------------------------------- +# spec_orient: SessionStart — exit 0 always +# -------------------------------------------------------------------- + + +def test_spec_orient_exit_0_with_populated_specs(tmp_path: Path) -> None: + """When ``docs/specs//`` has files, hook emits + exits 0.""" + (tmp_path / ".git").mkdir() # workspace-root marker + spec_dir = tmp_path / "docs" / "specs" / "demo-feature" + spec_dir.mkdir(parents=True) + (spec_dir / "requirements.md").write_text("# Demo Feature\n\n**Status**: draft\n") + result = _run_hook("spec_orient.py", cwd=tmp_path) + assert result.returncode == 0, ( + f"spec_orient must exit 0 (informational). " + f"stdout={result.stdout!r} stderr={result.stderr!r}" + ) + + +def test_spec_orient_exit_0_with_empty_workspace(tmp_path: Path) -> None: + """When no specs exist, hook still exits 0 (informational hooks never block).""" + (tmp_path / ".git").mkdir() + result = _run_hook("spec_orient.py", cwd=tmp_path) + assert result.returncode == 0 + + +# -------------------------------------------------------------------- +# format_on_save: PostToolUse Edit|Write — exit 0 when formatters absent +# -------------------------------------------------------------------- + + +def test_format_on_save_exit_0_when_formatters_absent(tmp_path: Path) -> None: + """Hook is best-effort; missing black/ruff is not a failure.""" + f = tmp_path / "demo.py" + f.write_text("x = 1\n") + payload = { + "tool_name": "Write", + "tool_input": {"file_path": str(f)}, + } + # Strip PATH so black/ruff lookups all fail. + result = _run_hook( + "format_on_save.py", + payload, + env_overrides={"PATH": "/nonexistent"}, + ) + assert result.returncode == 0, ( + f"format_on_save must exit 0 even when formatters are absent. " + f"stdout={result.stdout!r} stderr={result.stderr!r}" + ) diff --git a/sidecar/tests/test_claude_hooks_drift.py b/sidecar/tests/test_claude_hooks_drift.py new file mode 100644 index 0000000..89ee4fb --- /dev/null +++ b/sidecar/tests/test_claude_hooks_drift.py @@ -0,0 +1,82 @@ +"""Drift-guard test for vendored Claude Code session hooks. + +The 8 hook files under ``.claude/hooks/`` are byte-identical copies +of attune-ai's canonical hooks. ``.canonical-sha256`` carries the +expected sha256 of each. This test fails if any vendored file's +hash diverges, or if the file set itself changes without the +manifest being refreshed. + +To refresh after an upstream change: ``make sync-hooks``. + +See specs/sibling-claude-hooks/ in the attune umbrella workspace. +""" + +from __future__ import annotations + +import hashlib +from pathlib import Path + +import pytest + +# sidecar/tests/ -> parents[2] is the repo root that holds .claude/. +HOOKS_DIR = Path(__file__).resolve().parents[2] / ".claude" / "hooks" +MANIFEST = HOOKS_DIR / ".canonical-sha256" + + +def _parse_manifest() -> dict[str, str]: + """Parse `shasum -a 256` output into ``{filename: hexdigest}``.""" + out: dict[str, str] = {} + if not MANIFEST.exists(): + return out + for line in MANIFEST.read_text().splitlines(): + line = line.strip() + if not line: + continue + hexdigest, _, name = line.partition(" ") + if not name: + continue + out[name.strip()] = hexdigest.strip() + return out + + +def _on_disk_hooks() -> set[str]: + """Python hook files actually present in `.claude/hooks/`.""" + if not HOOKS_DIR.is_dir(): + return set() + return { + f.name + for f in HOOKS_DIR.iterdir() + if f.is_file() and f.suffix == ".py" and not f.name.startswith("__") + } + + +def test_manifest_exists() -> None: + """``.canonical-sha256`` must exist; ``make sync-hooks`` generates it.""" + assert MANIFEST.exists(), f"manifest missing at {MANIFEST}. Run `make sync-hooks` to generate." + + +def test_manifest_covers_all_on_disk_hooks() -> None: + """Manifest must list every Python hook file on disk and vice versa.""" + manifest = _parse_manifest() + on_disk = _on_disk_hooks() + missing_from_manifest = on_disk - set(manifest.keys()) + extra_in_manifest = set(manifest.keys()) - on_disk + assert not missing_from_manifest and not extra_in_manifest, ( + f"manifest/on-disk mismatch — " + f"missing_from_manifest={sorted(missing_from_manifest)}, " + f"extra_in_manifest={sorted(extra_in_manifest)}. " + f"Run `make sync-hooks` to refresh." + ) + + +@pytest.mark.parametrize("name,canonical_hex", sorted(_parse_manifest().items())) +def test_hook_matches_canonical_hash(name: str, canonical_hex: str) -> None: + """Each vendored hook file matches its canonical sha256.""" + f = HOOKS_DIR / name + assert f.exists(), f"vendored hook missing: {name}" + actual = hashlib.sha256(f.read_bytes()).hexdigest() + assert actual == canonical_hex, ( + f"{name} drift detected. " + f"vendored sha256={actual}, canonical={canonical_hex}. " + f"Run `make sync-hooks` to re-copy from attune-ai canonical." + )