Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 87 additions & 7 deletions control_plane/every_code_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import signal
import subprocess
import time
from typing import Callable, Literal, Protocol, Sequence
from typing import Callable, Literal, Mapping, Protocol, Sequence
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
Expand Down Expand Up @@ -1258,22 +1258,94 @@ def close_terminal_every_code_sessions(
session_name=session_name,
runner=run,
)
worktree_processes_closed = _terminate_every_code_worktree_processes(
session_state=session_state,
runner=run,
) > 0
if existing_session is False:
path.unlink(missing_ok=True)
closed += 1 if worktree_processes_closed else 0
continue
if existing_session is None:
closed += 1 if worktree_processes_closed else 0
continue
pane_pid = _tmux_pane_pid(
if _terminate_every_code_tmux_session(
tmux_binary=tmux_binary,
session_name=session_name,
runner=run,
)
if pane_pid is None:
):
path.unlink(missing_ok=True)
closed += 1
continue
if worktree_processes_closed:
closed += 1
return closed


def _terminate_every_code_tmux_session(
*,
tmux_binary: str,
session_name: str,
runner: Runner,
) -> bool:
closed = False
pane_pid = _tmux_pane_pid(
tmux_binary=tmux_binary,
session_name=session_name,
runner=runner,
)
if pane_pid is not None:
try:
os.killpg(pane_pid, signal.SIGTERM)
closed = True
except ProcessLookupError:
closed = True
except OSError:
pass
try:
result = runner((tmux_binary, "kill-session", "-t", session_name))
except OSError:
return closed
return closed or result.returncode in {0, 1}


def _terminate_every_code_worktree_processes(
*,
session_state: Mapping[str, object],
runner: Runner,
) -> int:
launch_root_value = session_state.get("launch_root")
if not isinstance(launch_root_value, str) or not launch_root_value.strip():
return 0
launch_root = Path(launch_root_value).expanduser()
if not launch_root.is_dir():
return 0
try:
result = runner(("lsof", "-t", "+D", str(launch_root)))
except OSError:
return 0
if result.returncode not in {0, 1}:
return 0
process_groups: set[int] = set()
for line in result.stdout.splitlines():
try:
pid = int(line.strip())
except ValueError:
continue
if pid <= 0 or pid == os.getpid():
continue
try:
process_groups.add(os.getpgid(pid))
except ProcessLookupError:
continue
except OSError:
process_groups.add(pid)

