diff --git a/control_plane/every_code_worker.py b/control_plane/every_code_worker.py index 608679ba..29e0a0cb 100644 --- a/control_plane/every_code_worker.py +++ b/control_plane/every_code_worker.py @@ -11,7 +11,7 @@ import signal import subprocess import time -from typing import Callable, Literal, Protocol, Sequence +from typing import Callable, Literal, Mapping, Protocol, Sequence from urllib.error import HTTPError, URLError from urllib.parse import urlencode from urllib.request import Request, urlopen @@ -1258,22 +1258,94 @@ def close_terminal_every_code_sessions( session_name=session_name, runner=run, ) + worktree_processes_closed = _terminate_every_code_worktree_processes( + session_state=session_state, + runner=run, + ) > 0 if existing_session is False: path.unlink(missing_ok=True) + closed += 1 if worktree_processes_closed else 0 continue if existing_session is None: + closed += 1 if worktree_processes_closed else 0 continue - pane_pid = _tmux_pane_pid( + if _terminate_every_code_tmux_session( tmux_binary=tmux_binary, session_name=session_name, runner=run, - ) - if pane_pid is None: + ): + path.unlink(missing_ok=True) + closed += 1 continue + if worktree_processes_closed: + closed += 1 + return closed + + +def _terminate_every_code_tmux_session( + *, + tmux_binary: str, + session_name: str, + runner: Runner, +) -> bool: + closed = False + pane_pid = _tmux_pane_pid( + tmux_binary=tmux_binary, + session_name=session_name, + runner=runner, + ) + if pane_pid is not None: try: os.killpg(pane_pid, signal.SIGTERM) + closed = True + except ProcessLookupError: + closed = True + except OSError: + pass + try: + result = runner((tmux_binary, "kill-session", "-t", session_name)) + except OSError: + return closed + return closed or result.returncode in {0, 1} + + +def _terminate_every_code_worktree_processes( + *, + session_state: Mapping[str, object], + runner: Runner, +) -> int: + launch_root_value = session_state.get("launch_root") + if not isinstance(launch_root_value, str) or not launch_root_value.strip(): + return 0 + launch_root = Path(launch_root_value).expanduser() + if not launch_root.is_dir(): + return 0 + try: + result = runner(("lsof", "-t", "+D", str(launch_root))) + except OSError: + return 0 + if result.returncode not in {0, 1}: + return 0 + process_groups: set[int] = set() + for line in result.stdout.splitlines(): + try: + pid = int(line.strip()) + except ValueError: + continue + if pid <= 0 or pid == os.getpid(): + continue + try: + process_groups.add(os.getpgid(pid)) + except ProcessLookupError: + continue + except OSError: + process_groups.add(pid) + + closed = 0 + for process_group in sorted(process_groups): + try: + os.killpg(process_group, signal.SIGTERM) except ProcessLookupError: - path.unlink(missing_ok=True) continue except OSError: continue @@ -1688,19 +1760,27 @@ def finish_every_code_work_request( result_pr_url=terminal_pr_url, ) succeeded = exit_code == 0 - del runner + run = runner or _run_subprocess + resolved_pr_url = result_pr_url.strip() + if succeeded and not resolved_pr_url: + resolved_pr_url = _every_code_preview_pr_url_for_record(record, runner=run) summary = result_summary.strip() or ( "Every Code session completed successfully." if succeeded else f"Every Code session exited with status {exit_code}." ) + if succeeded and not resolved_pr_url: + succeeded = False + summary = ( + "Every Code session exited successfully but did not open a pull request." + ) updated_record = apply_every_code_work_request_status( record, EveryCodeWorkRequestStatusUpdate( state="done" if succeeded else "blocked", host=host.strip(), updated_at=utc_now_timestamp(), - result_pr_url=result_pr_url.strip(), + result_pr_url=resolved_pr_url, result_summary=summary, error_message="" if succeeded else summary, ), diff --git a/tests/test_every_code_worker.py b/tests/test_every_code_worker.py index 58b6559a..c078ff27 100644 --- a/tests/test_every_code_worker.py +++ b/tests/test_every_code_worker.py @@ -232,6 +232,16 @@ def __call__(self, args: Sequence[str]) -> subprocess.CompletedProcess[str]: return super().__call__(args) +class _GoneSessionWithWorktreeProcessRunner(_GoneSessionRunner): + def __call__(self, args: Sequence[str]) -> subprocess.CompletedProcess[str]: + self.calls.append(tuple(args)) + if args[0] == "lsof": + return subprocess.CompletedProcess(args, 0, "9001\n9002\n", "") + if args[1] == "has-session": + return subprocess.CompletedProcess(args, 1, "", "no session") + return super().__call__(args) + + class _Process: def __init__(self, pid: int) -> None: self.pid = pid @@ -1672,7 +1682,116 @@ def test_terminal_host_request_sends_sigterm_to_session_process_group(self) -> N self.assertEqual(closed, 1) killpg.assert_called_once_with(4242, signal.SIGTERM) self.assertEqual(runner.calls[0][1], "has-session") - self.assertEqual(runner.calls[1][1], "display-message") + self.assertEqual(runner.calls[1][0], "lsof") + self.assertEqual(runner.calls[2][1], "display-message") + self.assertEqual(runner.calls[3][1], "kill-session") + + def test_terminal_request_kills_worktree_processes_when_tmux_is_gone(self) -> None: + with TemporaryDirectory() as temporary_directory_name: + temporary_root = Path(temporary_directory_name) + checkout_root = temporary_root / "Developer" / "code" + checkout_root.mkdir(parents=True) + (checkout_root / ".git").mkdir() + store = FilesystemRecordStore(state_dir=temporary_root / "state") + store.write_every_code_work_request_record(_queued_record()) + run_every_code_worker_once( + record_store=store, + host="Chris-Studio", + workspace_root=temporary_root / "Developer", + state_dir=temporary_root / "state", + runner=_Runner(), + ) + record = store.read_every_code_work_request_record( + "every-code-cbusillo-code-123-test" + ).model_copy( + update={ + "state": "done", + "finished_at": "2026-05-06T00:02:00Z", + "updated_at": "2026-05-06T00:02:00Z", + "result_summary": "Linked pull request merged.", + } + ) + store.write_every_code_work_request_record(record) + state_path = every_code_session_state_path( + state_dir=temporary_root / "state", + request_id="every-code-cbusillo-code-123-test", + ) + runner = _GoneSessionWithWorktreeProcessRunner() + + with patch( + "control_plane.every_code_worker.os.getpgid", + side_effect=lambda pid: 7000 if pid in {9001, 9002} else pid, + ): + with patch("control_plane.every_code_worker.os.killpg") as killpg: + closed = close_terminal_every_code_sessions( + record_store=store, + host="Chris-Studio", + state_dir=temporary_root / "state", + runner=runner, + ) + + self.assertEqual(closed, 1) + killpg.assert_called_once_with(7000, signal.SIGTERM) + self.assertFalse(state_path.exists()) + lsof_call = next(call for call in runner.calls if call[0] == "lsof") + self.assertEqual(lsof_call[1:3], ("-t", "+D")) + + def test_terminal_request_unlinks_state_when_pane_pid_is_missing(self) -> None: + with TemporaryDirectory() as temporary_directory_name: + temporary_root = Path(temporary_directory_name) + checkout_root = temporary_root / "Developer" / "code" + checkout_root.mkdir(parents=True) + (checkout_root / ".git").mkdir() + store = FilesystemRecordStore(state_dir=temporary_root / "state") + store.write_every_code_work_request_record(_queued_record()) + run_every_code_worker_once( + record_store=store, + host="Chris-Studio", + workspace_root=temporary_root / "Developer", + state_dir=temporary_root / "state", + runner=_Runner(), + ) + record = store.read_every_code_work_request_record( + "every-code-cbusillo-code-123-test" + ).model_copy( + update={ + "state": "blocked", + "finished_at": "2026-05-06T00:02:00Z", + "updated_at": "2026-05-06T00:02:00Z", + "error_message": "Session ended without a PR.", + } + ) + store.write_every_code_work_request_record(record) + state_path = every_code_session_state_path( + state_dir=temporary_root / "state", + request_id="every-code-cbusillo-code-123-test", + ) + + class _MissingPanePidRunner(_ExistingSessionRunner): + def __call__(self, args: Sequence[str]) -> subprocess.CompletedProcess[str]: + self.calls.append(tuple(args)) + if args[0] == "lsof": + return subprocess.CompletedProcess(args, 1, "", "") + if args[1] == "has-session": + return subprocess.CompletedProcess(args, 0, "", "") + if args[1] == "display-message": + return subprocess.CompletedProcess(args, 1, "", "no pane") + if args[1] == "kill-session": + return subprocess.CompletedProcess(args, 0, "", "") + return subprocess.CompletedProcess(args, 0, "", "") + + runner = _MissingPanePidRunner() + + closed = close_terminal_every_code_sessions( + record_store=store, + host="Chris-Studio", + state_dir=temporary_root / "state", + runner=runner, + ) + + self.assertEqual(closed, 1) + self.assertFalse(state_path.exists()) + self.assertTrue(any(call[1] == "kill-session" for call in runner.calls)) def test_terminal_request_claimed_by_other_host_is_ignored(self) -> None: with TemporaryDirectory() as temporary_directory_name: @@ -1770,6 +1889,68 @@ def test_finish_marks_running_request_done(self) -> None: self.assertEqual(record.result_pr_url, "https://github.com/cbusillo/code/pull/99") self.assertEqual(record.error_message, "") + def test_finish_discovers_open_pr_for_successful_exit_without_result_url(self) -> None: + with TemporaryDirectory() as temporary_directory_name: + temporary_root = Path(temporary_directory_name) + checkout_root = temporary_root / "Developer" / "code" + checkout_root.mkdir(parents=True) + (checkout_root / ".git").mkdir() + store = FilesystemRecordStore(state_dir=temporary_root / "state") + store.write_every_code_work_request_record(_queued_record()) + run_every_code_worker_once( + record_store=store, + host="Chris-Studio", + workspace_root=temporary_root / "Developer", + state_dir=temporary_root / "state", + runner=_Runner(), + ) + runner = _Runner( + pr_list_payload=[{"url": "https://github.com/cbusillo/code/pull/99"}] + ) + + result = finish_every_code_work_request( + record_store=store, + request_id="every-code-cbusillo-code-123-test", + host="Chris-Studio", + exit_code=0, + runner=runner, + ) + record = store.read_every_code_work_request_record("every-code-cbusillo-code-123-test") + + self.assertEqual(result.status, "done") + self.assertEqual(record.state, "done") + self.assertEqual(record.result_pr_url, "https://github.com/cbusillo/code/pull/99") + + def test_finish_blocks_successful_exit_without_pull_request(self) -> None: + with TemporaryDirectory() as temporary_directory_name: + temporary_root = Path(temporary_directory_name) + checkout_root = temporary_root / "Developer" / "code" + checkout_root.mkdir(parents=True) + (checkout_root / ".git").mkdir() + store = FilesystemRecordStore(state_dir=temporary_root / "state") + store.write_every_code_work_request_record(_queued_record()) + run_every_code_worker_once( + record_store=store, + host="Chris-Studio", + workspace_root=temporary_root / "Developer", + state_dir=temporary_root / "state", + runner=_Runner(), + ) + + result = finish_every_code_work_request( + record_store=store, + request_id="every-code-cbusillo-code-123-test", + host="Chris-Studio", + exit_code=0, + runner=_Runner(pr_list_payload=[]), + ) + record = store.read_every_code_work_request_record("every-code-cbusillo-code-123-test") + + self.assertEqual(result.status, "blocked") + self.assertEqual(record.state, "blocked") + self.assertEqual(record.result_pr_url, "") + self.assertIn("did not open a pull request", record.error_message) + def test_finish_defers_preview_label_until_gate_passes(self) -> None: with TemporaryDirectory() as temporary_directory_name: temporary_root = Path(temporary_directory_name) @@ -2282,6 +2463,8 @@ def test_cli_finish_reports_done(self) -> None: "Chris-Studio", "--exit-code", "0", + "--result-pr-url", + "https://github.com/cbusillo/code/pull/99", ], )