From ae389d6891c13115fe15d6e32598819eca1c991c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 25 Dec 2025 00:27:10 +0000 Subject: [PATCH 1/2] Initial plan From 6a9f2f1fb29e09252e1df5afd1cb12e16fca80c2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 25 Dec 2025 00:31:28 +0000 Subject: [PATCH 2/2] Fix indentation and path inconsistencies per review comments Co-authored-by: TerrenceZhangX <39916879+TerrenceZhangX@users.noreply.github.com> --- scripts/run_profile.py | 363 ++++++++++++++++++++------------------- simulator/base_parser.py | 2 +- 2 files changed, 184 insertions(+), 181 deletions(-) diff --git a/scripts/run_profile.py b/scripts/run_profile.py index b3bfc43..0526fa2 100644 --- a/scripts/run_profile.py +++ b/scripts/run_profile.py @@ -24,195 +24,198 @@ def wait_for_port(host: str, port: int, timeout: int = 600) -> bool: - """Wait until a TCP port becomes reachable.""" - deadline = time.time() + timeout - while time.time() < deadline: - try: - with socket.create_connection((host, port), timeout=2): - return True - except Exception: - time.sleep(1) - return False + """Wait until a TCP port becomes reachable.""" + deadline = time.time() + timeout + while time.time() < deadline: + try: + with socket.create_connection((host, port), timeout=2): + return True + except Exception: + time.sleep(1) + return False def clean_dir(path: str) -> None: - """Clean or create a directory.""" - if os.path.exists(path): - for name in os.listdir(path): - fp = os.path.join(path, name) - if os.path.isfile(fp) or os.path.islink(fp): - os.unlink(fp) - elif os.path.isdir(fp): - shutil.rmtree(fp) - else: - os.makedirs(path, exist_ok=True) + """Clean or create a directory.""" + if os.path.exists(path): + for name in os.listdir(path): + fp = os.path.join(path, name) + if os.path.isfile(fp) or os.path.islink(fp): + os.unlink(fp) + elif os.path.isdir(fp): + shutil.rmtree(fp) + else: + os.makedirs(path, exist_ok=True) def parse_args(argv: Optional[list] = None) -> argparse.Namespace: - p = argparse.ArgumentParser(description="Run sglang profiling workload") - - p.add_argument( - "--profile-dir", - default="/flowsim/server_profile", - help="Directory where profiler traces (.trace.json.gz) will be written", - ) - p.add_argument( - "--log-dir", - default="/flowsim/tests/test-artifacts", - help="Directory to write server/client logs", - ) - p.add_argument( - "--server-opts", - required=True, - help=( - "All options for sglang.launch_server (include --host, --port, --model-path, --tp, etc). " - "Example: '--model-path /path --tp 1 --host 0.0.0.0 --port 30001 --disable-cuda-graph'" - ), - ) - p.add_argument( - "--bench-opts", - required=True, - help=( - "All options for bench_serving.py (include --backend, --host, --port, --dataset-name, --profile, etc). " - "Example: '--backend sglang --host 0.0.0.0 --port 30001 --dataset-name defined-len --num-prompts 16 --profile'" - ), - ) - p.add_argument( - "--bench-timeout", - type=int, - default=1200, - help="Timeout in seconds for bench_serving.py", - ) - - return p.parse_args(argv) + p = argparse.ArgumentParser(description="Run sglang profiling workload") + + p.add_argument( + "--profile-dir", + default="/flowsim/server_profile", + help="Directory where profiler traces (.trace.json.gz) will be written", + ) + p.add_argument( + "--log-dir", + default="/flowsim/tests/test-artifacts", + help="Directory to write server/client logs", + ) + p.add_argument( + "--server-opts", + required=True, + help=( + "All options for sglang.launch_server (include --host, --port, --model-path, --tp, etc). " + "Example: '--model-path /path --tp 1 --host 0.0.0.0 --port 30001 --disable-cuda-graph'" + ), + ) + p.add_argument( + "--bench-opts", + required=True, + help=( + "All options for bench_serving.py (include --backend, --host, --port, --dataset-name, --profile, etc). " + "Example: '--backend sglang --host 0.0.0.0 --port 30001 --dataset-name defined-len --num-prompts 16 --profile'" + ), + ) + p.add_argument( + "--bench-timeout", + type=int, + default=1200, + help="Timeout in seconds for bench_serving.py", + ) + + return p.parse_args(argv) def main(argv: Optional[list] = None) -> int: - args = parse_args(argv) - - profile_dir = args.profile_dir - log_dir = args.log_dir - - clean_dir(profile_dir) - os.makedirs(log_dir, exist_ok=True) - - ts = int(time.time()) - server_stdout_path = os.path.join(log_dir, f"server_{ts}.stdout.log") - server_stderr_path = os.path.join(log_dir, f"server_{ts}.stderr.log") - server_stdout_f = open(server_stdout_path, "w", encoding="utf-8") - server_stderr_f = open(server_stderr_path, "w", encoding="utf-8") - - # Set profiling environment variables - env = os.environ.copy() - env["SGLANG_TORCH_PROFILER_DIR"] = profile_dir - env["SGLANG_PROFILE_KERNELS"] = "1" - env["SGLANG_PROFILE_DEBUG"] = "1" - env["SGLANG_SET_CPU_AFFINITY"] = "1" - - # Extract host and port from server-opts for connection check - server_args = shlex.split(args.server_opts) - host = "0.0.0.0" - port = 30001 - try: - if "--host" in server_args: - host = server_args[server_args.index("--host") + 1] - if "--port" in server_args: - port = int(server_args[server_args.index("--port") + 1]) - except (ValueError, IndexError): - pass - - # Build server command - launch_cmd = [ - sys.executable, - "-m", - "sglang.launch_server", - ] + server_args - - print("[INFO] Starting sglang server:", " ".join(launch_cmd), flush=True) - preexec = getattr(os, "setsid", None) - server_proc = subprocess.Popen( - launch_cmd, - cwd="/flowsim/workload/framework/sglang/python", - stdout=server_stdout_f, - stderr=server_stderr_f, - preexec_fn=preexec, - env=env, - ) - - try: - if not wait_for_port(host, port, timeout=600): - print("[ERROR] Server did not start within timeout", file=sys.stderr) - return 1 - - script = os.path.abspath( - "/flowsim/workload/framework/sglang/python/sglang/bench_serving.py" - ) - - bench_args = shlex.split(args.bench_opts) - client_args = [sys.executable, script] + bench_args - - print("[INFO] Running bench_serving:", " ".join(client_args), flush=True) - result = subprocess.run( - client_args, - capture_output=True, - text=True, - env=env, - timeout=args.bench_timeout, - ) - - ts2 = int(time.time()) - prefix = f"bench_serving_{ts2}" - client_stdout_path = os.path.join(log_dir, prefix + ".stdout.log") - client_stderr_path = os.path.join(log_dir, prefix + ".stderr.log") - with open(client_stdout_path, "w", encoding="utf-8") as f_out: - f_out.write(result.stdout) - with open(client_stderr_path, "w", encoding="utf-8") as f_err: - f_err.write(result.stderr) - - if result.returncode != 0: - print( - f"[ERROR] bench_serving exited with code {result.returncode}", - file=sys.stderr, - ) - return result.returncode - - files = os.listdir(profile_dir) - json_gz_files = [f for f in files if f.endswith(".trace.json.gz")] - if not json_gz_files: - print( - f"[ERROR] No .trace.json.gz files found in {profile_dir}", - file=sys.stderr, - ) - return 1 - - print( - f"[INFO] Profiling complete, found {len(json_gz_files)} trace file(s) in {profile_dir}", - flush=True, - ) - return 0 - finally: - try: - if server_proc.poll() is None: - try: - os.killpg(os.getpgid(server_proc.pid), signal.SIGTERM) - except Exception: - server_proc.terminate() - server_proc.wait(timeout=30) - except Exception: - pass - try: - server_stdout_f.flush() - server_stderr_f.flush() - except Exception: - pass - try: - server_stdout_f.close() - server_stderr_f.close() - except Exception: - pass - time.sleep(2) + args = parse_args(argv) + profile_dir = args.profile_dir + log_dir = args.log_dir -if __name__ == "__main__": - raise SystemExit(main()) + clean_dir(profile_dir) + os.makedirs(log_dir, exist_ok=True) + + ts = int(time.time()) + server_stdout_path = os.path.join(log_dir, f"server_{ts}.stdout.log") + server_stderr_path = os.path.join(log_dir, f"server_{ts}.stderr.log") + server_stdout_f = open(server_stdout_path, "w", encoding="utf-8") + server_stderr_f = open(server_stderr_path, "w", encoding="utf-8") + + # Set profiling environment variables + env = os.environ.copy() + env["SGLANG_TORCH_PROFILER_DIR"] = profile_dir + env["SGLANG_PROFILE_KERNELS"] = "1" + env["SGLANG_PROFILE_DEBUG"] = "1" + env["SGLANG_SET_CPU_AFFINITY"] = "1" + + # Extract host and port from server-opts for connection check + server_args = shlex.split(args.server_opts) + host = "0.0.0.0" + port = 30001 + try: + if "--host" in server_args: + host = server_args[server_args.index("--host") + 1] + if "--port" in server_args: + port = int(server_args[server_args.index("--port") + 1]) + except (ValueError, IndexError): + pass + + # Build server command + launch_cmd = [ + sys.executable, + "-m", + "sglang.launch_server", + ] + server_args + + print("[INFO] Starting sglang server:", " ".join(launch_cmd), flush=True) + preexec = getattr(os, "setsid", None) + server_proc = subprocess.Popen( + launch_cmd, + cwd="/flowsim/workload/framework/sglang/python", + stdout=server_stdout_f, + stderr=server_stderr_f, + preexec_fn=preexec, + env=env, + ) + + try: + if not wait_for_port(host, port, timeout=600): + print( + "[ERROR] Server did not start within timeout", file=sys.stderr + ) + return 1 + script = os.path.abspath( + "/flowsim/workload/framework/sglang/python/sglang/bench_serving.py" + ) + + bench_args = shlex.split(args.bench_opts) + client_args = [sys.executable, script] + bench_args + + print( + "[INFO] Running bench_serving:", " ".join(client_args), flush=True + ) + result = subprocess.run( + client_args, + capture_output=True, + text=True, + env=env, + timeout=args.bench_timeout, + ) + + ts2 = int(time.time()) + prefix = f"bench_serving_{ts2}" + client_stdout_path = os.path.join(log_dir, prefix + ".stdout.log") + client_stderr_path = os.path.join(log_dir, prefix + ".stderr.log") + with open(client_stdout_path, "w", encoding="utf-8") as f_out: + f_out.write(result.stdout) + with open(client_stderr_path, "w", encoding="utf-8") as f_err: + f_err.write(result.stderr) + + if result.returncode != 0: + print( + f"[ERROR] bench_serving exited with code {result.returncode}", + file=sys.stderr, + ) + return result.returncode + + files = os.listdir(profile_dir) + json_gz_files = [f for f in files if f.endswith(".trace.json.gz")] + if not json_gz_files: + print( + f"[ERROR] No .trace.json.gz files found in {profile_dir}", + file=sys.stderr, + ) + return 1 + + print( + f"[INFO] Profiling complete, found {len(json_gz_files)} trace file(s) in {profile_dir}", + flush=True, + ) + return 0 + finally: + try: + if server_proc.poll() is None: + try: + os.killpg(os.getpgid(server_proc.pid), signal.SIGTERM) + except Exception: + server_proc.terminate() + server_proc.wait(timeout=30) + except Exception: + pass + try: + server_stdout_f.flush() + server_stderr_f.flush() + except Exception: + pass + try: + server_stdout_f.close() + server_stderr_f.close() + except Exception: + pass + time.sleep(2) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/simulator/base_parser.py b/simulator/base_parser.py index 17e090c..b0c7209 100644 --- a/simulator/base_parser.py +++ b/simulator/base_parser.py @@ -536,7 +536,7 @@ def _calibrate_communication_kernels(self) -> None: profiled_duration = comm_profile_cache[cache_key] else: profiled_duration = nb.run_nccl_all_reduce_perf( - cmd_path="/workloadsim/third_party/nccl-tests/build/all_reduce_perf", + cmd_path="/flowsim/third_party/nccl-tests/build/all_reduce_perf", b=str(size), e=str(size), g=str(self.tensor_parallelism),