closed = 0
for process_group in sorted(process_groups):
try:
os.killpg(process_group, signal.SIGTERM)
except ProcessLookupError:
path.unlink(missing_ok=True)
continue
except OSError:
continue
Expand Down Expand Up @@ -1688,19 +1760,27 @@ def finish_every_code_work_request(
result_pr_url=terminal_pr_url,
)
succeeded = exit_code == 0
del runner
run = runner or _run_subprocess
resolved_pr_url = result_pr_url.strip()
if succeeded and not resolved_pr_url:
resolved_pr_url = _every_code_preview_pr_url_for_record(record, runner=run)
summary = result_summary.strip() or (
"Every Code session completed successfully."
if succeeded
else f"Every Code session exited with status {exit_code}."
)
if succeeded and not resolved_pr_url:
succeeded = False
summary = (
"Every Code session exited successfully but did not open a pull request."
)
updated_record = apply_every_code_work_request_status(
record,
EveryCodeWorkRequestStatusUpdate(
state="done" if succeeded else "blocked",
host=host.strip(),
updated_at=utc_now_timestamp(),
result_pr_url=result_pr_url.strip(),
result_pr_url=resolved_pr_url,
result_summary=summary,
error_message="" if succeeded else summary,
),
Expand Down
185 changes: 184 additions & 1 deletion tests/test_every_code_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,16 @@ def __call__(self, args: Sequence[str]) -> subprocess.CompletedProcess[str]:
return super().__call__(args)


class _GoneSessionWithWorktreeProcessRunner(_GoneSessionRunner):
def __call__(self, args: Sequence[str]) -> subprocess.CompletedProcess[str]:
self.calls.append(tuple(args))
if args[0] == "lsof":
return subprocess.CompletedProcess(args, 0, "9001\n9002\n", "")
if args[1] == "has-session":
return subprocess.CompletedProcess(args, 1, "", "no session")
return super().__call__(args)


class _Process:
def __init__(self, pid: int) -> None:
self.pid = pid
Expand Down Expand Up @@ -1672,7 +1682,116 @@ def test_terminal_host_request_sends_sigterm_to_session_process_group(self) -> N
self.assertEqual(closed, 1)
killpg.assert_called_once_with(4242, signal.SIGTERM)
self.assertEqual(runner.calls[0][1], "has-session")
self.assertEqual(runner.calls[1][1], "display-message")
self.assertEqual(runner.calls[1][0], "lsof")
self.assertEqual(runner.calls[2][1], "display-message")
self.assertEqual(runner.calls[3][1], "kill-session")

def test_terminal_request_kills_worktree_processes_when_tmux_is_gone(self) -> None:
with TemporaryDirectory() as temporary_directory_name:
temporary_root = Path(temporary_directory_name)
checkout_root = temporary_root / "Developer" / "code"
checkout_root.mkdir(parents=True)
(checkout_root / ".git").mkdir()
store = FilesystemRecordStore(state_dir=temporary_root / "state")
store.write_every_code_work_request_record(_queued_record())
run_every_code_worker_once(
record_store=store,
host="Chris-Studio",
workspace_root=temporary_root / "Developer",
state_dir=temporary_root / "state",
runner=_Runner(),
)
record = store.read_every_code_work_request_record(
"every-code-cbusillo-code-123-test"
).model_copy(
update={
"state": "done",
"finished_at": "2026-05-06T00:02:00Z",
"updated_at": "2026-05-06T00:02:00Z",
"result_summary": "Linked pull request merged.",
}
)
store.write_every_code_work_request_record(record)
state_path = every_code_session_state_path(
state_dir=temporary_root / "state",
request_id="every-code-cbusillo-code-123-test",
)
runner = _GoneSessionWithWorktreeProcessRunner()

with patch(
"control_plane.every_code_worker.os.getpgid",
side_effect=lambda pid: 7000 if pid in {9001, 9002} else pid,
):
with patch("control_plane.every_code_worker.os.killpg") as killpg:
closed = close_terminal_every_code_sessions(
record_store=store,
host="Chris-Studio",
state_dir=temporary_root / "state",
runner=runner,
)

self.assertEqual(closed, 1)
killpg.assert_called_once_with(7000, signal.SIGTERM)
self.assertFalse(state_path.exists())
lsof_call = next(call for call in runner.calls if call[0] == "lsof")
self.assertEqual(lsof_call[1:3], ("-t", "+D"))

def test_terminal_request_unlinks_state_when_pane_pid_is_missing(self) -> None:
with TemporaryDirectory() as temporary_directory_name:
temporary_root = Path(temporary_directory_name)
checkout_root = temporary_root / "Developer" / "code"
checkout_root.mkdir(parents=True)
(checkout_root / ".git").mkdir()
store = FilesystemRecordStore(state_dir=temporary_root / "state")
store.write_every_code_work_request_record(_queued_record())
run_every_code_worker_once(
record_store=store,
host="Chris-Studio",
workspace_root=temporary_root / "Developer",
state_dir=temporary_root / "state",
runner=_Runner(),
)
record = store.read_every_code_work_request_record(
"every-code-cbusillo-code-123-test"
).model_copy(
update={
"state": "blocked",
"finished_at": "2026-05-06T00:02:00Z",
"updated_at": "2026-05-06T00:02:00Z",
"error_message": "Session ended without a PR.",
}
)
store.write_every_code_work_request_record(record)
state_path = every_code_session_state_path(
state_dir=temporary_root / "state",
request_id="every-code-cbusillo-code-123-test",
)

class _MissingPanePidRunner(_ExistingSessionRunner):
def __call__(self, args: Sequence[str]) -> subprocess.CompletedProcess[str]:
self.calls.append(tuple(args))
if args[0] == "lsof":
return subprocess.CompletedProcess(args, 1, "", "")
if args[1] == "has-session":
return subprocess.CompletedProcess(args, 0, "", "")
if args[1] == "display-message":
return subprocess.CompletedProcess(args, 1, "", "no pane")
if args[1] == "kill-session":
return subprocess.CompletedProcess(args, 0, "", "")
return subprocess.CompletedProcess(args, 0, "", "")

runner = _MissingPanePidRunner()

closed = close_terminal_every_code_sessions(
record_store=store,
host="Chris-Studio",
state_dir=temporary_root / "state",
runner=runner,
)

self.assertEqual(closed, 1)
self.assertFalse(state_path.exists())
self.assertTrue(any(call[1] == "kill-session" for call in runner.calls))

def test_terminal_request_claimed_by_other_host_is_ignored(self) -> None:
with TemporaryDirectory() as temporary_directory_name:
Expand Down Expand Up @@ -1770,6 +1889,68 @@ def test_finish_marks_running_request_done(self) -> None:
self.assertEqual(record.result_pr_url, "https://github.com/cbusillo/code/pull/99")
self.assertEqual(record.error_message, "")

def test_finish_discovers_open_pr_for_successful_exit_without_result_url(self) -> None:
with TemporaryDirectory() as temporary_directory_name:
temporary_root = Path(temporary_directory_name)
checkout_root = temporary_root / "Developer" / "code"
checkout_root.mkdir(parents=True)
(checkout_root / ".git").mkdir()
store = FilesystemRecordStore(state_dir=temporary_root / "state")
store.write_every_code_work_request_record(_queued_record())
run_every_code_worker_once(
record_store=store,
host="Chris-Studio",
workspace_root=temporary_root / "Developer",
state_dir=temporary_root / "state",
runner=_Runner(),
)
runner = _Runner(
pr_list_payload=[{"url": "https://github.com/cbusillo/code/pull/99"}]
)

result = finish_every_code_work_request(
record_store=store,
request_id="every-code-cbusillo-code-123-test",
host="Chris-Studio",
exit_code=0,
runner=runner,
)
record = store.read_every_code_work_request_record("every-code-cbusillo-code-123-test")

self.assertEqual(result.status, "done")
self.assertEqual(record.state, "done")
self.assertEqual(record.result_pr_url, "https://github.com/cbusillo/code/pull/99")

def test_finish_blocks_successful_exit_without_pull_request(self) -> None:
with TemporaryDirectory() as temporary_directory_name:
temporary_root = Path(temporary_directory_name)
checkout_root = temporary_root / "Developer" / "code"
checkout_root.mkdir(parents=True)
(checkout_root / ".git").mkdir()
store = FilesystemRecordStore(state_dir=temporary_root / "state")
store.write_every_code_work_request_record(_queued_record())
run_every_code_worker_once(
record_store=store,
host="Chris-Studio",
workspace_root=temporary_root / "Developer",
state_dir=temporary_root / "state",
runner=_Runner(),
)

result = finish_every_code_work_request(
record_store=store,
request_id="every-code-cbusillo-code-123-test",
host="Chris-Studio",
exit_code=0,
runner=_Runner(pr_list_payload=[]),
)
record = store.read_every_code_work_request_record("every-code-cbusillo-code-123-test")

self.assertEqual(result.status, "blocked")
self.assertEqual(record.state, "blocked")
self.assertEqual(record.result_pr_url, "")
self.assertIn("did not open a pull request", record.error_message)

def test_finish_defers_preview_label_until_gate_passes(self) -> None:
with TemporaryDirectory() as temporary_directory_name:
temporary_root = Path(temporary_directory_name)
Expand Down Expand Up @@ -2282,6 +2463,8 @@ def test_cli_finish_reports_done(self) -> None:
"Chris-Studio",
"--exit-code",
"0",
"--result-pr-url",
"https://github.com/cbusillo/code/pull/99",
],
)

Expand Down
Loading