From 5b1a839988e48f7e097b840513a6be555f48ea16 Mon Sep 17 00:00:00 2001 From: haosenwang1018 Date: Thu, 9 Apr 2026 21:16:57 +0000 Subject: [PATCH] fix: clarify missing reme launcher env --- launcher.py | 264 ++++++++++++++++++++++++++--------------- tests/test_launcher.py | 19 +++ 2 files changed, 189 insertions(+), 94 deletions(-) create mode 100644 tests/test_launcher.py diff --git a/launcher.py b/launcher.py index 9d8cb8a9..906e17f1 100644 --- a/launcher.py +++ b/launcher.py @@ -11,7 +11,7 @@ from agentevolver.utils.daemon import LaunchCommandWhenAbsent load_dotenv() -BACK_TARGETS = os.environ.get('BACK_TARGETS', './config,./agentevolver').split(',') +BACK_TARGETS = os.environ.get("BACK_TARGETS", "./config,./agentevolver").split(",") def _replace_placeholder_in_config(config_obj, placeholder: str, replacement: str): @@ -33,86 +33,106 @@ def _walk(node): return _walk(config_obj) + def parse_args(): - parser = argparse.ArgumentParser(description='The launcher of agentevolver.') + parser = argparse.ArgumentParser(description="The launcher of agentevolver.") parser.add_argument( - '--target', + "--target", type=str, - default='agentevolver.main_ppo', + default="agentevolver.main_ppo", required=False, - help='Target script to run (default: agentevolver.main_ppo)' + help="Target script to run (default: agentevolver.main_ppo)", ) - parser.add_argument('--conf', + parser.add_argument( + "--conf", type=str, default="", required=False, - help='Path to configuration file' + help="Path to configuration file", ) - parser.add_argument('--db', - type=str, - default="", - required=False, - help='Path to configuration file' + parser.add_argument( + "--db", type=str, default="", required=False, help="Path to configuration file" ) - parser.add_argument('--with-appworld', - action='store_true', # Changed from store_true to action='store_true' + parser.add_argument( + "--with-appworld", + action="store_true", # Changed from store_true to action='store_true' default=False, - help='Launch appworld' + help="Launch appworld", ) - parser.add_argument('--with-webshop', - action='store_true', # Changed from store_true to action='store_true' + parser.add_argument( + "--with-webshop", + action="store_true", # Changed from store_true to action='store_true' default=False, - help='Launch webshop' + help="Launch webshop", ) - parser.add_argument('--with-bfcl', - action='store_true', - default=False, - help='Launch bfcl' + parser.add_argument( + "--with-bfcl", action="store_true", default=False, help="Launch bfcl" ) - parser.add_argument('--with-reme', - action='store_true', - default=False, - help='Launch ReMe' + parser.add_argument( + "--with-reme", action="store_true", default=False, help="Launch ReMe" ) - parser.add_argument('--with-logview', - action='store_true', # Changed from store_true to action='store_true' + parser.add_argument( + "--with-logview", + action="store_true", # Changed from store_true to action='store_true' default=False, - help='Launch logview' + help="Launch logview", ) - parser.add_argument('--with-crafters', - action='store_true', # Changed from store_true to action='store_true' + parser.add_argument( + "--with-crafters", + action="store_true", # Changed from store_true to action='store_true' default=False, - help='Launch Crafters Env Simulation' + help="Launch Crafters Env Simulation", ) - parser.add_argument('--reboot', - action='store_true', # Changed from store_true to action='store_true' + parser.add_argument( + "--reboot", + action="store_true", # Changed from store_true to action='store_true' default=False, - help='reboot flag' + help="reboot flag", ) - parser.add_argument('-k', '--kill', '--python-killer', - dest='python_killer', - action='store_true', + parser.add_argument( + "-k", + "--kill", + "--python-killer", + dest="python_killer", + action="store_true", default=False, - help='Kill existing ray and python processes (excluding vscode and current process) before starting') + help="Kill existing ray and python processes (excluding vscode and current process) before starting", + ) return parser.parse_args() def pty_launch(service_name: str, success_std_string="Starting server on"): - service_path = os.environ.get(f'{service_name.upper()}_PATH') - service_script = os.environ.get(f'{service_name.upper()}_SCRIPT') + service_path = os.environ.get(f"{service_name.upper()}_PATH") + service_script = os.environ.get(f"{service_name.upper()}_SCRIPT") + missing = [ + name + for name, value in ( + (f"{service_name.upper()}_PATH", service_path), + (f"{service_name.upper()}_SCRIPT", service_script), + ) + if not value + ] + if missing: + raise RuntimeError( + f"Missing launcher environment variable(s) for {service_name}: {', '.join(missing)}. " + f"Define them in your shell or .env before using --with-{service_name}." + ) companion = LaunchCommandWhenAbsent( full_argument_list=[service_script], dir=service_path, tag="appworld_env_service", - use_pty=True + use_pty=True, ) companion.launch( launch_wait_time=1800, success_std_string=success_std_string, ) -def _fast_kill_by_keyword_bash(keyword: str, exclude_substrings=None, grace_seconds: float = 1.0): + +def _fast_kill_by_keyword_bash( + keyword: str, exclude_substrings=None, grace_seconds: float = 1.0 +): """Use bash pipelines to kill processes matching keyword quickly. - Filters out processes containing any exclude_substrings @@ -127,14 +147,18 @@ def _fast_kill_by_keyword_bash(keyword: str, exclude_substrings=None, grace_seco # Build a fast PID collector using pgrep if available; fallback to ps/grep # We prefer pgrep -af to filter by full command and then extract PID (column 1) - exclude_filters = " ".join([f"| grep -v -F {shlex.quote(s)}" for s in exclude_substrings]) + exclude_filters = " ".join( + [f"| grep -v -F {shlex.quote(s)}" for s in exclude_substrings] + ) pid_list_cmd = ( f"(pgrep -af -- {shlex.quote(keyword)} 2>/dev/null || true) " f"{exclude_filters} | awk '{{print $1}}' | grep -v -x {self_pid} || true" ) try: - res = subprocess.run(["bash", "-lc", pid_list_cmd], capture_output=True, text=True, check=False) + res = subprocess.run( + ["bash", "-lc", pid_list_cmd], capture_output=True, text=True, check=False + ) pids = [pid for pid in res.stdout.split() if pid.isdigit()] except Exception as e: print(f"Failed to list PIDs via bash: {e}") @@ -147,7 +171,9 @@ def _fast_kill_by_keyword_bash(keyword: str, exclude_substrings=None, grace_seco f"{exclude_filters} | awk '{{print $1}}' | grep -v -x {self_pid} || true" ) try: - res2 = subprocess.run(["bash", "-lc", ps_pid_cmd], capture_output=True, text=True, check=False) + res2 = subprocess.run( + ["bash", "-lc", ps_pid_cmd], capture_output=True, text=True, check=False + ) pids = [pid for pid in res2.stdout.split() if pid.isdigit()] except Exception as e: print(f"Failed to list PIDs via ps/grep: {e}") @@ -159,16 +185,25 @@ def _fast_kill_by_keyword_bash(keyword: str, exclude_substrings=None, grace_seco pid_args = " ".join(pids) try: # Send TERM to all in one call - subprocess.run(["bash", "-lc", f"kill -TERM -- {pid_args} 2>/dev/null || true"], check=False) + subprocess.run( + ["bash", "-lc", f"kill -TERM -- {pid_args} 2>/dev/null || true"], + check=False, + ) time.sleep(grace_seconds) # Escalate with KILL once; ignore failures for already-exited PIDs - subprocess.run(["bash", "-lc", f"kill -KILL -- {pid_args} 2>/dev/null || true"], check=False) + subprocess.run( + ["bash", "-lc", f"kill -KILL -- {pid_args} 2>/dev/null || true"], + check=False, + ) except Exception as e: print(f"Error issuing kill commands: {e}") return [int(p) for p in pids] -def _kill_processes_by_keyword(keyword: str, exclude_substrings=None, grace_seconds: float = 1.0): + +def _kill_processes_by_keyword( + keyword: str, exclude_substrings=None, grace_seconds: float = 1.0 +): """Kill processes whose command line contains the given keyword. - Excludes commands containing any of exclude_substrings @@ -180,7 +215,12 @@ def _kill_processes_by_keyword(keyword: str, exclude_substrings=None, grace_seco try: # Use ps to get PID and full command - ps = subprocess.run(["ps", "-eo", "pid,command", "-ww"], capture_output=True, text=True, check=True) + ps = subprocess.run( + ["ps", "-eo", "pid,command", "-ww"], + capture_output=True, + text=True, + check=True, + ) lines = ps.stdout.strip().splitlines() except Exception as e: print(f"Failed to list processes: {e}") @@ -235,45 +275,57 @@ def _kill_processes_by_keyword(keyword: str, exclude_substrings=None, grace_seco return killed + def main(): args = parse_args() # Optionally kill existing processes before starting - if getattr(args, 'python_killer', False): - print("--python-killer enabled: fast killing via bash for 'ray' and 'python' (excluding 'vscode' and current process)") - killed_ray = _fast_kill_by_keyword_bash("ray", exclude_substrings=["vscode"], grace_seconds=0.8) - killed_py = _fast_kill_by_keyword_bash("python", exclude_substrings=["vscode"], grace_seconds=0.8) - print(f"Targeted {len(killed_ray)} ray-related processes and {len(killed_py)} python processes.") + if getattr(args, "python_killer", False): + print( + "--python-killer enabled: fast killing via bash for 'ray' and 'python' (excluding 'vscode' and current process)" + ) + killed_ray = _fast_kill_by_keyword_bash( + "ray", exclude_substrings=["vscode"], grace_seconds=0.8 + ) + killed_py = _fast_kill_by_keyword_bash( + "python", exclude_substrings=["vscode"], grace_seconds=0.8 + ) + print( + f"Targeted {len(killed_ray)} ray-related processes and {len(killed_py)} python processes." + ) # small pause to allow system to release resources (ports, files) time.sleep(0.5) if args.conf: yaml_path = args.conf - assert yaml_path.endswith('.yaml'), "Configuration file must be a YAML file" + assert yaml_path.endswith(".yaml"), "Configuration file must be a YAML file" exp_base = os.path.dirname(args.conf) if os.path.exists(exp_base): - ## 0. read yaml (get trainer.experiment_name) import yaml - with open(yaml_path, 'r') as file: + + with open(yaml_path, "r") as file: config = yaml.safe_load(file) - exp_name = config.get('trainer').get('experiment_name') - if exp_name is None or exp_name == 'read_yaml_name': - if exp_name is not None: exp_name = exp_name.replace('|', '-') - exp_name = os.path.basename(yaml_path).replace('.yaml', '') + exp_name = config.get("trainer").get("experiment_name") + if exp_name is None or exp_name == "read_yaml_name": + if exp_name is not None: + exp_name = exp_name.replace("|", "-") + exp_name = os.path.basename(yaml_path).replace(".yaml", "") else: - exp_name = exp_name.replace('|', '-') + exp_name = exp_name.replace("|", "-") - print('----------------------------------------') - backup_dir = os.path.join('launcher_record', exp_name, 'backup') - yaml_backup_dst = os.path.join('launcher_record', exp_name, 'yaml_backup.yaml') + print("----------------------------------------") + backup_dir = os.path.join("launcher_record", exp_name, "backup") + yaml_backup_dst = os.path.join( + "launcher_record", exp_name, "yaml_backup.yaml" + ) exe_yaml_path = yaml_backup_dst exe_exp_base = os.path.dirname(yaml_backup_dst) - print('Experiment Name:', exp_name) - print('Experiment Backup Dir:', backup_dir) - print('Experiment Yaml Dir:', yaml_backup_dst) - print('----------------------------------------') + print("Experiment Name:", exp_name) + print("Experiment Backup Dir:", backup_dir) + print("Experiment Yaml Dir:", yaml_backup_dst) + print("----------------------------------------") time.sleep(2) ## 1. check exp_base/backup exist @@ -282,13 +334,23 @@ def main(): else: total_seconds = 10 for i in range(total_seconds): - print(f"\rWarning: backup directory already exists, we will automatically ignore this after {total_seconds - i} seconds...", end="", flush=True) + print( + f"\rWarning: backup directory already exists, we will automatically ignore this after {total_seconds - i} seconds...", + end="", + flush=True, + ) time.sleep(1) ## 2. copy files to back up for backup_target in BACK_TARGETS: - print(f"Copying {backup_target} to {os.path.join(backup_dir, os.path.basename(backup_target))}") - shutil.copytree(backup_target, os.path.join(backup_dir, os.path.basename(backup_target)), dirs_exist_ok=True) + print( + f"Copying {backup_target} to {os.path.join(backup_dir, os.path.basename(backup_target))}" + ) + shutil.copytree( + backup_target, + os.path.join(backup_dir, os.path.basename(backup_target)), + dirs_exist_ok=True, + ) ## 3. copy yaml to back up yaml_backup_src = yaml_path @@ -297,9 +359,9 @@ def main(): ## 4. edit new yaml yaml_path = yaml_backup_dst # now, replace the trainer.experiment_name - with open(yaml_path, 'r') as file: + with open(yaml_path, "r") as file: config = yaml.safe_load(file) - config['trainer']['experiment_name'] = exp_name + config["trainer"]["experiment_name"] = exp_name # scan all config item in config recursively, find string "${trainer.experiment_name}", replace with `exp_name` config = _replace_placeholder_in_config( config, @@ -308,7 +370,7 @@ def main(): ) # replace all - with open(yaml_path, 'w') as file: + with open(yaml_path, "w") as file: yaml.dump(config, file) else: @@ -318,7 +380,7 @@ def main(): if args.db: env["RAY_DEBUG_POST_MORTEM"] = "1" env["DEBUG_TAGS"] = args.db - env["RAY_record_task_actor_creation_sites"] = "true" + env["RAY_record_task_actor_creation_sites"] = "true" print("Debug mode is ON") else: print("Debug mode is OFF") @@ -343,48 +405,61 @@ def main(): pty_launch("bfcl") if args.with_logview: - companion = LaunchCommandWhenAbsent( full_argument_list=[ sys.executable, - '-m', - 'web_display.start_web', + "-m", + "web_display.start_web", ], - dir='./', - tag="logview" + dir="./", + tag="logview", + ) + companion.launch( + launch_wait_time=1800, success_std_string="Uvicorn running on", env_dict={} ) - companion.launch(launch_wait_time=1800,success_std_string="Uvicorn running on", env_dict={}) time.sleep(2.5) try: import webbrowser from datetime import datetime - final_log_path = os.path.join( "experiments", exp_name, "trace_rollout", datetime.now().strftime("%Y_%m_%d_%H_%M")) + + final_log_path = os.path.join( + "experiments", + exp_name, + "trace_rollout", + datetime.now().strftime("%Y_%m_%d_%H_%M"), + ) # make dir os.makedirs(final_log_path) - webbrowser.open("http://127.0.0.1:8181/"+"?path="+os.path.abspath(final_log_path)) - except: + webbrowser.open( + "http://127.0.0.1:8181/" + "?path=" + os.path.abspath(final_log_path) + ) + except Exception: pass if args.conf: # let's begin the training process cmd = [ sys.executable, - '-m', + "-m", args.target, - '--config-path', + "--config-path", os.path.abspath(exe_exp_base), - '--config-name', + "--config-name", os.path.basename(exe_yaml_path), ] if args.with_logview: - env.update({ - 'BEST_LOGGER_WEB_SERVICE_URL': os.environ.get('BEST_LOGGER_WEB_SERVICE_URL', 'http://127.0.0.1:8181/') - }) + env.update( + { + "BEST_LOGGER_WEB_SERVICE_URL": os.environ.get( + "BEST_LOGGER_WEB_SERVICE_URL", "http://127.0.0.1:8181/" + ) + } + ) try: print(f"Running command: {' '.join(cmd)}") - subprocess.run(cmd, check=True, cwd=os.path.abspath('./'), env=env) + subprocess.run(cmd, check=True, cwd=os.path.abspath("./"), env=env) except subprocess.CalledProcessError as e: print(f"Error running subprocess: {e}") sys.exit(1) @@ -392,5 +467,6 @@ def main(): print(f"Unexpected error: {e}") sys.exit(1) + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/tests/test_launcher.py b/tests/test_launcher.py new file mode 100644 index 00000000..e985ac2d --- /dev/null +++ b/tests/test_launcher.py @@ -0,0 +1,19 @@ +import pytest + +import launcher + + +def test_pty_launch_requires_reme_env_vars(monkeypatch): + monkeypatch.delenv("REME_PATH", raising=False) + monkeypatch.delenv("REME_SCRIPT", raising=False) + + with pytest.raises(RuntimeError, match="REME_PATH, REME_SCRIPT"): + launcher.pty_launch("reme") + + +def test_pty_launch_requires_single_missing_env_var(monkeypatch): + monkeypatch.setenv("REME_PATH", "/tmp/reme") + monkeypatch.delenv("REME_SCRIPT", raising=False) + + with pytest.raises(RuntimeError, match="REME_SCRIPT"): + launcher.pty_launch("reme")