diff --git a/agent_cli/dev/agent_state.py b/agent_cli/dev/agent_state.py new file mode 100644 index 00000000..7c5c99e1 --- /dev/null +++ b/agent_cli/dev/agent_state.py @@ -0,0 +1,237 @@ +"""Agent state tracking for orchestration.""" + +from __future__ import annotations + +import json +import os +import re +import time +from dataclasses import asdict, dataclass, field +from hashlib import sha256 +from pathlib import Path +from typing import Literal + +STATE_BASE = Path.home() / ".cache" / "agent-cli" + +AgentStatus = Literal["running", "done", "dead", "quiet"] + + +@dataclass +class TrackedAgent: + """A single tracked agent instance.""" + + name: str + pane_id: str + worktree_path: str + agent_type: str + started_at: float + status: AgentStatus = "running" + last_output_hash: str = "" + last_output_change_at: float = 0.0 + + +@dataclass +class AgentStateFile: + """State file for one repository's tracked agents.""" + + agents: dict[str, TrackedAgent] = field(default_factory=dict) + last_poll_at: float = 0.0 + + +def _repo_slug(repo_root: Path) -> str: + """Convert a repo root path to a filesystem-safe slug. + + Includes a short path hash to avoid collisions between repositories with + the same trailing directory names. + """ + parts = repo_root.parts[-2:] + slug = "_".join(parts) + slug = re.sub(r"[^a-zA-Z0-9_-]", "_", slug) + digest = sha256(str(repo_root.expanduser().resolve()).encode()).hexdigest()[:10] + return f"{slug}_{digest}" + + +def _state_file_path(repo_root: Path) -> Path: + """Return the path to the agents.json state file.""" + return STATE_BASE / _repo_slug(repo_root) / "agents.json" + + +def load_state(repo_root: Path) -> AgentStateFile: + """Load agent state from disk. + + Returns an empty state if the file does not exist or is corrupt. + """ + path = _state_file_path(repo_root) + if not path.exists(): + return AgentStateFile() + + try: + data = json.loads(path.read_text()) + except (OSError, json.JSONDecodeError, TypeError): + return AgentStateFile() + + agents: dict[str, TrackedAgent] = {} + raw_agents = data.get("agents", {}) + if not isinstance(raw_agents, dict): + raw_agents = {} + for name, agent_data in raw_agents.items(): + if not isinstance(agent_data, dict): + continue + status = agent_data.get("status", "running") + if status not in ("running", "done", "dead", "quiet"): + status = "running" + try: + agents[name] = TrackedAgent( + name=str(agent_data["name"]), + pane_id=str(agent_data["pane_id"]), + worktree_path=str(agent_data["worktree_path"]), + agent_type=str(agent_data["agent_type"]), + started_at=float(agent_data["started_at"]), + status=status, + last_output_hash=str(agent_data.get("last_output_hash", "")), + last_output_change_at=float(agent_data.get("last_output_change_at", 0.0)), + ) + except (KeyError, TypeError, ValueError): + continue + + raw_last_poll_at = data.get("last_poll_at", 0.0) + try: + last_poll_at = float(raw_last_poll_at) + except (TypeError, ValueError): + last_poll_at = 0.0 + return AgentStateFile(agents=agents, last_poll_at=last_poll_at) + + +def save_state(repo_root: Path, state: AgentStateFile) -> None: + """Atomically write state to disk.""" + path = _state_file_path(repo_root) + path.parent.mkdir(parents=True, exist_ok=True) + data = { + "agents": {name: asdict(agent) for name, agent in state.agents.items()}, + "last_poll_at": state.last_poll_at, + } + # Write to temp file then rename for atomicity. + # Use PID in suffix to avoid races between concurrent writers. + tmp = path.with_suffix(f".{os.getpid()}.tmp") + tmp.write_text(json.dumps(data, indent=2) + "\n") + tmp.rename(path) + + +def register_agent( + repo_root: Path, + name: str, + pane_id: str, + worktree_path: Path, + agent_type: str, +) -> TrackedAgent: + """Register a new tracked agent in the state file.""" + state = load_state(repo_root) + now = time.time() + agent = TrackedAgent( + name=name, + pane_id=pane_id, + worktree_path=str(worktree_path), + agent_type=agent_type, + started_at=now, + ) + state.agents[name] = agent + save_state(repo_root, state) + return agent + + +def generate_agent_name( + repo_root: Path, + worktree_path: Path, + agent_type: str, + explicit_name: str | None = None, +) -> str: + """Generate a unique agent name. + + If *explicit_name* is given, uses that (raises if it collides with an active + agent). + Otherwise auto-generates from the worktree branch name. + """ + state = load_state(repo_root) + existing = { + name for name, existing_agent in state.agents.items() if existing_agent.status == "running" + } + + if explicit_name: + if explicit_name in existing: + msg = f"Agent name '{explicit_name}' already exists. Use a different --name." + raise ValueError(msg) + return explicit_name + + # Use worktree directory name as base (which is the branch name) + base = worktree_path.name + + # First agent in this worktree: just use the branch name + if base not in existing: + return base + + # Subsequent agents: append agent type + candidate = f"{base}-{agent_type}" + if candidate not in existing: + return candidate + + # Still collides: add numeric suffix + n = 2 + while f"{candidate}-{n}" in existing: + n += 1 + return f"{candidate}-{n}" + + +def sentinel_path(worktree_path: Path, agent_name: str) -> Path: + """Return the completion sentinel path for a tracked agent.""" + return worktree_path / ".claude" / f"DONE-{agent_name}" + + +def inject_completion_hook(worktree_path: Path, agent_type: str, agent_name: str) -> None: + """Inject a Stop hook into .claude/settings.local.json for completion detection. + + Uses settings.local.json (not settings.json) to avoid dirtying tracked files. + The sentinel file is unique per tracked agent name to avoid collisions when + multiple Claude sessions share the same worktree. + Only applies to Claude Code agents. Merges with existing settings. + """ + if agent_type != "claude": + return + + settings_path = worktree_path / ".claude" / "settings.local.json" + settings_path.parent.mkdir(parents=True, exist_ok=True) + + settings: dict = {} + if settings_path.exists(): + try: + raw = json.loads(settings_path.read_text()) + settings = raw if isinstance(raw, dict) else {} + except json.JSONDecodeError: + settings = {} + + # Merge Stop hook — guard against non-dict values from corrupt files + hooks = settings.get("hooks") + if not isinstance(hooks, dict): + hooks = {} + settings["hooks"] = hooks + stop_hooks = hooks.get("Stop") + if not isinstance(stop_hooks, list): + stop_hooks = [] + hooks["Stop"] = stop_hooks + + # Check if our hook is already present + sentinel_cmd = f"touch .claude/DONE-{agent_name}" + for entry in stop_hooks: + if not isinstance(entry, dict): + continue + for hook in entry.get("hooks", []): + cmd = hook.get("command", "") if isinstance(hook, dict) else hook + if isinstance(cmd, str) and sentinel_cmd in cmd: + return # Already injected + + stop_hooks.append( + { + "matcher": "", + "hooks": [{"type": "command", "command": sentinel_cmd}], + }, + ) + settings_path.write_text(json.dumps(settings, indent=2) + "\n") diff --git a/agent_cli/dev/cli.py b/agent_cli/dev/cli.py index 8d4163c6..5b7ac6e1 100644 --- a/agent_cli/dev/cli.py +++ b/agent_cli/dev/cli.py @@ -209,7 +209,7 @@ def _setup_worktree_env( warn("direnv not installed, skipping .envrc setup") -@app.command("new") +@app.command("new", rich_help_panel="Environments") def new( branch: Annotated[ str | None, @@ -452,7 +452,7 @@ def new( console.print(f'[dim]To enter the worktree:[/dim] cd "$(ag dev path {branch})"') -@app.command("list") +@app.command("list", rich_help_panel="Environments") def list_envs( json_output: Annotated[ bool, @@ -562,7 +562,7 @@ def _is_stale(status: worktree.WorktreeStatus, stale_days: int) -> bool: return days_since >= stale_days -@app.command("status") +@app.command("status", rich_help_panel="Environments") def status_cmd( # noqa: PLR0915 stale_days: Annotated[ int, @@ -673,7 +673,7 @@ def status_cmd( # noqa: PLR0915 console.print("\n" + " · ".join(summary_parts)) -@app.command("rm") +@app.command("rm", rich_help_panel="Environments") def remove( name: Annotated[ str, @@ -737,7 +737,7 @@ def remove( error(remove_err or "Failed to remove worktree") -@app.command("path") +@app.command("path", rich_help_panel="Environments") def path_cmd( name: Annotated[ str, @@ -759,7 +759,7 @@ def path_cmd( print(wt.path.as_posix()) -@app.command("editor") +@app.command("editor", rich_help_panel="Launch") def open_editor( name: Annotated[ str, @@ -807,7 +807,7 @@ def open_editor( error(f"Failed to open editor: {e}") -@app.command("agent") +@app.command("agent", rich_help_panel="Launch") def start_agent( name: Annotated[ str, @@ -848,16 +848,34 @@ def start_agent( readable=True, ), ] = None, + tab: Annotated[ + bool, + typer.Option( + "--tab", + help="Launch in a new tmux tab (tracked) instead of the current terminal", + ), + ] = False, + tracked_name: Annotated[ + str | None, + typer.Option( + "--name", + help="Explicit name for tracking (used with --tab). Auto-generated if omitted", + ), + ] = None, ) -> None: """Start an AI coding agent in an existing dev environment. - Launches the agent directly in your current terminal. + By default, launches the agent directly in your current terminal. + With ``--tab``, launches in a new tmux tab with orchestration tracking + (can then use ``dev poll``, ``dev output``, ``dev send``, ``dev wait``). **Examples:** - `dev agent my-feature` — Start agent in current terminal - `dev agent my-feature -a claude` — Start Claude specifically - `dev agent my-feature -p "Continue the auth refactor"` — Start with a task + - `dev agent my-feature --tab` — Start in new tracked tmux tab + - `dev agent my-feature --tab --name reviewer -p "Review the changes"` — Named tracked agent """ # Handle prompt-file option (takes precedence over --prompt) if prompt_file is not None: @@ -893,6 +911,24 @@ def start_agent( merged_args = merge_agent_args(agent, agent_args) agent_env = get_agent_env(agent) + if tab: + # Launch in a new tmux tab with tracking — require being *inside* a tmux + # session (TMUX env var), not just having a reachable tmux server, so the + # new window is visible in the user's current session. + if not os.environ.get("TMUX"): + error("--tab requires running inside a tmux session. Start tmux first.") + launch_agent( + wt.path, + agent, + merged_args, + prompt, + task_file, + agent_env, + track=True, + agent_name=tracked_name, + ) + return + info(f"Starting {agent.name} in {wt.path}...") try: os.chdir(wt.path) @@ -908,7 +944,7 @@ def start_agent( error(f"Failed to start agent: {e}") -@app.command("agents") +@app.command("agents", rich_help_panel="Info") def list_agents( json_output: Annotated[ bool, @@ -960,7 +996,7 @@ def list_agents( console.print(table) -@app.command("editors") +@app.command("editors", rich_help_panel="Info") def list_editors_cmd( json_output: Annotated[ bool, @@ -1012,7 +1048,7 @@ def list_editors_cmd( console.print(table) -@app.command("terminals") +@app.command("terminals", rich_help_panel="Info") def list_terminals_cmd( json_output: Annotated[ bool, @@ -1087,7 +1123,7 @@ def _doctor_check_git() -> None: console.print(" [yellow]○[/yellow] Not in a git repository") -@app.command("run") +@app.command("run", rich_help_panel="Launch") def run_cmd( name: Annotated[ str, @@ -1197,7 +1233,7 @@ def _clean_no_commits_worktrees( warn(f"Failed to remove {branch}: {remove_err}") -@app.command("clean") +@app.command("clean", rich_help_panel="Environments") def clean( merged: Annotated[ bool, @@ -1292,7 +1328,7 @@ def clean( _clean_no_commits_worktrees(repo_root, dry_run, yes, force=force) -@app.command("doctor") +@app.command("doctor", rich_help_panel="Info") def doctor( json_output: Annotated[ bool, @@ -1394,7 +1430,7 @@ def _get_current_repo_root() -> Path | None: return None -@app.command("install-skill") +@app.command("install-skill", rich_help_panel="Setup") def install_skill( force: Annotated[ bool, @@ -1454,3 +1490,7 @@ def install_skill( console.print("[dim]Skill files:[/dim]") for f in sorted(skill_dest.iterdir()): console.print(f" • {f.name}") + + +# Register orchestration commands (poll, output, send, wait) +from . import orchestration # noqa: E402, F401 diff --git a/agent_cli/dev/launch.py b/agent_cli/dev/launch.py index 8c7e67e3..909351a8 100644 --- a/agent_cli/dev/launch.py +++ b/agent_cli/dev/launch.py @@ -13,7 +13,7 @@ from agent_cli.core.utils import console from . import coding_agents, editors, terminals, worktree -from ._output import success, warn +from ._output import error, success, warn if TYPE_CHECKING: from .coding_agents.base import CodingAgent @@ -254,11 +254,20 @@ def launch_agent( prompt: str | None = None, task_file: Path | None = None, env: dict[str, str] | None = None, -) -> None: + *, + track: bool = False, + agent_name: str | None = None, +) -> str | None: """Launch agent in a new terminal tab. Agents are interactive TUIs that need a proper terminal. Priority: tmux/zellij tab > terminal tab > print instructions. + + When *track* is ``True`` and tmux is detected, the agent is registered + in the orchestration state file so it can be monitored with ``dev poll``, + ``dev output``, ``dev send``, and ``dev wait``. + + Returns the tracked agent name if tracking was successful, else ``None``. """ terminal = terminals.detect_current_terminal() @@ -280,10 +289,32 @@ def launch_agent( repo_name = repo_root.name if repo_root else path.name tab_name = f"{repo_name}@{branch}" if branch else repo_name - if terminal.open_new_tab(path, full_cmd, tab_name=tab_name): + # Use tmux_ops for tracked launch (tmux server must be reachable) + if track: + from . import agent_state, tmux_ops # noqa: PLC0415 + + root = repo_root or path + try: + name = agent_state.generate_agent_name(root, path, agent.name, agent_name) + except ValueError as exc: + error(str(exc)) + + # Remove stale completion sentinel before launching a new tracked Claude run. + if agent.name == "claude": + agent_state.sentinel_path(path, name).unlink(missing_ok=True) + + pane_id = tmux_ops.open_window_with_pane_id(path, full_cmd, tab_name=tab_name) + if pane_id: + agent_state.register_agent(root, name, pane_id, path, agent.name) + agent_state.inject_completion_hook(path, agent.name, name) + success(f"Started {agent.name} in new tmux tab (tracking as [cyan]{name}[/cyan])") + return name + warn("Could not open new tmux window") + elif terminal.open_new_tab(path, full_cmd, tab_name=tab_name): success(f"Started {agent.name} in new {terminal.name} tab") - return - warn(f"Could not open new tab in {terminal.name}") + return None + else: + warn(f"Could not open new tab in {terminal.name}") # No terminal detected or failed - print instructions if _is_ssh_session(): @@ -293,3 +324,4 @@ def launch_agent( console.print(f"\n[bold]To start {agent.name}:[/bold]") console.print(f" cd {path}") console.print(f" {full_cmd}") + return None diff --git a/agent_cli/dev/orchestration.py b/agent_cli/dev/orchestration.py new file mode 100644 index 00000000..2d402bb2 --- /dev/null +++ b/agent_cli/dev/orchestration.py @@ -0,0 +1,306 @@ +"""Orchestration CLI commands for tracked agents (tmux-only).""" + +from __future__ import annotations + +import json +import time +from pathlib import Path +from typing import TYPE_CHECKING, Annotated + +import typer + +from agent_cli.core.utils import console + +from . import worktree +from ._output import error, info, success, warn +from .cli import app + +if TYPE_CHECKING: + from . import agent_state + + +def _ensure_git_repo() -> Path: + """Ensure we're in a git repository and return the repo root.""" + if not worktree.git_available(): + error("Git is not installed or not in PATH") + + repo_root = worktree.get_main_repo_root() + if repo_root is None: + error("Not in a git repository") + + return repo_root + + +def _ensure_tmux() -> None: + """Exit with an error if not running inside tmux.""" + from . import tmux_ops # noqa: PLC0415 + + if not tmux_ops.is_tmux(): + error("Agent tracking requires tmux. Start a tmux session first.") + + +def _lookup_agent(name: str) -> tuple[Path, agent_state.TrackedAgent]: + """Look up a tracked agent by name. Exits on error.""" + from . import agent_state as _agent_state # noqa: PLC0415 + + repo_root = _ensure_git_repo() + state = _agent_state.load_state(repo_root) + agent = state.agents.get(name) + if agent is None: + error(f"Agent '{name}' not found. Run 'dev poll' to see tracked agents.") + return repo_root, agent + + +def _format_duration(seconds: float) -> str: + """Format seconds into a human-readable duration.""" + if seconds < 60: # noqa: PLR2004 + return f"{int(seconds)}s" + minutes = int(seconds // 60) + secs = int(seconds % 60) + if minutes < 60: # noqa: PLR2004 + return f"{minutes}m {secs}s" + hours = int(minutes // 60) + mins = minutes % 60 + return f"{hours}h {mins}m" + + +def _status_style(status: str) -> str: + """Return a Rich-styled status string.""" + styles = { + "running": "[bold green]running[/bold green]", + "done": "[bold cyan]done[/bold cyan]", + "quiet": "[bold cyan]quiet[/bold cyan]", + "dead": "[bold red]dead[/bold red]", + } + return styles.get(status, status) + + +@app.command("poll", rich_help_panel="Agent Orchestration") +def poll_cmd( + json_output: Annotated[ + bool, + typer.Option("--json", help="Output as JSON"), + ] = False, +) -> None: + """Check status of all tracked agents. + + Performs a single poll of all tracked agents (checks tmux panes and + completion sentinels) then displays results. + + **Status values:** + + - **running** — Agent pane exists and task is still in progress + - **done** — Agent wrote a completion sentinel (.claude/DONE) + - **quiet** — Agent output unchanged for several consecutive polls + - **dead** — tmux pane no longer exists + + **Examples:** + + - `dev poll` — Show status table + - `dev poll --json` — Machine-readable output + """ + from . import agent_state as _agent_state # noqa: PLC0415 + from .poller import poll_once # noqa: PLC0415 + + _ensure_tmux() + repo_root = _ensure_git_repo() + state = _agent_state.load_state(repo_root) + + if not state.agents: + info("No tracked agents. Launch one with 'dev new -a' or 'dev agent --tab'.") + return + + poll_once(repo_root) + + # Reload state after polling + state = _agent_state.load_state(repo_root) + now = time.time() + + if json_output: + data = { + "agents": [ + { + "name": a.name, + "status": a.status, + "agent_type": a.agent_type, + "worktree_path": a.worktree_path, + "pane_id": a.pane_id, + "started_at": a.started_at, + "duration_seconds": round(now - a.started_at), + } + for a in state.agents.values() + ], + "last_poll_at": state.last_poll_at, + } + print(json.dumps(data, indent=2)) + return + + from rich.table import Table # noqa: PLC0415 + + table = Table(title="Agent Status") + table.add_column("Name", style="cyan") + table.add_column("Status") + table.add_column("Agent", style="dim") + table.add_column("Worktree", style="dim") + table.add_column("Duration", style="dim") + + for a in state.agents.values(): + table.add_row( + a.name, + _status_style(a.status), + a.agent_type, + Path(a.worktree_path).name, + _format_duration(now - a.started_at), + ) + + console.print(table) + + # Summary line + total = len(state.agents) + by_status: dict[str, int] = {} + for a in state.agents.values(): + by_status[a.status] = by_status.get(a.status, 0) + 1 + parts = [f"{total} agent{'s' if total != 1 else ''}"] + parts.extend( + f"{count} {status}" + for status in ("running", "done", "quiet", "dead") + if (count := by_status.get(status, 0)) + ) + console.print(f"\n[dim]{' · '.join(parts)}[/dim]") + + +@app.command("output", rich_help_panel="Agent Orchestration") +def output_cmd( + name: Annotated[ + str, + typer.Argument(help="Agent name (from 'dev poll')"), + ], + lines: Annotated[ + int, + typer.Option("--lines", "-n", help="Number of lines to capture"), + ] = 50, +) -> None: + """Get recent terminal output from a tracked agent. + + Captures the last N lines from the agent's tmux pane. + + **Examples:** + + - `dev output my-feature` — Last 50 lines + - `dev output my-feature -n 200` — Last 200 lines + """ + from . import tmux_ops # noqa: PLC0415 + + _ensure_tmux() + _repo_root, agent = _lookup_agent(name) + + if agent.status == "dead": + error(f"Agent '{name}' is dead (tmux pane closed). No output available.") + + output = tmux_ops.capture_pane(agent.pane_id, lines) + if output is None: + error(f"Could not capture output from pane {agent.pane_id}") + print(output, end="") + + +@app.command("send", rich_help_panel="Agent Orchestration") +def send_cmd( + name: Annotated[ + str, + typer.Argument(help="Agent name (from 'dev poll')"), + ], + message: Annotated[ + str, + typer.Argument(help="Text to send to the agent's terminal"), + ], + no_enter: Annotated[ + bool, + typer.Option("--no-enter", help="Don't press Enter after sending"), + ] = False, +) -> None: + """Send text input to a running agent's terminal. + + Types the message into the agent's tmux pane using ``tmux send-keys``. + By default, presses Enter after the message. + + **Examples:** + + - `dev send my-feature "Fix the failing tests"` — Send a message + - `dev send my-feature "/exit" --no-enter` — Send without pressing Enter + """ + from . import tmux_ops # noqa: PLC0415 + + _ensure_tmux() + _repo_root, agent = _lookup_agent(name) + + if agent.status == "dead": + error(f"Agent '{name}' is dead (tmux pane closed). Cannot send messages.") + + if tmux_ops.send_keys(agent.pane_id, message, enter=not no_enter): + success(f"Sent message to {name}") + else: + error(f"Failed to send message to pane {agent.pane_id}") + + +@app.command("wait", rich_help_panel="Agent Orchestration") +def wait_cmd( + name: Annotated[ + str, + typer.Argument(help="Agent name (from 'dev poll')"), + ], + timeout: Annotated[ + float, + typer.Option("--timeout", "-t", help="Timeout in seconds (0 = no timeout)"), + ] = 0, + interval: Annotated[ + float, + typer.Option("--interval", "-i", help="Poll interval in seconds"), + ] = 5.0, +) -> None: + """Block until a tracked agent finishes. + + Polls the agent's tmux pane until it reaches done/dead, with output + quiescence as a fallback for agents without completion sentinels. + Useful for orchestration: launch an agent, wait for it, then act on results. + + **Exit codes:** + + - 0 — Agent finished (done or quiet fallback) + - 1 — Agent died (pane closed unexpectedly) + - 2 — Timeout reached + + **Examples:** + + - `dev wait my-feature` — Wait indefinitely + - `dev wait my-feature --timeout 300` — Wait up to 5 minutes + - `dev wait my-feature -i 2` — Poll every 2 seconds + """ + from .poller import wait_for_agent # noqa: PLC0415 + + _ensure_tmux() + repo_root, agent = _lookup_agent(name) + + if agent.status in ("done", "quiet", "dead"): + console.print(f"Agent '{name}' is already {_status_style(agent.status)}") + raise typer.Exit(0 if agent.status != "dead" else 1) + + info(f"Waiting for agent '{name}' to finish (polling every {interval}s)...") + + try: + status, elapsed = wait_for_agent(repo_root, name, timeout=timeout, interval=interval) + except TimeoutError: + warn(f"Timeout after {_format_duration(timeout)}") + raise typer.Exit(2) from None + + if status == "dead": + warn(f"Agent '{name}' died (pane closed) after {_format_duration(elapsed)}") + raise typer.Exit(1) + + if status == "quiet": + success( + f"Agent '{name}' appears quiet after {_format_duration(elapsed)} " + "(no output changes detected)", + ) + else: + success(f"Agent '{name}' is {status} after {_format_duration(elapsed)}") + raise typer.Exit(0) diff --git a/agent_cli/dev/poller.py b/agent_cli/dev/poller.py new file mode 100644 index 00000000..f848ed27 --- /dev/null +++ b/agent_cli/dev/poller.py @@ -0,0 +1,97 @@ +"""Polling logic for agent orchestration.""" + +from __future__ import annotations + +import time +from pathlib import Path + +from . import agent_state, tmux_ops + +# Seconds of unchanged output before marking an agent as "quiet". +QUIET_SECONDS = 10.0 + + +def _check_agent_status(agent: agent_state.TrackedAgent) -> None: + """Check and update a single agent's status in-place.""" + if not tmux_ops.pane_exists(agent.pane_id): + agent.status = "dead" + return + + if agent.agent_type == "claude": + done_path = agent_state.sentinel_path(Path(agent.worktree_path), agent.name) + if done_path.exists(): + agent.status = "done" + return + + agent.status = "running" + + +def _check_quiescence(agent: agent_state.TrackedAgent, now: float) -> None: + """Track output changes and mark agent as quiet if output is stable.""" + output = tmux_ops.capture_pane(agent.pane_id) + if output is None: + return + + output_hash = tmux_ops.hash_output(output) + if output_hash != agent.last_output_hash: + agent.last_output_hash = output_hash + agent.last_output_change_at = now + return + + # Hash unchanged — mark quiet if enough wall-clock time has passed + if agent.last_output_change_at > 0 and (now - agent.last_output_change_at) >= QUIET_SECONDS: + agent.status = "quiet" + + +def poll_once(repo_root: Path) -> dict[str, str]: + """Perform a single poll of all tracked agents. + + Checks pane existence, completion sentinels, and output quiescence. + Updates the state file and returns ``{agent_name: status}``. + """ + state = agent_state.load_state(repo_root) + now = time.time() + + for agent in state.agents.values(): + _check_agent_status(agent) + if agent.status == "running": + _check_quiescence(agent, now) + + state.last_poll_at = now + agent_state.save_state(repo_root, state) + return {a.name: a.status for a in state.agents.values()} + + +def wait_for_agent( + repo_root: Path, + name: str, + timeout: float = 0, + interval: float = 5.0, +) -> tuple[str, float]: + """Block until a tracked agent finishes. + + Returns ``(final_status, elapsed_seconds)`` where ``final_status`` is one + of ``done``, ``dead``, or ``quiet``. + Raises ``TimeoutError`` if *timeout* > 0 and is exceeded. + Raises ``KeyError`` if the agent is not found. + """ + state = agent_state.load_state(repo_root) + if name not in state.agents: + msg = f"Agent '{name}' not found" + raise KeyError(msg) + + start = time.time() + + while True: + elapsed = time.time() - start + if timeout > 0 and elapsed >= timeout: + msg = f"Timeout after {elapsed:.0f}s" + raise TimeoutError(msg) + + statuses = poll_once(repo_root) + status = statuses.get(name, "dead") + + if status in ("dead", "done", "quiet"): + return status, elapsed + + time.sleep(interval) diff --git a/agent_cli/dev/tmux_ops.py b/agent_cli/dev/tmux_ops.py new file mode 100644 index 00000000..96b0bd3e --- /dev/null +++ b/agent_cli/dev/tmux_ops.py @@ -0,0 +1,104 @@ +"""Low-level tmux operations for agent orchestration.""" + +from __future__ import annotations + +import hashlib +import os +import subprocess +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from pathlib import Path + + +def is_tmux() -> bool: + """Check if tmux is available (inside a session or server is reachable).""" + if os.environ.get("TMUX"): + return True + # Not inside tmux, but check if a tmux server is running + try: + subprocess.run( + ["tmux", "list-sessions"], # noqa: S607 + check=True, + capture_output=True, + ) + return True + except (subprocess.CalledProcessError, FileNotFoundError): + return False + + +def open_window_with_pane_id( + path: Path, + command: str | None = None, + tab_name: str | None = None, +) -> str | None: + """Create a new tmux window and return the pane ID. + + Uses ``tmux new-window -P -F '#{pane_id}'`` to get the stable pane ID + (e.g. ``%42``). Pane IDs are globally unique within a tmux server and + remain stable when panes are moved or reordered. + + Returns the pane ID string, or ``None`` on failure. + """ + cmd = ["tmux", "new-window", "-P", "-F", "#{pane_id}", "-c", str(path)] + if tab_name: + cmd.extend(["-n", tab_name]) + if command: + cmd.append(command) + try: + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + pane_id = result.stdout.strip() + return pane_id or None + except subprocess.CalledProcessError: + return None + + +def capture_pane(pane_id: str, lines: int = 200) -> str | None: + """Capture the last *lines* lines from a tmux pane. + + Returns the captured text, or ``None`` if the pane does not exist. + """ + try: + result = subprocess.run( + ["tmux", "capture-pane", "-p", "-t", pane_id, "-S", f"-{lines}"], # noqa: S607 + check=True, + capture_output=True, + text=True, + ) + return result.stdout + except subprocess.CalledProcessError: + return None + + +def send_keys(pane_id: str, text: str, *, enter: bool = True) -> bool: + """Send text to a tmux pane. + + Returns ``True`` on success. + """ + cmd = ["tmux", "send-keys", "-t", pane_id, text] + if enter: + cmd.append("Enter") + try: + subprocess.run(cmd, check=True, capture_output=True) + return True + except subprocess.CalledProcessError: + return False + + +def pane_exists(pane_id: str) -> bool: + """Check if a tmux pane still exists.""" + try: + result = subprocess.run( + ["tmux", "list-panes", "-a", "-F", "#{pane_id}"], # noqa: S607 + check=True, + capture_output=True, + text=True, + ) + return pane_id in result.stdout.splitlines() + except subprocess.CalledProcessError: + return False + + +def hash_output(text: str) -> str: + """Compute a SHA-256 hash of text for quiescence detection.""" + return hashlib.sha256(text.encode()).hexdigest() diff --git a/docs/commands/dev.md b/docs/commands/dev.md index f4fecc92..5ca2ae5e 100644 --- a/docs/commands/dev.md +++ b/docs/commands/dev.md @@ -285,6 +285,8 @@ agent-cli dev agent NAME [--agent/-a AGENT] [--agent-args ARGS] [--prompt/-p PRO | `--agent-args` | - | Extra CLI args for the agent. Example: --agent-args='--dangerously-skip-permissions' | | `--prompt, -p` | - | Initial task for the agent. Saved to .claude/TASK.md. Example: --prompt='Add unit tests for auth' | | `--prompt-file, -P` | - | Read the agent prompt from a file instead of command line | +| `--tab` | `false` | Launch in a new tmux tab (tracked) instead of the current terminal | +| `--name` | - | Explicit name for tracking (used with --tab). Auto-generated if omitted | @@ -299,6 +301,163 @@ agent-cli dev agent my-feature --prompt "Continue implementing the user settings agent-cli dev agent my-feature -a aider --prompt "Add unit tests for the auth module" ``` +## Agent Orchestration + +These commands require tmux and work with tracked agents (launched via `dev new -a` or `dev agent --tab`). + +### `dev poll` + +Check status of all tracked agents. + +```bash +agent-cli dev poll [OPTIONS] +``` + + + + + + + +### Options + +| Option | Default | Description | +|--------|---------|-------------| +| `--json` | `false` | Output as JSON | + + + + +**Status values:** + +| Status | Meaning | +|--------|---------| +| **running** | Agent pane exists and task is still in progress | +| **done** | Agent wrote a completion sentinel (`.claude/DONE`) | +| **quiet** | Agent output unchanged for several consecutive polls | +| **dead** | tmux pane no longer exists | + +**Examples:** + +```bash +# Show status table +agent-cli dev poll + +# Machine-readable output +agent-cli dev poll --json +``` + +### `dev output` + +Get recent terminal output from a tracked agent. + +```bash +agent-cli dev output NAME [OPTIONS] +``` + + + + + + + +### Options + +| Option | Default | Description | +|--------|---------|-------------| +| `--lines, -n` | `50` | Number of lines to capture | + + + + +**Examples:** + +```bash +# Last 50 lines from agent +agent-cli dev output my-feature + +# Last 200 lines +agent-cli dev output my-feature -n 200 +``` + +### `dev send` + +Send text input to a running agent's terminal. + +```bash +agent-cli dev send NAME MESSAGE [OPTIONS] +``` + + + + + + + +### Options + +| Option | Default | Description | +|--------|---------|-------------| +| `--no-enter` | `false` | Don't press Enter after sending | + + + + +**Examples:** + +```bash +# Send a message to an agent +agent-cli dev send my-feature "Fix the failing tests" + +# Send without pressing Enter +agent-cli dev send my-feature "/exit" --no-enter +``` + +### `dev wait` + +Block until a tracked agent finishes. + +```bash +agent-cli dev wait NAME [OPTIONS] +``` + + + + + + + +### Options + +| Option | Default | Description | +|--------|---------|-------------| +| `--timeout, -t` | `0` | Timeout in seconds (0 = no timeout) | +| `--interval, -i` | `5.0` | Poll interval in seconds | + + + + +**Exit codes:** + +| Code | Meaning | +|------|---------| +| 0 | Agent finished (done or quiet) | +| 1 | Agent died (pane closed unexpectedly) | +| 2 | Timeout reached | + +**Examples:** + +```bash +# Wait indefinitely +agent-cli dev wait my-feature + +# Wait up to 5 minutes +agent-cli dev wait my-feature --timeout 300 + +# Poll every 2 seconds +agent-cli dev wait my-feature -i 2 +``` + ### `dev run` Run a command in a dev environment. diff --git a/pyproject.toml b/pyproject.toml index 43668006..1532cece 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -199,6 +199,9 @@ ignore = [ "FBT001", # Boolean-typed positional argument in function definition "FBT002", # Boolean-typed keyword-only argument in function definition "BLE001", # Do not catch blind exception: `Exception` + "COM812", # Missing trailing comma (conflicts with formatter) + "D203", # Incorrect blank line before class (incompatible with D211) + "D213", # Multi-line summary second line (incompatible with D212) ] [tool.ruff.lint.per-file-ignores] diff --git a/tests/dev/test_orchestration.py b/tests/dev/test_orchestration.py new file mode 100644 index 00000000..eef08240 --- /dev/null +++ b/tests/dev/test_orchestration.py @@ -0,0 +1,838 @@ +"""Tests for agent orchestration (tmux_ops, agent_state, poll/output/send/wait).""" + +from __future__ import annotations + +import json +import os +import subprocess +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +import typer +from typer.testing import CliRunner + +from agent_cli.cli import app +from agent_cli.dev import agent_state, launch, poller, tmux_ops +from agent_cli.dev.agent_state import inject_completion_hook +from agent_cli.dev.terminals.tmux import Tmux + +runner = CliRunner(env={"NO_COLOR": "1", "TERM": "dumb"}) + +_TMUX_ENV = {"TMUX": "/tmp/tmux-1000/default,12345,0"} # noqa: S108 + + +# --------------------------------------------------------------------------- +# tmux_ops tests +# --------------------------------------------------------------------------- + + +class TestTmuxOps: + """Tests for low-level tmux operations.""" + + def test_open_window_with_pane_id(self) -> None: + """Returns pane ID from tmux new-window -P.""" + mock_result = MagicMock(stdout="%42\n") + with patch("subprocess.run", return_value=mock_result) as mock_run: + pane_id = tmux_ops.open_window_with_pane_id(Path("/tmp/test")) # noqa: S108 + assert pane_id == "%42" + cmd = mock_run.call_args[0][0] + assert "new-window" in cmd + assert "-P" in cmd + assert "#{pane_id}" in cmd + + def test_open_window_with_tab_name_and_command(self) -> None: + """Passes tab name and command to tmux.""" + mock_result = MagicMock(stdout="%5\n") + with patch("subprocess.run", return_value=mock_result) as mock_run: + pane_id = tmux_ops.open_window_with_pane_id( + Path("/tmp/test"), # noqa: S108 + command="echo hello", + tab_name="test-tab", + ) + assert pane_id == "%5" + cmd = mock_run.call_args[0][0] + assert "-n" in cmd + assert "test-tab" in cmd + assert "echo hello" in cmd + + def test_open_window_returns_none_on_failure(self) -> None: + """Returns None when tmux command fails.""" + with patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "tmux")): + assert tmux_ops.open_window_with_pane_id(Path("/tmp")) is None # noqa: S108 + + def test_capture_pane(self) -> None: + """Captures pane output from tmux.""" + mock_result = MagicMock(stdout="line1\nline2\n") + with patch("subprocess.run", return_value=mock_result) as mock_run: + output = tmux_ops.capture_pane("%3", lines=100) + assert output == "line1\nline2\n" + cmd = mock_run.call_args[0][0] + assert "-t" in cmd + assert "%3" in cmd + assert "-100" in cmd + + def test_capture_pane_returns_none_on_failure(self) -> None: + """Returns None when pane doesn't exist.""" + with patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "tmux")): + assert tmux_ops.capture_pane("%99") is None + + def test_send_keys(self) -> None: + """Sends keys to tmux pane.""" + with patch("subprocess.run") as mock_run: + assert tmux_ops.send_keys("%3", "hello") is True + cmd = mock_run.call_args[0][0] + assert "send-keys" in cmd + assert "%3" in cmd + assert "hello" in cmd + assert "Enter" in cmd + + def test_send_keys_no_enter(self) -> None: + """Sends keys without pressing Enter.""" + with patch("subprocess.run") as mock_run: + tmux_ops.send_keys("%3", "hello", enter=False) + cmd = mock_run.call_args[0][0] + assert "Enter" not in cmd + + def test_send_keys_returns_false_on_failure(self) -> None: + """Returns False when tmux command fails.""" + with patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "tmux")): + assert tmux_ops.send_keys("%3", "hello") is False + + def test_pane_exists(self) -> None: + """Checks pane existence via list-panes.""" + mock_result = MagicMock(stdout="%1\n%3\n%5\n") + with patch("subprocess.run", return_value=mock_result): + assert tmux_ops.pane_exists("%3") is True + assert tmux_ops.pane_exists("%99") is False + + def test_pane_exists_returns_false_on_failure(self) -> None: + """Returns False when tmux is not available.""" + with patch("subprocess.run", side_effect=subprocess.CalledProcessError(1, "tmux")): + assert tmux_ops.pane_exists("%3") is False + + def test_hash_output(self) -> None: + """Produces consistent SHA-256 hashes.""" + h1 = tmux_ops.hash_output("hello") + h2 = tmux_ops.hash_output("hello") + h3 = tmux_ops.hash_output("world") + assert h1 == h2 + assert h1 != h3 + assert len(h1) == 64 # SHA-256 hex digest + + +# --------------------------------------------------------------------------- +# agent_state tests +# --------------------------------------------------------------------------- + + +class TestAgentState: + """Tests for agent state management.""" + + def test_repo_slug(self) -> None: + """Generates filesystem-safe slug from path.""" + slug = agent_state._repo_slug(Path("/home/user/Work/my-project")) + assert "my-project" in slug + assert "/" not in slug + + def test_repo_slug_avoids_cross_clone_collisions(self) -> None: + """Distinct roots with same tail produce different slugs.""" + slug1 = agent_state._repo_slug(Path("/Users/alice/Work/my-project")) + slug2 = agent_state._repo_slug(Path("/Volumes/external/Work/my-project")) + assert slug1 != slug2 + + def test_load_empty_state(self, tmp_path: Path) -> None: + """Returns empty state when no file exists.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + state = agent_state.load_state(tmp_path / "repo") + assert state.agents == {} + assert state.last_poll_at == 0.0 + + def test_load_state_ignores_unknown_agent_fields(self, tmp_path: Path) -> None: + """Loads agent rows even when old schema includes extra fields.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + state_path = agent_state._state_file_path(repo) + state_path.parent.mkdir(parents=True, exist_ok=True) + state_path.write_text( + json.dumps( + { + "agents": { + "agent": { + "name": "agent", + "pane_id": "%42", + "worktree_path": str(tmp_path / "wt"), + "agent_type": "claude", + "started_at": 123.0, + "status": "running", + # Older schema fields should be ignored. + "last_output_hash": "", + "last_change_at": 123.0, + }, + }, + "last_poll_at": 12.5, + }, + ), + ) + + state = agent_state.load_state(repo) + assert "agent" in state.agents + assert state.last_poll_at == 12.5 + + def test_save_and_load_state(self, tmp_path: Path) -> None: + """Round-trips state through JSON.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + agent = agent_state.register_agent( + repo, + "test-agent", + "%42", + tmp_path / "worktree", + "claude", + ) + assert agent.name == "test-agent" + assert agent.pane_id == "%42" + + state = agent_state.load_state(repo) + assert "test-agent" in state.agents + assert state.agents["test-agent"].pane_id == "%42" + assert state.agents["test-agent"].agent_type == "claude" + + def test_generate_agent_name_first(self, tmp_path: Path) -> None: + """First agent in worktree uses branch name.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + name = agent_state.generate_agent_name(repo, tmp_path / "auth", "claude") + assert name == "auth" + + def test_generate_agent_name_second(self, tmp_path: Path) -> None: + """Second agent appends agent type.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + agent_state.register_agent(repo, "auth", "%1", tmp_path / "auth", "claude") + name = agent_state.generate_agent_name(repo, tmp_path / "auth", "claude") + assert name == "auth-claude" + + def test_generate_agent_name_collision(self, tmp_path: Path) -> None: + """Numeric suffix on further collisions.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + agent_state.register_agent(repo, "auth", "%1", tmp_path / "auth", "claude") + agent_state.register_agent(repo, "auth-claude", "%2", tmp_path / "auth", "claude") + name = agent_state.generate_agent_name(repo, tmp_path / "auth", "claude") + assert name == "auth-claude-2" + + def test_generate_agent_name_explicit(self, tmp_path: Path) -> None: + """Explicit name is used directly.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + name = agent_state.generate_agent_name( + repo, + tmp_path / "auth", + "claude", + explicit_name="reviewer", + ) + assert name == "reviewer" + + def test_generate_agent_name_explicit_collision(self, tmp_path: Path) -> None: + """Explicit name raises ValueError on collision.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + agent_state.register_agent(repo, "reviewer", "%1", tmp_path / "auth", "claude") + with pytest.raises(ValueError, match="already exists"): + agent_state.generate_agent_name( + repo, + tmp_path / "auth", + "claude", + explicit_name="reviewer", + ) + + def test_generate_agent_name_allows_reuse_after_done(self, tmp_path: Path) -> None: + """Explicit names can be reused when prior run is terminal.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + agent_state.register_agent(repo, "reviewer", "%1", tmp_path / "auth", "claude") + + state = agent_state.load_state(repo) + state.agents["reviewer"].status = "done" + agent_state.save_state(repo, state) + + name = agent_state.generate_agent_name( + repo, + tmp_path / "auth", + "claude", + explicit_name="reviewer", + ) + assert name == "reviewer" + + def test_load_corrupt_state(self, tmp_path: Path) -> None: + """Returns empty state on corrupt JSON.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + path = agent_state._state_file_path(tmp_path / "repo") + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text("not valid json{{{") + state = agent_state.load_state(tmp_path / "repo") + assert state.agents == {} + + def test_load_state_agents_is_list(self, tmp_path: Path) -> None: + """Returns empty agents when 'agents' is a list instead of dict.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + path = agent_state._state_file_path(tmp_path / "repo") + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps({"agents": [], "last_poll_at": 0.0})) + state = agent_state.load_state(tmp_path / "repo") + assert state.agents == {} + + def test_load_state_skips_non_dict_agent_data(self, tmp_path: Path) -> None: + """Skips agent entries that are not dicts (e.g. from partially corrupt cache).""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + state_path = agent_state._state_file_path(repo) + state_path.parent.mkdir(parents=True, exist_ok=True) + state_path.write_text( + json.dumps( + { + "agents": { + "good": { + "name": "good", + "pane_id": "%1", + "worktree_path": "/tmp/wt", # noqa: S108 + "agent_type": "claude", + "started_at": 100.0, + "status": "running", + }, + "bad_string": "not a dict", + "bad_int": 42, + "bad_list": [1, 2, 3], + }, + }, + ), + ) + state = agent_state.load_state(repo) + assert len(state.agents) == 1 + assert "good" in state.agents + + def test_save_state_uses_pid_in_temp_file(self, tmp_path: Path) -> None: + """Temp file includes PID to avoid races between concurrent writers.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + state = agent_state.AgentStateFile() + agent_state.save_state(repo, state) + # Verify the final file exists and no stale .tmp files remain + path = agent_state._state_file_path(repo) + assert path.exists() + # Ensure PID-suffixed temp doesn't linger + pid_tmp = path.with_suffix(f".{os.getpid()}.tmp") + assert not pid_tmp.exists() + + +# --------------------------------------------------------------------------- +# launch/poller regression tests +# --------------------------------------------------------------------------- + + +class TestLaunchTracking: + """Tests for tracked launch edge cases.""" + + def test_tracked_launch_validates_name_before_opening_tmux_window(self, tmp_path: Path) -> None: + """Duplicate tracked name should fail without creating a tmux window.""" + agent = MagicMock() + agent.name = "claude" + agent.launch_command.return_value = ["claude"] + + with ( + patch("agent_cli.dev.launch.terminals.detect_current_terminal", return_value=Tmux()), + patch("agent_cli.dev.launch.worktree.get_main_repo_root", return_value=tmp_path), + patch("agent_cli.dev.launch.worktree.get_current_branch", return_value="feature"), + patch( + "agent_cli.dev.agent_state.generate_agent_name", + side_effect=ValueError("name exists"), + ), + patch("agent_cli.dev.tmux_ops.open_window_with_pane_id") as mock_open_window, + ): + with pytest.raises(typer.Exit) as exc: + launch.launch_agent(tmp_path, agent, track=True, agent_name="reviewer") + assert exc.value.exit_code == 1 + mock_open_window.assert_not_called() + + def test_tracked_claude_launch_clears_stale_done_file(self, tmp_path: Path) -> None: + """Tracked Claude launch removes stale completion sentinel before spawn.""" + done_path = tmp_path / ".claude" / "DONE-reviewer" + done_path.parent.mkdir(parents=True, exist_ok=True) + done_path.write_text("stale\n") + + agent = MagicMock() + agent.name = "claude" + agent.launch_command.return_value = ["claude"] + + with ( + patch("agent_cli.dev.launch.terminals.detect_current_terminal", return_value=Tmux()), + patch("agent_cli.dev.launch.worktree.get_main_repo_root", return_value=tmp_path), + patch("agent_cli.dev.launch.worktree.get_current_branch", return_value="feature"), + patch("agent_cli.dev.agent_state.generate_agent_name", return_value="reviewer"), + patch("agent_cli.dev.tmux_ops.open_window_with_pane_id", return_value="%7"), + patch("agent_cli.dev.agent_state.register_agent"), + patch("agent_cli.dev.agent_state.inject_completion_hook"), + ): + result = launch.launch_agent(tmp_path, agent, track=True) + assert result == "reviewer" + assert not done_path.exists() + + +class TestPollerRegression: + """Regression tests for completion detection logic.""" + + def test_poll_ignores_done_sentinel_for_non_claude_agents(self, tmp_path: Path) -> None: + """Non-Claude agents should not be marked done by stale Claude sentinels.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + wt = tmp_path / "worktree" + done_path = wt / ".claude" / "DONE" + done_path.parent.mkdir(parents=True, exist_ok=True) + done_path.write_text("stale\n") + + agent_state.register_agent(repo, "worker", "%3", wt, "codex") + + with ( + patch("agent_cli.dev.tmux_ops.pane_exists", return_value=True), + patch("agent_cli.dev.tmux_ops.capture_pane", return_value="output"), + patch("agent_cli.dev.tmux_ops.hash_output", return_value="h1"), + ): + statuses = poller.poll_once(repo) + assert statuses["worker"] == "running" + + def test_poll_detects_quiescence_for_non_claude_agents(self, tmp_path: Path) -> None: + """poll_once marks agent as quiet after enough time with unchanged output.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + wt = tmp_path / "worktree" + agent_state.register_agent(repo, "worker", "%3", wt, "codex") + + with ( + patch("agent_cli.dev.tmux_ops.pane_exists", return_value=True), + patch("agent_cli.dev.tmux_ops.capture_pane", return_value="output"), + patch("agent_cli.dev.tmux_ops.hash_output", return_value="h1"), + ): + t0 = time.time() + # First poll sets the hash and last_output_change_at + with patch("time.time", return_value=t0): + statuses = poller.poll_once(repo) + assert statuses["worker"] == "running" + + # Second poll 1s later — too soon, still running + with patch("time.time", return_value=t0 + 1): + statuses = poller.poll_once(repo) + assert statuses["worker"] == "running" + + # Third poll after QUIET_SECONDS — now quiet + with patch("time.time", return_value=t0 + poller.QUIET_SECONDS + 1): + statuses = poller.poll_once(repo) + assert statuses["worker"] == "quiet" + + def test_poll_resets_quiescence_on_output_change(self, tmp_path: Path) -> None: + """Output change resets the quiescence timer.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + wt = tmp_path / "worktree" + agent_state.register_agent(repo, "worker", "%3", wt, "codex") + + with ( + patch("agent_cli.dev.tmux_ops.pane_exists", return_value=True), + patch("agent_cli.dev.tmux_ops.capture_pane", return_value="output"), + ): + t0 = time.time() + + # First poll: hash h1 + with ( + patch("agent_cli.dev.tmux_ops.hash_output", return_value="h1"), + patch("time.time", return_value=t0), + ): + poller.poll_once(repo) + + # After QUIET_SECONDS, hash changes to h2 — timer resets + with ( + patch("agent_cli.dev.tmux_ops.hash_output", return_value="h2"), + patch("time.time", return_value=t0 + poller.QUIET_SECONDS + 1), + ): + statuses = poller.poll_once(repo) + assert statuses["worker"] == "running" + + # QUIET_SECONDS after the reset — now quiet + with ( + patch("agent_cli.dev.tmux_ops.hash_output", return_value="h2"), + patch("time.time", return_value=t0 + 2 * poller.QUIET_SECONDS + 2), + ): + statuses = poller.poll_once(repo) + assert statuses["worker"] == "quiet" + + def test_wait_ignores_done_sentinel_for_non_claude_agents(self, tmp_path: Path) -> None: + """wait_for_agent should use quiescence for non-Claude agents even if DONE exists.""" + with patch.object(agent_state, "STATE_BASE", tmp_path / ".cache"): + repo = tmp_path / "repo" + wt = tmp_path / "worktree" + done_path = wt / ".claude" / "DONE" + done_path.parent.mkdir(parents=True, exist_ok=True) + done_path.write_text("stale\n") + + agent_state.register_agent(repo, "worker", "%3", wt, "codex") + + call_count = 0 + t0 = 1000.0 + + def advancing_time() -> float: + """Each call advances time beyond QUIET_SECONDS.""" + nonlocal call_count + call_count += 1 + return t0 + call_count * (poller.QUIET_SECONDS + 1) + + with ( + patch("agent_cli.dev.tmux_ops.pane_exists", return_value=True), + patch("agent_cli.dev.tmux_ops.capture_pane", return_value="output"), + patch("agent_cli.dev.tmux_ops.hash_output", return_value="h1"), + patch("time.time", side_effect=advancing_time), + patch("time.sleep"), + ): + status, _elapsed = poller.wait_for_agent(repo, "worker", timeout=0, interval=0) + assert status == "quiet" + + +# --------------------------------------------------------------------------- +# CLI command tests +# --------------------------------------------------------------------------- + + +class TestPollCommand: + """Tests for dev poll command.""" + + def test_poll_no_agents(self) -> None: + """Shows message when no agents are tracked.""" + with ( + patch.dict("os.environ", _TMUX_ENV), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + patch( + "agent_cli.dev.agent_state.load_state", + return_value=agent_state.AgentStateFile(), + ), + ): + result = runner.invoke(app, ["dev", "poll"]) + assert result.exit_code == 0 + assert "No tracked agents" in result.output + + def test_poll_json_output(self) -> None: + """Returns JSON with agent status.""" + state = agent_state.AgentStateFile() + state.agents["test"] = agent_state.TrackedAgent( + name="test", + pane_id="%3", + worktree_path="/tmp/wt", # noqa: S108 + agent_type="claude", + started_at=time.time() - 60, + ) + + with ( + patch.dict("os.environ", _TMUX_ENV), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + patch("agent_cli.dev.agent_state.load_state", return_value=state), + patch("agent_cli.dev.agent_state.save_state"), + patch("agent_cli.dev.tmux_ops.pane_exists", return_value=True), + patch("agent_cli.dev.tmux_ops.capture_pane", return_value="output"), + patch("agent_cli.dev.tmux_ops.hash_output", return_value="abc123"), + ): + result = runner.invoke(app, ["dev", "poll", "--json"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert len(data["agents"]) == 1 + assert data["agents"][0]["name"] == "test" + + def test_poll_detects_dead_agent(self) -> None: + """Marks agent as dead when pane is gone.""" + state = agent_state.AgentStateFile() + state.agents["test"] = agent_state.TrackedAgent( + name="test", + pane_id="%3", + worktree_path="/tmp/wt", # noqa: S108 + agent_type="claude", + started_at=time.time(), + ) + + with ( + patch.dict("os.environ", _TMUX_ENV), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + patch("agent_cli.dev.agent_state.load_state", return_value=state), + patch("agent_cli.dev.agent_state.save_state"), + patch("agent_cli.dev.tmux_ops.pane_exists", return_value=False), + ): + result = runner.invoke(app, ["dev", "poll", "--json"]) + assert result.exit_code == 0 + data = json.loads(result.output) + assert data["agents"][0]["status"] == "dead" + + +class TestOutputCommand: + """Tests for dev output command.""" + + def test_output_captures_pane(self) -> None: + """Captures and prints pane output.""" + state = agent_state.AgentStateFile() + state.agents["test"] = agent_state.TrackedAgent( + name="test", + pane_id="%3", + worktree_path="/tmp/wt", # noqa: S108 + agent_type="claude", + started_at=time.time(), + status="running", + ) + + with ( + patch.dict("os.environ", _TMUX_ENV), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + patch("agent_cli.dev.agent_state.load_state", return_value=state), + patch("agent_cli.dev.tmux_ops.capture_pane", return_value="hello world\n"), + ): + result = runner.invoke(app, ["dev", "output", "test"]) + assert result.exit_code == 0 + assert "hello world" in result.output + + def test_output_agent_not_found(self) -> None: + """Errors when agent name doesn't exist.""" + with ( + patch.dict("os.environ", _TMUX_ENV), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + patch( + "agent_cli.dev.agent_state.load_state", + return_value=agent_state.AgentStateFile(), + ), + ): + result = runner.invoke(app, ["dev", "output", "nonexistent"]) + assert result.exit_code == 1 + assert "not found" in result.output + + +class TestSendCommand: + """Tests for dev send command.""" + + def test_send_keys_to_agent(self) -> None: + """Sends keys to agent's tmux pane.""" + state = agent_state.AgentStateFile() + state.agents["test"] = agent_state.TrackedAgent( + name="test", + pane_id="%3", + worktree_path="/tmp/wt", # noqa: S108 + agent_type="claude", + started_at=time.time(), + status="running", + ) + + with ( + patch.dict("os.environ", _TMUX_ENV), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + patch("agent_cli.dev.agent_state.load_state", return_value=state), + patch("agent_cli.dev.tmux_ops.send_keys", return_value=True) as mock_send, + ): + result = runner.invoke(app, ["dev", "send", "test", "fix the tests"]) + assert result.exit_code == 0 + mock_send.assert_called_once_with("%3", "fix the tests", enter=True) + + def test_send_to_dead_agent(self) -> None: + """Errors when sending to dead agent.""" + state = agent_state.AgentStateFile() + state.agents["test"] = agent_state.TrackedAgent( + name="test", + pane_id="%3", + worktree_path="/tmp/wt", # noqa: S108 + agent_type="claude", + started_at=time.time(), + status="dead", + ) + + with ( + patch.dict("os.environ", _TMUX_ENV), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + patch("agent_cli.dev.agent_state.load_state", return_value=state), + ): + result = runner.invoke(app, ["dev", "send", "test", "hello"]) + assert result.exit_code == 1 + assert "dead" in result.output + + +class TestWaitCommand: + """Tests for dev wait command.""" + + def test_wait_already_done(self) -> None: + """Returns immediately if agent is already done.""" + state = agent_state.AgentStateFile() + state.agents["test"] = agent_state.TrackedAgent( + name="test", + pane_id="%3", + worktree_path="/tmp/wt", # noqa: S108 + agent_type="claude", + started_at=time.time(), + status="done", + ) + + with ( + patch.dict("os.environ", _TMUX_ENV), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + patch("agent_cli.dev.agent_state.load_state", return_value=state), + ): + result = runner.invoke(app, ["dev", "wait", "test"]) + assert result.exit_code == 0 + assert "already" in result.output + + def test_wait_already_dead(self) -> None: + """Returns exit code 1 if agent is dead.""" + state = agent_state.AgentStateFile() + state.agents["test"] = agent_state.TrackedAgent( + name="test", + pane_id="%3", + worktree_path="/tmp/wt", # noqa: S108 + agent_type="claude", + started_at=time.time(), + status="dead", + ) + + with ( + patch.dict("os.environ", _TMUX_ENV), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + patch("agent_cli.dev.agent_state.load_state", return_value=state), + ): + result = runner.invoke(app, ["dev", "wait", "test"]) + assert result.exit_code == 1 + + +class TestNotInTmux: + """Tests for tmux requirement enforcement.""" + + def test_poll_requires_tmux(self) -> None: + """Poll command fails outside tmux.""" + with ( + patch.dict("os.environ", {}, clear=True), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + ): + result = runner.invoke(app, ["dev", "poll"]) + assert result.exit_code == 1 + assert "tmux" in result.output.lower() + + def test_send_requires_tmux(self) -> None: + """Send command fails outside tmux.""" + with ( + patch.dict("os.environ", {}, clear=True), + patch("agent_cli.dev.orchestration._ensure_git_repo", return_value=Path("/repo")), + ): + result = runner.invoke(app, ["dev", "send", "test", "hello"]) + assert result.exit_code == 1 + + +class TestInjectCompletionHook: + """Tests for Claude Code hook injection.""" + + def test_injects_stop_hook(self, tmp_path: Path) -> None: + """Creates .claude/settings.local.json with Stop hook unique to agent name.""" + inject_completion_hook(tmp_path, "claude", "my-agent") + + settings_path = tmp_path / ".claude" / "settings.local.json" + assert settings_path.exists() + settings = json.loads(settings_path.read_text()) + assert "hooks" in settings + assert "Stop" in settings["hooks"] + hooks = settings["hooks"]["Stop"] + assert any( + any( + isinstance(hook, dict) and hook.get("command") == "touch .claude/DONE-my-agent" + for hook in h.get("hooks", []) + ) + for h in hooks + ) + + def test_merges_with_existing_settings(self, tmp_path: Path) -> None: + """Preserves existing settings when injecting hook.""" + settings_path = tmp_path / ".claude" / "settings.local.json" + settings_path.parent.mkdir(parents=True) + settings_path.write_text(json.dumps({"model": "opus", "hooks": {"PreToolUse": []}})) + + inject_completion_hook(tmp_path, "claude", "reviewer") + + settings = json.loads(settings_path.read_text()) + assert settings["model"] == "opus" + assert "PreToolUse" in settings["hooks"] + assert "Stop" in settings["hooks"] + + def test_skips_non_claude_agents(self, tmp_path: Path) -> None: + """Does nothing for non-Claude agents.""" + inject_completion_hook(tmp_path, "aider", "worker") + assert not (tmp_path / ".claude" / "settings.local.json").exists() + + def test_idempotent(self, tmp_path: Path) -> None: + """Doesn't duplicate hook on repeated calls.""" + inject_completion_hook(tmp_path, "claude", "reviewer") + inject_completion_hook(tmp_path, "claude", "reviewer") + + settings = json.loads((tmp_path / ".claude" / "settings.local.json").read_text()) + stop_hooks = settings["hooks"]["Stop"] + sentinel_count = sum( + 1 + for h in stop_hooks + if any( + isinstance(hook, dict) and hook.get("command") == "touch .claude/DONE-reviewer" + for hook in h.get("hooks", []) + ) + ) + assert sentinel_count == 1 + + def test_hooks_value_is_list_not_dict(self, tmp_path: Path) -> None: + """Handles corrupt settings where 'hooks' is a list instead of dict.""" + settings_path = tmp_path / ".claude" / "settings.local.json" + settings_path.parent.mkdir(parents=True) + settings_path.write_text(json.dumps({"hooks": []})) + + inject_completion_hook(tmp_path, "claude", "worker") + + settings = json.loads(settings_path.read_text()) + assert isinstance(settings["hooks"], dict) + assert "Stop" in settings["hooks"] + + def test_stop_value_is_not_list(self, tmp_path: Path) -> None: + """Handles corrupt settings where 'Stop' is not a list.""" + settings_path = tmp_path / ".claude" / "settings.local.json" + settings_path.parent.mkdir(parents=True) + settings_path.write_text(json.dumps({"hooks": {"Stop": "invalid"}})) + + inject_completion_hook(tmp_path, "claude", "worker") + + settings = json.loads(settings_path.read_text()) + assert isinstance(settings["hooks"]["Stop"], list) + assert len(settings["hooks"]["Stop"]) == 1 + + def test_different_agents_get_different_sentinels(self, tmp_path: Path) -> None: + """Two agents in the same worktree get distinct sentinel commands.""" + inject_completion_hook(tmp_path, "claude", "agent-a") + inject_completion_hook(tmp_path, "claude", "agent-b") + + settings = json.loads((tmp_path / ".claude" / "settings.local.json").read_text()) + stop_hooks = settings["hooks"]["Stop"] + commands = [ + hook.get("command", "") + for h in stop_hooks + for hook in h.get("hooks", []) + if isinstance(hook, dict) + ] + assert "touch .claude/DONE-agent-a" in commands + assert "touch .claude/DONE-agent-b" in commands + + def test_stop_hooks_with_non_dict_entries(self, tmp_path: Path) -> None: + """Non-dict entries in Stop list don't crash hook injection.""" + settings_path = tmp_path / ".claude" / "settings.local.json" + settings_path.parent.mkdir(parents=True) + settings_path.write_text( + json.dumps({"hooks": {"Stop": ["stale-string", 42, None]}}), + ) + + inject_completion_hook(tmp_path, "claude", "worker") + + settings = json.loads(settings_path.read_text()) + stop_hooks = settings["hooks"]["Stop"] + # Non-dict entries preserved, our hook appended + assert len(stop_hooks) == 4 + last_hook = stop_hooks[-1] + assert last_hook["hooks"][0]["command"] == "touch .claude/DONE-worker" diff --git a/tests/test_config.py b/tests/test_config.py index 284cdd9d..60043244 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -181,6 +181,32 @@ def test_config_preserves_scalar_options_when_section_has_nested_subsections( assert config["dev.agent_args"]["claude"] == ["--dangerously-skip-permissions"] +def test_config_deeply_nested_sections_with_scalars(tmp_path: Path) -> None: + """Deeply nested sections (agent_env.claude) are flattened with dot notation.""" + config_content = """ +[dev] +branch_name_mode = "ai" + +[dev.agent_args] +claude = ["--dangerously-skip-permissions"] +codex = ["--dangerously-bypass-approvals-and-sandbox"] + +[dev.agent_env.claude] +CLAUDE_CODE_USE_VERTEX = "1" +ANTHROPIC_MODEL = "claude-opus-4-6" +""" + config_path = tmp_path / "config.toml" + config_path.write_text(config_content) + + config = load_config(str(config_path)) + + assert config["dev"]["branch_name_mode"] == "ai" + assert config["dev.agent_args"]["claude"] == ["--dangerously-skip-permissions"] + assert config["dev.agent_args"]["codex"] == ["--dangerously-bypass-approvals-and-sandbox"] + assert config["dev.agent_env.claude"]["CLAUDE_CODE_USE_VERTEX"] == "1" + assert config["dev.agent_env.claude"]["ANTHROPIC_MODEL"] == "claude-opus-4-6" + + def test_provider_alias_normalization(config_file: Path) -> None: """Ensure deprecated provider names are normalized.""" config = load_config(str(config_file))