diff --git a/.claude/commands/map-efficient.md b/.claude/commands/map-efficient.md
index 6b5d48b..75b8d10 100644
--- a/.claude/commands/map-efficient.md
+++ b/.claude/commands/map-efficient.md
@@ -390,6 +390,10 @@ pytest --tb=short 2>&1 || true
When TDD mode is active, Actor receives `code_only` and must NOT modify test files. When TDD is off, standard behavior.
```python
+# Context assembly: use build_context_block() from map_step_runner.py
+# to generate when called programmatically.
+# For manual invocation, construct the block from blueprint.json + step_state.json.
+
Task(
subagent_type="actor",
description="Implement subtask [ID]",
@@ -399,16 +403,40 @@ Task(
[AAG contract from decomposition: Actor -> Action -> Goal]
-Subtask: [ID] [title]
+
+# Goal
+[Goal from task_plan.md — one sentence]
+
+# Current Subtask: [ID] — [title]
+AAG Contract: [contract from blueprint]
Affected files: [from blueprint]
-Validation criteria: [from blueprint]
+Validation criteria:
+- [criteria from blueprint]
+
+# Plan Overview ([N] subtasks):
+[For each subtask in blueprint, show one-liner with status:]
+- [x] ST-001: Title (complete)
+- [ ] ST-002: Title (pending)
+- [>>] ST-003: Title (IN PROGRESS) <- current
+
+# Upstream Results (dependencies of current subtask):
+[Only for subtasks that current depends on, from step_state.json subtask_results:]
+ST-001: files=[a.py, b.py], status=valid
+
+# Repo Delta (files changed since last subtask):
+[From compute_differential_insight(), if last_subtask_commit_sha available]
+[Omit this section entirely if no previous SHA (first subtask)]
+
Protocol:
-1. Parse MAP_Contract — this is your compilation target
-2. Read affected files to understand current state
-3. Implement: translate MAP_Contract into code
-4. Apply code with Edit/Write tools
-5. Output: approach + files_changed + trade-offs"""
+1. SCOPE: Implement ONLY the Current Subtask. Do NOT modify files belonging to other subtasks.
+2. Plan Overview is for orientation — do NOT implement other subtasks.
+3. Upstream Results show what dependencies produced — use as input context.
+4. Parse MAP_Contract — this is your compilation target.
+5. Read affected files to understand current state.
+6. Implement: translate MAP_Contract into code.
+7. Apply code with Edit/Write tools.
+8. Output: approach + files_changed + trade-offs"""
)
```
@@ -452,6 +480,16 @@ if echo "$TEST_GATE" | python3 -c "import sys,json; d=json.load(sys.stdin); sys.
# Tests passed — snapshot code state for artifact verification
SNAPSHOT=$(python3 .map/scripts/map_step_runner.py snapshot_code_state)
# Append git ref to review artifact header (if code-review file exists)
+
+ # Record subtask result for context-aware injection (Upstream Results + Repo Delta)
+ # Uses record_subtask_result CLI dispatch via stdin JSON (injection-safe, single source of truth).
+ FILES_JSON=$(echo "$SNAPSHOT" | python3 -c "import sys,json; print(json.dumps(json.load(sys.stdin).get('files_changed',[])))")
+ CURRENT_SHA=$(git rev-parse HEAD 2>/dev/null || echo "")
+ if [ -n "$CURRENT_SHA" ]; then
+ echo "{\"files\": ${FILES_JSON}, \"status\": \"valid\", \"summary\": \"Monitor passed + tests passed\", \"commit_sha\": \"${CURRENT_SHA}\"}" | python3 .map/scripts/map_step_runner.py record_subtask_result
+ else
+ echo "{\"files\": ${FILES_JSON}, \"status\": \"valid\", \"summary\": \"Monitor passed + tests passed\"}" | python3 .map/scripts/map_step_runner.py record_subtask_result
+ fi
fi
# After Monitor returns:
diff --git a/.claude/hooks/workflow-context-injector.py b/.claude/hooks/workflow-context-injector.py
index 359c625..c5a8420 100755
--- a/.claude/hooks/workflow-context-injector.py
+++ b/.claude/hooks/workflow-context-injector.py
@@ -18,6 +18,9 @@
import os
import re
import sys
+
+# Keep in sync with map_step_runner.py GOAL_HEADING_RE
+GOAL_HEADING_RE = r"## (?:Goal|Overview)\n(.*?)(?=\n##|\Z)"
from pathlib import Path
# Bash commands that don't need workflow reminders
@@ -161,8 +164,61 @@ def required_action_for_step(step_id: str, step_phase: str, state: dict) -> str
return None
+def load_goal_and_title(branch: str, subtask_id: str) -> tuple[str, str]:
+ """Load goal from task_plan and subtask title from blueprint.
+
+ Returns (truncated_goal, subtask_title) or ("", "") on any error.
+ Fast: single json.load + single regex — target <20ms.
+ """
+ project_dir = Path(os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd()))
+ goal = ""
+ title = ""
+
+ # Goal from task_plan.md — matches ## Goal or ## Overview headings
+ plan_file = project_dir / ".map" / branch / f"task_plan_{branch}.md"
+ try:
+ if plan_file.exists():
+ content = plan_file.read_text(encoding="utf-8")
+ match = re.search(GOAL_HEADING_RE, content, re.DOTALL)
+ if match:
+ goal = match.group(1).strip()
+ # Truncate to first sentence
+ if ". " in goal:
+ goal = goal[: goal.index(". ") + 1]
+ if len(goal) > 80:
+ goal = goal[:77] + "..."
+ except OSError:
+ pass
+
+ # Title from blueprint.json
+ blueprint_file = project_dir / ".map" / branch / "blueprint.json"
+ try:
+ if blueprint_file.exists():
+ bp = json.loads(blueprint_file.read_text(encoding="utf-8"))
+ for st in bp.get("subtasks", []):
+ if st.get("id") == subtask_id:
+ title = st.get("title", "")
+ break
+ except (json.JSONDecodeError, OSError):
+ pass
+
+ return (goal, title)
+
+
+def _truncate_at_word(text: str, limit: int) -> str:
+ """Truncate text at word boundary, appending '...' within limit."""
+ if len(text) <= limit:
+ return text
+ cut = text[: limit - 3]
+ # Find last space to avoid cutting mid-word
+ last_space = cut.rfind(" ")
+ if last_space > limit // 2:
+ cut = cut[:last_space]
+ return cut + "..."
+
+
def format_reminder(state: dict, branch: str) -> str | None:
- """Format terse workflow reminder (aim: ~150-200 chars)."""
+ """Format terse workflow reminder (aim: ≤500 chars)."""
if not state:
return None
@@ -216,9 +272,30 @@ def format_reminder(state: dict, branch: str) -> str | None:
if not step_id and not step_phase:
return None
- base = f"[MAP] {step_id} {step_phase} | ST: {subtask_id} ({progress}) | plan:{plan_ok} mode:{mode}{wave_hint}{diag_hint}{files_hint}"
+ # Context-aware: add goal and subtask title
+ goal_hint = ""
+ title_hint = ""
+ if subtask_id != "-":
+ goal, title = load_goal_and_title(branch, subtask_id)
+ if goal:
+ goal_hint = f" | Goal: {goal}"
+ if title:
+ title_hint = f" {title}"
+
+ base = f"[MAP] {step_id} {step_phase}{goal_hint} | ST: {subtask_id}{title_hint} ({progress}) | plan:{plan_ok} mode:{mode}{wave_hint}{diag_hint}{files_hint}"
+
+ # Enforce 500-char limit: trim goal first, then word-boundary truncate
+ if len(base) > 500:
+ goal_hint = ""
+ base = f"[MAP] {step_id} {step_phase} | ST: {subtask_id}{title_hint} ({progress}) | plan:{plan_ok} mode:{mode}{wave_hint}{diag_hint}{files_hint}"
+ if len(base) > 500:
+ base = _truncate_at_word(base, 500)
+
if required:
- return f"{base} | REQUIRED: {required}"
+ result = f"{base} | REQUIRED: {required}"
+ if len(result) > 500:
+ result = _truncate_at_word(result, 500)
+ return result
return base
diff --git a/.map/scripts/map_orchestrator.py b/.map/scripts/map_orchestrator.py
index 39960e5..bcc9424 100755
--- a/.map/scripts/map_orchestrator.py
+++ b/.map/scripts/map_orchestrator.py
@@ -298,6 +298,25 @@ class StepState:
subtask_files_changed: dict[str, list[str]] = field(default_factory=dict)
guard_rework_counts: dict[str, int] = field(default_factory=dict)
constraints: Optional[dict] = None
+ subtask_results: dict[str, dict] = field(default_factory=dict)
+ last_subtask_commit_sha: Optional[str] = None
+
+ def record_subtask_result(
+ self,
+ subtask_id: str,
+ files_changed: list[str],
+ status: str,
+ summary: str = "",
+ commit_sha: Optional[str] = None,
+ ) -> None:
+ """Record result of a completed subtask for context injection."""
+ self.subtask_results[subtask_id] = {
+ "files_changed": files_changed,
+ "status": status,
+ "summary": summary,
+ }
+ if commit_sha:
+ self.last_subtask_commit_sha = commit_sha
def to_dict(self) -> dict:
"""Serialize to dictionary."""
@@ -325,6 +344,8 @@ def to_dict(self) -> dict:
"subtask_files_changed": self.subtask_files_changed,
"guard_rework_counts": self.guard_rework_counts,
"constraints": self.constraints,
+ "subtask_results": self.subtask_results,
+ "last_subtask_commit_sha": self.last_subtask_commit_sha,
}
@classmethod
@@ -354,6 +375,8 @@ def from_dict(cls, data: dict) -> "StepState":
subtask_files_changed=data.get("subtask_files_changed", {}),
guard_rework_counts=data.get("guard_rework_counts", {}),
constraints=data.get("constraints"),
+ subtask_results=data.get("subtask_results", {}),
+ last_subtask_commit_sha=data.get("last_subtask_commit_sha"),
)
@classmethod
diff --git a/.map/scripts/map_step_runner.py b/.map/scripts/map_step_runner.py
index 0a44ccb..a51ef52 100755
--- a/.map/scripts/map_step_runner.py
+++ b/.map/scripts/map_step_runner.py
@@ -28,11 +28,16 @@
"""
import json
+import os
import re
+import subprocess
from datetime import datetime
from pathlib import Path
from typing import Optional
+# Keep in sync with workflow-context-injector.py GOAL_HEADING_RE
+GOAL_HEADING_RE = r"## (?:Goal|Overview)\n(.*?)(?=\n##|\Z)"
+
HUMAN_ARTIFACT_DEFAULTS = {
"qa-001.md": "# QA 001\n\n",
@@ -791,7 +796,7 @@ def read_current_goal(branch: Optional[str] = None) -> Optional[str]:
try:
content = plan_file.read_text(encoding="utf-8")
- match = re.search(r"## Goal\n(.*?)(?=\n##|\Z)", content, re.DOTALL)
+ match = re.search(GOAL_HEADING_RE, content, re.DOTALL)
if match:
return match.group(1).strip()
except OSError:
@@ -833,7 +838,6 @@ def run_test_gate() -> dict:
Returns structured result with pass/fail, output, and exit code.
Called AFTER Monitor returns valid=true, BEFORE validate_step advances state.
"""
- import subprocess
# Detect test runner
runners = [
@@ -913,7 +917,6 @@ def snapshot_code_state(branch: Optional[str] = None) -> dict:
Records git ref, changed files, and diff stat so review artifacts
can be tied to actual code state. Populates subtask_files_changed.
"""
- import subprocess
branch_name = branch or get_branch_name()
@@ -943,6 +946,196 @@ def _run_git(args: list[str]) -> str:
}
+def load_blueprint(
+ branch: Optional[str] = None, project_dir: Optional[Path] = None
+) -> Optional[dict]:
+ """Load blueprint.json for current branch."""
+ if branch is None:
+ branch = get_branch_name()
+ base = project_dir or Path(".")
+ blueprint_path = base / ".map" / branch / "blueprint.json"
+ if not blueprint_path.exists():
+ return None
+ try:
+ return json.loads(blueprint_path.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError):
+ return None
+
+
+def get_subtask_from_blueprint(blueprint: dict, subtask_id: str) -> Optional[dict]:
+ """Extract single subtask from blueprint by ID."""
+ for subtask in blueprint.get("subtasks", []):
+ if subtask.get("id") == subtask_id:
+ return subtask
+ return None
+
+
+def get_upstream_ids(blueprint: dict, subtask_id: str) -> list[str]:
+ """Get dependency subtask IDs for a given subtask."""
+ subtask = get_subtask_from_blueprint(blueprint, subtask_id)
+ if not subtask:
+ return []
+ return subtask.get("dependencies", [])
+
+
+def _sanitize_branch(branch: str) -> str:
+ """Sanitize branch name for safe filesystem paths.
+
+ Keep in sync with sanitize_branch_name() in workflow-context-injector.py.
+ """
+ sanitized = branch.replace("/", "-")
+ sanitized = re.sub(r"[^a-zA-Z0-9_.-]", "-", sanitized)
+ sanitized = re.sub(r"-+", "-", sanitized).strip("-")
+ if ".." in sanitized or sanitized.startswith("."):
+ return "default"
+ return sanitized or "default"
+
+
+def build_context_block(branch: str, current_subtask_id: str) -> str:
+ """Build structured context block for Actor prompt.
+
+ Returns formatted string with:
+ - Goal (from task_plan.md)
+ - Current subtask full details (from blueprint)
+ - Plan overview (all subtasks as ID + title + status one-liners)
+ - Upstream results (from step_state.json subtask_results)
+ - Repo delta (differential insight, if last_subtask_commit_sha available)
+
+ Returns empty string if blueprint not found (graceful fallback).
+ """
+ branch = _sanitize_branch(branch)
+ project_dir = Path(os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd()))
+
+ blueprint = load_blueprint(branch, project_dir=project_dir)
+ if not blueprint:
+ return ""
+
+ # Goal — read directly via project_dir for consistency
+ goal = None
+ plan_file = project_dir / ".map" / branch / f"task_plan_{branch}.md"
+ try:
+ if plan_file.exists():
+ content = plan_file.read_text(encoding="utf-8")
+ match = re.search(GOAL_HEADING_RE, content, re.DOTALL)
+ if match:
+ goal = match.group(1).strip()
+ except OSError:
+ pass
+ goal = goal or "No goal found"
+ # Truncate to first sentence
+ if ". " in goal:
+ goal = goal[: goal.index(". ") + 1]
+ if len(goal) > 200:
+ goal = goal[:197] + "..."
+
+ # Current subtask full details
+ current = get_subtask_from_blueprint(blueprint, current_subtask_id)
+ if not current:
+ return ""
+
+ current_details = []
+ current_details.append(f"AAG Contract: {current.get('aag_contract', 'N/A')}")
+ files = current.get("affected_files", [])
+ if files:
+ current_details.append(f"Affected files: {', '.join(files)}")
+ criteria = current.get("validation_criteria", [])
+ if criteria:
+ current_details.append("Validation criteria:")
+ for c in criteria:
+ current_details.append(f" - {c}")
+
+ # Plan overview with statuses from step_state.json
+ state_path = project_dir / ".map" / branch / "step_state.json"
+ subtask_phases: dict = {}
+ subtask_results: dict = {}
+ last_sha: Optional[str] = None
+ try:
+ if state_path.exists():
+ state = json.loads(state_path.read_text(encoding="utf-8"))
+ subtask_phases = state.get("subtask_phases", {})
+ subtask_results = state.get("subtask_results", {})
+ last_sha = state.get("last_subtask_commit_sha")
+ except (json.JSONDecodeError, OSError):
+ pass
+
+ overview_lines = []
+ for st in blueprint.get("subtasks", []):
+ st_id = st.get("id", "?")
+ st_title = st.get("title", "Untitled")
+ if st_id == current_subtask_id:
+ overview_lines.append(
+ f" [>>] {st_id}: {st_title} (IN PROGRESS) <- current"
+ )
+ elif st_id in subtask_results:
+ status = subtask_results[st_id].get("status", "done")
+ overview_lines.append(f" [x] {st_id}: {st_title} ({status})")
+ else:
+ phase = subtask_phases.get(st_id, "pending")
+ overview_lines.append(f" [ ] {st_id}: {st_title} ({phase})")
+
+ # Upstream results (only for dependencies)
+ upstream_ids = get_upstream_ids(blueprint, current_subtask_id)
+ upstream_lines = []
+ for up_id in upstream_ids:
+ if up_id in subtask_results:
+ result = subtask_results[up_id]
+ fc = result.get("files_changed", [])
+ status = result.get("status", "unknown")
+ summary = result.get("summary", "")
+ line = f" {up_id}: files={fc}, status={status}"
+ if summary:
+ line += f", summary={summary}"
+ upstream_lines.append(line)
+ else:
+ upstream_lines.append(f" {up_id}: (not yet completed)")
+
+ # Assemble block
+ parts = [
+ "",
+ f"# Goal: {goal}",
+ "",
+ f"# Current Subtask: {current_subtask_id} — {current.get('title', 'Untitled')}",
+ ]
+ parts.extend(current_details)
+ parts.append("")
+ parts.append(f"# Plan Overview ({len(blueprint.get('subtasks', []))} subtasks):")
+ parts.extend(overview_lines)
+
+ if upstream_lines:
+ parts.append("")
+ parts.append(f"# Upstream Results (dependencies of {current_subtask_id}):")
+ parts.extend(upstream_lines)
+
+ # Repo Delta (via compute_differential_insight from repo_insight)
+ if last_sha:
+ try:
+ from mapify_cli.repo_insight import compute_differential_insight
+
+ insight = compute_differential_insight(project_dir, last_sha)
+ changed = insight.get("changed_files", [])
+ deleted = insight.get("deleted_files", [])
+ if changed or deleted:
+ parts.append("")
+ parts.append("# Repo Delta (files changed since last subtask):")
+ for f in changed[:20]:
+ parts.append(f" {f}")
+ if len(changed) > 20:
+ parts.append(f" ... +{len(changed) - 20} more")
+ if deleted:
+ parts.append("# Deleted since last subtask:")
+ for f in deleted[:10]:
+ parts.append(f" (deleted) {f}")
+ if len(deleted) > 10:
+ parts.append(f" ... +{len(deleted) - 10} more")
+ except ImportError:
+ # Fallback: repo_insight not available in standalone .map/ context
+ pass
+
+ parts.append("")
+
+ return "\n".join(parts)
+
+
if __name__ == "__main__":
# Simple CLI interface for testing
import sys
@@ -1074,6 +1267,39 @@ def _run_git(args: list[str]) -> str:
result = snapshot_code_state()
print(json.dumps(result, indent=2))
+ elif func_name == "record_subtask_result":
+ # Read JSON from stdin to avoid shell injection: {"files": [...], "status": "...", "summary": "...", "commit_sha": "..."}
+ import sys as _sys
+ try:
+ data = json.loads(_sys.stdin.read())
+ except json.JSONDecodeError as e:
+ print(json.dumps({"status": "error", "message": f"Invalid JSON on stdin: {e}"}))
+ _sys.exit(1)
+ branch_name = get_branch_name()
+ state_path = Path(f".map/{branch_name}/step_state.json")
+ if not state_path.exists():
+ print(json.dumps({"status": "error", "message": "step_state.json not found"}))
+ _sys.exit(1)
+ from map_orchestrator import StepState
+ st = StepState.load(state_path)
+ subtask_id = data.get("subtask_id") or st.current_subtask_id or ""
+ if not subtask_id:
+ print(json.dumps({"status": "skipped", "message": "No subtask_id"}))
+ _sys.exit(0)
+ st.record_subtask_result(
+ subtask_id=subtask_id,
+ files_changed=data.get("files", []),
+ status=data.get("status", "valid"),
+ summary=data.get("summary", ""),
+ commit_sha=data.get("commit_sha"),
+ )
+ st.save(state_path)
+ print(json.dumps({"status": "success", "subtask_id": subtask_id}))
+
+ elif func_name == "build_context_block" and len(sys.argv) >= 4:
+ result = build_context_block(sys.argv[2], sys.argv[3])
+ print(result)
+
else:
print(f"Unknown function: {func_name}")
sys.exit(1)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2817181..7c60c7c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
+### Added
+- **Context-aware step injection**: Two-layer "active window" context system that replaces full plan injection with focused current-subtask context
+ - Hook layer: `workflow-context-injector.py` now includes goal + subtask title in ≤500 char reminders
+ - Actor prompt layer: structured `` block with goal, current subtask details, sibling summaries, upstream results, and repo delta
+ - New helpers in `map_step_runner.py`: `load_blueprint()`, `get_subtask_from_blueprint()`, `get_upstream_ids()`, `build_context_block()`
+ - New `StepState` fields: `subtask_results` (per-subtask outcome tracking), `last_subtask_commit_sha` (differential insight baseline)
+ - New function `compute_differential_insight()` in `repo_insight.py` for git-diff-based file change tracking between subtasks
+
## [3.6.0] - 2026-03-26
### Changed
diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md
index 19360b3..30867ea 100644
--- a/docs/ARCHITECTURE.md
+++ b/docs/ARCHITECTURE.md
@@ -1606,6 +1606,38 @@ Workflow state is managed through file-based persistence in `.map/` directory:
- ✅ +50% observability (clear progress tracking)
- ✅ Error context persistence (retry loops retain error history)
+### Context-Aware Step Injection (Phase 1.2)
+
+**Problem:** When a plan has 10+ subtasks, injecting the entire plan and all logs wastes tokens and dilutes attention on the current step.
+
+**Solution:** Two-layer "active window" injection that shows only relevant context:
+
+1. **Hook layer** (`workflow-context-injector.py` PreToolUse hook):
+ - Fires on every Edit/Write/significant Bash command
+ - Injects ≤500 char reminder: goal + current subtask title + progress
+ - Uses `load_goal_and_title()` to extract goal from `task_plan.md` and title from `blueprint.json`
+ - Graceful fallback to original format when blueprint missing
+
+2. **Actor prompt layer** (`map-efficient.md` ACTOR phase):
+ - Fires once per subtask when Actor agent is spawned
+ - Injects structured `` block (≤4000 tokens) containing:
+ - `# Goal` — one sentence from task_plan.md
+ - `# Current Subtask` — full AAG contract, affected files, validation criteria
+ - `# Plan Overview` — all subtasks as one-liners with `[x]/[ ]/[>>]` status markers
+ - `# Upstream Results` — only results from dependency subtasks (from `step_state.json subtask_results`)
+ - `# Repo Delta` — files changed since last subtask (via `git diff` from `last_subtask_commit_sha`)
+ - Built by `build_context_block()` in `map_step_runner.py`
+
+**Key data sources:**
+- `blueprint.json` — subtask metadata (deps, files, criteria). Single source of truth.
+- `step_state.json` — `subtask_results` dict (per-subtask files_changed + status), `last_subtask_commit_sha`
+- `task_plan.md` — goal text only (never parsed for structured data)
+
+**Benefits:**
+- 30-60% fewer tokens in system prompt on long workflows
+- Actor focuses on current subtask criteria, not future steps
+- Dependency results passed explicitly — no re-reading completed files
+
### Compaction Resilience
**Problem:** Context compaction (conversation history clearing) would normally lose workflow state, forcing restart from scratch.
diff --git a/src/mapify_cli/repo_insight.py b/src/mapify_cli/repo_insight.py
index 10a47db..bb4c311 100644
--- a/src/mapify_cli/repo_insight.py
+++ b/src/mapify_cli/repo_insight.py
@@ -7,6 +7,7 @@
from pathlib import Path
from typing import List
import json
+import subprocess
def detect_language(project_root: Path) -> str:
@@ -201,3 +202,74 @@ def _validate_repo_insight_schema(data: dict) -> None:
for dir_path in data["key_dirs"]:
if dir_path.startswith("/"):
raise ValueError(f"key_dirs must be relative paths: {dir_path}")
+
+
+def compute_differential_insight(
+ project_root: Path, since_sha: str | None
+) -> dict:
+ """Compute file changes since a given git SHA.
+
+ Used for context-aware injection: shows Actor only files
+ that changed since the last subtask completed.
+
+ Args:
+ project_root: Path to project root
+ since_sha: Git SHA to diff against (None = no baseline)
+
+ Returns:
+ Dict with changed_files, deleted_files. On success also includes
+ since_sha and current_sha. On error: empty lists and error key.
+ When since_sha is None: empty lists and note key.
+ """
+ if since_sha is None:
+ return {"changed_files": [], "deleted_files": [], "note": "no baseline SHA"}
+
+ try:
+ # Get changed/added/modified/renamed files
+ result = subprocess.run(
+ ["git", "diff", "--name-only", "--diff-filter=ACMR", since_sha, "HEAD"],
+ capture_output=True,
+ text=True,
+ cwd=project_root,
+ timeout=2,
+ )
+ if result.returncode != 0:
+ return {
+ "changed_files": [],
+ "deleted_files": [],
+ "error": f"git diff failed: {result.stderr.strip()}",
+ }
+ changed = [f for f in result.stdout.strip().split("\n") if f]
+
+ # Get deleted files
+ result_del = subprocess.run(
+ ["git", "diff", "--name-only", "--diff-filter=D", since_sha, "HEAD"],
+ capture_output=True,
+ text=True,
+ cwd=project_root,
+ timeout=2,
+ )
+ deleted = [f for f in result_del.stdout.strip().split("\n") if f] if result_del.returncode == 0 else []
+
+ # Get current HEAD SHA
+ head_result = subprocess.run(
+ ["git", "rev-parse", "HEAD"],
+ capture_output=True,
+ text=True,
+ cwd=project_root,
+ timeout=2,
+ )
+ current_sha = head_result.stdout.strip() if head_result.returncode == 0 else "unknown"
+
+ return {
+ "changed_files": changed,
+ "deleted_files": deleted,
+ "since_sha": since_sha,
+ "current_sha": current_sha,
+ }
+ except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as e:
+ return {
+ "changed_files": [],
+ "deleted_files": [],
+ "error": str(e),
+ }
diff --git a/src/mapify_cli/templates/commands/map-efficient.md b/src/mapify_cli/templates/commands/map-efficient.md
index 6b5d48b..75b8d10 100644
--- a/src/mapify_cli/templates/commands/map-efficient.md
+++ b/src/mapify_cli/templates/commands/map-efficient.md
@@ -390,6 +390,10 @@ pytest --tb=short 2>&1 || true
When TDD mode is active, Actor receives `code_only` and must NOT modify test files. When TDD is off, standard behavior.
```python
+# Context assembly: use build_context_block() from map_step_runner.py
+# to generate when called programmatically.
+# For manual invocation, construct the block from blueprint.json + step_state.json.
+
Task(
subagent_type="actor",
description="Implement subtask [ID]",
@@ -399,16 +403,40 @@ Task(
[AAG contract from decomposition: Actor -> Action -> Goal]
-Subtask: [ID] [title]
+
+# Goal
+[Goal from task_plan.md — one sentence]
+
+# Current Subtask: [ID] — [title]
+AAG Contract: [contract from blueprint]
Affected files: [from blueprint]
-Validation criteria: [from blueprint]
+Validation criteria:
+- [criteria from blueprint]
+
+# Plan Overview ([N] subtasks):
+[For each subtask in blueprint, show one-liner with status:]
+- [x] ST-001: Title (complete)
+- [ ] ST-002: Title (pending)
+- [>>] ST-003: Title (IN PROGRESS) <- current
+
+# Upstream Results (dependencies of current subtask):
+[Only for subtasks that current depends on, from step_state.json subtask_results:]
+ST-001: files=[a.py, b.py], status=valid
+
+# Repo Delta (files changed since last subtask):
+[From compute_differential_insight(), if last_subtask_commit_sha available]
+[Omit this section entirely if no previous SHA (first subtask)]
+
Protocol:
-1. Parse MAP_Contract — this is your compilation target
-2. Read affected files to understand current state
-3. Implement: translate MAP_Contract into code
-4. Apply code with Edit/Write tools
-5. Output: approach + files_changed + trade-offs"""
+1. SCOPE: Implement ONLY the Current Subtask. Do NOT modify files belonging to other subtasks.
+2. Plan Overview is for orientation — do NOT implement other subtasks.
+3. Upstream Results show what dependencies produced — use as input context.
+4. Parse MAP_Contract — this is your compilation target.
+5. Read affected files to understand current state.
+6. Implement: translate MAP_Contract into code.
+7. Apply code with Edit/Write tools.
+8. Output: approach + files_changed + trade-offs"""
)
```
@@ -452,6 +480,16 @@ if echo "$TEST_GATE" | python3 -c "import sys,json; d=json.load(sys.stdin); sys.
# Tests passed — snapshot code state for artifact verification
SNAPSHOT=$(python3 .map/scripts/map_step_runner.py snapshot_code_state)
# Append git ref to review artifact header (if code-review file exists)
+
+ # Record subtask result for context-aware injection (Upstream Results + Repo Delta)
+ # Uses record_subtask_result CLI dispatch via stdin JSON (injection-safe, single source of truth).
+ FILES_JSON=$(echo "$SNAPSHOT" | python3 -c "import sys,json; print(json.dumps(json.load(sys.stdin).get('files_changed',[])))")
+ CURRENT_SHA=$(git rev-parse HEAD 2>/dev/null || echo "")
+ if [ -n "$CURRENT_SHA" ]; then
+ echo "{\"files\": ${FILES_JSON}, \"status\": \"valid\", \"summary\": \"Monitor passed + tests passed\", \"commit_sha\": \"${CURRENT_SHA}\"}" | python3 .map/scripts/map_step_runner.py record_subtask_result
+ else
+ echo "{\"files\": ${FILES_JSON}, \"status\": \"valid\", \"summary\": \"Monitor passed + tests passed\"}" | python3 .map/scripts/map_step_runner.py record_subtask_result
+ fi
fi
# After Monitor returns:
diff --git a/src/mapify_cli/templates/hooks/workflow-context-injector.py b/src/mapify_cli/templates/hooks/workflow-context-injector.py
index 359c625..c5a8420 100755
--- a/src/mapify_cli/templates/hooks/workflow-context-injector.py
+++ b/src/mapify_cli/templates/hooks/workflow-context-injector.py
@@ -18,6 +18,9 @@
import os
import re
import sys
+
+# Keep in sync with map_step_runner.py GOAL_HEADING_RE
+GOAL_HEADING_RE = r"## (?:Goal|Overview)\n(.*?)(?=\n##|\Z)"
from pathlib import Path
# Bash commands that don't need workflow reminders
@@ -161,8 +164,61 @@ def required_action_for_step(step_id: str, step_phase: str, state: dict) -> str
return None
+def load_goal_and_title(branch: str, subtask_id: str) -> tuple[str, str]:
+ """Load goal from task_plan and subtask title from blueprint.
+
+ Returns (truncated_goal, subtask_title) or ("", "") on any error.
+ Fast: single json.load + single regex — target <20ms.
+ """
+ project_dir = Path(os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd()))
+ goal = ""
+ title = ""
+
+ # Goal from task_plan.md — matches ## Goal or ## Overview headings
+ plan_file = project_dir / ".map" / branch / f"task_plan_{branch}.md"
+ try:
+ if plan_file.exists():
+ content = plan_file.read_text(encoding="utf-8")
+ match = re.search(GOAL_HEADING_RE, content, re.DOTALL)
+ if match:
+ goal = match.group(1).strip()
+ # Truncate to first sentence
+ if ". " in goal:
+ goal = goal[: goal.index(". ") + 1]
+ if len(goal) > 80:
+ goal = goal[:77] + "..."
+ except OSError:
+ pass
+
+ # Title from blueprint.json
+ blueprint_file = project_dir / ".map" / branch / "blueprint.json"
+ try:
+ if blueprint_file.exists():
+ bp = json.loads(blueprint_file.read_text(encoding="utf-8"))
+ for st in bp.get("subtasks", []):
+ if st.get("id") == subtask_id:
+ title = st.get("title", "")
+ break
+ except (json.JSONDecodeError, OSError):
+ pass
+
+ return (goal, title)
+
+
+def _truncate_at_word(text: str, limit: int) -> str:
+ """Truncate text at word boundary, appending '...' within limit."""
+ if len(text) <= limit:
+ return text
+ cut = text[: limit - 3]
+ # Find last space to avoid cutting mid-word
+ last_space = cut.rfind(" ")
+ if last_space > limit // 2:
+ cut = cut[:last_space]
+ return cut + "..."
+
+
def format_reminder(state: dict, branch: str) -> str | None:
- """Format terse workflow reminder (aim: ~150-200 chars)."""
+ """Format terse workflow reminder (aim: ≤500 chars)."""
if not state:
return None
@@ -216,9 +272,30 @@ def format_reminder(state: dict, branch: str) -> str | None:
if not step_id and not step_phase:
return None
- base = f"[MAP] {step_id} {step_phase} | ST: {subtask_id} ({progress}) | plan:{plan_ok} mode:{mode}{wave_hint}{diag_hint}{files_hint}"
+ # Context-aware: add goal and subtask title
+ goal_hint = ""
+ title_hint = ""
+ if subtask_id != "-":
+ goal, title = load_goal_and_title(branch, subtask_id)
+ if goal:
+ goal_hint = f" | Goal: {goal}"
+ if title:
+ title_hint = f" {title}"
+
+ base = f"[MAP] {step_id} {step_phase}{goal_hint} | ST: {subtask_id}{title_hint} ({progress}) | plan:{plan_ok} mode:{mode}{wave_hint}{diag_hint}{files_hint}"
+
+ # Enforce 500-char limit: trim goal first, then word-boundary truncate
+ if len(base) > 500:
+ goal_hint = ""
+ base = f"[MAP] {step_id} {step_phase} | ST: {subtask_id}{title_hint} ({progress}) | plan:{plan_ok} mode:{mode}{wave_hint}{diag_hint}{files_hint}"
+ if len(base) > 500:
+ base = _truncate_at_word(base, 500)
+
if required:
- return f"{base} | REQUIRED: {required}"
+ result = f"{base} | REQUIRED: {required}"
+ if len(result) > 500:
+ result = _truncate_at_word(result, 500)
+ return result
return base
diff --git a/src/mapify_cli/templates/map/scripts/map_orchestrator.py b/src/mapify_cli/templates/map/scripts/map_orchestrator.py
index 39960e5..bcc9424 100755
--- a/src/mapify_cli/templates/map/scripts/map_orchestrator.py
+++ b/src/mapify_cli/templates/map/scripts/map_orchestrator.py
@@ -298,6 +298,25 @@ class StepState:
subtask_files_changed: dict[str, list[str]] = field(default_factory=dict)
guard_rework_counts: dict[str, int] = field(default_factory=dict)
constraints: Optional[dict] = None
+ subtask_results: dict[str, dict] = field(default_factory=dict)
+ last_subtask_commit_sha: Optional[str] = None
+
+ def record_subtask_result(
+ self,
+ subtask_id: str,
+ files_changed: list[str],
+ status: str,
+ summary: str = "",
+ commit_sha: Optional[str] = None,
+ ) -> None:
+ """Record result of a completed subtask for context injection."""
+ self.subtask_results[subtask_id] = {
+ "files_changed": files_changed,
+ "status": status,
+ "summary": summary,
+ }
+ if commit_sha:
+ self.last_subtask_commit_sha = commit_sha
def to_dict(self) -> dict:
"""Serialize to dictionary."""
@@ -325,6 +344,8 @@ def to_dict(self) -> dict:
"subtask_files_changed": self.subtask_files_changed,
"guard_rework_counts": self.guard_rework_counts,
"constraints": self.constraints,
+ "subtask_results": self.subtask_results,
+ "last_subtask_commit_sha": self.last_subtask_commit_sha,
}
@classmethod
@@ -354,6 +375,8 @@ def from_dict(cls, data: dict) -> "StepState":
subtask_files_changed=data.get("subtask_files_changed", {}),
guard_rework_counts=data.get("guard_rework_counts", {}),
constraints=data.get("constraints"),
+ subtask_results=data.get("subtask_results", {}),
+ last_subtask_commit_sha=data.get("last_subtask_commit_sha"),
)
@classmethod
diff --git a/src/mapify_cli/templates/map/scripts/map_step_runner.py b/src/mapify_cli/templates/map/scripts/map_step_runner.py
index 0a44ccb..a51ef52 100755
--- a/src/mapify_cli/templates/map/scripts/map_step_runner.py
+++ b/src/mapify_cli/templates/map/scripts/map_step_runner.py
@@ -28,11 +28,16 @@
"""
import json
+import os
import re
+import subprocess
from datetime import datetime
from pathlib import Path
from typing import Optional
+# Keep in sync with workflow-context-injector.py GOAL_HEADING_RE
+GOAL_HEADING_RE = r"## (?:Goal|Overview)\n(.*?)(?=\n##|\Z)"
+
HUMAN_ARTIFACT_DEFAULTS = {
"qa-001.md": "# QA 001\n\n",
@@ -791,7 +796,7 @@ def read_current_goal(branch: Optional[str] = None) -> Optional[str]:
try:
content = plan_file.read_text(encoding="utf-8")
- match = re.search(r"## Goal\n(.*?)(?=\n##|\Z)", content, re.DOTALL)
+ match = re.search(GOAL_HEADING_RE, content, re.DOTALL)
if match:
return match.group(1).strip()
except OSError:
@@ -833,7 +838,6 @@ def run_test_gate() -> dict:
Returns structured result with pass/fail, output, and exit code.
Called AFTER Monitor returns valid=true, BEFORE validate_step advances state.
"""
- import subprocess
# Detect test runner
runners = [
@@ -913,7 +917,6 @@ def snapshot_code_state(branch: Optional[str] = None) -> dict:
Records git ref, changed files, and diff stat so review artifacts
can be tied to actual code state. Populates subtask_files_changed.
"""
- import subprocess
branch_name = branch or get_branch_name()
@@ -943,6 +946,196 @@ def _run_git(args: list[str]) -> str:
}
+def load_blueprint(
+ branch: Optional[str] = None, project_dir: Optional[Path] = None
+) -> Optional[dict]:
+ """Load blueprint.json for current branch."""
+ if branch is None:
+ branch = get_branch_name()
+ base = project_dir or Path(".")
+ blueprint_path = base / ".map" / branch / "blueprint.json"
+ if not blueprint_path.exists():
+ return None
+ try:
+ return json.loads(blueprint_path.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError):
+ return None
+
+
+def get_subtask_from_blueprint(blueprint: dict, subtask_id: str) -> Optional[dict]:
+ """Extract single subtask from blueprint by ID."""
+ for subtask in blueprint.get("subtasks", []):
+ if subtask.get("id") == subtask_id:
+ return subtask
+ return None
+
+
+def get_upstream_ids(blueprint: dict, subtask_id: str) -> list[str]:
+ """Get dependency subtask IDs for a given subtask."""
+ subtask = get_subtask_from_blueprint(blueprint, subtask_id)
+ if not subtask:
+ return []
+ return subtask.get("dependencies", [])
+
+
+def _sanitize_branch(branch: str) -> str:
+ """Sanitize branch name for safe filesystem paths.
+
+ Keep in sync with sanitize_branch_name() in workflow-context-injector.py.
+ """
+ sanitized = branch.replace("/", "-")
+ sanitized = re.sub(r"[^a-zA-Z0-9_.-]", "-", sanitized)
+ sanitized = re.sub(r"-+", "-", sanitized).strip("-")
+ if ".." in sanitized or sanitized.startswith("."):
+ return "default"
+ return sanitized or "default"
+
+
+def build_context_block(branch: str, current_subtask_id: str) -> str:
+ """Build structured context block for Actor prompt.
+
+ Returns formatted string with:
+ - Goal (from task_plan.md)
+ - Current subtask full details (from blueprint)
+ - Plan overview (all subtasks as ID + title + status one-liners)
+ - Upstream results (from step_state.json subtask_results)
+ - Repo delta (differential insight, if last_subtask_commit_sha available)
+
+ Returns empty string if blueprint not found (graceful fallback).
+ """
+ branch = _sanitize_branch(branch)
+ project_dir = Path(os.environ.get("CLAUDE_PROJECT_DIR", os.getcwd()))
+
+ blueprint = load_blueprint(branch, project_dir=project_dir)
+ if not blueprint:
+ return ""
+
+ # Goal — read directly via project_dir for consistency
+ goal = None
+ plan_file = project_dir / ".map" / branch / f"task_plan_{branch}.md"
+ try:
+ if plan_file.exists():
+ content = plan_file.read_text(encoding="utf-8")
+ match = re.search(GOAL_HEADING_RE, content, re.DOTALL)
+ if match:
+ goal = match.group(1).strip()
+ except OSError:
+ pass
+ goal = goal or "No goal found"
+ # Truncate to first sentence
+ if ". " in goal:
+ goal = goal[: goal.index(". ") + 1]
+ if len(goal) > 200:
+ goal = goal[:197] + "..."
+
+ # Current subtask full details
+ current = get_subtask_from_blueprint(blueprint, current_subtask_id)
+ if not current:
+ return ""
+
+ current_details = []
+ current_details.append(f"AAG Contract: {current.get('aag_contract', 'N/A')}")
+ files = current.get("affected_files", [])
+ if files:
+ current_details.append(f"Affected files: {', '.join(files)}")
+ criteria = current.get("validation_criteria", [])
+ if criteria:
+ current_details.append("Validation criteria:")
+ for c in criteria:
+ current_details.append(f" - {c}")
+
+ # Plan overview with statuses from step_state.json
+ state_path = project_dir / ".map" / branch / "step_state.json"
+ subtask_phases: dict = {}
+ subtask_results: dict = {}
+ last_sha: Optional[str] = None
+ try:
+ if state_path.exists():
+ state = json.loads(state_path.read_text(encoding="utf-8"))
+ subtask_phases = state.get("subtask_phases", {})
+ subtask_results = state.get("subtask_results", {})
+ last_sha = state.get("last_subtask_commit_sha")
+ except (json.JSONDecodeError, OSError):
+ pass
+
+ overview_lines = []
+ for st in blueprint.get("subtasks", []):
+ st_id = st.get("id", "?")
+ st_title = st.get("title", "Untitled")
+ if st_id == current_subtask_id:
+ overview_lines.append(
+ f" [>>] {st_id}: {st_title} (IN PROGRESS) <- current"
+ )
+ elif st_id in subtask_results:
+ status = subtask_results[st_id].get("status", "done")
+ overview_lines.append(f" [x] {st_id}: {st_title} ({status})")
+ else:
+ phase = subtask_phases.get(st_id, "pending")
+ overview_lines.append(f" [ ] {st_id}: {st_title} ({phase})")
+
+ # Upstream results (only for dependencies)
+ upstream_ids = get_upstream_ids(blueprint, current_subtask_id)
+ upstream_lines = []
+ for up_id in upstream_ids:
+ if up_id in subtask_results:
+ result = subtask_results[up_id]
+ fc = result.get("files_changed", [])
+ status = result.get("status", "unknown")
+ summary = result.get("summary", "")
+ line = f" {up_id}: files={fc}, status={status}"
+ if summary:
+ line += f", summary={summary}"
+ upstream_lines.append(line)
+ else:
+ upstream_lines.append(f" {up_id}: (not yet completed)")
+
+ # Assemble block
+ parts = [
+ "",
+ f"# Goal: {goal}",
+ "",
+ f"# Current Subtask: {current_subtask_id} — {current.get('title', 'Untitled')}",
+ ]
+ parts.extend(current_details)
+ parts.append("")
+ parts.append(f"# Plan Overview ({len(blueprint.get('subtasks', []))} subtasks):")
+ parts.extend(overview_lines)
+
+ if upstream_lines:
+ parts.append("")
+ parts.append(f"# Upstream Results (dependencies of {current_subtask_id}):")
+ parts.extend(upstream_lines)
+
+ # Repo Delta (via compute_differential_insight from repo_insight)
+ if last_sha:
+ try:
+ from mapify_cli.repo_insight import compute_differential_insight
+
+ insight = compute_differential_insight(project_dir, last_sha)
+ changed = insight.get("changed_files", [])
+ deleted = insight.get("deleted_files", [])
+ if changed or deleted:
+ parts.append("")
+ parts.append("# Repo Delta (files changed since last subtask):")
+ for f in changed[:20]:
+ parts.append(f" {f}")
+ if len(changed) > 20:
+ parts.append(f" ... +{len(changed) - 20} more")
+ if deleted:
+ parts.append("# Deleted since last subtask:")
+ for f in deleted[:10]:
+ parts.append(f" (deleted) {f}")
+ if len(deleted) > 10:
+ parts.append(f" ... +{len(deleted) - 10} more")
+ except ImportError:
+ # Fallback: repo_insight not available in standalone .map/ context
+ pass
+
+ parts.append("")
+
+ return "\n".join(parts)
+
+
if __name__ == "__main__":
# Simple CLI interface for testing
import sys
@@ -1074,6 +1267,39 @@ def _run_git(args: list[str]) -> str:
result = snapshot_code_state()
print(json.dumps(result, indent=2))
+ elif func_name == "record_subtask_result":
+ # Read JSON from stdin to avoid shell injection: {"files": [...], "status": "...", "summary": "...", "commit_sha": "..."}
+ import sys as _sys
+ try:
+ data = json.loads(_sys.stdin.read())
+ except json.JSONDecodeError as e:
+ print(json.dumps({"status": "error", "message": f"Invalid JSON on stdin: {e}"}))
+ _sys.exit(1)
+ branch_name = get_branch_name()
+ state_path = Path(f".map/{branch_name}/step_state.json")
+ if not state_path.exists():
+ print(json.dumps({"status": "error", "message": "step_state.json not found"}))
+ _sys.exit(1)
+ from map_orchestrator import StepState
+ st = StepState.load(state_path)
+ subtask_id = data.get("subtask_id") or st.current_subtask_id or ""
+ if not subtask_id:
+ print(json.dumps({"status": "skipped", "message": "No subtask_id"}))
+ _sys.exit(0)
+ st.record_subtask_result(
+ subtask_id=subtask_id,
+ files_changed=data.get("files", []),
+ status=data.get("status", "valid"),
+ summary=data.get("summary", ""),
+ commit_sha=data.get("commit_sha"),
+ )
+ st.save(state_path)
+ print(json.dumps({"status": "success", "subtask_id": subtask_id}))
+
+ elif func_name == "build_context_block" and len(sys.argv) >= 4:
+ result = build_context_block(sys.argv[2], sys.argv[3])
+ print(result)
+
else:
print(f"Unknown function: {func_name}")
sys.exit(1)
diff --git a/tests/test_map_orchestrator.py b/tests/test_map_orchestrator.py
index ee40198..9c997aa 100644
--- a/tests/test_map_orchestrator.py
+++ b/tests/test_map_orchestrator.py
@@ -1487,5 +1487,79 @@ def test_reopen_then_get_next_step(self, branch_dir, tmp_path):
assert result["step_id"] == "2.3"
+class TestSubtaskResults:
+ """Tests for StepState subtask_results and last_subtask_commit_sha fields."""
+
+ def test_subtask_results_default_empty(self):
+ state = map_orchestrator.StepState()
+ assert state.subtask_results == {}
+ assert state.last_subtask_commit_sha is None
+
+ def test_record_subtask_result(self):
+ state = map_orchestrator.StepState()
+ state.record_subtask_result("ST-001", ["a.py", "b.py"], "valid", "All tests pass")
+ assert "ST-001" in state.subtask_results
+ assert state.subtask_results["ST-001"]["files_changed"] == ["a.py", "b.py"]
+ assert state.subtask_results["ST-001"]["status"] == "valid"
+ assert state.subtask_results["ST-001"]["summary"] == "All tests pass"
+
+ def test_record_subtask_result_with_commit_sha(self):
+ state = map_orchestrator.StepState()
+ state.record_subtask_result("ST-001", ["a.py"], "valid", commit_sha="abc123")
+ assert state.subtask_results["ST-001"]["status"] == "valid"
+ assert state.last_subtask_commit_sha == "abc123"
+
+ def test_record_subtask_result_without_commit_sha_preserves_existing(self):
+ state = map_orchestrator.StepState()
+ state.last_subtask_commit_sha = "old_sha"
+ state.record_subtask_result("ST-002", ["b.py"], "valid")
+ assert state.last_subtask_commit_sha == "old_sha"
+
+ def test_serialize_deserialize_roundtrip(self):
+ state = map_orchestrator.StepState()
+ state.record_subtask_result("ST-001", ["x.py"], "valid")
+ state.last_subtask_commit_sha = "abc123def"
+
+ data = state.to_dict()
+ assert data["subtask_results"]["ST-001"]["status"] == "valid"
+ assert data["last_subtask_commit_sha"] == "abc123def"
+
+ restored = map_orchestrator.StepState.from_dict(data)
+ assert restored.subtask_results["ST-001"]["files_changed"] == ["x.py"]
+ assert restored.last_subtask_commit_sha == "abc123def"
+
+ def test_save_load_roundtrip(self, tmp_path):
+ state_file = tmp_path / "step_state.json"
+ state = map_orchestrator.StepState()
+ state.record_subtask_result("ST-002", ["c.py"], "invalid", "Tests failed")
+ state.last_subtask_commit_sha = "deadbeef"
+ state.save(state_file)
+
+ loaded = map_orchestrator.StepState.load(state_file)
+ assert loaded.subtask_results["ST-002"]["status"] == "invalid"
+ assert loaded.last_subtask_commit_sha == "deadbeef"
+
+ def test_backward_compat_missing_fields(self):
+ """Old step_state.json without new fields should load safely."""
+ old_data = {"workflow": "map-efficient", "started_at": "2026-01-01"}
+ restored = map_orchestrator.StepState.from_dict(old_data)
+ assert restored.subtask_results == {}
+ assert restored.last_subtask_commit_sha is None
+
+ def test_record_subtask_result_empty_files(self):
+ """record_subtask_result with empty files_changed list."""
+ state = map_orchestrator.StepState()
+ state.record_subtask_result("ST-003", [], "valid", "No files changed")
+ assert state.subtask_results["ST-003"]["files_changed"] == []
+ assert state.subtask_results["ST-003"]["status"] == "valid"
+ assert state.subtask_results["ST-003"]["summary"] == "No files changed"
+
+ def test_record_subtask_result_empty_summary(self):
+ """record_subtask_result with empty summary string."""
+ state = map_orchestrator.StepState()
+ state.record_subtask_result("ST-004", ["x.py"], "valid")
+ assert state.subtask_results["ST-004"]["summary"] == ""
+
+
if __name__ == "__main__":
pytest.main([__file__, "-v"])
diff --git a/tests/test_map_step_runner.py b/tests/test_map_step_runner.py
index bbaadbb..17587a8 100644
--- a/tests/test_map_step_runner.py
+++ b/tests/test_map_step_runner.py
@@ -3,6 +3,7 @@
import json
import sys
from pathlib import Path
+from unittest.mock import patch, MagicMock
import pytest
@@ -791,3 +792,345 @@ def test_git_ref_is_truncated(self, branch_workspace):
result = map_step_runner.snapshot_code_state()
assert len(result["git_ref"]) <= 12
+
+
+class TestLoadBlueprint:
+ """Tests for load_blueprint function."""
+
+ def test_returns_dict_for_valid_file(self, branch_workspace):
+ blueprint = {"summary": "test", "subtasks": [{"id": "ST-001", "title": "T1"}]}
+ (branch_workspace / "blueprint.json").write_text(json.dumps(blueprint))
+ result = map_step_runner.load_blueprint("test-branch")
+ assert result == blueprint
+
+ def test_returns_none_for_missing_file(self, branch_workspace):
+ result = map_step_runner.load_blueprint("test-branch")
+ assert result is None
+
+ def test_returns_none_for_invalid_json(self, branch_workspace):
+ (branch_workspace / "blueprint.json").write_text("not json")
+ result = map_step_runner.load_blueprint("test-branch")
+ assert result is None
+
+
+class TestGetSubtaskFromBlueprint:
+ """Tests for get_subtask_from_blueprint function."""
+
+ def test_finds_subtask_by_id(self):
+ bp = {"subtasks": [{"id": "ST-001", "title": "A"}, {"id": "ST-002", "title": "B"}]}
+ result = map_step_runner.get_subtask_from_blueprint(bp, "ST-002")
+ assert result is not None
+ assert result["title"] == "B"
+
+ def test_returns_none_for_missing_id(self):
+ bp = {"subtasks": [{"id": "ST-001", "title": "A"}]}
+ result = map_step_runner.get_subtask_from_blueprint(bp, "ST-999")
+ assert result is None
+
+ def test_returns_none_for_empty_subtasks(self):
+ result = map_step_runner.get_subtask_from_blueprint({}, "ST-001")
+ assert result is None
+
+
+class TestGetUpstreamIds:
+ """Tests for get_upstream_ids function."""
+
+ def test_returns_dependencies(self):
+ bp = {"subtasks": [{"id": "ST-002", "dependencies": ["ST-001"]}]}
+ result = map_step_runner.get_upstream_ids(bp, "ST-002")
+ assert result == ["ST-001"]
+
+ def test_returns_empty_for_no_deps(self):
+ bp = {"subtasks": [{"id": "ST-001", "dependencies": []}]}
+ result = map_step_runner.get_upstream_ids(bp, "ST-001")
+ assert result == []
+
+ def test_returns_empty_for_missing_subtask(self):
+ bp = {"subtasks": []}
+ result = map_step_runner.get_upstream_ids(bp, "ST-999")
+ assert result == []
+
+
+class TestBuildContextBlock:
+ """Tests for build_context_block function."""
+
+ def test_returns_empty_when_no_blueprint(self, branch_workspace):
+ result = map_step_runner.build_context_block("test-branch", "ST-001")
+ assert result == ""
+
+ def test_returns_empty_when_subtask_not_found(self, branch_workspace):
+ bp = {"summary": "test", "subtasks": [{"id": "ST-001", "title": "A"}]}
+ (branch_workspace / "blueprint.json").write_text(json.dumps(bp))
+ result = map_step_runner.build_context_block("test-branch", "ST-999")
+ assert result == ""
+
+ def test_builds_full_context_block(self, branch_workspace):
+ bp = {
+ "summary": "test goal",
+ "subtasks": [
+ {
+ "id": "ST-001",
+ "title": "First task",
+ "aag_contract": "Actor -> do() -> done",
+ "affected_files": ["a.py"],
+ "validation_criteria": ["VC1: check"],
+ "dependencies": [],
+ },
+ {
+ "id": "ST-002",
+ "title": "Second task",
+ "aag_contract": "Actor -> do2() -> done2",
+ "affected_files": ["b.py"],
+ "validation_criteria": ["VC2: check"],
+ "dependencies": ["ST-001"],
+ },
+ ],
+ }
+ (branch_workspace / "blueprint.json").write_text(json.dumps(bp))
+
+ plan = "## Goal\nImplement the feature.\n\n## Subtasks\n..."
+ (branch_workspace / "task_plan_test-branch.md").write_text(plan)
+
+ state = {
+ "subtask_phases": {"ST-001": "COMPLETE"},
+ "subtask_results": {
+ "ST-001": {"files_changed": ["a.py"], "status": "valid", "summary": "done"}
+ },
+ }
+ (branch_workspace / "step_state.json").write_text(json.dumps(state))
+
+ result = map_step_runner.build_context_block("test-branch", "ST-002")
+
+ assert "" in result
+ assert "" in result
+ assert "# Goal:" in result
+ assert "Implement the feature." in result
+ assert "ST-002" in result
+ assert "Second task" in result
+ assert "Actor -> do2() -> done2" in result
+ assert "[>>] ST-002" in result
+ assert "[x] ST-001" in result
+ assert "# Upstream Results" in result
+ assert "ST-001: files=" in result
+
+ def test_upstream_results_omitted_when_no_deps(self, branch_workspace):
+ bp = {
+ "summary": "test",
+ "subtasks": [
+ {
+ "id": "ST-001",
+ "title": "Only task",
+ "aag_contract": "A -> B -> C",
+ "affected_files": [],
+ "validation_criteria": [],
+ "dependencies": [],
+ },
+ ],
+ }
+ (branch_workspace / "blueprint.json").write_text(json.dumps(bp))
+ plan = "## Goal\nDo thing.\n\n## Done"
+ (branch_workspace / "task_plan_test-branch.md").write_text(plan)
+
+ result = map_step_runner.build_context_block("test-branch", "ST-001")
+
+ assert "" in result
+ assert "# Upstream Results" not in result
+
+
+class TestBuildContextBlockRepoDelta:
+ """Tests for Repo Delta path in build_context_block (requires mocked compute_differential_insight)."""
+
+ def _setup_blueprint_and_state(self, branch_workspace, last_sha=None):
+ """Helper to set up blueprint + state with optional last_subtask_commit_sha."""
+ bp = {
+ "summary": "test",
+ "subtasks": [
+ {
+ "id": "ST-001",
+ "title": "First task",
+ "aag_contract": "A -> B -> C",
+ "affected_files": ["a.py"],
+ "validation_criteria": ["VC1"],
+ "dependencies": [],
+ },
+ ],
+ }
+ (branch_workspace / "blueprint.json").write_text(json.dumps(bp))
+ plan = "## Goal\nDo thing.\n\n## Done"
+ (branch_workspace / "task_plan_test-branch.md").write_text(plan)
+
+ state = {"subtask_phases": {}, "subtask_results": {}}
+ if last_sha is not None:
+ state["last_subtask_commit_sha"] = last_sha
+ (branch_workspace / "step_state.json").write_text(json.dumps(state))
+
+ def test_includes_repo_delta_when_sha_available(self, branch_workspace):
+ self._setup_blueprint_and_state(branch_workspace, last_sha="abc123")
+ mock_insight = {
+ "changed_files": ["src/foo.py", "src/bar.py"],
+ "deleted_files": [],
+ "since_sha": "abc123",
+ "current_sha": "def456",
+ }
+ with patch.dict("sys.modules", {"mapify_cli": MagicMock(), "mapify_cli.repo_insight": MagicMock()}):
+ with patch(
+ "mapify_cli.repo_insight.compute_differential_insight",
+ return_value=mock_insight,
+ ):
+ result = map_step_runner.build_context_block("test-branch", "ST-001")
+
+ assert "# Repo Delta" in result
+ assert "src/foo.py" in result
+ assert "src/bar.py" in result
+
+ def test_repo_delta_capped_at_20_files(self, branch_workspace):
+ self._setup_blueprint_and_state(branch_workspace, last_sha="abc123")
+ many_files = [f"file_{i}.py" for i in range(25)]
+ mock_insight = {
+ "changed_files": many_files,
+ "deleted_files": [],
+ "since_sha": "abc123",
+ "current_sha": "def456",
+ }
+ with patch.dict("sys.modules", {"mapify_cli": MagicMock(), "mapify_cli.repo_insight": MagicMock()}):
+ with patch(
+ "mapify_cli.repo_insight.compute_differential_insight",
+ return_value=mock_insight,
+ ):
+ result = map_step_runner.build_context_block("test-branch", "ST-001")
+
+ assert "# Repo Delta" in result
+ assert "file_19.py" in result
+ assert "file_20.py" not in result
+ assert "... +5 more" in result
+
+ def test_repo_delta_omitted_on_error(self, branch_workspace):
+ self._setup_blueprint_and_state(branch_workspace, last_sha="abc123")
+ mock_insight = {
+ "changed_files": [],
+ "deleted_files": [],
+ "error": "git diff failed",
+ }
+ with patch.dict("sys.modules", {"mapify_cli": MagicMock(), "mapify_cli.repo_insight": MagicMock()}):
+ with patch(
+ "mapify_cli.repo_insight.compute_differential_insight",
+ return_value=mock_insight,
+ ):
+ result = map_step_runner.build_context_block("test-branch", "ST-001")
+
+ assert "" in result
+ assert "# Repo Delta" not in result
+
+ def test_repo_delta_omitted_when_no_sha(self, branch_workspace):
+ self._setup_blueprint_and_state(branch_workspace, last_sha=None)
+ result = map_step_runner.build_context_block("test-branch", "ST-001")
+ assert "" in result
+ assert "# Repo Delta" not in result
+
+ def test_repo_delta_fallback_on_import_error(self, branch_workspace):
+ """When mapify_cli.repo_insight is not importable, Repo Delta is silently skipped."""
+ self._setup_blueprint_and_state(branch_workspace, last_sha="abc123")
+ with patch.dict("sys.modules", {"mapify_cli": None, "mapify_cli.repo_insight": None}):
+ result = map_step_runner.build_context_block("test-branch", "ST-001")
+
+ assert "" in result
+ assert "# Repo Delta" not in result
+
+ def test_repo_delta_includes_deleted_files(self, branch_workspace):
+ """Deleted files from compute_differential_insight are shown in context block."""
+ self._setup_blueprint_and_state(branch_workspace, last_sha="abc123")
+ mock_insight = {
+ "changed_files": ["src/new.py"],
+ "deleted_files": ["src/old.py", "src/removed.py"],
+ "since_sha": "abc123",
+ "current_sha": "def456",
+ }
+ with patch.dict("sys.modules", {"mapify_cli": MagicMock(), "mapify_cli.repo_insight": MagicMock()}):
+ with patch(
+ "mapify_cli.repo_insight.compute_differential_insight",
+ return_value=mock_insight,
+ ):
+ result = map_step_runner.build_context_block("test-branch", "ST-001")
+
+ assert "# Repo Delta" in result
+ assert "src/new.py" in result
+ assert "# Deleted since last subtask:" in result
+ assert "(deleted) src/old.py" in result
+ assert "(deleted) src/removed.py" in result
+
+ def test_repo_delta_only_deleted_no_changed(self, branch_workspace):
+ """When only deletions occurred, Repo Delta still appears."""
+ self._setup_blueprint_and_state(branch_workspace, last_sha="abc123")
+ mock_insight = {
+ "changed_files": [],
+ "deleted_files": ["src/gone.py"],
+ "since_sha": "abc123",
+ "current_sha": "def456",
+ }
+ with patch.dict("sys.modules", {"mapify_cli": MagicMock(), "mapify_cli.repo_insight": MagicMock()}):
+ with patch(
+ "mapify_cli.repo_insight.compute_differential_insight",
+ return_value=mock_insight,
+ ):
+ result = map_step_runner.build_context_block("test-branch", "ST-001")
+
+ assert "# Repo Delta" in result
+ assert "# Deleted since last subtask:" in result
+ assert "(deleted) src/gone.py" in result
+
+
+class TestBuildContextBlockIntegration:
+ """Integration test: record_subtask_result → build_context_block → upstream results."""
+
+ def test_upstream_results_flow(self, branch_workspace):
+ """Subtask results recorded in step_state appear as upstream results in context block."""
+ branch = "test-branch"
+
+ # Set up blueprint with two subtasks, ST-002 depends on ST-001
+ bp = {
+ "subtasks": [
+ {
+ "id": "ST-001",
+ "title": "First task",
+ "aag_contract": "A -> B -> C",
+ "affected_files": ["a.py"],
+ "validation_criteria": ["VC1"],
+ "dependencies": [],
+ },
+ {
+ "id": "ST-002",
+ "title": "Second task",
+ "aag_contract": "D -> E -> F",
+ "affected_files": ["b.py"],
+ "validation_criteria": ["VC2"],
+ "dependencies": ["ST-001"],
+ },
+ ],
+ }
+ (branch_workspace / "blueprint.json").write_text(json.dumps(bp))
+
+ plan = "## Goal\nBuild the feature.\n\n## Done"
+ (branch_workspace / f"task_plan_{branch}.md").write_text(plan)
+
+ # Simulate ST-001 completed with results via StepState
+ sys.path.insert(0, str(SCRIPTS_PATH))
+ import map_orchestrator # noqa: E402
+
+ state = map_orchestrator.StepState()
+ state.current_subtask_id = "ST-002"
+ state.record_subtask_result(
+ "ST-001", ["a.py"], "valid", "All tests pass", commit_sha="abc123"
+ )
+ state_file = branch_workspace / "step_state.json"
+ state.save(state_file)
+
+ # Now build context for ST-002 — should see ST-001 upstream results
+ result = map_step_runner.build_context_block(branch, "ST-002")
+
+ assert "" in result
+ assert "# Current Subtask: ST-002" in result
+ assert "# Upstream Results (dependencies of ST-002):" in result
+ assert "ST-001: files=['a.py'], status=valid" in result
+ assert "All tests pass" in result
+ assert "[x] ST-001: First task (valid)" in result
+ assert "[>>] ST-002: Second task (IN PROGRESS)" in result
diff --git a/tests/test_repo_insight.py b/tests/test_repo_insight.py
index 4ac8cb2..65f1d3b 100644
--- a/tests/test_repo_insight.py
+++ b/tests/test_repo_insight.py
@@ -4,11 +4,14 @@
import tempfile
from pathlib import Path
+from unittest.mock import patch, MagicMock
+
from mapify_cli.repo_insight import (
detect_language,
generate_suggested_checks,
generate_key_dirs,
create_repo_insight,
+ compute_differential_insight,
)
@@ -366,3 +369,57 @@ def test_unknown_language_still_produces_valid_json(self):
# Unknown language should still pass validation
assert data["language"] == "unknown"
+
+
+class TestComputeDifferentialInsight:
+ """Tests for compute_differential_insight function."""
+
+ def test_none_sha_returns_note(self):
+ """Should return empty lists with note when since_sha is None."""
+ result = compute_differential_insight(Path("/tmp"), None)
+ assert result["changed_files"] == []
+ assert result["deleted_files"] == []
+ assert "note" in result
+
+ def test_valid_diff_returns_files(self):
+ """Should return changed and deleted files on successful git diff."""
+ mock_changed = MagicMock(returncode=0, stdout="a.py\nb.py\n")
+ mock_deleted = MagicMock(returncode=0, stdout="old.py\n")
+ mock_head = MagicMock(returncode=0, stdout="abc123\n")
+
+ with patch("subprocess.run", side_effect=[mock_changed, mock_deleted, mock_head]):
+ result = compute_differential_insight(Path("/tmp"), "def456")
+
+ assert result["changed_files"] == ["a.py", "b.py"]
+ assert result["deleted_files"] == ["old.py"]
+ assert result["since_sha"] == "def456"
+ assert result["current_sha"] == "abc123"
+
+ def test_git_failure_returns_error(self):
+ """Should return error dict when git diff fails."""
+ mock_fail = MagicMock(returncode=1, stderr="fatal: bad object")
+
+ with patch("subprocess.run", return_value=mock_fail):
+ result = compute_differential_insight(Path("/tmp"), "badsha")
+
+ assert result["changed_files"] == []
+ assert result["deleted_files"] == []
+ assert "error" in result
+
+ def test_timeout_returns_error(self):
+ """Should return error dict on subprocess timeout."""
+ import subprocess
+
+ with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("git", 2)):
+ result = compute_differential_insight(Path("/tmp"), "abc123")
+
+ assert result["changed_files"] == []
+ assert "error" in result
+
+ def test_file_not_found_returns_error(self):
+ """Should return error dict when git is not available."""
+ with patch("subprocess.run", side_effect=FileNotFoundError("git")):
+ result = compute_differential_insight(Path("/tmp"), "abc123")
+
+ assert result["changed_files"] == []
+ assert "error" in result
diff --git a/tests/test_workflow_context_injector.py b/tests/test_workflow_context_injector.py
index e6d6798..336094d 100644
--- a/tests/test_workflow_context_injector.py
+++ b/tests/test_workflow_context_injector.py
@@ -1,8 +1,11 @@
import json
+import importlib.util
import os
import subprocess
from pathlib import Path
+import pytest
+
def _run_hook(tmp_project_dir: Path, stdin_payload: dict) -> tuple[int, str, str]:
hook_path = Path(".claude/hooks/workflow-context-injector.py")
@@ -20,8 +23,23 @@ def _run_hook(tmp_project_dir: Path, stdin_payload: dict) -> tuple[int, str, str
return proc.returncode, proc.stdout.strip(), proc.stderr.strip()
-def test_injects_for_edit_when_step_state_exists(tmp_path: Path) -> None:
- branch = (
+def _import_hook():
+ """Import the hook module dynamically for direct function testing."""
+ hook_path = Path(".claude/hooks/workflow-context-injector.py").resolve()
+ spec = importlib.util.spec_from_file_location("workflow_context_injector", hook_path)
+ mod = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(mod)
+ return mod
+
+
+@pytest.fixture
+def hook_mod():
+ return _import_hook()
+
+
+@pytest.fixture
+def branch_name():
+ return (
subprocess.run(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
capture_output=True,
@@ -33,6 +51,10 @@ def test_injects_for_edit_when_step_state_exists(tmp_path: Path) -> None:
.replace("/", "-")
)
+
+def test_injects_for_edit_when_step_state_exists(tmp_path: Path, branch_name: str) -> None:
+ branch = branch_name
+
state_dir = tmp_path / ".map" / branch
state_dir.mkdir(parents=True, exist_ok=True)
(state_dir / "step_state.json").write_text(
@@ -74,18 +96,8 @@ def test_skips_for_readonly_bash(tmp_path: Path) -> None:
assert out == "{}"
-def test_injects_for_pytest_bash_when_step_state_exists(tmp_path: Path) -> None:
- branch = (
- subprocess.run(
- ["git", "rev-parse", "--abbrev-ref", "HEAD"],
- capture_output=True,
- text=True,
- check=True,
- timeout=2,
- )
- .stdout.strip()
- .replace("/", "-")
- )
+def test_injects_for_pytest_bash_when_step_state_exists(tmp_path: Path, branch_name: str) -> None:
+ branch = branch_name
state_dir = tmp_path / ".map" / branch
state_dir.mkdir(parents=True, exist_ok=True)
@@ -115,3 +127,273 @@ def test_injects_for_pytest_bash_when_step_state_exists(tmp_path: Path) -> None:
assert "2.8" in additional
assert "TESTS_GATE" in additional
assert "ST-002" in additional
+
+
+class TestLoadGoalAndTitle:
+ """Tests for load_goal_and_title function."""
+
+ def test_returns_goal_and_title(self, tmp_path, hook_mod, branch_name):
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ plan = "## Goal\nImplement the feature. More details here.\n\n## Subtasks\n..."
+ (state_dir / f"task_plan_{branch}.md").write_text(plan)
+
+ bp = {"subtasks": [{"id": "ST-001", "title": "First task"}]}
+ (state_dir / "blueprint.json").write_text(json.dumps(bp))
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ goal, title = hook_mod.load_goal_and_title(branch, "ST-001")
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert goal == "Implement the feature."
+ assert title == "First task"
+
+ def test_returns_empty_when_no_files(self, tmp_path, hook_mod, branch_name):
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ goal, title = hook_mod.load_goal_and_title(branch_name, "ST-001")
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert goal == ""
+ assert title == ""
+
+ def test_truncates_goal_at_80_chars(self, tmp_path, hook_mod, branch_name):
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ long_goal = "A" * 100
+ plan = f"## Goal\n{long_goal}\n\n## Done"
+ (state_dir / f"task_plan_{branch}.md").write_text(plan)
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ goal, _ = hook_mod.load_goal_and_title(branch, "ST-001")
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert len(goal) == 80
+ assert goal.endswith("...")
+
+ def test_truncates_goal_at_first_sentence(self, tmp_path, hook_mod, branch_name):
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ plan = "## Goal\nFirst sentence. Second sentence. Third.\n\n## Done"
+ (state_dir / f"task_plan_{branch}.md").write_text(plan)
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ goal, _ = hook_mod.load_goal_and_title(branch, "ST-001")
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert goal == "First sentence."
+
+ def test_returns_empty_title_for_missing_subtask(self, tmp_path, hook_mod, branch_name):
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ bp = {"subtasks": [{"id": "ST-001", "title": "Only task"}]}
+ (state_dir / "blueprint.json").write_text(json.dumps(bp))
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ _, title = hook_mod.load_goal_and_title(branch, "ST-999")
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert title == ""
+
+ def test_handles_invalid_json_blueprint(self, tmp_path, hook_mod, branch_name):
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ (state_dir / "blueprint.json").write_text("not json")
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ _, title = hook_mod.load_goal_and_title(branch, "ST-001")
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert title == ""
+
+ def test_matches_overview_heading(self, tmp_path, hook_mod, branch_name):
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ plan = "## Overview\nThe overview text.\n\n## Details"
+ (state_dir / f"task_plan_{branch}.md").write_text(plan)
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ goal, _ = hook_mod.load_goal_and_title(branch, "ST-001")
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert goal == "The overview text."
+
+
+class TestFormatReminderTruncation:
+ """Tests for format_reminder progressive 500-char truncation."""
+
+ def _make_state(self, **overrides):
+ base = {
+ "current_step_id": "2.3",
+ "current_step_phase": "ACTOR",
+ "current_subtask_id": "ST-001",
+ "subtask_index": 0,
+ "subtask_sequence": ["ST-001"],
+ "plan_approved": True,
+ "execution_mode": "batch",
+ }
+ base.update(overrides)
+ return base
+
+ def test_result_within_500_chars(self, hook_mod, tmp_path, branch_name):
+ """Basic reminder should be well under 500 chars."""
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ state = self._make_state()
+ result = hook_mod.format_reminder(state, branch_name)
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert result is not None
+ assert len(result) <= 500
+
+ def test_includes_goal_when_plan_exists(self, hook_mod, tmp_path, branch_name):
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ plan = "## Goal\nShort goal.\n\n## Done"
+ (state_dir / f"task_plan_{branch}.md").write_text(plan)
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ state = self._make_state()
+ result = hook_mod.format_reminder(state, branch)
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert "Goal: Short goal." in result
+
+ def test_includes_subtask_title(self, hook_mod, tmp_path, branch_name):
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ bp = {"subtasks": [{"id": "ST-001", "title": "My task title"}]}
+ (state_dir / "blueprint.json").write_text(json.dumps(bp))
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ state = self._make_state()
+ result = hook_mod.format_reminder(state, branch)
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert "My task title" in result
+
+ def test_hard_truncates_at_500(self, hook_mod, tmp_path, branch_name):
+ """When base string exceeds 500 chars even after dropping goal, hard-truncate."""
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ # Create a title long enough to push past 500 chars even without goal
+ bp = {"subtasks": [{"id": "ST-001", "title": "X" * 480}]}
+ (state_dir / "blueprint.json").write_text(json.dumps(bp))
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ state = self._make_state()
+ result = hook_mod.format_reminder(state, branch)
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert result is not None
+ assert len(result) <= 500
+ assert result.endswith("...")
+
+ def test_drops_goal_first_when_over_500(self, hook_mod, tmp_path, branch_name):
+ """Goal hint is dropped first before hard truncation."""
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ # Title that takes ~430 chars, goal that would push it past 500
+ plan = "## Goal\nSome goal text.\n\n## Done"
+ (state_dir / f"task_plan_{branch}.md").write_text(plan)
+ bp = {"subtasks": [{"id": "ST-001", "title": "Y" * 430}]}
+ (state_dir / "blueprint.json").write_text(json.dumps(bp))
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ state = self._make_state()
+ result = hook_mod.format_reminder(state, branch)
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert result is not None
+ assert len(result) <= 500
+ # Goal should have been dropped
+ assert "Goal:" not in result
+
+ def test_no_goal_or_title_when_subtask_is_dash(self, hook_mod, tmp_path, branch_name):
+ """When subtask_id is '-', skip goal/title loading entirely."""
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ state = self._make_state(current_subtask_id="-")
+ result = hook_mod.format_reminder(state, branch_name)
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert result is not None
+ assert "Goal:" not in result
+
+ def test_required_suffix_truncated(self, hook_mod, tmp_path, branch_name):
+ """REQUIRED suffix should also be truncated to 500 chars total at word boundary."""
+ branch = branch_name
+ state_dir = tmp_path / ".map" / branch
+ state_dir.mkdir(parents=True, exist_ok=True)
+
+ # Use word-spaced title so truncation can find a word boundary
+ # "word " * 90 = 450 chars, plus prefix + REQUIRED pushes well past 500
+ long_title = ("word " * 90).strip()
+ bp = {"subtasks": [{"id": "ST-001", "title": long_title}]}
+ (state_dir / "blueprint.json").write_text(json.dumps(bp))
+
+ os.environ["CLAUDE_PROJECT_DIR"] = str(tmp_path)
+ try:
+ # Use step_id "1.55" which triggers "Review and approve plan" required action
+ state = self._make_state(
+ current_step_id="1.55",
+ current_step_phase="REVIEW_PLAN",
+ )
+ result = hook_mod.format_reminder(state, branch)
+ finally:
+ os.environ.pop("CLAUDE_PROJECT_DIR", None)
+
+ assert result is not None
+ assert len(result) <= 500
+ assert result.endswith("...")
+ # Word-boundary truncation: should not cut mid-word.
+ # The text before "..." ends at a space (rfind finds last space),
+ # so the remaining text should end with a non-alphanumeric char
+ # (space, pipe, paren) rather than cutting "wor..." mid-word.
+ before_ellipsis = result[:-3]
+ assert not before_ellipsis[-1].isalpha(), (
+ f"Truncation cut mid-word; last char before '...': {before_ellipsis[-1]!r}"
+ )