Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a8fa9ea
feat(dev): add sub-agent orchestration for tmux
basnijholt Feb 10, 2026
c852b0c
refactor(dev): extract orchestration logic from cli.py
basnijholt Feb 10, 2026
1088804
Merge c852b0c18fb20281be1f3f070a4f14a4aca557cf into 0d7b62cd9553f5455…
basnijholt Feb 10, 2026
639d165
Update auto-generated docs
github-actions[bot] Feb 10, 2026
836c907
refactor(dev): extract cleanup and launch logic from cli.py
basnijholt Feb 10, 2026
9a49f83
refactor(dev): extract output helpers and orchestration commands from…
basnijholt Feb 10, 2026
608e369
refactor(dev): remove import aliases and privatize internal helpers
basnijholt Feb 10, 2026
f20faa0
refactor(dev): make output helpers public and fix variable shadowing
basnijholt Feb 10, 2026
3a41dcf
fix: resolve merge conflicts from origin/main
basnijholt Feb 10, 2026
8f8179e
fix(dev): address PR review issues in orchestration code
basnijholt Feb 10, 2026
b0f24c5
refactor(dev): remove --follow flag from dev output
basnijholt Feb 10, 2026
372e8b8
feat(dev): group dev subcommands with rich_help_panel
basnijholt Feb 10, 2026
9c389fe
fix(dev): harden orchestration tracking edge cases
basnijholt Feb 10, 2026
c2ff134
refactor(dev): simplify orchestration state model
basnijholt Feb 10, 2026
9fa6b86
fix(dev): address orchestration bugs found during live testing
basnijholt Feb 10, 2026
2668f78
fix(dev): use settings.local.json for completion hook injection
basnijholt Feb 10, 2026
3696ca0
fix(dev): harden state files against corrupt schemas and concurrent w…
basnijholt Feb 10, 2026
37e1d87
feat(dev): add quiescence detection to poll command
basnijholt Feb 10, 2026
043e854
fix(dev): use wall-clock time for quiescence detection
basnijholt Feb 10, 2026
f0dd57d
fix(dev): use per-agent sentinel files and track via tmux from any te…
basnijholt Feb 10, 2026
93012a0
fix(dev): PR review cleanup — move is_tmux, inline _state_dir, preser…
basnijholt Feb 10, 2026
14602f4
fix(dev): harden against corrupt state files and fix --tab tmux guard
basnijholt Feb 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 237 additions & 0 deletions agent_cli/dev/agent_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
"""Agent state tracking for orchestration."""

from __future__ import annotations

import json
import os
import re
import time
from dataclasses import asdict, dataclass, field
from hashlib import sha256
from pathlib import Path
from typing import Literal

STATE_BASE = Path.home() / ".cache" / "agent-cli"

AgentStatus = Literal["running", "done", "dead", "quiet"]


@dataclass
class TrackedAgent:
"""A single tracked agent instance."""

name: str
pane_id: str
worktree_path: str
agent_type: str
started_at: float
status: AgentStatus = "running"
last_output_hash: str = ""
last_output_change_at: float = 0.0


@dataclass
class AgentStateFile:
"""State file for one repository's tracked agents."""

agents: dict[str, TrackedAgent] = field(default_factory=dict)
last_poll_at: float = 0.0


def _repo_slug(repo_root: Path) -> str:
"""Convert a repo root path to a filesystem-safe slug.

Includes a short path hash to avoid collisions between repositories with
the same trailing directory names.
"""
parts = repo_root.parts[-2:]
slug = "_".join(parts)
slug = re.sub(r"[^a-zA-Z0-9_-]", "_", slug)
digest = sha256(str(repo_root.expanduser().resolve()).encode()).hexdigest()[:10]
return f"{slug}_{digest}"


def _state_file_path(repo_root: Path) -> Path:
"""Return the path to the agents.json state file."""
return STATE_BASE / _repo_slug(repo_root) / "agents.json"


def load_state(repo_root: Path) -> AgentStateFile:
"""Load agent state from disk.

Returns an empty state if the file does not exist or is corrupt.
"""
path = _state_file_path(repo_root)
if not path.exists():
return AgentStateFile()

try:
data = json.loads(path.read_text())
except (OSError, json.JSONDecodeError, TypeError):
return AgentStateFile()

