diff --git a/QUICKSTART.md b/QUICKSTART.md index 9af29d0..b21c1da 100644 --- a/QUICKSTART.md +++ b/QUICKSTART.md @@ -8,21 +8,57 @@ source .venv/bin/activate pip install -e . ``` -## 2) Run (basic) +## 2) One-word entrypoint (recommended) + +```bash +codexloop +``` + +Show supported features/commands: + +```bash +codexloop help +``` + +Behavior: +- First run: prompts for Telegram token/chat id, uses current shell directory as run working directory, writes `.codex_daemon/daemon_config.json`, starts daemon. +- Later runs: auto-reuse previous config, auto-start daemon if needed, and attach to live logs. +- `codexloop init`: stop all current codexloop daemons, prompt token/chat id/model preset/play mode, start a fresh daemon in background, and exit. +- After `init`, run `codexloop` to attach monitor. +- In attach console, terminal control works directly: + - `/run ` + - `/inject ` + - `/status`, `/stop`, `/disable` (same as `/daemon-stop`), `/daemon-stop` + - plain text: running => inject, idle => run + +Play Mode: +- `execute-only`: only execute user commands, no plan agent. +- `fully-plan` (default): 10 minutes after run completion, daemon proposes next request; if not overridden within another 10 minutes, it auto-runs. +- `record-only`: plan agent only writes markdown table records; reviewer remains unchanged. + +Daemon-launched runs use `--yolo` by default. + +Disable daemon quickly: + +```bash +codexloop disable +``` + +## 3) Run (basic) ```bash codex-autoloop \ - --max-rounds 100 \ + --max-rounds 500 \ "帮我在这个文件夹写一下pipeline" ``` -## 3) Run with Telegram (secure remote visibility) +## 4) Run with Telegram (secure remote visibility) ```bash export TELEGRAM_BOT_TOKEN='123456789:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' codex-autoloop \ - --max-rounds 100 \ + --max-rounds 500 \ --telegram-bot-token "$TELEGRAM_BOT_TOKEN" \ --telegram-events "loop.started,round.review.completed,loop.completed" \ "帮我在这个文件夹写一下pipeline" @@ -33,7 +69,7 @@ Notes: - Live terminal streaming is on by default. - Telegram live deltas are sent every 30s only when content changes. - Telegram control commands are enabled by default (`/inject`, `/status`, `/stop`). -- Daemon defaults to the `quality` model preset (`gpt-5.4` + `high`) unless you override it. +- Daemon defaults to the `codex-xhigh` model preset (`gpt-5.3-codex` + `xhigh`) unless you override it. Control examples from Telegram Web: @@ -90,7 +126,7 @@ Defaults: - Idle daemon tries to resume from the last saved `session_id`. - One Telegram token can only be used by one active daemon. - Operator messages are recorded into per-run markdown files in `.codex_daemon/logs/`. -- Daemon child model preset defaults to `cheap`. +- Daemon child model preset defaults to `codex-xhigh`. - Re-running setup/start will stop the previous daemon for the same `.codex_daemon` before starting a new one. One-click kill: @@ -118,7 +154,7 @@ codex-autoloop \ --dashboard \ --dashboard-host 127.0.0.1 \ --dashboard-port 8787 \ - --max-rounds 100 \ + --max-rounds 500 \ "your objective" ``` diff --git a/README.md b/README.md index cbd88cf..a3d6a18 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,8 @@ This solves the common "agent stopped early and asked for next instruction" prob Current defaults: -- `max_rounds` defaults to `100`. -- Daemon child model preset defaults to `quality` (`gpt-5.4` + `high`) unless overridden. +- `max_rounds` defaults to `500`. +- Daemon child model defaults now inherit Codex CLI global settings unless you explicitly set a preset/override. - Daemon-launched idle runs try to resume from the last saved `session_id` before starting a fresh thread. ## Current Feature Snapshot @@ -27,8 +27,10 @@ Current defaults: - Daemon follow-up prompt: after a run ends, Telegram can offer the planner's next suggested objective as a one-click continuation. - Planner modes: `off`, `auto`, `record`; setup defaults to `auto`. - Dual control channels for daemon: Telegram and terminal (`codex-autoloop-daemon-ctl`). +- Single-word operator entrypoint: `codexloop` (first run setup, later auto-attach monitor). - Token-exclusive daemon lock: one active daemon per Telegram token. - Operator message history persisted to markdown and fed to reviewer decisions. +- Run archive persisted as JSONL with date/workspace/session metadata for resume continuity. - Utility scripts: start/kill/watch daemon logs, plus sanitized cross-project setup examples. ## Why this is a plugin, not a native flag @@ -43,27 +45,53 @@ source .venv/bin/activate pip install -e . ``` -## Build and Launch +## One-word operator workflow (`codexloop`) -For a local development setup: +Run: ```bash -python -m venv .venv -source .venv/bin/activate -pip install -e . -python -m codex_autoloop.setup_wizard --run-cd . +codexloop ``` -This gives you: +List supported features/commands: + +```bash +codexloop help +``` + +Behavior: + +- First run: asks for Telegram token/chat id, uses current shell directory as run working directory, writes `.codex_daemon/daemon_config.json`, starts daemon. +- Later runs: reuses config, ensures daemon is running, then directly attaches to live output. +- `codexloop init`: stops all current codexloop daemons, prompts token/chat id/model selection/play mode, starts daemon in background, then exits. +- After `init`, run `codexloop` to attach monitor to that background daemon. +- Same terminal can control daemon/run: + - `/run ` + - `/inject ` + - `/status`, `/stop`, `/fresh`, `/disable` (alias of `/daemon-stop`), `/daemon-stop` + - plain text auto-routes: running => inject, idle => run -- the CLI entrypoints -- an interactive daemon setup -- a Telegram-controlled long-running loop +Play Mode semantics: -If you do not want the daemon yet, run the loop directly: +- `execute-only`: only execute user commands, no plan agent. +- `fully-plan` (default): after a run finishes, daemon generates next request after 10 minutes; if not overridden within another 10 minutes, it auto-runs that request. +- `record-only`: plan agent degrades to markdown table recorder; reviewer behavior stays unchanged. + +YOLO policy: + +- Daemon-launched runs always use `--yolo` by default. + +Directly disable daemon from terminal: ```bash -codex-autoloop --max-rounds 10 "Implement feature X and keep iterating until tests pass" +codexloop disable +``` + +You can still use low-level commands when needed: + +```bash +codex-autoloop-daemon-ctl --bus-dir .codex_daemon/bus status +codex-autoloop-daemon-ctl --bus-dir .codex_daemon/bus inject "先修测试再继续" ``` ## Run @@ -300,10 +328,14 @@ Default behavior for daemon-launched runs: - `--yolo` is enabled by default. - No default `--check` is enforced unless you set one. -- Daemon defaults to the `cheap` model preset unless you override it. +- Daemon-launched runs inherit Codex CLI default model settings unless you explicitly set preset/overrides. - When the daemon is idle, a new `/run` or terminal `run` command will reuse the last saved `session_id` if available. - One Telegram token can only be owned by one active daemon process (second daemon returns an error). -- Operator messages (initial objective + terminal/Telegram injects) are written to per-run markdown files in the daemon logs directory. +- In daemon mode, only daemon polls Telegram updates; child runs receive control via daemon bus (avoids getUpdates 409 conflicts). +- If daemon detects `invalid encrypted content` from a resumed run, it raises a warning and auto-arms fresh session for the next run. +- Inside a running child loop, `invalid_encrypted_content` now triggers an immediate in-loop fresh-session retry instead of spinning reviewer `continue` loops. +- Operator messages (initial objective + terminal/Telegram injects) are appended to a shared `.codex_daemon/logs/operator_messages.md` so reviewer can see global inject history across runs. +- Each run also appends start/finish records into `.codex_daemon/logs/codexloop-run-archive.jsonl` (includes date + workspace + session metadata) for continuity and auditing. - Re-running setup or start script will stop the previous daemon under the same `home-dir` before launching the new one. After setup, use terminal control: diff --git a/codex_autoloop/cli.py b/codex_autoloop/cli.py index 50876b4..c478801 100644 --- a/codex_autoloop/cli.py +++ b/codex_autoloop/cli.py @@ -337,7 +337,7 @@ def build_parser() -> argparse.ArgumentParser: parser.add_argument("objective", nargs="+", help="Task objective passed to the primary agent.") parser.add_argument("--codex-bin", default="codex", help="Codex CLI binary path.") parser.add_argument("--session-id", default=None, help="Resume an existing Codex exec session id.") - parser.add_argument("--max-rounds", type=int, default=100, help="Maximum primary-agent rounds.") + parser.add_argument("--max-rounds", type=int, default=500, help="Maximum primary-agent rounds.") parser.add_argument( "--max-no-progress-rounds", type=int, diff --git a/codex_autoloop/codex_runner.py b/codex_autoloop/codex_runner.py index 4a5557f..885396b 100644 --- a/codex_autoloop/codex_runner.py +++ b/codex_autoloop/codex_runner.py @@ -224,6 +224,9 @@ def consume_pipe(stream_name: str, pipe) -> None: fatal_error = watchdog_reason elif turn_completed and not turn_failed: fatal_error = None + elif process.returncode != 0 and fatal_error is None: + turn_failed = True + fatal_error = f"Process exited with code {process.returncode} before turn completion." return CodexRunResult( command=command, diff --git a/codex_autoloop/codexloop.py b/codex_autoloop/codexloop.py new file mode 100644 index 0000000..8f24979 --- /dev/null +++ b/codex_autoloop/codexloop.py @@ -0,0 +1,783 @@ +from __future__ import annotations + +import argparse +import getpass +import json +import os +import select +import signal +import shutil +import subprocess +import sys +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from .daemon_bus import BusCommand, JsonlCommandBus, read_status +from .model_catalog import MODEL_PRESETS + +DEFAULT_HOME_DIR = ".codex_daemon" +DEFAULT_TOKEN_LOCK_DIR = "/tmp/codex-autoloop-token-locks" +DEFAULT_MAX_ROUNDS = 500 + + +@dataclass +class TerminalCommand: + kind: str + text: str = "" + + +@dataclass(frozen=True) +class PlayMode: + name: str + run_plan_mode: str + note: str + + +PLAY_MODES: list[PlayMode] = [ + PlayMode( + name="execute-only", + run_plan_mode="execute-only", + note="Only execute user command; no plan agent.", + ), + PlayMode( + name="fully-plan", + run_plan_mode="fully-plan", + note="Default: full plan agent; propose in 10m and auto-run in another 10m if unchanged.", + ), + PlayMode( + name="record-only", + run_plan_mode="record-only", + note="Plan agent degrades to table recorder; reviewer remains unchanged.", + ), +] + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + if args.subcommand == "help": + print(supported_features_text()) + return + home_dir = Path(args.home_dir).resolve() + home_dir.mkdir(parents=True, exist_ok=True) + + if shutil.which("codex") is None: + parser.error("codex CLI not found in PATH. Install/configure codex first.") + + config_path = home_dir / "daemon_config.json" + config = load_config(config_path) + current_run_cwd = Path(args.run_cd).resolve() if args.run_cd else Path.cwd().resolve() + if args.subcommand == "init": + stop_all_codexloop_loops( + home_dir=home_dir, + config=config, + token_lock_dir=args.token_lock_dir, + ) + config = run_interactive_config(home_dir=home_dir, run_cd=current_run_cwd) + save_config(config_path, config) + print(f"Saved config: {config_path}") + pid = ensure_daemon_running(config=config, home_dir=home_dir, token_lock_dir=args.token_lock_dir) + print(f"Daemon running in background. pid={pid}") + print("Use `codexloop` to attach monitor and terminal control.") + return + if args.reconfigure or config is None or not is_config_usable(config): + if args.reconfigure: + stop_all_codexloop_loops( + home_dir=home_dir, + config=config, + token_lock_dir=args.token_lock_dir, + ) + config = run_interactive_config(home_dir=home_dir, run_cd=current_run_cwd) + save_config(config_path, config) + print(f"Saved config: {config_path}") + + if args.subcommand is None: + ensure_daemon_running(config=config, home_dir=home_dir, token_lock_dir=args.token_lock_dir) + run_monitor_console( + config=config, + home_dir=home_dir, + token_lock_dir=args.token_lock_dir, + tail_lines=max(1, int(args.tail_lines)), + ) + return + + bus_dir = resolve_bus_dir(config, home_dir) + if args.subcommand == "status": + payload = read_status(bus_dir / "daemon_status.json") + if payload is None: + print("No daemon status found.") + raise SystemExit(1) + print(json.dumps(payload, ensure_ascii=True, indent=2)) + return + + if args.subcommand in {"run", "inject", "fresh"}: + ensure_daemon_running(config=config, home_dir=home_dir, token_lock_dir=args.token_lock_dir) + + if args.subcommand == "disable": + kind = "daemon-stop" + elif args.subcommand == "fresh": + kind = "fresh-session" + else: + kind = str(args.subcommand) + text = " ".join(args.text).strip() if hasattr(args, "text") else "" + publish_command(bus_dir=bus_dir, kind=kind, text=text, source="terminal") + print(f"Sent: {args.subcommand}") + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + prog="codexloop", + description=( + "Single-word entrypoint for codex-autoloop daemon. " + "First run configures + starts daemon, later runs attach and monitor." + ), + ) + parser.add_argument( + "--home-dir", + default=DEFAULT_HOME_DIR, + help="Daemon home directory (stores config/pid/logs/bus).", + ) + parser.add_argument( + "--run-cd", + default=None, + help="Run working directory for daemon-launched runs. Defaults to current shell directory.", + ) + parser.add_argument( + "--token-lock-dir", + default=DEFAULT_TOKEN_LOCK_DIR, + help="Global lock directory to enforce one daemon per Telegram token.", + ) + parser.add_argument( + "--tail-lines", + type=int, + default=20, + help="Initial number of lines to show for each followed log file.", + ) + parser.add_argument( + "--reconfigure", + action="store_true", + help="Re-run interactive setup and overwrite daemon config.", + ) + + sub = parser.add_subparsers(dest="subcommand") + sub.add_parser("help", help="Show supported codexloop features and commands.") + sub.add_parser("init", help="Stop current codexloop loops, reconfigure, and restart fresh daemon.") + sub.add_parser("status", help="Show daemon status JSON.") + run = sub.add_parser("run", help="Start a run objective.") + run.add_argument("text", nargs="+", help="Objective text.") + inject = sub.add_parser("inject", help="Inject instruction into active run.") + inject.add_argument("text", nargs="+", help="Instruction text.") + sub.add_parser("stop", help="Stop active run.") + sub.add_parser("fresh", help="Force next run to start with a fresh session (no resume).") + sub.add_parser("disable", help="Disable codexloop daemon (alias of daemon-stop).") + sub.add_parser("daemon-stop", help="Stop daemon process.") + return parser + + +def supported_features_text() -> str: + return "\n".join( + [ + "codexloop supported features", + "", + "Top-level commands:", + " codexloop", + " Attach monitor; auto-start daemon if needed.", + " codexloop help", + " Show this feature list.", + " codexloop init", + " Stop all current codexloop loops, collect new config, and restart daemon in background.", + " codexloop status", + " Print daemon status JSON.", + " codexloop run ", + " Start a new run objective.", + " codexloop inject ", + " Inject instruction into active run.", + " codexloop stop", + " Stop active run only.", + " codexloop fresh", + " Mark next run as fresh session (ignore saved session_id).", + " codexloop disable", + " Stop daemon process (alias of codexloop daemon-stop).", + " codexloop daemon-stop", + " Stop daemon process.", + "", + "Attached monitor console commands:", + " /status /run /inject /stop /fresh /disable /daemon-stop /help /exit", + " Plain text routes to /inject when running, else to /run.", + "", + "Play Mode:", + " 1) execute-only 2) fully-plan (default) 3) record-only", + "", + "Run working directory:", + " By default, uses the shell current working directory when config is created.", + ] + ) + + +def load_config(path: Path) -> dict[str, Any] | None: + if not path.exists(): + return None + try: + payload = json.loads(path.read_text(encoding="utf-8")) + except Exception: + return None + if not isinstance(payload, dict): + return None + return payload + + +def save_config(path: Path, payload: dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps(payload, ensure_ascii=True, indent=2), encoding="utf-8") + path.chmod(0o600) + + +def run_interactive_config(*, home_dir: Path, run_cd: Path) -> dict[str, Any]: + run_cd = run_cd.resolve() + print("codexloop first-time setup") + token = prompt_token() + chat_id = prompt_chat_id() + check_cmd = prompt_input("Default check command (optional): ", default="").strip() + model_preset = prompt_model_choice() + play_mode = prompt_play_mode() + print(f"Run working directory: {run_cd}") + return { + "telegram_bot_token": token, + "telegram_chat_id": chat_id, + "run_cd": str(run_cd), + "run_check": (check_cmd if check_cmd else None), + "run_max_rounds": DEFAULT_MAX_ROUNDS, + "run_skip_git_repo_check": False, + "run_full_auto": False, + "run_yolo": True, + "run_plan_mode": play_mode.run_plan_mode, + "run_plan_request_delay_seconds": 600, + "run_plan_auto_execute_delay_seconds": 600, + "run_plan_record_file": None, + "run_resume_last_session": True, + "run_main_reasoning_effort": None, + "run_reviewer_reasoning_effort": None, + "run_main_model": None, + "run_reviewer_model": None, + "run_model_preset": model_preset, + "bus_dir": str((home_dir / "bus").resolve()), + "logs_dir": str((home_dir / "logs").resolve()), + } + + +def is_config_usable(config: dict[str, Any]) -> bool: + token = str(config.get("telegram_bot_token") or "").strip() + run_cd = str(config.get("run_cd") or "").strip() + return looks_like_token(token) and bool(run_cd) + + +def prompt_input(prompt: str, default: str) -> str: + raw = input(prompt).strip() + if not raw: + return default + return raw + + +def prompt_secret(prompt: str) -> str: + return getpass.getpass(prompt).strip() + + +def prompt_token() -> str: + while True: + token = prompt_secret("Telegram bot token: ") + if looks_like_token(token): + return token + print("Invalid token format. Expected :. Please try again.", file=sys.stderr) + + +def prompt_chat_id() -> str: + while True: + value = prompt_input("Telegram chat id (or 'auto'): ", default="auto").strip() or "auto" + if value.lower() == "auto" or looks_like_chat_id(value): + return value + print("Invalid chat id. Use 'auto' or numeric id like 123456 or -100123456.", file=sys.stderr) + + +def prompt_model_choice() -> str | None: + print("Choose model preset:") + print(" 0. inherit codex default (recommended)") + for idx, preset in enumerate(MODEL_PRESETS, start=1): + print( + f" {idx}. {preset.name}: " + f"main={preset.main_model}/{preset.main_reasoning_effort}, " + f"reviewer={preset.reviewer_model}/{preset.reviewer_reasoning_effort}" + ) + while True: + raw = prompt_input("Preset number: ", default="0").strip() + try: + index = int(raw) + except ValueError: + print("Invalid selection. Enter a number from the list.", file=sys.stderr) + continue + if index == 0: + return None + if 1 <= index <= len(MODEL_PRESETS): + return MODEL_PRESETS[index - 1].name + print("Selection out of range. Please choose one of the listed numbers.", file=sys.stderr) + + +def prompt_play_mode() -> PlayMode: + print("Choose Play Mode:") + for idx, mode in enumerate(PLAY_MODES, start=1): + print(f" {idx}. {mode.name}: {mode.note}") + default_index = next((idx for idx, mode in enumerate(PLAY_MODES, start=1) if mode.name == "fully-plan"), 1) + while True: + raw = prompt_input("Play Mode number: ", default=str(default_index)).strip() + try: + index = int(raw) + except ValueError: + print("Invalid selection. Enter a number from the list.", file=sys.stderr) + continue + if 1 <= index <= len(PLAY_MODES): + return PLAY_MODES[index - 1] + print("Selection out of range. Please choose one of the listed numbers.", file=sys.stderr) + + +def looks_like_token(token: str) -> bool: + if ":" not in token: + return False + left, right = token.split(":", 1) + return left.isdigit() and bool(right.strip()) + + +def looks_like_chat_id(value: str) -> bool: + if not value: + return False + if value.startswith("-"): + return value[1:].isdigit() + return value.isdigit() + + +def resolve_daemon_launch_prefix() -> list[str]: + daemon_bin = shutil.which("codex-autoloop-telegram-daemon") + if daemon_bin: + return [daemon_bin] + return [sys.executable, "-m", "codex_autoloop.telegram_daemon"] + + +def resolve_bus_dir(config: dict[str, Any], home_dir: Path) -> Path: + raw = str(config.get("bus_dir") or (home_dir / "bus")) + return Path(raw).expanduser().resolve() + + +def resolve_logs_dir(config: dict[str, Any], home_dir: Path) -> Path: + raw = str(config.get("logs_dir") or (home_dir / "logs")) + return Path(raw).expanduser().resolve() + + +def read_pid(pid_path: Path) -> int | None: + if not pid_path.exists(): + return None + try: + raw = pid_path.read_text(encoding="utf-8").strip() + pid = int(raw) + except Exception: + return None + return pid if pid > 0 else None + + +def is_process_running(pid: int | None) -> bool: + if pid is None: + return False + try: + os.kill(pid, 0) + except OSError: + return False + return True + + +def ensure_daemon_running(*, config: dict[str, Any], home_dir: Path, token_lock_dir: str) -> int: + pid_path = home_dir / "daemon.pid" + existing_pid = read_pid(pid_path) + if is_process_running(existing_pid): + assert existing_pid is not None + return existing_pid + + bus_dir = resolve_bus_dir(config, home_dir) + logs_dir = resolve_logs_dir(config, home_dir) + bus_dir.mkdir(parents=True, exist_ok=True) + logs_dir.mkdir(parents=True, exist_ok=True) + daemon_log = home_dir / "daemon.out" + cmd = build_daemon_command( + config=config, + home_dir=home_dir, + token_lock_dir=token_lock_dir, + ) + with daemon_log.open("a", encoding="utf-8") as stream: + proc = subprocess.Popen( + cmd, + stdout=stream, + stderr=stream, + stdin=subprocess.DEVNULL, + start_new_session=True, + ) + time.sleep(1.0) + if proc.poll() is not None: + print("Daemon failed to start. Recent daemon.out:", file=sys.stderr) + print(read_log_tail(daemon_log, max_lines=30), file=sys.stderr) + raise SystemExit(2) + pid_path.write_text(str(proc.pid), encoding="utf-8") + pid_path.chmod(0o600) + print(f"Daemon started. pid={proc.pid}") + return proc.pid + + +def stop_all_codexloop_loops(*, home_dir: Path, config: dict[str, Any] | None, token_lock_dir: str) -> None: + stopped = stop_current_home_daemon(home_dir=home_dir, config=config) + global_stopped = stop_global_daemons_from_token_locks(token_lock_dir=token_lock_dir) + if stopped or global_stopped: + total = int(stopped) + len(global_stopped) + print(f"Stopped {total} codexloop daemon process(es).") + + +def stop_current_home_daemon(*, home_dir: Path, config: dict[str, Any] | None) -> bool: + pid_path = home_dir / "daemon.pid" + pid = read_pid(pid_path) + bus_dir = resolve_bus_dir(config, home_dir) if config else (home_dir / "bus").resolve() + publish_daemon_stop_if_possible(bus_dir=bus_dir) + if pid is None: + pid_path.unlink(missing_ok=True) + return False + if wait_process_exit(pid, timeout_seconds=3.0): + pid_path.unlink(missing_ok=True) + return True + terminated = terminate_process_tree(pid) + pid_path.unlink(missing_ok=True) + return terminated + + +def stop_global_daemons_from_token_locks(*, token_lock_dir: str) -> list[int]: + lock_dir = Path(token_lock_dir).resolve() + if not lock_dir.exists(): + return [] + stopped: list[int] = [] + for meta_path in lock_dir.glob("*.json"): + payload = load_config(meta_path) + if payload is None: + continue + pid_raw = payload.get("pid") + bus_dir_raw = payload.get("bus_dir") + if isinstance(bus_dir_raw, str) and bus_dir_raw.strip(): + publish_daemon_stop_if_possible(bus_dir=Path(bus_dir_raw).expanduser().resolve()) + pid = parse_pid(pid_raw) + if pid is None: + continue + if wait_process_exit(pid, timeout_seconds=2.0): + stopped.append(pid) + meta_path.unlink(missing_ok=True) + continue + if terminate_process_tree(pid): + stopped.append(pid) + meta_path.unlink(missing_ok=True) + return stopped + + +def parse_pid(value: Any) -> int | None: + if isinstance(value, int): + return value if value > 0 else None + if isinstance(value, str) and value.isdigit(): + parsed = int(value) + return parsed if parsed > 0 else None + return None + + +def publish_daemon_stop_if_possible(*, bus_dir: Path) -> None: + try: + publish_command(bus_dir=bus_dir, kind="daemon-stop", text="", source="terminal-init") + except Exception: + return + + +def wait_process_exit(pid: int, *, timeout_seconds: float) -> bool: + if not is_process_running(pid): + return True + deadline = time.time() + max(0.0, timeout_seconds) + while time.time() < deadline: + if not is_process_running(pid): + return True + time.sleep(0.2) + return not is_process_running(pid) + + +def terminate_process_tree(pid: int) -> bool: + if not is_process_running(pid): + return True + try: + os.kill(pid, signal.SIGTERM) + except OSError: + return True + if wait_process_exit(pid, timeout_seconds=3.0): + return True + try: + os.kill(pid, signal.SIGKILL) + except OSError: + return True + return wait_process_exit(pid, timeout_seconds=1.0) + + +def build_daemon_command(*, config: dict[str, Any], home_dir: Path, token_lock_dir: str) -> list[str]: + token = str(config.get("telegram_bot_token") or "").strip() + if not token: + raise SystemExit("Missing telegram_bot_token in daemon config.") + run_cd = str(config.get("run_cd") or ".") + chat_id = str(config.get("telegram_chat_id") or "auto") + bus_dir = resolve_bus_dir(config, home_dir) + logs_dir = resolve_logs_dir(config, home_dir) + cmd = [ + *resolve_daemon_launch_prefix(), + "--telegram-bot-token", + token, + "--telegram-chat-id", + chat_id, + "--run-cd", + str(Path(run_cd).expanduser().resolve()), + "--run-max-rounds", + str(int(config.get("run_max_rounds", DEFAULT_MAX_ROUNDS))), + "--bus-dir", + str(bus_dir), + "--logs-dir", + str(logs_dir), + "--run-state-file", + str((home_dir / "last_state.json").resolve()), + "--token-lock-dir", + token_lock_dir, + "--run-plan-mode", + str(config.get("run_plan_mode") or "fully-plan"), + "--run-plan-request-delay-seconds", + str(int(config.get("run_plan_request_delay_seconds", 600))), + "--run-plan-auto-execute-delay-seconds", + str(int(config.get("run_plan_auto_execute_delay_seconds", 600))), + ] + run_check = config.get("run_check") + if isinstance(run_check, str) and run_check.strip(): + cmd.extend(["--run-check", run_check.strip()]) + elif isinstance(run_check, list): + for item in run_check: + value = str(item).strip() + if value: + cmd.extend(["--run-check", value]) + run_model_preset = str(config.get("run_model_preset") or "").strip() + if run_model_preset: + cmd.extend(["--run-model-preset", run_model_preset]) + run_plan_record_file = str(config.get("run_plan_record_file") or "").strip() + if run_plan_record_file: + cmd.extend(["--run-plan-record-file", run_plan_record_file]) + run_main_model = str(config.get("run_main_model") or "").strip() + if run_main_model: + cmd.extend(["--run-main-model", run_main_model]) + run_main_effort = str(config.get("run_main_reasoning_effort") or "").strip() + if run_main_effort: + cmd.extend(["--run-main-reasoning-effort", run_main_effort]) + run_reviewer_model = str(config.get("run_reviewer_model") or "").strip() + if run_reviewer_model: + cmd.extend(["--run-reviewer-model", run_reviewer_model]) + run_reviewer_effort = str(config.get("run_reviewer_reasoning_effort") or "").strip() + if run_reviewer_effort: + cmd.extend(["--run-reviewer-reasoning-effort", run_reviewer_effort]) + if bool(config.get("run_skip_git_repo_check")): + cmd.append("--run-skip-git-repo-check") + if bool(config.get("run_full_auto")): + cmd.append("--run-full-auto") + cmd.append("--run-yolo") + if bool(config.get("run_resume_last_session", True)): + cmd.append("--run-resume-last-session") + else: + cmd.append("--no-run-resume-last-session") + return cmd + + +def read_log_tail(path: Path, *, max_lines: int) -> str: + if not path.exists(): + return "" + try: + lines = path.read_text(encoding="utf-8").splitlines() + except Exception: + return "" + if len(lines) <= max_lines: + return "\n".join(lines) + return "\n".join(lines[-max_lines:]) + + +def publish_command(*, bus_dir: Path, kind: str, text: str, source: str) -> None: + bus_dir.mkdir(parents=True, exist_ok=True) + bus = JsonlCommandBus(bus_dir / "daemon_commands.jsonl") + bus.publish(BusCommand(kind=kind, text=text, source=source, ts=time.time())) + + +def run_monitor_console( + *, + config: dict[str, Any], + home_dir: Path, + token_lock_dir: str, + tail_lines: int, +) -> None: + bus_dir = resolve_bus_dir(config, home_dir) + logs_dir = resolve_logs_dir(config, home_dir) + daemon_out = home_dir / "daemon.out" + events_log = logs_dir / "daemon-events.jsonl" + status_path = bus_dir / "daemon_status.json" + + print("Attached to codexloop daemon.") + print( + "Commands: /status /run /inject /stop /fresh /disable /daemon-stop /exit" + ) + print("Plain text: running -> inject, idle -> run") + print("") + + tracked_offsets: dict[Path, int] = {} + file_labels: dict[Path, str] = {} + + def ensure_tracked(path: Path, label: str) -> None: + if path in tracked_offsets: + return + if path.exists(): + for line in tail_file(path, max_lines=tail_lines): + print(f"[{label}] {line}") + tracked_offsets[path] = path.stat().st_size + else: + tracked_offsets[path] = 0 + file_labels[path] = label + + ensure_tracked(daemon_out, "daemon") + ensure_tracked(events_log, "events") + + last_child_log: Path | None = None + while True: + status_payload = read_status(status_path) or {} + child_log_raw = status_payload.get("child_log_path") + child_log_path = Path(child_log_raw).resolve() if isinstance(child_log_raw, str) and child_log_raw else None + if child_log_path is not None and child_log_path != last_child_log: + print(f"[monitor] child log switched to: {child_log_path}") + ensure_tracked(child_log_path, "child") + last_child_log = child_log_path + + for path, offset in list(tracked_offsets.items()): + lines, next_offset = read_new_lines(path, offset) + tracked_offsets[path] = next_offset + label = file_labels.get(path, "log") + for line in lines: + print(f"[{label}] {line}") + + line = read_input_line(timeout_seconds=0.5) + if line is None: + continue + if line == "__EOF__": + print("Input closed. Exiting monitor.") + return + + running = bool((read_status(status_path) or {}).get("running")) + parsed = parse_terminal_command(line, running=running) + if parsed is None: + continue + if parsed.kind == "exit": + print("Leaving monitor.") + return + if parsed.kind == "help": + print( + "Commands: /status /run /inject /stop /fresh /disable /daemon-stop /exit\n" + "Plain text routes to inject when running, else run." + ) + continue + if parsed.kind == "status": + payload = read_status(status_path) + if payload is None: + print("No daemon status found.") + else: + print(json.dumps(payload, ensure_ascii=True, indent=2)) + continue + if parsed.kind in {"run", "inject"}: + ensure_daemon_running(config=config, home_dir=home_dir, token_lock_dir=token_lock_dir) + publish_command(bus_dir=bus_dir, kind=parsed.kind, text=parsed.text, source="terminal-console") + print(f"Sent: {parsed.kind}") + + +def tail_file(path: Path, *, max_lines: int) -> list[str]: + if not path.exists(): + return [] + try: + lines = path.read_text(encoding="utf-8").splitlines() + except Exception: + return [] + if len(lines) <= max_lines: + return lines + return lines[-max_lines:] + + +def read_new_lines(path: Path, offset: int) -> tuple[list[str], int]: + if not path.exists(): + return [], offset + size = path.stat().st_size + if size < offset: + offset = 0 + if size == offset: + return [], offset + try: + with path.open("r", encoding="utf-8") as stream: + stream.seek(offset) + chunk = stream.read() + next_offset = stream.tell() + except Exception: + return [], offset + if not chunk: + return [], next_offset + return chunk.splitlines(), next_offset + + +def read_input_line(*, timeout_seconds: float) -> str | None: + try: + ready, _, _ = select.select([sys.stdin], [], [], timeout_seconds) + except Exception: + time.sleep(timeout_seconds) + return None + if not ready: + return None + line = sys.stdin.readline() + if line == "": + return "__EOF__" + return line.rstrip("\n") + + +def parse_terminal_command(raw: str, *, running: bool) -> TerminalCommand | None: + text = raw.strip() + if not text: + return None + if text in {"/exit", "/quit", "exit", "quit"}: + return TerminalCommand(kind="exit") + if text in {"/help", "help"}: + return TerminalCommand(kind="help") + if text in {"/status", "status"}: + return TerminalCommand(kind="status") + if text in {"/stop", "stop"}: + return TerminalCommand(kind="stop") + if text in {"/fresh", "fresh", "/fresh-session", "fresh-session", "/new-session", "new-session"}: + return TerminalCommand(kind="fresh-session") + if text in {"/disable", "disable", "/daemon-stop", "daemon-stop"}: + return TerminalCommand(kind="daemon-stop") + if text.startswith("/run "): + objective = text[len("/run ") :].strip() + if not objective: + return None + return TerminalCommand(kind="run", text=objective) + if text == "/run": + return None + if text.startswith("/inject "): + instruction = text[len("/inject ") :].strip() + if not instruction: + return None + return TerminalCommand(kind="inject", text=instruction) + if text == "/inject": + return None + if text.startswith("/"): + return None + return TerminalCommand(kind="inject" if running else "run", text=text) + + +if __name__ == "__main__": + main() diff --git a/codex_autoloop/control_state.py b/codex_autoloop/control_state.py index 4acca92..8b618f8 100644 --- a/codex_autoloop/control_state.py +++ b/codex_autoloop/control_state.py @@ -4,6 +4,7 @@ from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path +import re @dataclass @@ -20,8 +21,8 @@ def __init__(self, operator_messages_file: str | None = None) -> None: self._interrupt_reason: str | None = None self._pending_instruction: str | None = None self._stop_requested = False - self._messages: list[OperatorMessage] = [] self._operator_messages_file = operator_messages_file + self._messages: list[OperatorMessage] = self._load_messages_from_file(operator_messages_file) def request_inject(self, instruction: str, source: str = "operator") -> None: text = instruction.strip() @@ -107,3 +108,31 @@ def _write_messages_doc_locked(self) -> None: for item in self._messages: lines.append(f"- `{item.ts}` `{item.source}` `{item.kind}`: {item.text}") path.write_text("\n".join(lines) + "\n", encoding="utf-8") + + @staticmethod + def _load_messages_from_file(path: str | None) -> list[OperatorMessage]: + if not path: + return [] + p = Path(path) + if not p.exists(): + return [] + try: + content = p.read_text(encoding="utf-8") + except Exception: + return [] + out: list[OperatorMessage] = [] + pattern = re.compile(r"^- `([^`]+)` `([^`]+)` `([^`]+)`: (.*)$") + for line in content.splitlines(): + match = pattern.match(line.strip()) + if not match: + continue + ts, source, kind, text = match.groups() + out.append( + OperatorMessage( + ts=ts.strip(), + source=source.strip(), + kind=kind.strip(), + text=text.strip(), + ) + ) + return out diff --git a/codex_autoloop/model_catalog.py b/codex_autoloop/model_catalog.py index b754582..7f654e5 100644 --- a/codex_autoloop/model_catalog.py +++ b/codex_autoloop/model_catalog.py @@ -34,6 +34,9 @@ class ModelEntry: ] +DEFAULT_MODEL_PRESET = "codex-xhigh" + + MODEL_PRESETS: list[ModelPreset] = [ ModelPreset( name="quality", diff --git a/codex_autoloop/orchestrator.py b/codex_autoloop/orchestrator.py index dcb7297..2e8c7b5 100644 --- a/codex_autoloop/orchestrator.py +++ b/codex_autoloop/orchestrator.py @@ -1,7 +1,6 @@ from __future__ import annotations import json -import threading from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path @@ -9,9 +8,7 @@ from .checks import all_checks_passed, run_checks from .codex_runner import CodexRunner, InactivitySnapshot, RunnerOptions -from .models import PlanSnapshot, ReviewDecision, RoundSummary -from .planner_modes import planner_mode_enabled -from .planner import Planner, PlannerConfig, format_plan_todo_markdown +from .models import ReviewDecision, RoundSummary from .reviewer import Reviewer, ReviewerConfig from .stall_subagent import analyze_stall @@ -21,7 +18,7 @@ @dataclass class AutoLoopConfig: objective: str - max_rounds: int = 100 + max_rounds: int = 500 max_no_progress_rounds: int = 3 check_commands: list[str] | None = None check_timeout_seconds: int = 1200 @@ -29,24 +26,16 @@ class AutoLoopConfig: main_reasoning_effort: str | None = None reviewer_model: str | None = None reviewer_reasoning_effort: str | None = None - planner_model: str | None = None - planner_reasoning_effort: str | None = None - planner_mode: str = "auto" main_extra_args: list[str] | None = None reviewer_extra_args: list[str] | None = None - planner_extra_args: list[str] | None = None skip_git_repo_check: bool = False full_auto: bool = False dangerous_yolo: bool = False state_file: str | None = None - plan_report_file: str | None = None - plan_todo_file: str | None = None initial_session_id: str | None = None loop_event_callback: LoopEventCallback | None = None - stall_soft_idle_seconds: int = 3600 + stall_soft_idle_seconds: int = 1200 stall_hard_idle_seconds: int = 10800 - plan_update_interval_seconds: int = 1800 - planner_enabled: bool = True external_interrupt_reason_provider: Callable[[], str | None] | None = None pending_instruction_consumer: Callable[[], str | None] | None = None stop_requested_checker: Callable[[], bool] | None = None @@ -59,50 +48,19 @@ class AutoLoopResult: session_id: str | None rounds: list[RoundSummary] stop_reason: str - plan: PlanSnapshot | None = None - - -@dataclass -class _PlannerContext: - round_index: int - session_id: str | None - rounds: list[RoundSummary] - latest_review: ReviewDecision | None - latest_checks: list[Any] - stop_reason: str | None class AutoLoopOrchestrator: - def __init__( - self, - runner: CodexRunner, - reviewer: Reviewer, - planner: Planner | None, - config: AutoLoopConfig, - ) -> None: + def __init__(self, runner: CodexRunner, reviewer: Reviewer, config: AutoLoopConfig) -> None: self.runner = runner self.reviewer = reviewer - self.planner = planner self.config = config - self._planner_context_lock = threading.Lock() - self._planner_run_lock = threading.Lock() - self._planner_stop_event = threading.Event() - self._planner_thread: threading.Thread | None = None - self._latest_plan: PlanSnapshot | None = None - self._planner_context = _PlannerContext( - round_index=0, - session_id=config.initial_session_id, - rounds=[], - latest_review=None, - latest_checks=[], - stop_reason=None, - ) def run(self) -> AutoLoopResult: rounds: list[RoundSummary] = [] session_id = self.config.initial_session_id no_progress_rounds = 0 - previous_main_message = "" + previous_progress_signature = "" next_main_prompt = self._initial_main_prompt(self.config.objective) self._emit( { @@ -112,289 +70,286 @@ def run(self) -> AutoLoopResult: "session_id": session_id, } ) - self._persist_state(rounds=rounds, session_id=session_id, current_review=None) - self._start_planner_loop() - self._run_planner_update(trigger="initial", terminal=False, wait=True) - self._persist_state(rounds=rounds, session_id=session_id, current_review=None) - - try: - for round_index in range(1, self.config.max_rounds + 1): - if self._is_stop_requested(): - return self._finish_loop( - success=False, - session_id=session_id, - rounds=rounds, - stop_reason="Stopped by operator command.", - round_index=round_index - 1, - ) - def inactivity_callback(snapshot: InactivitySnapshot) -> str: - return self._handle_inactivity(round_index=round_index, snapshot=snapshot) - - self._emit( - { - "type": "round.started", - "round_index": round_index, - "session_id": session_id, - } - ) - self._update_planner_context( - round_index=round_index, + for round_index in range(1, self.config.max_rounds + 1): + if self._is_stop_requested(): + result = AutoLoopResult( + success=False, session_id=session_id, rounds=rounds, - latest_review=self._planner_context.latest_review, - latest_checks=self._planner_context.latest_checks, - stop_reason=None, + stop_reason="Stopped by operator command.", ) + self._emit({"type": "loop.completed", "success": result.success, "stop_reason": result.stop_reason}) + return result - main_result = self.runner.run_exec( - prompt=next_main_prompt, - resume_thread_id=session_id, - options=RunnerOptions( - model=self.config.main_model, - reasoning_effort=self.config.main_reasoning_effort, - dangerous_yolo=self.config.dangerous_yolo, - full_auto=self.config.full_auto, - skip_git_repo_check=self.config.skip_git_repo_check, - extra_args=self.config.main_extra_args, - watchdog_soft_idle_seconds=self.config.stall_soft_idle_seconds, - watchdog_hard_idle_seconds=self.config.stall_hard_idle_seconds, - inactivity_callback=inactivity_callback, - external_interrupt_reason_provider=self.config.external_interrupt_reason_provider, - ), - run_label="main", + def inactivity_callback(snapshot: InactivitySnapshot) -> str: + return self._handle_inactivity(round_index=round_index, snapshot=snapshot) + + self._emit( + { + "type": "round.started", + "round_index": round_index, + "session_id": session_id, + } + ) + resume_session_id = session_id + main_result = self.runner.run_exec( + prompt=next_main_prompt, + resume_thread_id=resume_session_id, + options=RunnerOptions( + model=self.config.main_model, + reasoning_effort=self.config.main_reasoning_effort, + dangerous_yolo=self.config.dangerous_yolo, + full_auto=self.config.full_auto, + skip_git_repo_check=self.config.skip_git_repo_check, + extra_args=self.config.main_extra_args, + watchdog_soft_idle_seconds=self.config.stall_soft_idle_seconds, + watchdog_hard_idle_seconds=self.config.stall_hard_idle_seconds, + inactivity_callback=inactivity_callback, + external_interrupt_reason_provider=self.config.external_interrupt_reason_provider, + ), + run_label="main", + ) + session_id = main_result.thread_id or session_id + interrupted = ( + main_result.fatal_error is not None + and main_result.fatal_error.startswith("External interrupt:") + ) + invalid_encrypted_content = self._looks_like_invalid_encrypted_content(main_result.fatal_error) + self._emit( + { + "type": "round.main.completed", + "round_index": round_index, + "session_id": session_id, + "exit_code": main_result.exit_code, + "turn_completed": main_result.turn_completed, + "turn_failed": False if interrupted else main_result.turn_failed, + "interrupted": interrupted, + "fatal_error": main_result.fatal_error, + "last_message": main_result.last_agent_message, + } + ) + if interrupted: + injected_instruction = self._consume_pending_instruction() + review_reason = main_result.fatal_error + next_action = "Continue with prior objective after external interruption." + if injected_instruction: + review_reason = ( + f"{main_result.fatal_error}. New operator instruction injected and will be applied." + ) + next_action = "Apply injected operator instruction in next round." + self._emit( + { + "type": "round.control.injected", + "round_index": round_index, + "instruction": injected_instruction, + } + ) + review = ReviewDecision( + status="continue", + confidence=1.0, + reason=review_reason, + next_action=next_action, ) - session_id = main_result.thread_id or session_id - self._emit( - { - "type": "round.main.completed", - "round_index": round_index, - "session_id": session_id, - "exit_code": main_result.exit_code, - "turn_completed": main_result.turn_completed, - "turn_failed": main_result.turn_failed, - "fatal_error": main_result.fatal_error, - "last_message": main_result.last_agent_message, - } + round_summary = RoundSummary( + round_index=round_index, + thread_id=session_id, + main_exit_code=main_result.exit_code, + main_turn_completed=main_result.turn_completed, + main_turn_failed=False, + checks=[], + review=review, + main_last_message=main_result.last_agent_message, ) + rounds.append(round_summary) + self._persist_state(rounds=rounds, session_id=session_id, current_review=review) - interrupted = ( - main_result.fatal_error is not None - and main_result.fatal_error.startswith("External interrupt:") - ) - if interrupted: - injected_instruction = self._consume_pending_instruction() - review_reason = main_result.fatal_error - next_action = "Continue with prior objective after external interruption." - if injected_instruction: - review_reason = ( - f"{main_result.fatal_error}. New operator instruction injected and will be applied." - ) - next_action = "Apply injected operator instruction in next round." - self._emit( - { - "type": "round.control.injected", - "round_index": round_index, - "instruction": injected_instruction, - } - ) + if self._is_stop_requested(): + result = AutoLoopResult( + success=False, + session_id=session_id, + rounds=rounds, + stop_reason="Stopped by operator command.", + ) + self._emit( + {"type": "loop.completed", "success": result.success, "stop_reason": result.stop_reason} + ) + return result + + if injected_instruction: + next_main_prompt = self._build_operator_override_prompt( + objective=self.config.objective, + instruction=injected_instruction, + ) + else: + next_main_prompt = self._build_continue_prompt( + objective=self.config.objective, + review=review, + checks_ok=False, + ) + continue + + if invalid_encrypted_content: + if resume_session_id: + self._emit( + { + "type": "round.session.reset", + "round_index": round_index, + "previous_session_id": resume_session_id, + "reason": main_result.fatal_error, + } + ) + session_id = None review = ReviewDecision( status="continue", confidence=1.0, - reason=review_reason, - next_action=next_action, + reason=( + "Main agent failed with invalid_encrypted_content while resuming session. " + "Resetting to a fresh session for the next round." + ), + next_action="Retry objective from a fresh session immediately.", ) round_summary = RoundSummary( round_index=round_index, thread_id=session_id, main_exit_code=main_result.exit_code, main_turn_completed=main_result.turn_completed, - main_turn_failed=True, + main_turn_failed=main_result.turn_failed, checks=[], review=review, main_last_message=main_result.last_agent_message, ) rounds.append(round_summary) - self._update_planner_context( - round_index=round_index, - session_id=session_id, - rounds=rounds, - latest_review=review, - latest_checks=[], - stop_reason=None, - ) - self._run_planner_update(trigger="round", terminal=False, wait=True) self._persist_state(rounds=rounds, session_id=session_id, current_review=review) - - if self._is_stop_requested(): - return self._finish_loop( - success=False, - session_id=session_id, - rounds=rounds, - stop_reason="Stopped by operator command.", - round_index=round_index, - ) - - if injected_instruction: - next_main_prompt = self._build_operator_override_prompt( - objective=self.config.objective, - instruction=injected_instruction, - ) - else: - next_main_prompt = self._build_continue_prompt( - objective=self.config.objective, - review=review, - checks_ok=False, - ) + next_main_prompt = self._build_fresh_session_retry_prompt( + objective=self.config.objective, + fatal_error=main_result.fatal_error or "", + ) continue - - checks = run_checks(self.config.check_commands or [], self.config.check_timeout_seconds) - self._emit( - { - "type": "round.checks.completed", - "round_index": round_index, - "checks": [ - { - "command": item.command, - "exit_code": item.exit_code, - "passed": item.passed, - } - for item in checks - ], - } - ) - review = self.reviewer.evaluate( - objective=self.config.objective, - operator_messages=self._get_operator_messages(), - round_index=round_index, + result = AutoLoopResult( + success=False, session_id=session_id, - main_summary=main_result.last_agent_message, - main_error=main_result.fatal_error, - checks=checks, - config=ReviewerConfig( - model=self.config.reviewer_model, - reasoning_effort=self.config.reviewer_reasoning_effort, - extra_args=self.config.reviewer_extra_args, - skip_git_repo_check=self.config.skip_git_repo_check, - full_auto=self.config.full_auto, - dangerous_yolo=self.config.dangerous_yolo, + rounds=rounds, + stop_reason=( + "Main agent failed with invalid_encrypted_content in a fresh session; " + "cannot recover automatically." ), ) - self._emit( - { - "type": "round.review.completed", - "round_index": round_index, - "status": review.status, - "confidence": review.confidence, - "reason": review.reason, - "next_action": review.next_action, - } - ) + self._emit({"type": "loop.completed", "success": result.success, "stop_reason": result.stop_reason}) + return result - round_summary = RoundSummary( - round_index=round_index, - thread_id=session_id, - main_exit_code=main_result.exit_code, - main_turn_completed=main_result.turn_completed, - main_turn_failed=main_result.turn_failed, - checks=checks, - review=review, - main_last_message=main_result.last_agent_message, - ) - rounds.append(round_summary) - self._update_planner_context( - round_index=round_index, + checks = run_checks(self.config.check_commands or [], self.config.check_timeout_seconds) + self._emit( + { + "type": "round.checks.completed", + "round_index": round_index, + "checks": [ + { + "command": item.command, + "exit_code": item.exit_code, + "passed": item.passed, + } + for item in checks + ], + } + ) + review = self.reviewer.evaluate( + objective=self.config.objective, + operator_messages=self._get_operator_messages(), + round_index=round_index, + session_id=session_id, + main_exit_code=main_result.exit_code, + main_turn_completed=main_result.turn_completed, + main_turn_failed=main_result.turn_failed, + main_agent_message_count=len(main_result.agent_messages), + main_summary=main_result.last_agent_message, + main_error=main_result.fatal_error, + checks=checks, + config=ReviewerConfig( + model=self.config.reviewer_model, + reasoning_effort=self.config.reviewer_reasoning_effort, + extra_args=self.config.reviewer_extra_args, + skip_git_repo_check=self.config.skip_git_repo_check, + full_auto=self.config.full_auto, + dangerous_yolo=self.config.dangerous_yolo, + ), + ) + self._emit( + { + "type": "round.review.completed", + "round_index": round_index, + "status": review.status, + "confidence": review.confidence, + "reason": review.reason, + "next_action": review.next_action, + } + ) + + round_summary = RoundSummary( + round_index=round_index, + thread_id=session_id, + main_exit_code=main_result.exit_code, + main_turn_completed=main_result.turn_completed, + main_turn_failed=main_result.turn_failed, + checks=checks, + review=review, + main_last_message=main_result.last_agent_message, + ) + rounds.append(round_summary) + self._persist_state(rounds=rounds, session_id=session_id, current_review=review) + + checks_ok = all_checks_passed(checks) + if review.status == "done" and checks_ok: + result = AutoLoopResult( + success=True, session_id=session_id, rounds=rounds, - latest_review=review, - latest_checks=checks, - stop_reason=None, + stop_reason="Reviewer marked done and acceptance checks passed.", ) - self._run_planner_update(trigger="round", terminal=False, wait=True) - self._persist_state(rounds=rounds, session_id=session_id, current_review=review) - - checks_ok = all_checks_passed(checks) - if review.status == "done" and checks_ok: - return self._finish_loop( - success=True, - session_id=session_id, - rounds=rounds, - stop_reason="Reviewer marked done and acceptance checks passed.", - round_index=round_index, - ) - - if review.status == "blocked": - return self._finish_loop( - success=False, - session_id=session_id, - rounds=rounds, - stop_reason=f"Reviewer blocked: {review.reason}", - round_index=round_index, - ) + self._emit({"type": "loop.completed", "success": result.success, "stop_reason": result.stop_reason}) + return result - current_main_message = (main_result.last_agent_message or "").strip() - if current_main_message and current_main_message == previous_main_message: - no_progress_rounds += 1 - else: - no_progress_rounds = 0 - previous_main_message = current_main_message - - if no_progress_rounds >= self.config.max_no_progress_rounds: - return self._finish_loop( - success=False, - session_id=session_id, - rounds=rounds, - stop_reason=( - "Stopped due to repeated no-progress rounds. " - "Reviewer kept requesting continuation without new output." - ), - round_index=round_index, - ) - - next_main_prompt = self._build_continue_prompt( - objective=self.config.objective, - review=review, - checks_ok=checks_ok, + if review.status == "blocked": + result = AutoLoopResult( + success=False, + session_id=session_id, + rounds=rounds, + stop_reason=f"Reviewer blocked: {review.reason}", ) + self._emit({"type": "loop.completed", "success": result.success, "stop_reason": result.stop_reason}) + return result + + current_progress_signature = self._build_progress_signature(main_result=main_result) + if current_progress_signature and current_progress_signature == previous_progress_signature: + no_progress_rounds += 1 + else: + no_progress_rounds = 0 + previous_progress_signature = current_progress_signature + + if no_progress_rounds >= self.config.max_no_progress_rounds: + result = AutoLoopResult( + success=False, + session_id=session_id, + rounds=rounds, + stop_reason=( + "Stopped due to repeated no-progress rounds. " + "Reviewer kept requesting continuation without new output." + ), + ) + self._emit({"type": "loop.completed", "success": result.success, "stop_reason": result.stop_reason}) + return result - return self._finish_loop( - success=False, - session_id=session_id, - rounds=rounds, - stop_reason=f"Reached max rounds ({self.config.max_rounds}).", - round_index=len(rounds), + next_main_prompt = self._build_continue_prompt( + objective=self.config.objective, + review=review, + checks_ok=checks_ok, ) - finally: - self._stop_planner_loop() - def _finish_loop( - self, - *, - success: bool, - session_id: str | None, - rounds: list[RoundSummary], - stop_reason: str, - round_index: int, - ) -> AutoLoopResult: - self._planner_stop_event.set() - latest_review = rounds[-1].review if rounds else None - latest_checks = rounds[-1].checks if rounds else [] - self._update_planner_context( - round_index=round_index, - session_id=session_id, - rounds=rounds, - latest_review=latest_review, - latest_checks=latest_checks, - stop_reason=stop_reason, - ) - self._run_planner_update(trigger="final", terminal=True, wait=True) - self._persist_state(rounds=rounds, session_id=session_id, current_review=latest_review) result = AutoLoopResult( - success=success, + success=False, session_id=session_id, rounds=rounds, - stop_reason=stop_reason, - plan=self._latest_plan, + stop_reason=f"Reached max rounds ({self.config.max_rounds}).", ) self._emit({"type": "loop.completed", "success": result.success, "stop_reason": result.stop_reason}) return result @@ -404,24 +359,21 @@ def _persist_state( *, rounds: list[RoundSummary], session_id: str | None, - current_review: ReviewDecision | None, + current_review: ReviewDecision, ) -> None: if not self.config.state_file: - self._write_plan_artifacts() return payload = { "updated_at": datetime.now(timezone.utc).isoformat(), "objective": self.config.objective, "session_id": session_id, "round_count": len(rounds), - "latest_review_status": current_review.status if current_review is not None else None, + "latest_review_status": current_review.status, "rounds": [self._serialize_round(item) for item in rounds], - "plan": self._serialize_plan(self._latest_plan) if self._latest_plan is not None else None, } path = Path(self.config.state_file) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps(payload, indent=2), encoding="utf-8") - self._write_plan_artifacts() @staticmethod def _serialize_round(round_summary: RoundSummary) -> dict: @@ -430,14 +382,19 @@ def _serialize_round(round_summary: RoundSummary) -> dict: data["review"] = asdict(round_summary.review) return data - @staticmethod - def _serialize_plan(plan: PlanSnapshot) -> dict[str, Any]: - data = asdict(plan) - data["workstreams"] = [asdict(item) for item in plan.workstreams] - return data - @staticmethod def _initial_main_prompt(objective: str) -> str: + if AutoLoopOrchestrator._request_style(objective) == "response": + return ( + "You are the primary agent.\n" + "The user may be greeting you, asking a question, or requesting analysis instead of code edits.\n" + "Reply directly in the user's language.\n" + "Inspect the repository, logs, or local context yourself if that helps answer.\n" + "Do not force code changes unless they are actually needed to solve the request.\n" + "Do not refuse unless the request is genuinely disallowed.\n" + "If the user is simply greeting you or checking whether you are still here, reply briefly and ask what they want next.\n\n" + f"User request:\n{objective}\n" + ) return ( "You are the primary implementation agent.\n" "Complete the objective end-to-end by executing required edits and commands directly.\n" @@ -451,6 +408,17 @@ def _initial_main_prompt(objective: str) -> str: @staticmethod def _build_continue_prompt(*, objective: str, review: ReviewDecision, checks_ok: bool) -> str: + if AutoLoopOrchestrator._request_style(objective) == "response": + return ( + "Continue the same user request in this session.\n" + f"User request:\n{objective}\n\n" + f"Reviewer reason:\n{review.reason}\n\n" + f"Reviewer next action:\n{review.next_action}\n\n" + "Respond directly.\n" + "Inspect the repository or logs yourself if needed.\n" + "Do not force code edits unless they are actually needed.\n" + "For greetings or short questions, answer naturally without DONE/REMAINING/BLOCKERS." + ) check_instruction = ( "Acceptance checks passed in previous round." if checks_ok @@ -514,141 +482,17 @@ def _get_operator_messages(self) -> list[str]: return [] return provider() - def _update_planner_context( - self, - *, - round_index: int, - session_id: str | None, - rounds: list[RoundSummary], - latest_review: ReviewDecision | None, - latest_checks: list[Any], - stop_reason: str | None, - ) -> None: - with self._planner_context_lock: - self._planner_context = _PlannerContext( - round_index=round_index, - session_id=session_id, - rounds=list(rounds), - latest_review=latest_review, - latest_checks=list(latest_checks), - stop_reason=stop_reason, - ) - - def _run_planner_update(self, *, trigger: str, terminal: bool, wait: bool) -> None: - if not self._planner_enabled(): - return - acquired = self._planner_run_lock.acquire(blocking=wait) - if not acquired: - return - try: - with self._planner_context_lock: - context = _PlannerContext( - round_index=self._planner_context.round_index, - session_id=self._planner_context.session_id, - rounds=list(self._planner_context.rounds), - latest_review=self._planner_context.latest_review, - latest_checks=list(self._planner_context.latest_checks), - stop_reason=self._planner_context.stop_reason, - ) - assert self.planner is not None - plan = self.planner.update( - objective=self.config.objective, - operator_messages=self._get_operator_messages(), - round_index=context.round_index, - session_id=context.session_id, - rounds=context.rounds, - latest_review=context.latest_review, - latest_checks=context.latest_checks, - trigger=trigger, - terminal=terminal, - stop_reason=context.stop_reason, - config=PlannerConfig( - model=self.config.planner_model or self.config.reviewer_model, - reasoning_effort=self.config.planner_reasoning_effort or self.config.reviewer_reasoning_effort, - extra_args=self.config.planner_extra_args or self.config.reviewer_extra_args, - skip_git_repo_check=self.config.skip_git_repo_check, - full_auto=self.config.full_auto, - dangerous_yolo=self.config.dangerous_yolo, - mode=self.config.planner_mode, - ), - ) - self._latest_plan = plan - event_type = "plan.finalized" if terminal else "plan.updated" - self._emit( - { - "type": event_type, - "round_index": context.round_index, - "session_id": context.session_id, - "plan_id": plan.plan_id, - "trigger": plan.trigger, - "terminal": plan.terminal, - "summary": plan.summary, - "workstreams": [asdict(item) for item in plan.workstreams], - "done_items": plan.done_items, - "remaining_items": plan.remaining_items, - "risks": plan.risks, - "next_steps": plan.next_steps, - "exploration_items": plan.exploration_items, - "suggested_next_objective": plan.suggested_next_objective, - "should_propose_follow_up": plan.should_propose_follow_up, - "report_markdown": plan.report_markdown, - } - ) - finally: - self._planner_run_lock.release() - - def _start_planner_loop(self) -> None: - if not self._planner_enabled(): - return - if self.config.plan_update_interval_seconds <= 0: - return - if self._planner_thread is not None and self._planner_thread.is_alive(): - return - self._planner_stop_event.clear() - self._planner_thread = threading.Thread(target=self._planner_loop, daemon=True) - self._planner_thread.start() - - def _planner_loop(self) -> None: - interval = max(30, int(self.config.plan_update_interval_seconds)) - while not self._planner_stop_event.wait(interval): - self._run_planner_update(trigger="timer", terminal=False, wait=False) - with self._planner_context_lock: - context = self._planner_context - self._persist_state( - rounds=context.rounds, - session_id=context.session_id, - current_review=context.latest_review, - ) - - def _stop_planner_loop(self) -> None: - self._planner_stop_event.set() - if self._planner_thread is not None: - self._planner_thread.join(timeout=5.0) - self._planner_thread = None - - def _planner_enabled(self) -> bool: - return self.config.planner_enabled and planner_mode_enabled(self.config.planner_mode) and self.planner is not None - - def _write_plan_artifacts(self) -> None: - if self._latest_plan is None: - return - if self.config.plan_report_file: - path = Path(self.config.plan_report_file) - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text(self._latest_plan.report_markdown, encoding="utf-8") - if self.config.plan_todo_file: - path = Path(self.config.plan_todo_file) - path.parent.mkdir(parents=True, exist_ok=True) - path.write_text( - format_plan_todo_markdown( - objective=self.config.objective, - snapshot=self._latest_plan, - ), - encoding="utf-8", - ) - @staticmethod def _build_operator_override_prompt(*, objective: str, instruction: str) -> str: + if AutoLoopOrchestrator._request_style(instruction) == "response": + return ( + "Operator override received from control channel.\n" + "Treat it as a direct user request and answer it in the user's language.\n" + "Inspect local repository context if useful.\n" + "Do not force code edits unless they are actually needed.\n\n" + f"Original objective:\n{objective}\n\n" + f"New operator instruction:\n{instruction}\n" + ) return ( "Operator override received from control channel.\n" "Immediately switch to the following instruction while preserving repository safety.\n\n" @@ -657,3 +501,123 @@ def _build_operator_override_prompt(*, objective: str, instruction: str) -> str: "Execute concrete work now and continue until completion gates are met.\n" "End with DONE/REMAINING/BLOCKERS." ) + + @staticmethod + def _looks_like_invalid_encrypted_content(fatal_error: str | None) -> bool: + if not fatal_error: + return False + return "invalid_encrypted_content" in fatal_error.lower() or "invalid encrypted content" in fatal_error.lower() + + @staticmethod + def _build_fresh_session_retry_prompt(*, objective: str, fatal_error: str) -> str: + if AutoLoopOrchestrator._request_style(objective) == "response": + return ( + "Previous resumed session failed with invalid_encrypted_content.\n" + "Start a fresh session and continue responding directly in the user's language.\n" + f"User request:\n{objective}\n\n" + f"Prior fatal error:\n{fatal_error}\n" + ) + return ( + "Previous resumed session failed with invalid_encrypted_content.\n" + "Start a fresh session and continue the same objective immediately.\n\n" + f"Objective:\n{objective}\n\n" + f"Prior fatal error:\n{fatal_error}\n\n" + "Execute concrete work now and end with DONE/REMAINING/BLOCKERS." + ) + + @staticmethod + def _build_progress_signature(*, main_result: Any) -> str: + last_message = str(getattr(main_result, "last_agent_message", "") or "").strip() + if last_message: + return f"msg:{last_message}" + fatal_error = str(getattr(main_result, "fatal_error", "") or "").strip() + exit_code = int(getattr(main_result, "exit_code", 0)) + turn_completed = bool(getattr(main_result, "turn_completed", False)) + turn_failed = bool(getattr(main_result, "turn_failed", False)) + return ( + "nomsg:" + f"exit={exit_code}|completed={int(turn_completed)}|failed={int(turn_failed)}|fatal={fatal_error[:240]}" + ) + + @staticmethod + def _request_style(text: str) -> str: + normalized = " ".join((text or "").strip().lower().split()) + if not normalized: + return "implementation" + + implementation_markers = ( + "fix", + "implement", + "add ", + "write ", + "edit ", + "modify", + "refactor", + "commit", + "push", + "train", + "experiment", + "run ", + "rerun", + "修改", + "修复", + "实现", + "新增", + "添加", + "重构", + "提交", + "推送", + "训练", + "实验", + "继续改", + "继续修", + "直接改", + "帮我改", + "帮我修", + ) + if any(marker in normalized for marker in implementation_markers): + return "implementation" + + greeting_phrases = { + "hi", + "hello", + "hey", + "ping", + "你好", + "您好", + "在吗", + "还在吗", + "还活着吗", + "兄弟在吗", + } + if normalized in greeting_phrases: + return "response" + + response_markers = ( + "?", + "?", + "why", + "what", + "how", + "explain", + "analyze", + "analysis", + "question", + "为什么", + "为啥", + "怎么", + "啥原因", + "是什么", + "是不是", + "分析", + "解释", + "看看问题", + "bug问题", + "错误在哪", + "问题在哪", + "问题所在", + ) + if any(marker in normalized for marker in response_markers): + return "response" + + return "implementation" diff --git a/codex_autoloop/reviewer.py b/codex_autoloop/reviewer.py index 06b3e00..3cfc3e3 100644 --- a/codex_autoloop/reviewer.py +++ b/codex_autoloop/reviewer.py @@ -31,6 +31,10 @@ def evaluate( operator_messages: list[str], round_index: int, session_id: str | None, + main_exit_code: int, + main_turn_completed: bool, + main_turn_failed: bool, + main_agent_message_count: int, main_summary: str, main_error: str | None, checks: list[CheckResult], @@ -41,6 +45,10 @@ def evaluate( operator_messages=operator_messages, round_index=round_index, session_id=session_id, + main_exit_code=main_exit_code, + main_turn_completed=main_turn_completed, + main_turn_failed=main_turn_failed, + main_agent_message_count=main_agent_message_count, main_summary=main_summary, main_error=main_error, checks=checks, @@ -84,6 +92,10 @@ def _build_prompt( operator_messages: list[str], round_index: int, session_id: str | None, + main_exit_code: int, + main_turn_completed: bool, + main_turn_failed: bool, + main_agent_message_count: int, main_summary: str, main_error: str | None, checks: list[CheckResult], @@ -98,12 +110,19 @@ def _build_prompt( "1) `done` only when objective is fully satisfied, no blocker remains, and acceptance checks pass.\n" "2) If uncertain, choose `continue`.\n" "3) Use `blocked` only if additional user input is strictly required.\n" - "4) `next_action` must be a concrete instruction for the primary agent.\n\n" + "4) `next_action` must be a concrete instruction for the primary agent.\n" + "5) Do not speculate about crashes or missing replies; use the structured main-agent facts below.\n" + "6) Only describe the main agent as crashed/failed if the exit code is non-zero or fatal error is not `none`.\n" + "7) If the main agent emitted one or more agent messages, do not claim there was no user-facing reply.\n\n" f"Objective:\n{objective}\n\n" "Operator message history (source of truth for user instructions):\n" f"{operator_text}\n\n" f"Round: {round_index}\n" f"Session ID: {session_id or 'none'}\n" + f"Main agent exit code: {main_exit_code}\n" + f"Main agent turn completed: {str(main_turn_completed).lower()}\n" + f"Main agent turn failed: {str(main_turn_failed).lower()}\n" + f"Main agent emitted agent messages: {main_agent_message_count}\n" f"Main agent fatal error: {error_text}\n\n" "Main agent last summary:\n" f"{main_summary}\n\n" diff --git a/codex_autoloop/setup_wizard.py b/codex_autoloop/setup_wizard.py index c88b8c5..ca409f8 100644 --- a/codex_autoloop/setup_wizard.py +++ b/codex_autoloop/setup_wizard.py @@ -46,19 +46,18 @@ def main() -> None: if choice not in {"y", "yes"}: raise SystemExit(2) - token = prompt_secret("Telegram bot token: ") - if ":" not in token: - print("Token format looks invalid. Expected :.", file=sys.stderr) - raise SystemExit(2) - chat_id = prompt_input("Telegram chat id (or 'auto'): ", default="auto").strip() or "auto" + token = prompt_token() + chat_id = prompt_chat_id() check_cmd = prompt_input( "Default check command (optional, leave empty for none): ", default="", ).strip() - preset_names = ", ".join(p.name for p in MODEL_PRESETS) preset_name = args.run_model_preset + inherit_codex_defaults = False if preset_name is None and args.run_main_model is None and args.run_reviewer_model is None: preset_name = prompt_model_choice() + if preset_name is None: + inherit_codex_defaults = True resolved_preset = get_preset(preset_name) if preset_name and preset_name.lower() != "custom" else None if preset_name and preset_name.lower() != "custom" and resolved_preset is None: print(f"Unknown model preset: {preset_name}", file=sys.stderr) @@ -69,22 +68,24 @@ def main() -> None: reviewer_model = resolved_preset.reviewer_model reviewer_reasoning_effort = resolved_preset.reviewer_reasoning_effort else: - if args.run_main_model is not None or args.run_reviewer_model is not None: + if inherit_codex_defaults: + main_model = None + main_reasoning_effort = None + reviewer_model = None + reviewer_reasoning_effort = None + elif args.run_main_model is not None or args.run_reviewer_model is not None: main_model = args.run_main_model main_reasoning_effort = args.run_main_reasoning_effort reviewer_model = args.run_reviewer_model reviewer_reasoning_effort = args.run_reviewer_reasoning_effort else: - main_model = prompt_input( - "Main agent model (optional): ", - default="gpt-5.2-codex", - ).strip() or None - main_reasoning_effort = ( - prompt_input( - "Main agent reasoning effort (low/medium/high/xhigh, optional): ", - default="xhigh", - ).strip() - or None + main_model = prompt_input("Main agent model (optional): ", default="").strip() or None + main_reasoning_effort = prompt_reasoning_effort( + "Main agent reasoning effort (low/medium/high/xhigh, optional): " + ) + reviewer_model = prompt_input("Reviewer agent model (optional): ", default="").strip() or None + reviewer_reasoning_effort = prompt_reasoning_effort( + "Reviewer agent reasoning effort (low/medium/high/xhigh, optional): " ) reviewer_model = prompt_input( "Reviewer agent model (optional): ", @@ -233,9 +234,9 @@ def main() -> None: f"/ {resolved_preset.reviewer_model}/{resolved_preset.reviewer_reasoning_effort})" ) else: - print(f"Main model: {main_model or ''} effort={main_reasoning_effort or ''}") + print(f"Main model: {main_model or ''} effort={main_reasoning_effort or ''}") print( - f"Reviewer model: {reviewer_model or ''} " + f"Reviewer model: {reviewer_model or ''} " f"effort={reviewer_reasoning_effort or ''}" ) print("") @@ -393,8 +394,25 @@ def prompt_secret(prompt: str) -> str: return getpass.getpass(prompt).strip() -def prompt_model_choice() -> str: +def prompt_token() -> str: + while True: + token = prompt_secret("Telegram bot token: ") + if looks_like_token(token): + return token + print("Invalid token format. Expected :. Please try again.", file=sys.stderr) + + +def prompt_chat_id() -> str: + while True: + value = prompt_input("Telegram chat id (or 'auto'): ", default="auto").strip() or "auto" + if value.lower() == "auto" or looks_like_chat_id(value): + return value + print("Invalid chat id. Use 'auto' or a numeric chat id like 123456 or -100123456.", file=sys.stderr) + + +def prompt_model_choice() -> str | None: print("Choose a model preset:") + print(" 0. inherit codex default (recommended)") for idx, preset in enumerate(MODEL_PRESETS, start=1): print( f" {idx}. {preset.name}: " @@ -402,14 +420,45 @@ def prompt_model_choice() -> str: f"reviewer={preset.reviewer_model}/{preset.reviewer_reasoning_effort}" ) print(f" {len(MODEL_PRESETS) + 1}. custom") - raw = prompt_input("Preset number: ", default="1").strip() - try: - index = int(raw) - except ValueError: - return "quality" - if 1 <= index <= len(MODEL_PRESETS): - return MODEL_PRESETS[index - 1].name - return "custom" + while True: + raw = prompt_input("Preset number: ", default="0").strip() + try: + index = int(raw) + except ValueError: + print("Invalid selection. Enter a number from the list.", file=sys.stderr) + continue + if index == 0: + return None + if 1 <= index <= len(MODEL_PRESETS): + return MODEL_PRESETS[index - 1].name + if index == len(MODEL_PRESETS) + 1: + return "custom" + print("Selection out of range. Please choose one of the listed numbers.", file=sys.stderr) + + +def prompt_reasoning_effort(prompt: str) -> str | None: + while True: + value = prompt_input(prompt, default="").strip().lower() + if not value: + return None + if value in {"low", "medium", "high", "xhigh"}: + return value + print("Invalid reasoning effort. Choose low, medium, high, xhigh, or leave blank.", file=sys.stderr) + + +def looks_like_token(token: str) -> bool: + if ":" not in token: + return False + left, right = token.split(":", 1) + return left.isdigit() and bool(right.strip()) + + +def looks_like_chat_id(value: str) -> bool: + if not value: + return False + if value.startswith("-"): + return value[1:].isdigit() + return value.isdigit() def prompt_planner_mode_choice() -> str: diff --git a/codex_autoloop/telegram_control.py b/codex_autoloop/telegram_control.py index baf6b75..8f67607 100644 --- a/codex_autoloop/telegram_control.py +++ b/codex_autoloop/telegram_control.py @@ -394,6 +394,8 @@ def parse_command_text(*, text: str, plain_text_as_inject: bool) -> TelegramComm return TelegramCommand(kind="daemon-stop", text="") if content in {"/status", "/stat"}: return TelegramCommand(kind="status", text="") + if content in {"/fresh", "/fresh-session", "/new-session"}: + return TelegramCommand(kind="fresh-session", text="") if content in {"/help", "/commands"}: return TelegramCommand(kind="help", text="") if content.startswith("/"): diff --git a/codex_autoloop/telegram_daemon.py b/codex_autoloop/telegram_daemon.py index a0d535b..e785744 100644 --- a/codex_autoloop/telegram_daemon.py +++ b/codex_autoloop/telegram_daemon.py @@ -10,9 +10,10 @@ import time from dataclasses import dataclass from pathlib import Path +from typing import Any from .daemon_bus import BusCommand, JsonlCommandBus, read_status, write_status -from .model_catalog import get_preset +from .model_catalog import MODEL_PRESETS, get_preset from .planner_modes import ( PLANNER_MODE_AUTO, PLANNER_MODE_CHOICES, @@ -23,6 +24,14 @@ from .telegram_notifier import TelegramConfig, TelegramNotifier, resolve_chat_id from .token_lock import TokenLock, acquire_token_lock +PLAN_MODE_EXECUTE_ONLY = "execute-only" +PLAN_MODE_FULLY_PLAN = "fully-plan" +PLAN_MODE_RECORD_ONLY = "record-only" +PLAN_MODES = {PLAN_MODE_EXECUTE_ONLY, PLAN_MODE_FULLY_PLAN, PLAN_MODE_RECORD_ONLY} +FORCE_FRESH_SESSION_KEY = "force_fresh_session" +FORCE_FRESH_REASON_KEY = "force_fresh_reason" +INVALID_ENCRYPTED_CONTENT_MARKER = "invalid encrypted content" + @dataclass class PlanFollowUp: @@ -91,6 +100,8 @@ def main() -> None: logs_dir.mkdir(parents=True, exist_ok=True) bus_dir.mkdir(parents=True, exist_ok=True) events_log = logs_dir / "daemon-events.jsonl" + run_archive_log = logs_dir / "codexloop-run-archive.jsonl" + operator_messages_path = logs_dir / "operator_messages.md" token_lock: TokenLock | None = None try: @@ -128,7 +139,17 @@ def main() -> None: child_plan_todo_path: Path | None = None child_started_at: dt.datetime | None = None child_control_bus: JsonlCommandBus | None = None - pending_follow_up: PlanFollowUp | None = None + child_run_id: str | None = None + child_control_path: Path | None = None + child_resume_session_id: str | None = None + plan_mode = normalize_plan_mode(args.run_plan_mode) + plan_request_delay_seconds = max(0, int(args.run_plan_request_delay_seconds)) + plan_auto_execute_delay_seconds = max(0, int(args.run_plan_auto_execute_delay_seconds)) + pending_plan_request: str | None = None + pending_plan_auto_execute_at: dt.datetime | None = None + pending_plan_generated_at: dt.datetime | None = None + scheduled_plan_context: dict[str, Any] | None = None + scheduled_plan_request_at: dt.datetime | None = None def log_event(event_type: str, **kwargs) -> None: payload = { @@ -139,10 +160,39 @@ def log_event(event_type: str, **kwargs) -> None: with events_log.open("a", encoding="utf-8") as f: f.write(json.dumps(payload, ensure_ascii=True) + "\n") + def append_run_archive_record(*, event: str, **kwargs: Any) -> None: + now = dt.datetime.utcnow() + payload = { + "ts": now.isoformat() + "Z", + "date": now.date().isoformat(), + "event": event, + "workspace": str(run_cwd), + **kwargs, + } + with run_archive_log.open("a", encoding="utf-8") as f: + f.write(json.dumps(payload, ensure_ascii=True) + "\n") + + def clear_planner_state(*, reason: str | None = None) -> None: + nonlocal pending_plan_request, pending_plan_auto_execute_at, pending_plan_generated_at + nonlocal scheduled_plan_context, scheduled_plan_request_at + had_state = ( + pending_plan_request is not None + or pending_plan_auto_execute_at is not None + or scheduled_plan_request_at is not None + or scheduled_plan_context is not None + ) + pending_plan_request = None + pending_plan_auto_execute_at = None + pending_plan_generated_at = None + scheduled_plan_context = None + scheduled_plan_request_at = None + if had_state and reason: + log_event("plan.cleared", reason=reason) + def update_status() -> None: - current_child = child - running = current_child is not None and current_child.poll() is None - last_session_id = resolve_saved_session_id(args.run_state_file) + running = child is not None and child.poll() is None + last_session_id = resolve_resume_session_id(args.run_state_file, run_archive_log) + force_fresh_session = is_force_fresh_session_requested(args.run_state_file) write_status( status_path, { @@ -156,39 +206,39 @@ def update_status() -> None: "child_plan_todo_path": str(child_plan_todo_path) if child_plan_todo_path else None, "child_started_at": child_started_at.isoformat() + "Z" if child_started_at else None, "last_session_id": last_session_id, - "pending_follow_up_plan_id": pending_follow_up.plan_id if pending_follow_up else None, - "pending_follow_up_objective": pending_follow_up.objective if pending_follow_up else None, - "pending_follow_up_created_at": ( - pending_follow_up.created_at.isoformat() + "Z" if pending_follow_up else None - ), - "pending_follow_up_auto_execute_at": ( - pending_follow_up.auto_execute_at.isoformat() + "Z" - if pending_follow_up and pending_follow_up.auto_execute_at is not None - else None - ), - "pending_follow_up_awaiting_user_edit": ( - pending_follow_up.awaiting_user_edit if pending_follow_up else None - ), + "force_fresh_session": force_fresh_session, "run_cwd": str(run_cwd), "logs_dir": str(logs_dir), "bus_dir": str(bus_dir), "events_log": str(events_log), + "run_archive_log": str(run_archive_log), + "operator_messages_file": str(operator_messages_path), + "plan_mode": plan_mode, + "pending_plan_request": pending_plan_request, + "pending_plan_generated_at": ( + pending_plan_generated_at.isoformat() + "Z" if pending_plan_generated_at else None + ), + "pending_plan_auto_execute_at": ( + pending_plan_auto_execute_at.isoformat() + "Z" if pending_plan_auto_execute_at else None + ), + "scheduled_plan_request_at": ( + scheduled_plan_request_at.isoformat() + "Z" if scheduled_plan_request_at else None + ), }, ) - def start_child(objective: str, *, resume_last_session: bool = True) -> None: - nonlocal child, child_objective, child_log_path, child_plan_report_path, child_plan_todo_path - nonlocal child_started_at - nonlocal child_control_bus, pending_follow_up - timestamp = dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S") + def start_child(objective: str) -> None: + nonlocal child, child_objective, child_log_path, child_started_at, child_control_bus + nonlocal child_run_id, child_control_path, child_resume_session_id + clear_planner_state(reason="child_started") + timestamp = dt.datetime.utcnow().strftime("%Y%m%d-%H%M%S-%f") log_path = logs_dir / f"run-{timestamp}.log" control_path = bus_dir / f"child-control-{timestamp}.jsonl" - messages_path = logs_dir / f"run-{timestamp}-operator_messages.md" - plan_report_path = logs_dir / f"run-{timestamp}-plan.md" - plan_todo_path = logs_dir / f"run-{timestamp}-todo.md" + messages_path = operator_messages_path + force_fresh = is_force_fresh_session_requested(args.run_state_file) resume_session_id = ( - resolve_saved_session_id(args.run_state_file) - if args.run_resume_last_session and resume_last_session + (None if force_fresh else resolve_resume_session_id(args.run_state_file, run_archive_log)) + if args.run_resume_last_session else None ) child_control_bus = JsonlCommandBus(control_path) @@ -217,6 +267,9 @@ def start_child(objective: str, *, resume_last_session: bool = True) -> None: child_plan_report_path = plan_report_path child_plan_todo_path = plan_todo_path child_started_at = dt.datetime.utcnow() + child_run_id = timestamp + child_control_path = control_path + child_resume_session_id = resume_session_id notifier.send_message( "[daemon] launched run\n" f"pid={child.pid}\n" @@ -233,7 +286,23 @@ def start_child(objective: str, *, resume_last_session: bool = True) -> None: plan_report_file=str(plan_report_path), plan_todo_file=str(plan_todo_path), resume_session_id=resume_session_id, + run_id=timestamp, ) + append_run_archive_record( + event="run.started", + run_id=timestamp, + pid=child.pid, + objective=objective[:700], + log_path=str(log_path), + control_path=str(control_path), + operator_messages_file=str(messages_path), + resume_session_id=resume_session_id, + force_fresh_session=force_fresh, + plan_mode=plan_mode, + started_at=child_started_at.isoformat() + "Z", + ) + if force_fresh: + log_event("session.fresh.applied", run_id=timestamp) update_status() def send_follow_up_prompt() -> None: @@ -329,7 +398,7 @@ def handle_command(command: TelegramCommand, source: str) -> None: send_reply(source, help_text()) return if command.kind == "status": - last_session_id = resolve_saved_session_id(args.run_state_file) + last_session_id = resolve_resume_session_id(args.run_state_file, run_archive_log) send_reply( source, format_status( @@ -338,47 +407,37 @@ def handle_command(command: TelegramCommand, source: str) -> None: child_log_path=child_log_path, child_started_at=child_started_at, last_session_id=last_session_id, - pending_follow_up=pending_follow_up, + force_fresh_session=is_force_fresh_session_requested(args.run_state_file), + plan_mode=plan_mode, + pending_plan_request=pending_plan_request, + pending_plan_auto_execute_at=pending_plan_auto_execute_at, + scheduled_plan_request_at=scheduled_plan_request_at, ), ) return - if command.kind == "plan-run": - if command.callback_query_id: - notifier.answer_callback_query(command.callback_query_id) - if pending_follow_up is None or pending_follow_up.plan_id != command.text: - send_reply(source, "[daemon] suggested next step is stale or unavailable.") - return + if command.kind == "fresh-session": + set_force_fresh_session_marker( + args.run_state_file, + enabled=True, + reason=f"operator_requested_from_{source}", + ) running = child is not None and child.poll() is None - if running: - send_reply(source, "[daemon] active run exists. Finish or stop it before launching the next step.") - return - launched = launch_follow_up( - follow_up=pending_follow_up, + append_run_archive_record( + event="session.fresh.requested", source=source, - auto_triggered=False, + running=running, + active_run_id=child_run_id, + active_objective=str(child_objective or "")[:700], ) - if launched: - pending_follow_up = None - return - if command.kind == "plan-reject": - if command.callback_query_id: - notifier.answer_callback_query(command.callback_query_id, text="Rejected") - if pending_follow_up is not None and pending_follow_up.plan_id == command.text: - pending_follow_up = None - update_status() - send_reply(source, "[daemon] follow-up plan rejected. Waiting for your next instruction.") - return - if command.kind == "plan-modify": - if command.callback_query_id: - notifier.answer_callback_query(command.callback_query_id, text="Send your changes") - if pending_follow_up is None or pending_follow_up.plan_id != command.text: - send_reply(source, "[daemon] suggested next step is stale or unavailable.") - return - pending_follow_up.awaiting_user_edit = True - pending_follow_up.auto_execute_enabled = False - pending_follow_up.auto_execute_at = None + if running: + send_reply( + source, + "[daemon] fresh session is armed for the next run. " + "Current run keeps its existing session.", + ) + else: + send_reply(source, "[daemon] fresh session armed. Next /run will not resume previous session_id.") update_status() - send_modify_prompt() return if command.kind in {"run", "inject"}: objective = command.text.strip() @@ -410,6 +469,9 @@ def handle_command(command: TelegramCommand, source: str) -> None: else: send_reply(source, "[daemon] active run exists but child control bus unavailable.") return + if pending_plan_request or scheduled_plan_request_at is not None: + clear_planner_state(reason="manual_override") + send_reply(source, "[daemon] pending plan request cleared by manual command.") start_child(objective) return if command.kind == "stop": @@ -431,6 +493,124 @@ def handle_command(command: TelegramCommand, source: str) -> None: def on_telegram_command(command: TelegramCommand) -> None: handle_command(command, "telegram") + def schedule_plan_after_child_finish(*, objective: str, exit_code: int, log_path: Path | None) -> None: + nonlocal pending_plan_request, pending_plan_auto_execute_at, pending_plan_generated_at + nonlocal scheduled_plan_context, scheduled_plan_request_at + if plan_mode == PLAN_MODE_EXECUTE_ONLY: + clear_planner_state(reason="execute_only") + return + + state_payload = read_status(args.run_state_file) if args.run_state_file else None + finished_at = dt.datetime.utcnow() + if plan_mode == PLAN_MODE_RECORD_ONLY: + record_file = ( + Path(args.run_plan_record_file).expanduser().resolve() + if args.run_plan_record_file + else (logs_dir / "plan-agent-records.md").resolve() + ) + append_plan_record_row( + path=record_file, + finished_at=finished_at, + objective=objective, + exit_code=exit_code, + state_payload=state_payload, + log_path=log_path, + ) + log_event( + "plan.recorded", + mode=plan_mode, + record_file=str(record_file), + objective=objective[:700], + exit_code=exit_code, + ) + notifier.send_message( + "[daemon] plan mode=record-only\n" + f"Recorded run summary to table: {record_file}" + ) + clear_planner_state(reason="record_only") + return + + scheduled_plan_context = { + "objective": objective, + "exit_code": exit_code, + "log_path": str(log_path) if log_path else None, + "state_payload": state_payload, + } + scheduled_plan_request_at = finished_at + dt.timedelta(seconds=plan_request_delay_seconds) + pending_plan_request = None + pending_plan_auto_execute_at = None + pending_plan_generated_at = None + log_event( + "plan.scheduled", + mode=plan_mode, + scheduled_request_at=scheduled_plan_request_at.isoformat() + "Z", + objective=objective[:700], + exit_code=exit_code, + ) + notifier.send_message( + "[daemon] plan mode=fully-plan\n" + f"Will generate next request in {plan_request_delay_seconds}s." + ) + + def process_planner_timers() -> None: + nonlocal pending_plan_request, pending_plan_auto_execute_at, pending_plan_generated_at + nonlocal scheduled_plan_context, scheduled_plan_request_at + if plan_mode != PLAN_MODE_FULLY_PLAN: + return + if child is not None and child.poll() is None: + return + now = dt.datetime.utcnow() + + if ( + pending_plan_request is not None + and pending_plan_auto_execute_at is not None + and now >= pending_plan_auto_execute_at + ): + request = pending_plan_request + auto_at = pending_plan_auto_execute_at + clear_planner_state(reason="auto_execute") + log_event( + "plan.auto_execute", + request=request[:700], + auto_execute_at=auto_at.isoformat() + "Z", + ) + notifier.send_message( + "[daemon] auto executing planned request (no override received in time).\n" + f"request={request[:700]}" + ) + start_child(request) + return + + if ( + scheduled_plan_context is not None + and scheduled_plan_request_at is not None + and now >= scheduled_plan_request_at + ): + request = build_plan_request( + objective=str(scheduled_plan_context.get("objective") or "").strip(), + exit_code=int(scheduled_plan_context.get("exit_code") or 0), + state_payload=( + scheduled_plan_context.get("state_payload") + if isinstance(scheduled_plan_context.get("state_payload"), dict) + else None + ), + ) + pending_plan_request = request + pending_plan_generated_at = now + pending_plan_auto_execute_at = now + dt.timedelta(seconds=plan_auto_execute_delay_seconds) + scheduled_plan_context = None + scheduled_plan_request_at = None + log_event( + "plan.proposed", + request=request[:700], + auto_execute_at=pending_plan_auto_execute_at.isoformat() + "Z", + ) + notifier.send_message( + "[daemon] planner request generated\n" + f"request={request[:700]}\n" + f"Auto execute in {plan_auto_execute_delay_seconds}s unless you override via /run or /inject." + ) + poller = TelegramCommandPoller( bot_token=args.telegram_bot_token, chat_id=chat_id, @@ -449,7 +629,7 @@ def on_telegram_command(command: TelegramCommand) -> None: notifier.send_message( "[daemon] online\n" "Send /run to start a new run.\n" - "Commands: /status /stop /help" + "Commands: /status /stop /fresh /help" ) log_event( "daemon.started", @@ -469,19 +649,7 @@ def on_telegram_command(command: TelegramCommand) -> None: "terminal", ) if child is None: - if ( - pending_follow_up is not None - and pending_follow_up.auto_execute_enabled - and pending_follow_up.auto_execute_at is not None - and dt.datetime.utcnow() >= pending_follow_up.auto_execute_at - ): - launched = launch_follow_up( - follow_up=pending_follow_up, - source="telegram", - auto_triggered=True, - ) - if launched: - pending_follow_up = None + process_planner_timers() update_status() continue rc = child.poll() @@ -499,6 +667,61 @@ def on_telegram_command(command: TelegramCommand) -> None: exit_code=rc, objective=str(child_objective or "")[:700], log_path=str(child_log_path) if child_log_path else None, + run_id=child_run_id, + ) + if rc != 0 and log_contains_invalid_encrypted_content(child_log_path): + set_force_fresh_session_marker( + args.run_state_file, + enabled=True, + reason="detected_invalid_encrypted_content", + ) + warning = ( + "[daemon][warning] detected 'invalid encrypted content' in child log. " + "Next run will start with a fresh session (no resume)." + ) + notifier.send_message(warning) + print(warning, file=sys.stdout) + log_event( + "session.fresh.flagged", + run_id=child_run_id, + reason="invalid_encrypted_content", + log_path=str(child_log_path) if child_log_path else None, + ) + append_run_archive_record( + event="session.fresh.flagged", + run_id=child_run_id, + reason="invalid_encrypted_content", + log_path=str(child_log_path) if child_log_path else None, + ) + if is_force_fresh_session_requested(args.run_state_file): + fresh_session_id = resolve_saved_session_id_raw(args.run_state_file) + if fresh_session_id: + set_force_fresh_session_marker(args.run_state_file, enabled=False) + log_event("session.fresh.cleared", run_id=child_run_id, session_id=fresh_session_id) + append_run_archive_record( + event="session.fresh.cleared", + run_id=child_run_id, + session_id=fresh_session_id, + ) + finished_session_id = resolve_resume_session_id(args.run_state_file, run_archive_log) + append_run_archive_record( + event="run.finished", + run_id=child_run_id, + objective=str(child_objective or "")[:700], + plan_mode=plan_mode, + exit_code=rc, + log_path=str(child_log_path) if child_log_path else None, + control_path=str(child_control_path) if child_control_path else None, + operator_messages_file=str(operator_messages_path), + resume_session_id=child_resume_session_id, + session_id=finished_session_id, + started_at=child_started_at.isoformat() + "Z" if child_started_at else None, + finished_at=dt.datetime.utcnow().isoformat() + "Z", + ) + schedule_plan_after_child_finish( + objective=str(child_objective or ""), + exit_code=rc, + log_path=child_log_path, ) pending_follow_up = resolve_plan_follow_up( state_file=args.run_state_file, @@ -514,8 +737,9 @@ def on_telegram_command(command: TelegramCommand) -> None: send_follow_up_prompt() child = None child_control_bus = None - child_plan_report_path = None - child_plan_todo_path = None + child_run_id = None + child_control_path = None + child_resume_session_id = None update_status() except KeyboardInterrupt: print("Daemon interrupted.", file=sys.stderr) @@ -524,6 +748,7 @@ def on_telegram_command(command: TelegramCommand) -> None: poller.stop() if child is not None and child.poll() is None: child.terminate() + clear_planner_state(reason="daemon_stopped") write_status( status_path, { @@ -566,6 +791,7 @@ def build_child_command( args.telegram_bot_token, "--telegram-chat-id", chat_id, + "--no-telegram-control", "--control-file", control_file, "--operator-messages-file", @@ -634,18 +860,24 @@ def format_status( child_log_path: Path | None, child_started_at: dt.datetime | None, last_session_id: str | None = None, - pending_follow_up: PlanFollowUp | None = None, + force_fresh_session: bool = False, + plan_mode: str = PLAN_MODE_FULLY_PLAN, + pending_plan_request: str | None = None, + pending_plan_auto_execute_at: dt.datetime | None = None, + scheduled_plan_request_at: dt.datetime | None = None, ) -> str: if child is None or child.poll() is not None: - base = "[daemon] status=idle" + base = f"[daemon] status=idle\nplan_mode={plan_mode}" if last_session_id: base += f"\nlast_session_id={last_session_id}" - if pending_follow_up is not None: - base += f"\npending_follow_up={pending_follow_up.objective[:700]}" - base += f"\npending_follow_up_mode={'edit' if pending_follow_up.awaiting_user_edit else 'ready'}" - if pending_follow_up.auto_execute_at is not None: - remaining = max(0, int((pending_follow_up.auto_execute_at - dt.datetime.utcnow()).total_seconds())) - base += f"\nauto_execute_in={format_countdown(remaining)}" + if force_fresh_session: + base += "\nforce_fresh_session=true" + if scheduled_plan_request_at is not None: + base += f"\nplan_request_at={scheduled_plan_request_at.isoformat()}Z" + if pending_plan_request: + base += f"\npending_plan_request={pending_plan_request[:700]}" + if pending_plan_auto_execute_at is not None: + base += f"\nplan_auto_execute_at={pending_plan_auto_execute_at.isoformat()}Z" return base elapsed = "unknown" if child_started_at is not None: @@ -653,15 +885,17 @@ def format_status( elapsed = f"{elapsed_seconds}s" return ( "[daemon] status=running\n" + f"plan_mode={plan_mode}\n" f"pid={child.pid}\n" f"elapsed={elapsed}\n" f"last_session_id={last_session_id}\n" + f"force_fresh_session={str(force_fresh_session).lower()}\n" f"objective={str(child_objective or '')[:700]}\n" f"log={child_log_path}" ) -def resolve_saved_session_id(state_file: str | None) -> str | None: +def resolve_saved_session_id_raw(state_file: str | None) -> str | None: if not state_file: return None payload = read_status(state_file) @@ -673,160 +907,176 @@ def resolve_saved_session_id(state_file: str | None) -> str | None: return None -def resolve_plan_follow_up( - state_file: str | None, - report_path: Path | None, - auto_execute_after_seconds: int, -) -> PlanFollowUp | None: - if not state_file: +def resolve_saved_session_id(state_file: str | None) -> str | None: + if is_force_fresh_session_requested(state_file): return None + return resolve_saved_session_id_raw(state_file) + + +def is_force_fresh_session_requested(state_file: str | None) -> bool: + if not state_file: + return False payload = read_status(state_file) - if payload is None: - return None - plan = payload.get("plan") - if not isinstance(plan, dict): + if not isinstance(payload, dict): + return False + return payload.get(FORCE_FRESH_SESSION_KEY) is True + + +def set_force_fresh_session_marker(state_file: str | None, *, enabled: bool, reason: str | None = None) -> bool: + if not state_file: + return False + state_path = Path(state_file).expanduser().resolve() + payload = read_status(str(state_path)) + if not isinstance(payload, dict): + payload = {} + payload[FORCE_FRESH_SESSION_KEY] = bool(enabled) + if enabled: + payload["session_id"] = None + payload["force_fresh_updated_at"] = dt.datetime.utcnow().isoformat() + "Z" + if reason: + payload[FORCE_FRESH_REASON_KEY] = reason + else: + payload.pop(FORCE_FRESH_REASON_KEY, None) + payload.pop("force_fresh_updated_at", None) + write_status(state_path, payload) + return True + + +def resolve_last_session_id_from_archive(archive_file: str | Path | None) -> str | None: + if archive_file is None: return None - if not bool(plan.get("should_propose_follow_up")): + archive_path = Path(archive_file) + if not archive_path.exists(): return None - objective = plan.get("suggested_next_objective") - plan_id = plan.get("plan_id") - if not isinstance(objective, str) or not objective.strip(): + try: + lines = archive_path.read_text(encoding="utf-8").splitlines() + except Exception: return None - if not isinstance(plan_id, str) or not plan_id.strip(): + for line in reversed(lines): + raw = line.strip() + if not raw: + continue + try: + payload = json.loads(raw) + except Exception: + continue + if not isinstance(payload, dict): + continue + session_id = payload.get("session_id") + if isinstance(session_id, str) and session_id.strip(): + return session_id.strip() + resume_session_id = payload.get("resume_session_id") + if isinstance(resume_session_id, str) and resume_session_id.strip(): + return resume_session_id.strip() + return None + + +def resolve_resume_session_id(state_file: str | None, archive_file: str | Path | None) -> str | None: + if is_force_fresh_session_requested(state_file): return None - report_markdown = "" - if report_path is not None and report_path.exists(): - report_markdown = report_path.read_text(encoding="utf-8") - if not report_markdown: - candidate = plan.get("report_markdown") - if isinstance(candidate, str): - report_markdown = candidate - created_at = dt.datetime.utcnow() - auto_execute_at = created_at + dt.timedelta(seconds=max(0, auto_execute_after_seconds)) - return PlanFollowUp( - plan_id=plan_id.strip(), - objective=objective.strip(), - report_markdown=report_markdown.strip() or objective.strip(), - created_at=created_at, - auto_execute_at=auto_execute_at, - ) + from_state = resolve_saved_session_id(state_file) + if from_state: + return from_state + return resolve_last_session_id_from_archive(archive_file) -def format_countdown(total_seconds: int) -> str: - remaining = max(0, int(total_seconds)) - hours, rem = divmod(remaining, 3600) - minutes, seconds = divmod(rem, 60) - if hours > 0: - return f"{hours}h {minutes}m" - if minutes > 0: - return f"{minutes}m {seconds}s" - return f"{seconds}s" +def log_contains_invalid_encrypted_content(log_path: Path | None, *, tail_bytes: int = 256_000) -> bool: + if log_path is None or not log_path.exists(): + return False + try: + with log_path.open("rb") as f: + f.seek(0, 2) + size = f.tell() + start = max(0, size - max(1, int(tail_bytes))) + f.seek(start) + raw = f.read() + except Exception: + return False + text = raw.decode("utf-8", errors="ignore").lower() + return INVALID_ENCRYPTED_CONTENT_MARKER in text -def build_modified_follow_up_objective(*, base_objective: str, user_text: str) -> str: - return ( - "Continue from the planner's proposed next session objective, but apply the user's revision.\n\n" - f"Base planner objective:\n{base_objective.strip()}\n\n" - f"User revision to inherit:\n{user_text.strip()}" - ) +def normalize_plan_mode(raw: str | None) -> str: + value = (raw or PLAN_MODE_FULLY_PLAN).strip().lower() + return value if value in PLAN_MODES else PLAN_MODE_FULLY_PLAN -def create_git_checkpoint(*, run_cwd: Path, plan_id: str, auto_triggered: bool) -> GitCheckpointResult: - if not _git_ok(run_cwd, "rev-parse", "--is-inside-work-tree"): - return GitCheckpointResult( - ok_to_continue=True, - message="[daemon] follow-up checkpoint skipped: current workspace is not a git repository.", - ) - status = subprocess.run( - ["git", "status", "--porcelain"], - cwd=run_cwd, - capture_output=True, - text=True, - ) - if status.returncode != 0: - return GitCheckpointResult( - ok_to_continue=False, - message="[daemon] follow-up checkpoint failed: unable to inspect git status.", - ) - if not status.stdout.strip(): - return GitCheckpointResult( - ok_to_continue=True, - message="[daemon] workspace already clean. No checkpoint commit was needed.", - commit_hash=_resolve_head(run_cwd), - ) - add = subprocess.run( - ["git", "add", "-A"], - cwd=run_cwd, - capture_output=True, - text=True, - ) - if add.returncode != 0: - return GitCheckpointResult( - ok_to_continue=False, - message="[daemon] follow-up checkpoint failed during `git add -A`. Auto execution has been paused.", - ) - commit_message = ( - f"chore: checkpoint before planner follow-up {plan_id}" - if not auto_triggered - else f"chore: auto checkpoint before planner follow-up {plan_id}" - ) - commit = subprocess.run( - [ - "git", - "-c", - "user.name=Codex Planner", - "-c", - "user.email=codex-planner@local", - "commit", - "-m", - commit_message, - ], - cwd=run_cwd, - capture_output=True, - text=True, - ) - if commit.returncode != 0: - output = (commit.stderr or commit.stdout).strip().splitlines() - tail = output[-1] if output else "unknown git error" - return GitCheckpointResult( - ok_to_continue=False, - message=( - "[daemon] follow-up checkpoint failed during commit. " - f"Auto execution has been paused.\nreason={tail[:500]}" - ), - ) - commit_hash = _resolve_head(run_cwd) - return GitCheckpointResult( - ok_to_continue=True, - commit_hash=commit_hash, - message=( - "[daemon] git checkpoint created before follow-up execution.\n" - f"commit={commit_hash}\nmessage={commit_message}" - ), - ) +def build_plan_request(*, objective: str, exit_code: int, state_payload: dict[str, Any] | None) -> str: + objective_text = objective.strip() or "Continue improving the current repository objective." + review_status, review_reason, review_next_action = extract_latest_review(state_payload) + parts = [f"继续推进目标:{objective_text}"] + if exit_code != 0: + parts.append("先定位并修复上一轮失败原因。") + if review_next_action: + parts.append(f"优先动作:{review_next_action}") + elif review_reason: + parts.append(f"优先关注:{review_reason}") + if review_status: + parts.append(f"当前审核状态:{review_status}") + if not review_next_action and not review_reason: + parts.append("补齐剩余实现并运行关键验证命令后再继续。") + return " ".join(parts).strip() -def _git_ok(run_cwd: Path, *args: str) -> bool: - completed = subprocess.run( - ["git", *args], - cwd=run_cwd, - capture_output=True, - text=True, +def extract_latest_review(state_payload: dict[str, Any] | None) -> tuple[str | None, str | None, str | None]: + if not isinstance(state_payload, dict): + return None, None, None + rounds = state_payload.get("rounds") + if not isinstance(rounds, list) or not rounds: + return None, None, None + last_item = rounds[-1] + if not isinstance(last_item, dict): + return None, None, None + review = last_item.get("review") + if not isinstance(review, dict): + return None, None, None + status = review.get("status") + reason = review.get("reason") + next_action = review.get("next_action") + return ( + str(status).strip() if status else None, + str(reason).strip() if reason else None, + str(next_action).strip() if next_action else None, ) - return completed.returncode == 0 and completed.stdout.strip() == "true" -def _resolve_head(run_cwd: Path) -> str | None: - completed = subprocess.run( - ["git", "rev-parse", "HEAD"], - cwd=run_cwd, - capture_output=True, - text=True, +def append_plan_record_row( + *, + path: Path, + finished_at: dt.datetime, + objective: str, + exit_code: int, + state_payload: dict[str, Any] | None, + log_path: Path | None, +) -> None: + status, reason, next_action = extract_latest_review(state_payload) + session_id = None + if isinstance(state_payload, dict): + session_id_raw = state_payload.get("session_id") + if isinstance(session_id_raw, str) and session_id_raw.strip(): + session_id = session_id_raw.strip() + path.parent.mkdir(parents=True, exist_ok=True) + if not path.exists(): + header = ( + "| finished_at | objective | exit_code | review_status | review_next_action | session_id | log |\n" + "|---|---|---:|---|---|---|---|\n" + ) + path.write_text(header, encoding="utf-8") + row = ( + f"| {_table_cell(finished_at.isoformat() + 'Z')} " + f"| {_table_cell(objective[:700])} " + f"| {exit_code} " + f"| {_table_cell(status or '')} " + f"| {_table_cell((next_action or reason or '')[:700])} " + f"| {_table_cell(session_id or '')} " + f"| {_table_cell(str(log_path) if log_path else '')} |\n" ) - if completed.returncode != 0: - return None - value = completed.stdout.strip() - return value or None + with path.open("a", encoding="utf-8") as f: + f.write(row) + + +def _table_cell(value: str) -> str: + return value.replace("|", "\\|").replace("\n", " ").strip() def help_text() -> str: @@ -836,15 +1086,18 @@ def help_text() -> str: "/inject - inject instruction to active run (or run if idle)\n" "/status - daemon + child status\n" "/stop - stop active run\n" + "/fresh - force next run to use a fresh session (ignore saved session_id)\n" "/daemon-stop - stop daemon process\n" "/help - show this help\n" "When planner proposes a next session, use the Telegram buttons to execute, reject, or modify it.\n" "Plain text message is treated as /run when idle.\n" - "Voice/audio message will be transcribed by Whisper when enabled." + "Voice/audio message will be transcribed by Whisper when enabled.\n" + "In fully-plan mode, daemon may auto-propose and auto-run next request unless overridden." ) def build_parser() -> argparse.ArgumentParser: + preset_names = ", ".join(p.name for p in MODEL_PRESETS) parser = argparse.ArgumentParser( prog="codex-autoloop-telegram-daemon", description="Keep a Telegram command daemon online and launch codex-autoloop runs on demand.", @@ -907,11 +1160,14 @@ def build_parser() -> argparse.ArgumentParser: default=".", help="Working directory for child codex-autoloop runs.", ) - parser.add_argument("--run-max-rounds", type=int, default=100, help="Child codex-autoloop max rounds.") + parser.add_argument("--run-max-rounds", type=int, default=500, help="Child codex-autoloop max rounds.") parser.add_argument( "--run-model-preset", - default="cheap", - help="Model preset name for child runs (cheap, balanced, strong, max).", + default=None, + help=( + "Optional model preset name for child runs. " + f"If unset, child inherits Codex default model settings (available presets: {preset_names})." + ), ) parser.add_argument( "--run-main-model", @@ -1005,6 +1261,29 @@ def build_parser() -> argparse.ArgumentParser: default=True, help="Resume from last saved session_id when daemon starts a new idle run.", ) + parser.add_argument( + "--run-plan-mode", + default=PLAN_MODE_FULLY_PLAN, + choices=sorted(PLAN_MODES), + help="Plan mode: execute-only, fully-plan (default), or record-only.", + ) + parser.add_argument( + "--run-plan-request-delay-seconds", + type=int, + default=600, + help="In fully-plan mode, delay before generating next planner request after child completion.", + ) + parser.add_argument( + "--run-plan-auto-execute-delay-seconds", + type=int, + default=600, + help="In fully-plan mode, auto-execute planner request after this delay unless user overrides it.", + ) + parser.add_argument( + "--run-plan-record-file", + default=None, + help="Optional markdown table file path used by record-only mode. Defaults to logs_dir/plan-agent-records.md.", + ) parser.add_argument( "--run-no-dashboard", action="store_true", diff --git a/codex_autoloop/telegram_notifier.py b/codex_autoloop/telegram_notifier.py index dd2e519..7676221 100644 --- a/codex_autoloop/telegram_notifier.py +++ b/codex_autoloop/telegram_notifier.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import socket import threading import urllib.error import urllib.parse @@ -114,6 +115,12 @@ def _post_form(self, url: str, payload: dict[str, Any]) -> bool: except urllib.error.URLError as exc: self._emit_error(f"Telegram network error: {exc}") return False + except (TimeoutError, socket.timeout) as exc: + self._emit_error(f"Telegram network timeout: {exc}") + return False + except OSError as exc: + self._emit_error(f"Telegram network os error: {exc}") + return False try: parsed = json.loads(raw) @@ -159,6 +166,14 @@ def resolve_chat_id( if on_error: on_error(f"getUpdates network error: {exc}") return None + except (TimeoutError, socket.timeout) as exc: + if on_error: + on_error(f"getUpdates timeout: {exc}") + return None + except OSError as exc: + if on_error: + on_error(f"getUpdates os error: {exc}") + return None try: parsed = json.loads(raw) @@ -232,7 +247,8 @@ def format_event_message(event: dict[str, Any]) -> str: return ( f"[autoloop] main completed {now}\n" f"round={event.get('round_index')} exit={event.get('exit_code')} " - f"turn_completed={event.get('turn_completed')} turn_failed={event.get('turn_failed')}\n" + f"turn_completed={event.get('turn_completed')} turn_failed={event.get('turn_failed')} " + f"interrupted={event.get('interrupted')}\n" f"session_id={event.get('session_id')}\n" f"summary={last_message}" ) diff --git a/pyproject.toml b/pyproject.toml index eb6ceb8..6bd547f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,6 +11,7 @@ requires-python = ">=3.10" dependencies = [] [project.scripts] +codexloop = "codex_autoloop.codexloop:main" codex-autoloop = "codex_autoloop.cli:main" codex-autoloop-telegram-daemon = "codex_autoloop.telegram_daemon:main" codex-autoloop-daemon-ctl = "codex_autoloop.daemon_ctl:main" diff --git a/skills/autonomous-research-loop/SKILL.md b/skills/autonomous-research-loop/SKILL.md index e72b289..77badee 100644 --- a/skills/autonomous-research-loop/SKILL.md +++ b/skills/autonomous-research-loop/SKILL.md @@ -180,6 +180,7 @@ Default expectation: 1. If changes are valid and commit is complete, push immediately. 2. Treat timely push as the normal good path, not an optional extra. +3. If no remote is configured, explicitly notify the user to create/configure a remote manually before continuing. ## Step 6: Long-Running Monitoring Mode (24h style) diff --git a/tests/test_codex_runner.py b/tests/test_codex_runner.py index 07adb6b..ae5c972 100644 --- a/tests/test_codex_runner.py +++ b/tests/test_codex_runner.py @@ -29,3 +29,21 @@ def test_build_command_resume() -> None: assert command[:4] == ["codex", "exec", "resume", "--json"] assert "--output-schema" not in command assert command[-2:] == ["thread123", "continue"] + + +def test_run_exec_marks_nonzero_exit_without_turn_completion_as_failed(tmp_path) -> None: # type: ignore[no-untyped-def] + fake_codex = tmp_path / "fake-codex" + fake_codex.write_text("#!/bin/sh\nexit 17\n", encoding="utf-8") + fake_codex.chmod(0o755) + + runner = CodexRunner(codex_bin=str(fake_codex)) + result = runner.run_exec( + prompt="do work", + resume_thread_id=None, + options=RunnerOptions(), + ) + + assert result.exit_code == 17 + assert result.turn_completed is False + assert result.turn_failed is True + assert result.fatal_error == "Process exited with code 17 before turn completion." diff --git a/tests/test_codexloop.py b/tests/test_codexloop.py new file mode 100644 index 0000000..22f18b5 --- /dev/null +++ b/tests/test_codexloop.py @@ -0,0 +1,229 @@ +import sys +from pathlib import Path + +from codex_autoloop import codexloop + + +def test_parse_terminal_command_plain_text_routes_to_run_when_idle() -> None: + cmd = codexloop.parse_terminal_command("implement feature", running=False) + assert cmd is not None + assert cmd.kind == "run" + assert cmd.text == "implement feature" + + +def test_parse_terminal_command_plain_text_routes_to_inject_when_running() -> None: + cmd = codexloop.parse_terminal_command("fix failing tests first", running=True) + assert cmd is not None + assert cmd.kind == "inject" + assert cmd.text == "fix failing tests first" + + +def test_parse_terminal_command_explicit_commands() -> None: + run = codexloop.parse_terminal_command("/run build dashboard", running=True) + inject = codexloop.parse_terminal_command("/inject tweak prompt", running=False) + stop = codexloop.parse_terminal_command("/stop", running=False) + fresh = codexloop.parse_terminal_command("/fresh", running=False) + disable = codexloop.parse_terminal_command("/disable", running=False) + status = codexloop.parse_terminal_command("/status", running=False) + assert run is not None and run.kind == "run" and run.text == "build dashboard" + assert inject is not None and inject.kind == "inject" and inject.text == "tweak prompt" + assert stop is not None and stop.kind == "stop" + assert fresh is not None and fresh.kind == "fresh-session" + assert disable is not None and disable.kind == "daemon-stop" + assert status is not None and status.kind == "status" + + +def test_parse_terminal_command_rejects_empty_payload() -> None: + assert codexloop.parse_terminal_command("/run ", running=False) is None + assert codexloop.parse_terminal_command("/inject ", running=True) is None + + +def test_build_parser_supports_disable_subcommand() -> None: + parser = codexloop.build_parser() + args = parser.parse_args(["disable"]) + assert args.subcommand == "disable" + + +def test_build_parser_supports_fresh_subcommand() -> None: + parser = codexloop.build_parser() + args = parser.parse_args(["fresh"]) + assert args.subcommand == "fresh" + + +def test_build_parser_supports_help_subcommand() -> None: + parser = codexloop.build_parser() + args = parser.parse_args(["help"]) + assert args.subcommand == "help" + + +def test_build_parser_supports_init_subcommand() -> None: + parser = codexloop.build_parser() + args = parser.parse_args(["init"]) + assert args.subcommand == "init" + + +def test_supported_features_text_contains_core_commands() -> None: + text = codexloop.supported_features_text() + assert "codexloop help" in text + assert "codexloop disable" in text + assert "codexloop fresh" in text + assert "codexloop init" in text + assert "/disable" in text + assert "/fresh" in text + + +def test_main_help_does_not_require_codex_binary(monkeypatch, capsys) -> None: + monkeypatch.setattr(sys, "argv", ["codexloop", "help"]) + monkeypatch.setattr(codexloop.shutil, "which", lambda name: None) + codexloop.main() + captured = capsys.readouterr() + assert "codexloop supported features" in captured.out + + +def test_is_config_usable_requires_token_and_run_cd() -> None: + assert codexloop.is_config_usable({"telegram_bot_token": "123:abc", "run_cd": "."}) is True + assert codexloop.is_config_usable({"telegram_bot_token": "bad-token", "run_cd": "."}) is False + assert codexloop.is_config_usable({"telegram_bot_token": "123:abc", "run_cd": ""}) is False + + +def test_run_interactive_config_uses_passed_run_cd(monkeypatch, tmp_path: Path) -> None: + monkeypatch.setattr(codexloop, "prompt_token", lambda: "123:abc") + monkeypatch.setattr(codexloop, "prompt_chat_id", lambda: "auto") + monkeypatch.setattr(codexloop, "prompt_input", lambda prompt, default: "") + monkeypatch.setattr(codexloop, "prompt_model_choice", lambda: None) + monkeypatch.setattr(codexloop, "prompt_play_mode", lambda: codexloop.PLAY_MODES[1]) + config = codexloop.run_interactive_config(home_dir=tmp_path / ".codex_daemon", run_cd=tmp_path) + assert config["run_cd"] == str(tmp_path.resolve()) + assert config["run_model_preset"] is None + assert config["run_plan_mode"] == "fully-plan" + assert config["run_plan_request_delay_seconds"] == 600 + assert config["run_plan_auto_execute_delay_seconds"] == 600 + assert config["run_yolo"] is True + assert config["run_full_auto"] is False + + +def test_prompt_play_mode_selection(monkeypatch) -> None: + answers = iter(["3"]) + monkeypatch.setattr(codexloop, "prompt_input", lambda prompt, default: next(answers)) + mode = codexloop.prompt_play_mode() + assert mode.name == "record-only" + assert mode.run_plan_mode == "record-only" + + +def test_prompt_model_choice_selection(monkeypatch) -> None: + answers = iter(["1"]) + monkeypatch.setattr(codexloop, "prompt_input", lambda prompt, default: next(answers)) + model = codexloop.prompt_model_choice() + assert model == codexloop.MODEL_PRESETS[0].name + + +def test_prompt_model_choice_default_inherits_codex(monkeypatch) -> None: + monkeypatch.setattr(codexloop, "prompt_input", lambda prompt, default: default) + assert codexloop.prompt_model_choice() is None + + +def test_main_init_starts_background_without_attach(monkeypatch, tmp_path: Path, capsys) -> None: + home_dir = tmp_path / ".codex_daemon" + config = { + "telegram_bot_token": "123:abc", + "telegram_chat_id": "auto", + "run_cd": str(tmp_path), + "run_check": None, + "run_max_rounds": 500, + "run_skip_git_repo_check": False, + "run_full_auto": False, + "run_yolo": True, + "run_plan_mode": "fully-plan", + "run_plan_request_delay_seconds": 600, + "run_plan_auto_execute_delay_seconds": 600, + "run_resume_last_session": True, + "run_main_reasoning_effort": None, + "run_reviewer_reasoning_effort": None, + "run_main_model": None, + "run_reviewer_model": None, + "run_model_preset": "codex-xhigh", + "bus_dir": str(home_dir / "bus"), + "logs_dir": str(home_dir / "logs"), + } + + monkeypatch.setattr(sys, "argv", ["codexloop", "--home-dir", str(home_dir), "init"]) + monkeypatch.setattr(codexloop.shutil, "which", lambda name: "/usr/bin/codex") + monkeypatch.setattr(codexloop, "load_config", lambda path: None) + monkeypatch.setattr(codexloop, "run_interactive_config", lambda **kwargs: config) + monkeypatch.setattr(codexloop, "stop_all_codexloop_loops", lambda **kwargs: None) + monkeypatch.setattr(codexloop, "ensure_daemon_running", lambda **kwargs: 4321) + monkeypatch.setattr(codexloop, "run_monitor_console", lambda **kwargs: (_ for _ in ()).throw(AssertionError("attach should not run"))) + monkeypatch.setattr(codexloop, "save_config", lambda path, payload: None) + + codexloop.main() + captured = capsys.readouterr() + assert "Daemon running in background. pid=4321" in captured.out + assert "Use `codexloop` to attach monitor" in captured.out + + +def test_parse_pid_supports_int_and_digit_string() -> None: + assert codexloop.parse_pid(123) == 123 + assert codexloop.parse_pid("456") == 456 + assert codexloop.parse_pid("setup-probe") is None + assert codexloop.parse_pid(-1) is None + + +def test_build_daemon_command_uses_config(monkeypatch, tmp_path: Path) -> None: + monkeypatch.setattr(codexloop, "resolve_daemon_launch_prefix", lambda: ["daemon-bin"]) + home_dir = tmp_path / ".codex_daemon" + config = { + "telegram_bot_token": "123:abc", + "telegram_chat_id": "auto", + "run_cd": str(tmp_path), + "run_check": "pytest -q", + "run_max_rounds": 500, + "run_skip_git_repo_check": True, + "run_full_auto": False, + "run_yolo": True, + "run_plan_mode": "fully-plan", + "run_plan_request_delay_seconds": 600, + "run_plan_auto_execute_delay_seconds": 600, + "run_resume_last_session": True, + "run_model_preset": "quality", + "bus_dir": str(home_dir / "bus"), + "logs_dir": str(home_dir / "logs"), + } + cmd = codexloop.build_daemon_command( + config=config, + home_dir=home_dir, + token_lock_dir="/tmp/token-locks", + ) + assert cmd[0] == "daemon-bin" + assert "--telegram-bot-token" in cmd + assert "--run-max-rounds" in cmd + assert "--run-plan-mode" in cmd + assert "--run-plan-request-delay-seconds" in cmd + assert "--run-plan-auto-execute-delay-seconds" in cmd + assert "--run-check" in cmd + assert "--run-model-preset" in cmd + assert "--run-skip-git-repo-check" in cmd + assert "--run-yolo" in cmd + assert "--run-resume-last-session" in cmd + + +def test_build_daemon_command_forces_yolo(monkeypatch, tmp_path: Path) -> None: + monkeypatch.setattr(codexloop, "resolve_daemon_launch_prefix", lambda: ["daemon-bin"]) + home_dir = tmp_path / ".codex_daemon" + config = { + "telegram_bot_token": "123:abc", + "telegram_chat_id": "auto", + "run_cd": str(tmp_path), + "run_check": None, + "run_max_rounds": 500, + "run_skip_git_repo_check": False, + "run_full_auto": False, + "run_yolo": False, + "run_plan_mode": "execute-only", + "run_resume_last_session": True, + "run_model_preset": "quality", + "bus_dir": str(home_dir / "bus"), + "logs_dir": str(home_dir / "logs"), + } + cmd = codexloop.build_daemon_command(config=config, home_dir=home_dir, token_lock_dir="/tmp/token-locks") + assert "--run-yolo" in cmd + assert "--no-run-yolo" not in cmd diff --git a/tests/test_control_state.py b/tests/test_control_state.py index 6fc8263..333b740 100644 --- a/tests/test_control_state.py +++ b/tests/test_control_state.py @@ -39,3 +39,31 @@ def test_control_state_writes_markdown_doc(tmp_path) -> None: assert "Operator Messages" in content assert "initial goal" in content assert "fix test" in content + + +def test_control_state_loads_existing_messages_and_appends(tmp_path) -> None: + doc_path = tmp_path / "operator_messages.md" + doc_path.write_text( + "\n".join( + [ + "# Operator Messages", + "", + "Messages entered by user/operator channels (Telegram/terminal/initial objective).", + "", + "- `2026-03-12T00:00:00Z` `telegram` `inject`: keep prior context", + "", + ] + ), + encoding="utf-8", + ) + state = LoopControlState(operator_messages_file=str(doc_path)) + before = state.list_messages() + assert len(before) == 1 + assert "keep prior context" in before[0] + state.request_inject("new instruction", source="terminal") + after = state.list_messages() + assert len(after) == 2 + assert "new instruction" in after[-1] + content = doc_path.read_text(encoding="utf-8") + assert "keep prior context" in content + assert "new instruction" in content diff --git a/tests/test_orchestrator.py b/tests/test_orchestrator.py new file mode 100644 index 0000000..7c222e3 --- /dev/null +++ b/tests/test_orchestrator.py @@ -0,0 +1,195 @@ +from codex_autoloop.models import CodexRunResult +from codex_autoloop.models import ReviewDecision +from codex_autoloop.orchestrator import AutoLoopConfig, AutoLoopOrchestrator + + +class _InterruptingRunner: + def run_exec(self, **kwargs): # type: ignore[no-untyped-def] + return CodexRunResult( + command=["codex", "exec"], + exit_code=0, + thread_id="thread-1", + agent_messages=["still working"], + turn_completed=False, + turn_failed=True, + fatal_error="External interrupt: terminal requested instruction update", + ) + + +class _UnusedReviewer: + def evaluate(self, **kwargs): # type: ignore[no-untyped-def] + raise AssertionError("reviewer should not run for external interrupts") + + +def test_external_interrupt_is_not_persisted_as_failure() -> None: + events: list[dict] = [] + stop_checks = {"calls": 0} + + def should_stop() -> bool: + stop_checks["calls"] += 1 + return stop_checks["calls"] >= 2 + + orchestrator = AutoLoopOrchestrator( + runner=_InterruptingRunner(), # type: ignore[arg-type] + reviewer=_UnusedReviewer(), # type: ignore[arg-type] + config=AutoLoopConfig( + objective="continue work", + max_rounds=3, + stop_requested_checker=should_stop, + pending_instruction_consumer=lambda: "new instruction", + loop_event_callback=events.append, + ), + ) + + result = orchestrator.run() + + assert result.success is False + assert len(result.rounds) == 1 + round_summary = result.rounds[0] + assert round_summary.main_turn_failed is False + assert round_summary.review.reason.endswith("New operator instruction injected and will be applied.") + assert round_summary.review.next_action == "Apply injected operator instruction in next round." + + main_event = next(event for event in events if event.get("type") == "round.main.completed") + assert main_event["turn_failed"] is False + assert main_event["interrupted"] is True + + +def test_initial_prompt_uses_response_mode_for_greeting() -> None: + prompt = AutoLoopOrchestrator._initial_main_prompt("你好") + assert "Reply directly in the user's language." in prompt + assert "primary implementation agent" not in prompt + + +def test_initial_prompt_uses_response_mode_for_bug_question() -> None: + prompt = AutoLoopOrchestrator._initial_main_prompt("这是为啥?你分析一下bug问题所在") + assert "Reply directly in the user's language." in prompt + assert "Do not force code changes unless they are actually needed" in prompt + + +def test_initial_prompt_keeps_implementation_mode_for_fix_request() -> None: + prompt = AutoLoopOrchestrator._initial_main_prompt("修复 failing tests 并提交 commit") + assert "primary implementation agent" in prompt + assert "Complete the objective end-to-end" in prompt + + +class _SequenceRunner: + def __init__(self, outputs: list[CodexRunResult]) -> None: + self.outputs = outputs + self.calls: list[str | None] = [] + + def run_exec(self, **kwargs): # type: ignore[no-untyped-def] + self.calls.append(kwargs.get("resume_thread_id")) + if not self.outputs: + raise AssertionError("runner called more times than expected") + return self.outputs.pop(0) + + +class _DoneReviewer: + def evaluate(self, **kwargs): # type: ignore[no-untyped-def] + return ReviewDecision( + status="done", + confidence=1.0, + reason="completed", + next_action="none", + ) + + +class _ContinueReviewer: + def evaluate(self, **kwargs): # type: ignore[no-untyped-def] + return ReviewDecision( + status="continue", + confidence=0.9, + reason="retry", + next_action="retry", + ) + + +def test_invalid_encrypted_content_resets_session_and_retries_fresh() -> None: + events: list[dict] = [] + runner = _SequenceRunner( + outputs=[ + CodexRunResult( + command=["codex", "exec", "resume"], + exit_code=1, + thread_id="thread-old", + agent_messages=[], + turn_completed=False, + turn_failed=True, + fatal_error="invalid_encrypted_content", + ), + CodexRunResult( + command=["codex", "exec"], + exit_code=0, + thread_id="thread-new", + agent_messages=["done"], + turn_completed=True, + turn_failed=False, + fatal_error=None, + ), + ] + ) + orchestrator = AutoLoopOrchestrator( + runner=runner, # type: ignore[arg-type] + reviewer=_DoneReviewer(), # type: ignore[arg-type] + config=AutoLoopConfig( + objective="继续修复并提交", + initial_session_id="thread-old", + max_rounds=5, + loop_event_callback=events.append, + ), + ) + + result = orchestrator.run() + assert result.success is True + assert result.session_id == "thread-new" + assert runner.calls == ["thread-old", None] + reset_events = [item for item in events if item.get("type") == "round.session.reset"] + assert len(reset_events) == 1 + assert reset_events[0].get("previous_session_id") == "thread-old" + + +def test_no_progress_stops_when_repeated_empty_failures() -> None: + runner = _SequenceRunner( + outputs=[ + CodexRunResult( + command=["codex", "exec"], + exit_code=1, + thread_id=None, + agent_messages=[], + turn_completed=False, + turn_failed=True, + fatal_error="Process exited with code 1 before turn completion.", + ), + CodexRunResult( + command=["codex", "exec"], + exit_code=1, + thread_id=None, + agent_messages=[], + turn_completed=False, + turn_failed=True, + fatal_error="Process exited with code 1 before turn completion.", + ), + CodexRunResult( + command=["codex", "exec"], + exit_code=1, + thread_id=None, + agent_messages=[], + turn_completed=False, + turn_failed=True, + fatal_error="Process exited with code 1 before turn completion.", + ), + ] + ) + orchestrator = AutoLoopOrchestrator( + runner=runner, # type: ignore[arg-type] + reviewer=_ContinueReviewer(), # type: ignore[arg-type] + config=AutoLoopConfig( + objective="继续", + max_rounds=10, + max_no_progress_rounds=2, + ), + ) + result = orchestrator.run() + assert result.success is False + assert "no-progress" in result.stop_reason.lower() diff --git a/tests/test_reviewer.py b/tests/test_reviewer.py index bb495ee..6302571 100644 --- a/tests/test_reviewer.py +++ b/tests/test_reviewer.py @@ -1,4 +1,5 @@ -from codex_autoloop.reviewer import parse_decision_text +from codex_autoloop.codex_runner import CodexRunner +from codex_autoloop.reviewer import Reviewer, parse_decision_text def test_parse_decision_plain_json() -> None: @@ -22,3 +23,26 @@ def test_parse_decision_embedded_json() -> None: def test_parse_decision_invalid() -> None: decision = parse_decision_text("not json") assert decision is None + + +def test_build_prompt_includes_structured_main_run_facts() -> None: + reviewer = Reviewer(CodexRunner(codex_bin="codex")) + prompt = reviewer._build_prompt( + objective="say hello", + operator_messages=["用户只说了你好"], + round_index=1, + session_id="thread-1", + main_exit_code=0, + main_turn_completed=False, + main_turn_failed=False, + main_agent_message_count=1, + main_summary="你好!有什么我可以帮你的吗?", + main_error=None, + checks=[], + ) + + assert "Do not speculate about crashes or missing replies" in prompt + assert "Main agent exit code: 0" in prompt + assert "Main agent turn completed: false" in prompt + assert "Main agent turn failed: false" in prompt + assert "Main agent emitted agent messages: 1" in prompt diff --git a/tests/test_setup_wizard.py b/tests/test_setup_wizard.py index 52970ff..cf34d94 100644 --- a/tests/test_setup_wizard.py +++ b/tests/test_setup_wizard.py @@ -22,15 +22,29 @@ def test_stop_existing_daemon_no_pid_file(tmp_path: Path) -> None: def test_prompt_model_choice_default(monkeypatch) -> None: - monkeypatch.setattr(setup_wizard, "prompt_input", lambda prompt, default: "") - assert setup_wizard.prompt_model_choice() == "quality" + monkeypatch.setattr(setup_wizard, "prompt_input", lambda prompt, default: default) + assert setup_wizard.prompt_model_choice() is None -def test_prompt_planner_mode_choice_default(monkeypatch) -> None: - monkeypatch.setattr(setup_wizard, "prompt_input", lambda prompt, default: "") - assert setup_wizard.prompt_planner_mode_choice() == "auto" +def test_prompt_model_choice_retries(monkeypatch) -> None: + answers = iter(["abc", "99", "2"]) + monkeypatch.setattr(setup_wizard, "prompt_input", lambda prompt, default: next(answers)) + assert setup_wizard.prompt_model_choice() == setup_wizard.MODEL_PRESETS[1].name -def test_prompt_planner_mode_choice_select_record(monkeypatch) -> None: - monkeypatch.setattr(setup_wizard, "prompt_input", lambda prompt, default: "3") - assert setup_wizard.prompt_planner_mode_choice() == "record" +def test_prompt_reasoning_effort_retries(monkeypatch) -> None: + answers = iter(["wrong", "high"]) + monkeypatch.setattr(setup_wizard, "prompt_input", lambda prompt, default: next(answers)) + assert setup_wizard.prompt_reasoning_effort("x") == "high" + + +def test_prompt_chat_id_retries(monkeypatch) -> None: + answers = iter(["abc", "-100123"]) + monkeypatch.setattr(setup_wizard, "prompt_input", lambda prompt, default: next(answers)) + assert setup_wizard.prompt_chat_id() == "-100123" + + +def test_prompt_token_retries(monkeypatch) -> None: + answers = iter(["invalid", "123:secret"]) + monkeypatch.setattr(setup_wizard, "prompt_secret", lambda prompt: next(answers)) + assert setup_wizard.prompt_token() == "123:secret" diff --git a/tests/test_telegram_control.py b/tests/test_telegram_control.py index a6228a1..654cee4 100644 --- a/tests/test_telegram_control.py +++ b/tests/test_telegram_control.py @@ -74,6 +74,16 @@ def test_parse_stop_and_status() -> None: assert status is not None and status.kind == "status" +def test_parse_fresh_session_command() -> None: + cmd = parse_command_from_update( + update=_wrap("/fresh"), + expected_chat_id="100", + plain_text_as_inject=True, + ) + assert cmd is not None + assert cmd.kind == "fresh-session" + + def test_parse_daemon_stop() -> None: cmd = parse_command_from_update( update=_wrap("/daemon-stop"), diff --git a/tests/test_telegram_daemon.py b/tests/test_telegram_daemon.py index 8cb98dc..7289fcf 100644 --- a/tests/test_telegram_daemon.py +++ b/tests/test_telegram_daemon.py @@ -1,13 +1,22 @@ import json +import datetime as dt from argparse import Namespace from pathlib import Path from codex_autoloop.telegram_daemon import ( + PLAN_MODE_FULLY_PLAN, + append_plan_record_row, + build_parser, build_child_command, - build_modified_follow_up_objective, - format_countdown, - resolve_plan_follow_up, + build_plan_request, + format_status, + is_force_fresh_session_requested, + log_contains_invalid_encrypted_content, + normalize_plan_mode, + resolve_last_session_id_from_archive, + resolve_resume_session_id, resolve_saved_session_id, + set_force_fresh_session_marker, ) @@ -55,6 +64,7 @@ def test_build_child_command_includes_core_args() -> None: assert cmd[0] == "codex-autoloop" assert "--telegram-bot-token" in cmd assert "--telegram-chat-id" in cmd + assert "--no-telegram-control" in cmd assert "--control-file" in cmd assert "--operator-messages-file" in cmd assert "--main-model" in cmd @@ -84,36 +94,114 @@ def test_resolve_saved_session_id(tmp_path: Path) -> None: assert resolve_saved_session_id(str(state_file)) == "thread-abc" -def test_resolve_plan_follow_up(tmp_path: Path) -> None: +def test_resolve_last_session_id_from_archive_prefers_latest_finished_row(tmp_path: Path) -> None: + archive_file = tmp_path / "codexloop-run-archive.jsonl" + rows = [ + {"event": "run.started", "resume_session_id": "thread-old"}, + {"event": "run.finished", "session_id": "thread-new"}, + ] + with archive_file.open("w", encoding="utf-8") as f: + for row in rows: + f.write(json.dumps(row, ensure_ascii=True) + "\n") + assert resolve_last_session_id_from_archive(archive_file) == "thread-new" + + +def test_resolve_resume_session_id_falls_back_to_archive(tmp_path: Path) -> None: + archive_file = tmp_path / "codexloop-run-archive.jsonl" + archive_file.write_text(json.dumps({"session_id": "thread-archive"}) + "\n", encoding="utf-8") + assert resolve_resume_session_id(str(tmp_path / "missing-state.json"), archive_file) == "thread-archive" + + +def test_resolve_resume_session_id_prefers_state_over_archive(tmp_path: Path) -> None: + state_file = tmp_path / "last_state.json" + archive_file = tmp_path / "codexloop-run-archive.jsonl" + state_file.write_text(json.dumps({"session_id": "thread-state"}), encoding="utf-8") + archive_file.write_text(json.dumps({"session_id": "thread-archive"}) + "\n", encoding="utf-8") + assert resolve_resume_session_id(str(state_file), archive_file) == "thread-state" + + +def test_set_force_fresh_session_marker_blocks_resume(tmp_path: Path) -> None: state_file = tmp_path / "last_state.json" - report_file = tmp_path / "plan.md" - report_file.write_text("# Planning Snapshot\nnext", encoding="utf-8") - state_file.write_text( - json.dumps( + archive_file = tmp_path / "archive.jsonl" + state_file.write_text(json.dumps({"session_id": "thread-state"}), encoding="utf-8") + archive_file.write_text(json.dumps({"session_id": "thread-archive"}) + "\n", encoding="utf-8") + assert resolve_resume_session_id(str(state_file), archive_file) == "thread-state" + changed = set_force_fresh_session_marker(str(state_file), enabled=True, reason="test") + assert changed is True + assert is_force_fresh_session_requested(str(state_file)) is True + assert resolve_resume_session_id(str(state_file), archive_file) is None + set_force_fresh_session_marker(str(state_file), enabled=False) + assert is_force_fresh_session_requested(str(state_file)) is False + assert resolve_resume_session_id(str(state_file), archive_file) == "thread-archive" + + +def test_log_contains_invalid_encrypted_content(tmp_path: Path) -> None: + log_file = tmp_path / "run.log" + log_file.write_text("some error: Invalid Encrypted Content happened\n", encoding="utf-8") + assert log_contains_invalid_encrypted_content(log_file) is True + other_log = tmp_path / "ok.log" + other_log.write_text("normal output\n", encoding="utf-8") + assert log_contains_invalid_encrypted_content(other_log) is False + + +def test_normalize_plan_mode_defaults_to_fully_plan() -> None: + assert normalize_plan_mode(None) == PLAN_MODE_FULLY_PLAN + assert normalize_plan_mode("unknown") == PLAN_MODE_FULLY_PLAN + assert normalize_plan_mode("execute-only") == "execute-only" + + +def test_build_plan_request_uses_review_guidance() -> None: + state_payload = { + "rounds": [ { - "plan": { - "plan_id": "plan-123", - "suggested_next_objective": "benchmark pipeline", - "should_propose_follow_up": True, - "report_markdown": "# fallback", + "review": { + "status": "continue", + "reason": "tests still failing", + "next_action": "fix failing tests and rerun pytest", } } - ), - encoding="utf-8", + ] + } + request = build_plan_request(objective="完成接口重构", exit_code=2, state_payload=state_payload) + assert "完成接口重构" in request + assert "fix failing tests" in request + assert "失败原因" in request + + +def test_append_plan_record_row_writes_markdown_table(tmp_path: Path) -> None: + record = tmp_path / "plan-records.md" + state_payload = {"session_id": "thread-1", "rounds": [{"review": {"status": "continue", "reason": "x"}}]} + append_plan_record_row( + path=record, + finished_at=dt.datetime(2026, 1, 1, 0, 0, 0), + objective="do work", + exit_code=0, + state_payload=state_payload, + log_path=tmp_path / "run.log", ) - follow_up = resolve_plan_follow_up(str(state_file), report_file, 3600) - assert follow_up is not None - assert follow_up.plan_id == "plan-123" - assert follow_up.objective == "benchmark pipeline" - assert "Planning Snapshot" in follow_up.report_markdown - assert follow_up.auto_execute_at is not None - - -def test_format_countdown_and_build_modified_follow_up_objective() -> None: - assert format_countdown(3661) == "1h 1m" - objective = build_modified_follow_up_objective( - base_objective="benchmark pipeline", - user_text="focus on Telegram follow-up flow first", + text = record.read_text(encoding="utf-8") + assert "| finished_at | objective | exit_code |" in text + assert "do work" in text + assert "thread-1" in text + + +def test_format_status_includes_plan_fields_when_idle() -> None: + rendered = format_status( + child=None, + child_objective=None, + child_log_path=None, + child_started_at=None, + last_session_id="thread-1", + plan_mode="fully-plan", + pending_plan_request="继续推进目标", + pending_plan_auto_execute_at=dt.datetime(2026, 1, 1, 0, 10, 0), + scheduled_plan_request_at=dt.datetime(2026, 1, 1, 0, 0, 0), ) - assert "benchmark pipeline" in objective - assert "Telegram follow-up flow first" in objective + assert "plan_mode=fully-plan" in rendered + assert "pending_plan_request=继续推进目标" in rendered + assert "plan_auto_execute_at=" in rendered + + +def test_build_parser_default_run_model_preset_is_none() -> None: + args = build_parser().parse_args(["--telegram-bot-token", "123:abc"]) + assert args.run_model_preset is None diff --git a/tests/test_telegram_notifier.py b/tests/test_telegram_notifier.py index 88928dd..54db877 100644 --- a/tests/test_telegram_notifier.py +++ b/tests/test_telegram_notifier.py @@ -1,3 +1,5 @@ +import urllib.error + from codex_autoloop.cli import looks_like_bot_token from codex_autoloop.telegram_notifier import ( TelegramConfig, @@ -74,3 +76,43 @@ def test_typing_disabled_does_not_start_thread() -> None: def test_extract_chat_id_from_message_update() -> None: update = {"message": {"chat": {"id": 12345}}} assert extract_chat_id_from_update(update) == "12345" + + +def test_send_message_timeout_does_not_raise(monkeypatch) -> None: + notifier = TelegramNotifier( + TelegramConfig( + bot_token="123456:ABCDEFGHIJK", + chat_id="1", + events=set(), + ) + ) + errors: list[str] = [] + notifier.on_error = errors.append + + def fake_urlopen(*args, **kwargs): # type: ignore[no-untyped-def] + raise TimeoutError("read timed out") + + monkeypatch.setattr("urllib.request.urlopen", fake_urlopen) + notifier.send_message("hello") + assert errors + assert "timeout" in errors[-1].lower() + + +def test_send_message_urlerror_does_not_raise(monkeypatch) -> None: + notifier = TelegramNotifier( + TelegramConfig( + bot_token="123456:ABCDEFGHIJK", + chat_id="1", + events=set(), + ) + ) + errors: list[str] = [] + notifier.on_error = errors.append + + def fake_urlopen(*args, **kwargs): # type: ignore[no-untyped-def] + raise urllib.error.URLError("network down") + + monkeypatch.setattr("urllib.request.urlopen", fake_urlopen) + notifier.send_message("hello") + assert errors + assert "network" in errors[-1].lower()