diff --git a/tests/test_plan_mode.py b/tests/test_plan_mode.py new file mode 100644 index 0000000..8f12b43 --- /dev/null +++ b/tests/test_plan_mode.py @@ -0,0 +1,82 @@ +"""Unit tests for the plan-mode tools. + +Exercise `tools.plan_mode._enter_plan_mode` and `_exit_plan_mode` in +isolation: the permission-mode transitions, plan-file lifecycle and the +"empty plan" guard. E2E coverage (through agent.run + a mocked LLM stream ++ the Write tool) lives in test_plan_mode_e2e.py. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +import runtime +import tools as _tools_init # noqa: F401 — register tools including plan_mode +from tools.plan_mode import _enter_plan_mode, _exit_plan_mode + + +@pytest.fixture(autouse=True) +def _isolated_ctx(): + """Ensure plan-mode state is not leaked between tests (same session_id).""" + yield + for sid in ("default", "unit_sess"): + ctx = runtime.get_session_ctx(sid) + ctx.plan_file = None + ctx.prev_permission_mode = None + + +def _mk_config(cwd): + return { + "_session_id": "unit_sess", + "_worktree_cwd": str(cwd), + "permission_mode": "auto", + } + + +class TestEnterPlanMode: + def test_creates_plan_file_with_header(self, tmp_path): + config = _mk_config(tmp_path) + result = _enter_plan_mode({"task_description": "Refactor X"}, config) + + plan_path = tmp_path / ".nano_claude" / "plans" / "unit_sess.md" + assert plan_path.exists() + assert plan_path.read_text(encoding="utf-8").startswith("# Plan: Refactor X") + assert "Plan mode activated" in result + + def test_flips_permission_mode_to_plan(self, tmp_path): + config = _mk_config(tmp_path) + _enter_plan_mode({}, config) + assert config["permission_mode"] == "plan" + + def test_is_idempotent_if_already_in_plan_mode(self, tmp_path): + config = _mk_config(tmp_path) + _enter_plan_mode({}, config) + second = _enter_plan_mode({}, config) + assert "Already in plan mode" in second + + +class TestExitPlanMode: + def test_rejects_empty_plan(self, tmp_path): + config = _mk_config(tmp_path) + _enter_plan_mode({}, config) # writes only the "# Plan" header + result = _exit_plan_mode({}, config) + assert "empty" in result.lower() + # Still in plan mode, since exit was refused. + assert config["permission_mode"] == "plan" + + def test_accepts_plan_with_real_content_and_restores_permission(self, tmp_path): + config = _mk_config(tmp_path) + _enter_plan_mode({}, config) + plan_path = tmp_path / ".nano_claude" / "plans" / "unit_sess.md" + plan_path.write_text("# Plan\n\n## Steps\n1. read\n2. write\n", encoding="utf-8") + + result = _exit_plan_mode({}, config) + assert "Plan mode exited" in result + assert "## Steps" in result + assert config["permission_mode"] == "auto" + + def test_noop_when_not_in_plan_mode(self, tmp_path): + config = _mk_config(tmp_path) # permission_mode = "auto" + result = _exit_plan_mode({}, config) + assert "Not in plan mode" in result diff --git a/tests/test_plan_mode_e2e.py b/tests/test_plan_mode_e2e.py new file mode 100644 index 0000000..23ee26a --- /dev/null +++ b/tests/test_plan_mode_e2e.py @@ -0,0 +1,113 @@ +"""End-to-end: LLM drives the plan-mode workflow via agent.run + mocked stream. + +The plan file is written using the regular `Write` tool, whose permission +check only allows writes to the current plan_file while in plan mode -- so +this test also exercises the agent._check_permission plan-mode branch. + +Only `providers.stream` is mocked. Plan tools, registry dispatch, Write tool +and the per-session RuntimeContext all run for real against tmp_path. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest + +import tools as _tools_init # noqa: F401 - register built-ins + plan_mode +import runtime +from agent import AgentState, run +from providers import AssistantTurn + + +def _scripted_stream(turns): + cursor = iter(turns) + + def fake_stream(**_kwargs): + spec = next(cursor) + yield AssistantTurn( + text=spec.get("text", ""), + tool_calls=spec.get("tool_calls") or [], + in_tokens=1, out_tokens=1, + ) + + return fake_stream + + +@pytest.fixture(autouse=True) +def _reset_plan_ctx(): + yield + for sid in ("default", "plan_e2e", "plan_rogue"): + ctx = runtime.get_session_ctx(sid) + ctx.plan_file = None + ctx.prev_permission_mode = None + + +def test_full_plan_mode_flow_through_agent_loop(monkeypatch, tmp_path): + """EnterPlanMode → Write(plan_file) → ExitPlanMode, all via the real agent loop.""" + plan_file = str(tmp_path / ".nano_claude" / "plans" / "plan_e2e.md") + plan_body = "# Plan: Refactor X\n\n## Steps\n1. explore\n2. implement\n" + turns = [ + {"tool_calls": [{ + "id": "t1", "name": "EnterPlanMode", + "input": {"task_description": "Refactor X"}, + }]}, + {"tool_calls": [{ + "id": "t2", "name": "Write", + "input": {"file_path": plan_file, "content": plan_body}, + }]}, + {"tool_calls": [{ + "id": "t3", "name": "ExitPlanMode", "input": {}, + }]}, + {"text": "all done"}, + ] + monkeypatch.setattr("agent.stream", _scripted_stream(turns)) + + state = AgentState() + config = { + "model": "test", + "permission_mode": "auto", # plan mode will flip it to "plan" + "_session_id": "plan_e2e", + "_worktree_cwd": str(tmp_path), + } + list(run("plan a refactor", state, config, "sys")) + + # Plan file ended up on disk with the Write-tool content. + assert Path(plan_file).read_text(encoding="utf-8") == plan_body + + # ExitPlanMode restored the previous permission mode. + assert config["permission_mode"] == "auto" + + +def test_write_outside_plan_file_is_rejected_in_plan_mode(monkeypatch, tmp_path): + """The permission-mode 'plan' branch must deny Writes to any file != plan_file.""" + plan_file = str(tmp_path / ".nano_claude" / "plans" / "plan_rogue.md") + unrelated = str(tmp_path / "src" / "config.py") + turns = [ + {"tool_calls": [{ + "id": "t1", "name": "EnterPlanMode", + "input": {"task_description": "secure"}, + }]}, + {"tool_calls": [{ + "id": "t2", "name": "Write", + "input": {"file_path": unrelated, "content": "print('pwned')"}, + }]}, + {"text": "stopped"}, + ] + monkeypatch.setattr("agent.stream", _scripted_stream(turns)) + + state = AgentState() + config = { + "model": "test", + "permission_mode": "auto", + "_session_id": "plan_rogue", + "_worktree_cwd": str(tmp_path), + } + list(run("try a rogue write", state, config, "sys")) + + # The unrelated file was NEVER created. + assert not Path(unrelated).exists() + + # The Write tool_result for t2 carries the rejection message. + t2_result = next(m for m in state.messages + if m.get("role") == "tool" and m.get("tool_call_id") == "t2") + assert "Denied" in t2_result["content"] or "plan" in t2_result["content"].lower() diff --git a/tools/__init__.py b/tools/__init__.py index 8731a8c..d04b037 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -495,13 +495,21 @@ def _register_builtins() -> None: "skill.tools", "cc_mcp.tools", "task.tools", + "tools.plan_mode", ] for _mod_name in _EXTENSION_MODULES: try: __import__(_mod_name) - except Exception: - pass # Extension loading is best-effort; never crash startup + except Exception as _ext_err: + # Best-effort loading — a missing optional extension must not crash startup, + # but the cause should still be visible on stderr for diagnosis. + import sys as _sys + print( + f"[tools] extension {_mod_name!r} failed to load: " + f"{type(_ext_err).__name__}: {_ext_err}", + file=_sys.stderr, + ) from multi_agent.tools import get_agent_manager as _get_agent_manager # noqa: F401 @@ -525,95 +533,6 @@ def _register_builtins() -> None: except Exception: pass -# ── Plan mode tools (EnterPlanMode / ExitPlanMode) ──────────────────────── - -from pathlib import Path as _Path - - -def _enter_plan_mode(params: dict, config: dict) -> str: - if config.get("permission_mode") == "plan": - return "Already in plan mode. Write your plan to the plan file, then call ExitPlanMode." - - session_id = config.get("_session_id", "default") - plans_dir = _Path(config.get("_worktree_cwd") or _Path.cwd()) / ".nano_claude" / "plans" - plans_dir.mkdir(parents=True, exist_ok=True) - plan_path = plans_dir / f"{session_id}.md" - - task_desc = params.get("task_description", "") - if not plan_path.exists() or plan_path.stat().st_size == 0: - header = f"# Plan: {task_desc}\n\n" if task_desc else "# Plan\n\n" - plan_path.write_text(header, encoding="utf-8") - - import runtime - sctx = runtime.get_ctx(config) - sctx.prev_permission_mode = config.get("permission_mode", "auto") - config["permission_mode"] = "plan" - sctx.plan_file = str(plan_path) - return ( - f"Plan mode activated. Plan file: {plan_path}\n" - "Write your step-by-step plan to the plan file, then call ExitPlanMode when ready to implement." - ) - - -def _exit_plan_mode(params: dict, config: dict) -> str: - if config.get("permission_mode") != "plan": - return "Not in plan mode." - import runtime - sctx = runtime.get_ctx(config) - plan_file = sctx.plan_file or "" - plan_content = "" - if plan_file: - try: - plan_content = _Path(plan_file).read_text(encoding="utf-8").strip() - except Exception: - plan_content = "" - - # Reject if plan file is effectively empty (only whitespace / top-level title) - # A top-level title is exactly "# ..." (single #). ## sections count as content. - non_trivial_lines = [ - l for l in plan_content.splitlines() - if l.strip() and not (l.strip().startswith("# ") and not l.strip().startswith("## ")) - ] - if not non_trivial_lines: - return ( - "Plan is empty — please write your step-by-step plan to the plan file " - f"({plan_file}) before exiting plan mode." - ) - - config["permission_mode"] = sctx.prev_permission_mode or "auto" - sctx.prev_permission_mode = None - sctx.plan_file = None - return ( - f"Plan mode exited. Resuming normal permissions.\n\n" - f"Plan content:\n{plan_content}\n\n" - "Wait for the user to approve the plan before executing any steps." - ) - - -_plan_schema_enter = { - "name": "EnterPlanMode", - "description": ( - "Switch to plan mode: read-only except for writing the plan file. " - "Use this to analyze a task and write a step-by-step plan before executing." - ), - "input_schema": { - "type": "object", - "properties": { - "task_description": { - "type": "string", - "description": "Brief description of what you plan to do", - }, - }, - "required": [], - }, -} -_plan_schema_exit = { - "name": "ExitPlanMode", - "description": "Exit plan mode and return to normal permissions to begin executing the plan.", - "input_schema": {"type": "object", "properties": {}, "required": []}, -} - -register_tool(ToolDef("EnterPlanMode", _plan_schema_enter, _enter_plan_mode, - read_only=True, concurrent_safe=False)) -register_tool(ToolDef("ExitPlanMode", _plan_schema_exit, _exit_plan_mode, - read_only=False, concurrent_safe=False)) +# Plan mode tools (EnterPlanMode / ExitPlanMode) are registered by +# tools/plan_mode.py via the extension loader above; the old inline block +# that used to live here is removed so there is a single source of truth. diff --git a/tools/plan_mode.py b/tools/plan_mode.py new file mode 100644 index 0000000..06e4ac4 --- /dev/null +++ b/tools/plan_mode.py @@ -0,0 +1,148 @@ +"""Plan mode tools — EnterPlanMode / ExitPlanMode. + +Extracted from tools/__init__.py so plan-mode logic lives in a single focused +module rather than scattered inline at the bottom of the tools package. + +Model flow +---------- +1. `EnterPlanMode` is called; a per-session plan file is (re-)created under + `/.nano_claude/plans/.md` with a Markdown header, and + `config["permission_mode"]` flips to "plan". In that mode, `Write` is only + allowed against the plan file (see agent._check_permission). +2. The model writes the plan by calling the regular `Write` tool with + `file_path=`. No dedicated WritePlan tool — `Write` already + exists and the permission gate takes care of scoping. +3. `ExitPlanMode` reads the plan file, refuses to exit if it is empty / + only-header, and restores the previous permission mode. The plan content + is embedded in the tool_result so it is visible to the user on approval. +""" +from __future__ import annotations + +from pathlib import Path + +import runtime +from tool_registry import register_tool, ToolDef + + +def _plan_file_for(config: dict) -> Path: + session_id = config.get("_session_id", "default") + cwd = Path(config.get("_worktree_cwd") or Path.cwd()) + plans_dir = cwd / ".nano_claude" / "plans" + plans_dir.mkdir(parents=True, exist_ok=True) + return plans_dir / f"{session_id}.md" + + +def _enter_plan_mode(params: dict, config: dict) -> str: + """Enter plan mode: create plan file, flip permission_mode, remember previous.""" + if config.get("permission_mode") == "plan": + return ( + "Already in plan mode. Write your plan to the plan file, " + "then call ExitPlanMode." + ) + + plan_path = _plan_file_for(config) + if not plan_path.exists() or plan_path.stat().st_size == 0: + task_desc = params.get("task_description", "") + header = f"# Plan: {task_desc}\n\n" if task_desc else "# Plan\n\n" + plan_path.write_text(header, encoding="utf-8") + + sctx = runtime.get_ctx(config) + sctx.prev_permission_mode = config.get("permission_mode", "auto") + config["permission_mode"] = "plan" + sctx.plan_file = str(plan_path) + + return ( + f"Plan mode activated. Plan file: {plan_path}\n" + "Write your step-by-step plan to the plan file, then call ExitPlanMode " + "when ready to implement." + ) + + +def _exit_plan_mode(_params: dict, config: dict) -> str: + """Exit plan mode: read plan file, reject if empty, restore permissions.""" + if config.get("permission_mode") != "plan": + return "Not in plan mode." + + sctx = runtime.get_ctx(config) + plan_file = sctx.plan_file or "" + plan_content = _read_plan_content(plan_file) + + if not _plan_has_substance(plan_content): + return ( + "Plan is empty -- please write your step-by-step plan to the plan " + f"file ({plan_file}) before exiting plan mode." + ) + + config["permission_mode"] = sctx.prev_permission_mode or "auto" + sctx.prev_permission_mode = None + sctx.plan_file = None + + return ( + "Plan mode exited. Resuming normal permissions.\n\n" + f"Plan content:\n{plan_content}\n\n" + "Wait for the user to approve the plan before executing any steps." + ) + + +def _read_plan_content(plan_file: str) -> str: + if not plan_file: + return "" + path = Path(plan_file) + if not path.exists(): + return "" + return path.read_text(encoding="utf-8").strip() + + +def _plan_has_substance(content: str) -> bool: + """Accept the plan only if it has real content beyond a single top-level title. + + A lone `# Title` line counts as empty so the model is forced to actually + write steps; `## Section` and below count as real content. + """ + if not content: + return False + for line in content.splitlines(): + stripped = line.strip() + if not stripped: + continue + is_top_level_title = stripped.startswith("# ") and not stripped.startswith("## ") + if not is_top_level_title: + return True + return False + + +_ENTER_SCHEMA = { + "name": "EnterPlanMode", + "description": ( + "Switch to plan mode: read-only except for writing the plan file. " + "Use this to analyze a task and write a step-by-step plan before executing." + ), + "input_schema": { + "type": "object", + "properties": { + "task_description": { + "type": "string", + "description": "Brief description of what you plan to do", + }, + }, + "required": [], + }, +} + +_EXIT_SCHEMA = { + "name": "ExitPlanMode", + "description": ( + "Exit plan mode and return to normal permissions to begin executing the plan." + ), + "input_schema": {"type": "object", "properties": {}, "required": []}, +} + + +register_tool(ToolDef( + name="EnterPlanMode", schema=_ENTER_SCHEMA, func=_enter_plan_mode, + read_only=True, concurrent_safe=False, +)) +register_tool(ToolDef( + name="ExitPlanMode", schema=_EXIT_SCHEMA, func=_exit_plan_mode, + read_only=False, concurrent_safe=False, +))