agents: dict[str, TrackedAgent] = {}
raw_agents = data.get("agents", {})
if not isinstance(raw_agents, dict):
raw_agents = {}
for name, agent_data in raw_agents.items():
if not isinstance(agent_data, dict):
continue
status = agent_data.get("status", "running")
if status not in ("running", "done", "dead", "quiet"):
status = "running"
try:
agents[name] = TrackedAgent(
name=str(agent_data["name"]),
pane_id=str(agent_data["pane_id"]),
worktree_path=str(agent_data["worktree_path"]),
agent_type=str(agent_data["agent_type"]),
started_at=float(agent_data["started_at"]),
status=status,
last_output_hash=str(agent_data.get("last_output_hash", "")),
last_output_change_at=float(agent_data.get("last_output_change_at", 0.0)),
)
except (KeyError, TypeError, ValueError):
continue

raw_last_poll_at = data.get("last_poll_at", 0.0)
try:
last_poll_at = float(raw_last_poll_at)
except (TypeError, ValueError):
last_poll_at = 0.0
return AgentStateFile(agents=agents, last_poll_at=last_poll_at)


def save_state(repo_root: Path, state: AgentStateFile) -> None:
"""Atomically write state to disk."""
path = _state_file_path(repo_root)
path.parent.mkdir(parents=True, exist_ok=True)
data = {
"agents": {name: asdict(agent) for name, agent in state.agents.items()},
"last_poll_at": state.last_poll_at,
}
# Write to temp file then rename for atomicity.
# Use PID in suffix to avoid races between concurrent writers.
tmp = path.with_suffix(f".{os.getpid()}.tmp")
tmp.write_text(json.dumps(data, indent=2) + "\n")
tmp.rename(path)


def register_agent(
repo_root: Path,
name: str,
pane_id: str,
worktree_path: Path,
agent_type: str,
) -> TrackedAgent:
"""Register a new tracked agent in the state file."""
state = load_state(repo_root)
now = time.time()
agent = TrackedAgent(
name=name,
pane_id=pane_id,
worktree_path=str(worktree_path),
agent_type=agent_type,
started_at=now,
)
state.agents[name] = agent
save_state(repo_root, state)
return agent


def generate_agent_name(
repo_root: Path,
worktree_path: Path,
agent_type: str,
explicit_name: str | None = None,
) -> str:
"""Generate a unique agent name.

If *explicit_name* is given, uses that (raises if it collides with an active
agent).
Otherwise auto-generates from the worktree branch name.
"""
state = load_state(repo_root)
existing = {
name for name, existing_agent in state.agents.items() if existing_agent.status == "running"
}

if explicit_name:
if explicit_name in existing:
msg = f"Agent name '{explicit_name}' already exists. Use a different --name."
raise ValueError(msg)
return explicit_name

# Use worktree directory name as base (which is the branch name)
base = worktree_path.name

# First agent in this worktree: just use the branch name
if base not in existing:
return base

# Subsequent agents: append agent type
candidate = f"{base}-{agent_type}"
if candidate not in existing:
return candidate

# Still collides: add numeric suffix
n = 2
while f"{candidate}-{n}" in existing:
n += 1
return f"{candidate}-{n}"


def sentinel_path(worktree_path: Path, agent_name: str) -> Path:
"""Return the completion sentinel path for a tracked agent."""
return worktree_path / ".claude" / f"DONE-{agent_name}"


def inject_completion_hook(worktree_path: Path, agent_type: str, agent_name: str) -> None:
"""Inject a Stop hook into .claude/settings.local.json for completion detection.

Uses settings.local.json (not settings.json) to avoid dirtying tracked files.
The sentinel file is unique per tracked agent name to avoid collisions when
multiple Claude sessions share the same worktree.
Only applies to Claude Code agents. Merges with existing settings.
"""
if agent_type != "claude":
return

settings_path = worktree_path / ".claude" / "settings.local.json"
settings_path.parent.mkdir(parents=True, exist_ok=True)

settings: dict = {}
if settings_path.exists():
try:
raw = json.loads(settings_path.read_text())
settings = raw if isinstance(raw, dict) else {}
except json.JSONDecodeError:
settings = {}

# Merge Stop hook — guard against non-dict values from corrupt files
hooks = settings.get("hooks")
if not isinstance(hooks, dict):
hooks = {}
settings["hooks"] = hooks
stop_hooks = hooks.get("Stop")
if not isinstance(stop_hooks, list):
stop_hooks = []
hooks["Stop"] = stop_hooks

# Check if our hook is already present
sentinel_cmd = f"touch .claude/DONE-{agent_name}"
for entry in stop_hooks:
if not isinstance(entry, dict):
continue
for hook in entry.get("hooks", []):
cmd = hook.get("command", "") if isinstance(hook, dict) else hook
if isinstance(cmd, str) and sentinel_cmd in cmd:
return # Already injected

stop_hooks.append(
{
"matcher": "",
"hooks": [{"type": "command", "command": sentinel_cmd}],
},
)
settings_path.write_text(json.dumps(settings, indent=2) + "\n")
Loading
Loading