diff --git a/Makefile b/Makefile index 1ce89b9..702bc0c 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,7 @@ -IMAGE_NAME ?= workloadsim-image -CONTAINER_NAME ?= workloadsim-docker +IMAGE_NAME ?= flowsim-image +CONTAINER_NAME ?= flowsim-docker PORT ?= 22401 MEM_LIMIT ?= 240g -MOUNT_VOLUME ?= n fix-permissions: sudo chown -R $$(id -u):$$(id -g) $$(pwd) @@ -12,9 +11,8 @@ build-docker: run-docker: @if [ -n "$(GPU_DEVICES)" ]; then GPU_DEVICES_FLAG="--gpus=$(GPU_DEVICES)"; else GPU_DEVICES_FLAG=""; fi; \ - if [ "$(MOUNT_VOLUME)" = "y" ]; then VOLUME_FLAG="-v $$(pwd):/workloadsim"; else VOLUME_FLAG=""; fi; \ - echo "Running: sudo docker run $$VOLUME_FLAG $$GPU_DEVICES_FLAG --network=host --cap-add=SYS_ADMIN -it -p ${PORT}:22 --memory ${MEM_LIMIT} --name $(CONTAINER_NAME) $(IMAGE_NAME)"; \ - sudo docker run $$VOLUME_FLAG $$GPU_DEVICES_FLAG --cap-add=SYS_ADMIN --network=host --shm-size 911G -it -p ${PORT}:22 --memory ${MEM_LIMIT} --name $(CONTAINER_NAME) $(IMAGE_NAME) + echo "Running: sudo docker run $$GPU_DEVICES_FLAG --network=host --cap-add=SYS_ADMIN -it -p ${PORT}:22 --memory ${MEM_LIMIT} --name $(CONTAINER_NAME) $(IMAGE_NAME)"; \ + sudo docker run $$GPU_DEVICES_FLAG --cap-add=SYS_ADMIN --network=host --shm-size 911G -it -p ${PORT}:22 --memory ${MEM_LIMIT} --name $(CONTAINER_NAME) $(IMAGE_NAME) rm-docker: sudo docker rm $(CONTAINER_NAME) \ No newline at end of file diff --git a/README.md b/README.md index b441244..4547a0b 100644 --- a/README.md +++ b/README.md @@ -19,11 +19,8 @@ The project supports rapid deployment using Docker, includes scripts for environ ## Table of Contents -- [Quick Demo](#quick-demo) -- [Prerequisites](#prerequisites) -- [Docker Build and Run](#docker-build-and-run) -- [Profiling and Simulation](#profiling-and-simulation) -- [Tests](#tests) +- [Getting Started](#getting-started) +- [For Developers](#for-developers) - [Risks and limitations](#risks-and-limitations) - [License](#license) - [Trademarks](#trademarks) @@ -31,108 +28,160 @@ The project supports rapid deployment using Docker, includes scripts for environ --- -## Quick Demo +## Getting Started -This example walks through profiling, parsing, and simulating a real workload. +### Prerequisites -1. **Build and run the Docker image (on the host with GPU)** +- Linux system with NVIDIA GPU(s) (for profiling) +- [Docker](https://docs.docker.com/get-docker/) with [NVIDIA Container Runtime](https://github.com/NVIDIA/nvidia-container-runtime) +- [Make](https://www.gnu.org/software/make/) +- [NVIDIA NGC account](https://org.ngc.nvidia.com/setup/api-key) for pulling NVIDIA base images +- ~50GB disk space for images and traces + +**Note:** Run `git submodule update --init --recursive` before building, as the LLMCompass submodule requires initialization. + +### 1. Build the Docker Image ```bash +cd /path/to/flowsim make build-docker -make run-docker GPU_DEVICES=all MOUNT_VOLUME=y CONTAINER_NAME=flowsim-demo ``` -2. **Apply FlowSim patches to the bundled sglang** +This creates a local image named `flowsim-image` with FlowSim patches already applied to sglang. + +### 2. Run Profile → Parse → Simulate + +Create workspace directories on your host for storing traces and results: ```bash -cd /workloadsim/workload/framework/sglang -git apply ../patches/hook.patch -git apply ../patches/v055.patch -cd /workloadsim +mkdir -p /data/flowsim-profile +mkdir -p /data/flowsim-simulate ``` -3. **Generate (or reuse) a trace (optionally translate to CSV + summary)** - -Preferred path: run the integration profiling test to generate a fresh trace under `/workloadsim/server_profile`: +#### Step 1: Profile (Generate Traces) ```bash -# 3.a Generate a new trace via profiling (GPU required) -pytest tests/integration/test_profile.py::test_bench_serving_predefined_len_profile - -# 3.b (optional) Translate the trace to CSV + summary for offline analysis -python scripts/run_parse.py \ - --trace-file server_profile/your-trace-name-TP-0.trace.json.gz \ - --output-dir server_simulate +sudo docker run --rm --gpus=all \ + -v /data/flowsim-profile:/workspace/profile \ + -v /data/flowsim-simulate:/workspace/simulate \ + -w /flowsim \ + flowsim-image \ + python scripts/run_profile.py \ + --profile-dir /workspace/profile \ + --log-dir /workspace/profile/logs \ + --server-opts "--model-path /flowsim/workload/models/configs/deepseek/ --load-format dummy --tp 1 --host 0.0.0.0 --port 30001 --attention-backend flashinfer --disable-cuda-graph" \ + --bench-opts "--backend sglang --host 0.0.0.0 --port 30001 --dataset-name defined-len --prefill-decode-lens 32768:8 --num-prompts 16 --profile" ``` -Fallback: if you cannot run profiling (e.g., no GPU), reuse the demo trace shipped with the repo instead (both for CSV translation and for step 4 simulation): +**What this does:** +- Starts an sglang server with profiling enabled +- Runs benchmark requests against it +- Generates `*.trace.json.gz` files in `/data/flowsim-profile` (mounted as `/workspace/profile`) + +**Note:** The first run will be slow (~10 minutes) due to DeepGEMM kernel warmup and compilation. For stable performance, avoid using `--rm` flag and reuse the same container. Subsequent runs with similar configurations will be faster. + +**Tip:** +- Adjust `--server-opts` and `--bench-opts` to match your model, parallelism (TP/DP/EP), and workload requirements. All `sglang.launch_server` and `bench_serving.py` parameters are supported. +- Trace files can be visualized using [Perfetto UI](https://ui.perfetto.dev/) by uploading the `.trace.json.gz` files directly. +- For multi-GPU profiling (TP > 1), merge individual traces into a single file for a global view: + ```bash + python /flowsim/utils/merge_trace.py \ + --trace_dir /data/flowsim-profile \ + --output /data/flowsim-profile/merged_trace.json + ``` + Then visualize the merged trace at [Perfetto UI](https://ui.perfetto.dev/). + +#### Step 2: Parse (Convert Trace to CSV) ```bash -python scripts/run_parse.py \ - --trace-file demo/deepseekv3-TP-0.trace.json.gz \ - --output-dir server_simulate +sudo docker run --rm \ + -v /data/flowsim-profile:/workspace/profile \ + -v /data/flowsim-simulate:/workspace/simulate \ + -w /flowsim \ + flowsim-image \ + python -m scripts.run_parse \ + --trace-file /workspace/profile/your-trace-name-TP-0.trace.json.gz \ + --output-dir /workspace/simulate ``` -These steps: +Replace `your-trace-name-TP-0.trace.json.gz` with the actual filename from step 1. -- Use sglang to produce or reuse a real profile trace under `server_profile/` or `demo/`. -- Optionally use `BaseKernelInfoParser` (via `run_parse.py`) to extract kernel-level information and write a per-kernel CSV plus a summary file into `server_simulate/`. +**What this does:** +- Parses the trace file +- Extracts kernel-level information (operator, shapes, dtypes) +- Generates a CSV file and JSON summary in `/data/flowsim-simulate` (mounted as `/workspace/simulate`) -You can then inspect the generated artifacts in the corresponding folder. +**Fallback:** If you don't have a GPU or can't run profiling, use the demo trace shipped with the repo: + +```bash +sudo docker run --rm \ + -v /data/flowsim-simulate:/workspace/simulate \ + -w /flowsim \ + flowsim-image \ + python -m scripts.run_parse \ + --trace-file /flowsim/demo/deepseekv3-TP-0.trace.json.gz \ + --output-dir /workspace/simulate +``` -4. **Run a lightweight simulation via the LLMCompass backend** +#### Step 3: Simulate (Run Hardware Simulation) -With a trace from step 3 (or the demo directory), you can run a small hardware-level simulation using the LLMCompass backend integration test. This test starts a local backend server and parses the trace internally before posting kernels to it; the CSV from step 3 is not required. +This step requires a running LLMCompass backend. First, build the backend image: ```bash -# Optionally point the simulator to the trace you just generated or reused -export TRACE_PATH=/workloadsim/path-to-your-trace.trace.json.gz - -# Run the LLMCompass backend integration test -pytest tests/unit/test_llmcompass_backend.py::test_post_parsed_kernels_to_backend +sudo docker build -t llmcompass-backend -f backend/LLMCompass/Dockerfile backend/LLMCompass/ ``` -If `TRACE_PATH` is not set, the test falls back to the bundled sample trace `tests/unit/test_trace.trace.json.gz`. +Then start the backend: -Together, steps 1–4 illustrate the core FlowSim workflow: **profile → parse/translate → simulate/analyze**. ---- +```bash +# Terminal 1: Start LLMCompass backend +sudo docker run --rm -p 8000:8000 llmcompass-backend +``` -## Prerequisites +Then in another terminal, run the simulation: -- Recommended: Linux system -- Required: [Docker](https://docs.docker.com/get-docker/), [Make](https://www.gnu.org/software/make/) and an [NVIDIA NGC account](https://org.ngc.nvidia.com/setup/api-key) for pulling NVIDIA Docker images. +```bash +# Terminal 2: Run simulation +sudo docker run --rm \ + --network=host \ + -v /data/flowsim-profile:/workspace/profile \ + -v /data/flowsim-simulate:/workspace/simulate \ + flowsim-image \ + python -m scripts.run_simulate \ + --trace-file /workspace/profile/your-trace-name-TP-0.trace.json.gz \ + --api-url http://127.0.0.1:8000 \ + --artifact-dir /workspace/simulate/llmcompass +``` ---- +**What this does:** +- Parses the trace into kernels +- Submits each kernel to the LLMCompass backend `/tasks` API +- Polls until all tasks complete +- Writes request/response artifacts to `/workspace/simulate/llmcompass` -## Docker Build and Run +### 3. Inspect Results -From the project root directory, build and launch the Docker container. Initializing git submodules is necessary, as the repository requires a Personal Access Token (PAT) for cloning. +All generated files are available on your host at `/data/`: ```bash -make build-docker -make run-docker GPU_DEVICES=[xxx] MOUNT_VOLUME=[y/n] CONTAINER_NAME=[YourContainerName] +ls -lh /data/flowsim-profile/ # Raw trace files +ls -lh /data/flowsim-simulate/ # Parsed CSV, summary, simulation artifacts ``` -- `make build-docker`: Builds the Docker image using the provided Dockerfile. Run `git submodule update --init --recursive` first, since the LLMCompass submodule requires a PAT for initialization. -- `make run-docker GPU_DEVICES=all`: Starts the container interactively. Use `MOUNT_VOLUME=y` for development purposes to easily download trace files. By default, the container name is `workloadsim-docker`. -- `make rm-docker`: Removes the Docker container after it stops. - --- -## Profiling and Simulation - -### Quick Start +## For Developers -For a concrete end-to-end profiling setup in this repo, see `tests/integration/test_profile.py`. It demonstrates how to: +### Customizing Profiling Workloads -- Launch an sglang server with profiling enabled (via `sglang.launch_server` and environment variables such as `SGLANG_TORCH_PROFILER_DIR` and `SGLANG_PROFILE_KERNELS`). -- Run `python sglang/bench_serving.py --profile ...` against that server to generate `.trace.json.gz` files under `/workloadsim/server_profile`. +For programmatic profiling setup, see `tests/integration/test_profile.py`, which shows how to: -These trace files can then be translated and simulated following the [Quick Demo](#quick-demo) section. +- Launch an sglang server with profiling enabled via environment variables (`SGLANG_TORCH_PROFILER_DIR`, `SGLANG_PROFILE_KERNELS`) +- Run custom benchmarks against the server to generate trace files -For more background on profiling options and parameters, refer to the [sglang profiling documentation](https://docs.sglang.ai/developer_guide/benchmark_and_profiling.html). +Adjust `--server-opts` and `--bench-opts` in `scripts/run_profile.py` to match your model and workload. All `sglang.launch_server` and `bench_serving.py` parameters are supported. See the [sglang profiling documentation](https://docs.sglang.ai/developer_guide/benchmark_and_profiling.html) for details. -### Simulation characteristics (LLMCompass backend) +### LLMCompass Backend Integration FlowSim currently integrates with [LLMCompass](https://github.com/TerrenceZhangX/LLMCompass) as a reference GPU performance simulator. In this setup: @@ -147,9 +196,9 @@ LLMCompass itself supports richer workflows (e.g., compiling full operator graph After you obtain a profile trace (`*.trace.json.gz`), you will typically run the parser once to inspect kernel-level status: ```bash -python scripts/run_parse.py \ - --trace-file /workloadsim/server_profile/your-trace-name.trace.json.gz \ - --output-dir /workloadsim/server_simulate +python -m scripts.run_parse \ + --trace-file /flowsim/server_profile/your-trace-name.trace.json.gz \ + --output-dir /flowsim/server_simulate ``` During parsing, FlowSim looks up kernel metadata (e.g., tensor shapes and dtypes) in `kernels.json`. Any kernels it cannot match are written to `unknown_kernels.json` at the project root, with incomplete or `unknown` parameter descriptions. diff --git a/backend/LLMCompass b/backend/LLMCompass index bcc54eb..65dec66 160000 --- a/backend/LLMCompass +++ b/backend/LLMCompass @@ -1 +1 @@ -Subproject commit bcc54eb5755e50ca950ccd1f9b482f7460afae8a +Subproject commit 65dec66aeed335174d42a96c83427767cf13b648 diff --git a/dockerfiles/cuda12.6.dockerfile b/dockerfiles/cuda12.6.dockerfile index eeb0766..9796440 100644 --- a/dockerfiles/cuda12.6.dockerfile +++ b/dockerfiles/cuda12.6.dockerfile @@ -19,7 +19,7 @@ FROM nvcr.io/nvidia/pytorch:24.10-py3 # Note: dockerfile modifed based on # https://github.com/microsoft/superbenchmark/blob/main/dockerfile/cuda12.4.dockerfile -LABEL maintainer="WorkloadSim" +LABEL maintainer="FlowSim" ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && \ @@ -102,27 +102,6 @@ RUN TARGETARCH_HW=$(uname -m) && \ mv hpcx-${HPCX_VERSION}-gcc-mlnx_ofed-ubuntu22.04-cuda12-${TARGETARCH_HW} hpcx && \ rm hpcx.tbz -# Deprecated - Installs specific to amd64 platform -# RUN if [ "$TARGETARCH" = "amd64" ]; then \ -# # Install Intel MLC -# cd /tmp && \ -# wget -q https://downloadmirror.intel.com/793041/mlc_v3.11.tgz -O mlc.tgz && \ -# tar xzf mlc.tgz Linux/mlc && \ -# cp ./Linux/mlc /usr/local/bin/ && \ -# rm -rf ./Linux mlc.tgz && \ -# # Install AOCC compiler -# wget https://download.amd.com/developer/eula/aocc-compiler/aocc-compiler-4.0.0_1_amd64.deb && \ -# apt install -y ./aocc-compiler-4.0.0_1_amd64.deb && \ -# rm -rf aocc-compiler-4.0.0_1_amd64.deb && \ -# # Install AMD BLIS -# wget https://download.amd.com/developer/eula/blis/blis-4-0/aocl-blis-linux-aocc-4.0.tar.gz && \ -# tar xzf aocl-blis-linux-aocc-4.0.tar.gz && \ -# mv amd-blis /opt/AMD && \ -# rm -rf aocl-blis-linux-aocc-4.0.tar.gz; \ -# else \ -# echo "Skipping Intel MLC, AOCC and AMD Bliss installations for non-amd64 architecture: $TARGETARCH"; \ -# fi - # Install UCX v1.16.0 with multi-threading support RUN cd /tmp && \ wget https://github.com/openucx/ucx/releases/download/v1.16.0/ucx-1.16.0.tar.gz && \ @@ -149,20 +128,26 @@ RUN cd /workspace && \ git submodule update --init --recursive && \ python setup.py install -# Copy local workloadsim code into container -COPY . /workloadsim +# Copy local FlowSim code into container +COPY . /flowsim -WORKDIR /workloadsim +WORKDIR /flowsim # Init sglang submodule RUN git submodule update --init --recursive # Install sglang -WORKDIR /workloadsim/workload/framework/sglang +WORKDIR /flowsim/workload/framework/sglang RUN python3 -m pip install -e "python[all]" +# Apply FlowSim patches during the Docker build. This replaces the older, +# previously documented manual 'git apply' step that used to be run inside +# the container. +RUN git apply ../patches/hook.patch && \ + git apply ../patches/v055.patch + # Build nccl-tests -WORKDIR /workloadsim/third_party/nccl-tests -RUN make -j ${NUM_MAKE_JOBS} +WORKDIR /flowsim/third_party/nccl-tests +RUN make -j ${NUM_MAKE_JOBS} MPI=1 MPI_HOME=/usr/local/mpi NCCL_HOME=/usr/local -WORKDIR /workloadsim +WORKDIR /flowsim CMD ["/bin/bash"] \ No newline at end of file diff --git a/scripts/run_profile.py b/scripts/run_profile.py new file mode 100644 index 0000000..0526fa2 --- /dev/null +++ b/scripts/run_profile.py @@ -0,0 +1,221 @@ +#!/usr/bin/env python +"""Simplified sglang profiling script for Docker/Kubernetes. + +This script launches sglang server + bench_serving for profiling. +All server and benchmark parameters are passed via --server-opts and --bench-opts. + +Example: + python scripts/run_profile.py \\ + --profile-dir /flowsim/server_profile \\ + --server-opts "--model-path /path/to/model --tp 4 --load-format dummy --host 0.0.0.0 --port 30001 --disable-cuda-graph" \\ + --bench-opts "--backend sglang --host 0.0.0.0 --port 30001 --dataset-name defined-len --num-prompts 16 --profile" +""" + +import argparse +import os +import shlex +import shutil +import signal +import socket +import subprocess +import sys +import time +from typing import Optional + + +def wait_for_port(host: str, port: int, timeout: int = 600) -> bool: + """Wait until a TCP port becomes reachable.""" + deadline = time.time() + timeout + while time.time() < deadline: + try: + with socket.create_connection((host, port), timeout=2): + return True + except Exception: + time.sleep(1) + return False + + +def clean_dir(path: str) -> None: + """Clean or create a directory.""" + if os.path.exists(path): + for name in os.listdir(path): + fp = os.path.join(path, name) + if os.path.isfile(fp) or os.path.islink(fp): + os.unlink(fp) + elif os.path.isdir(fp): + shutil.rmtree(fp) + else: + os.makedirs(path, exist_ok=True) + + +def parse_args(argv: Optional[list] = None) -> argparse.Namespace: + p = argparse.ArgumentParser(description="Run sglang profiling workload") + + p.add_argument( + "--profile-dir", + default="/flowsim/server_profile", + help="Directory where profiler traces (.trace.json.gz) will be written", + ) + p.add_argument( + "--log-dir", + default="/flowsim/tests/test-artifacts", + help="Directory to write server/client logs", + ) + p.add_argument( + "--server-opts", + required=True, + help=( + "All options for sglang.launch_server (include --host, --port, --model-path, --tp, etc). " + "Example: '--model-path /path --tp 1 --host 0.0.0.0 --port 30001 --disable-cuda-graph'" + ), + ) + p.add_argument( + "--bench-opts", + required=True, + help=( + "All options for bench_serving.py (include --backend, --host, --port, --dataset-name, --profile, etc). " + "Example: '--backend sglang --host 0.0.0.0 --port 30001 --dataset-name defined-len --num-prompts 16 --profile'" + ), + ) + p.add_argument( + "--bench-timeout", + type=int, + default=1200, + help="Timeout in seconds for bench_serving.py", + ) + + return p.parse_args(argv) + + +def main(argv: Optional[list] = None) -> int: + args = parse_args(argv) + + profile_dir = args.profile_dir + log_dir = args.log_dir + + clean_dir(profile_dir) + os.makedirs(log_dir, exist_ok=True) + + ts = int(time.time()) + server_stdout_path = os.path.join(log_dir, f"server_{ts}.stdout.log") + server_stderr_path = os.path.join(log_dir, f"server_{ts}.stderr.log") + server_stdout_f = open(server_stdout_path, "w", encoding="utf-8") + server_stderr_f = open(server_stderr_path, "w", encoding="utf-8") + + # Set profiling environment variables + env = os.environ.copy() + env["SGLANG_TORCH_PROFILER_DIR"] = profile_dir + env["SGLANG_PROFILE_KERNELS"] = "1" + env["SGLANG_PROFILE_DEBUG"] = "1" + env["SGLANG_SET_CPU_AFFINITY"] = "1" + + # Extract host and port from server-opts for connection check + server_args = shlex.split(args.server_opts) + host = "0.0.0.0" + port = 30001 + try: + if "--host" in server_args: + host = server_args[server_args.index("--host") + 1] + if "--port" in server_args: + port = int(server_args[server_args.index("--port") + 1]) + except (ValueError, IndexError): + pass + + # Build server command + launch_cmd = [ + sys.executable, + "-m", + "sglang.launch_server", + ] + server_args + + print("[INFO] Starting sglang server:", " ".join(launch_cmd), flush=True) + preexec = getattr(os, "setsid", None) + server_proc = subprocess.Popen( + launch_cmd, + cwd="/flowsim/workload/framework/sglang/python", + stdout=server_stdout_f, + stderr=server_stderr_f, + preexec_fn=preexec, + env=env, + ) + + try: + if not wait_for_port(host, port, timeout=600): + print( + "[ERROR] Server did not start within timeout", file=sys.stderr + ) + return 1 + + script = os.path.abspath( + "/flowsim/workload/framework/sglang/python/sglang/bench_serving.py" + ) + + bench_args = shlex.split(args.bench_opts) + client_args = [sys.executable, script] + bench_args + + print( + "[INFO] Running bench_serving:", " ".join(client_args), flush=True + ) + result = subprocess.run( + client_args, + capture_output=True, + text=True, + env=env, + timeout=args.bench_timeout, + ) + + ts2 = int(time.time()) + prefix = f"bench_serving_{ts2}" + client_stdout_path = os.path.join(log_dir, prefix + ".stdout.log") + client_stderr_path = os.path.join(log_dir, prefix + ".stderr.log") + with open(client_stdout_path, "w", encoding="utf-8") as f_out: + f_out.write(result.stdout) + with open(client_stderr_path, "w", encoding="utf-8") as f_err: + f_err.write(result.stderr) + + if result.returncode != 0: + print( + f"[ERROR] bench_serving exited with code {result.returncode}", + file=sys.stderr, + ) + return result.returncode + + files = os.listdir(profile_dir) + json_gz_files = [f for f in files if f.endswith(".trace.json.gz")] + if not json_gz_files: + print( + f"[ERROR] No .trace.json.gz files found in {profile_dir}", + file=sys.stderr, + ) + return 1 + + print( + f"[INFO] Profiling complete, found {len(json_gz_files)} trace file(s) in {profile_dir}", + flush=True, + ) + return 0 + finally: + try: + if server_proc.poll() is None: + try: + os.killpg(os.getpgid(server_proc.pid), signal.SIGTERM) + except Exception: + server_proc.terminate() + server_proc.wait(timeout=30) + except Exception: + pass + try: + server_stdout_f.flush() + server_stderr_f.flush() + except Exception: + pass + try: + server_stdout_f.close() + server_stderr_f.close() + except Exception: + pass + time.sleep(2) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/run_simulate.py b/scripts/run_simulate.py new file mode 100644 index 0000000..d9322a9 --- /dev/null +++ b/scripts/run_simulate.py @@ -0,0 +1,300 @@ +#!/usr/bin/env python +"""Submit parsed kernels from a trace to LLMCompass backend. + +This script parses a trace file and submits each kernel to the LLMCompass backend +for simulation. + +Example: + python scripts/run_simulate.py \\ + --trace-file /flowsim/server_profile/your-trace.trace.json.gz \\ + --api-url http://127.0.0.1:8000 \\ + --artifact-dir /flowsim/artifacts +""" + +import argparse +import csv +import json +import os +import time +from pathlib import Path +from typing import Optional + +import requests + +from backend.interface import submit_task, get_result, wait_for_health +from simulator.base_parser import BaseKernelInfoParser +from simulator.utils import parse_kernel_entry + + +def parse_args(argv: Optional[list] = None) -> argparse.Namespace: + p = argparse.ArgumentParser( + description="Submit parsed kernels to LLMCompass backend" + ) + p.add_argument( + "--trace-file", + required=True, + help="Path to profiler trace (.trace.json.gz) file", + ) + p.add_argument( + "--api-url", + default="http://127.0.0.1:8000", + help="Base URL of LLMCompass backend (default: http://127.0.0.1:8000)", + ) + p.add_argument( + "--artifact-dir", + default="/flowsim/artifacts", + help="Directory to write request/response artifacts", + ) + p.add_argument( + "--limit", + type=int, + default=0, + help="Maximum number of kernel entries to submit (0 = no limit)", + ) + p.add_argument( + "--system-key", + default="A100_4_fp16", + help="System key for backend simulation (default: A100_4_fp16)", + ) + return p.parse_args(argv) + + +def write_summary(artifact_dir: Path, submitted: dict, results: dict) -> None: + """Write summary files (JSON and CSV) with all task results.""" + + # Collect summary data + summary_data = { + "total_tasks": len(submitted), + "successful_tasks": 0, + "running_tasks": 0, + "failed_tasks": 0, + "tasks": [] + } + + for task_id, task_info in submitted.items(): + payload = task_info["payload"] + result = results.get(task_id, {}) + + # Get top-level status (queued/running/done) + status = result.get("status", "unknown") + + task_entry = { + "task_id": task_id, + "kernel_name": payload.get("kernel_name", ""), + "op": payload.get("op", ""), + "input_dim": str(payload.get("input_dim", "")), + "dtype": str(payload.get("dtype", "")), + "system_key": payload.get("system_key", ""), + "status": status, + } + + # Check result body for simulation status and data + result_body = result.get("result", {}) + if isinstance(result_body, dict): + result_status = result_body.get("status") # "success" or "failed" + + # Extract simulated_time (in seconds) if successful + simulated_time = result_body.get("simulated_time") + if simulated_time is not None: + task_entry["latency_s"] = float(simulated_time) + + # Extract failure reason if failed + failure_reason = result_body.get("failure_reason", {}) + if isinstance(failure_reason, dict): + error_msg = failure_reason.get("error", "") + error_code = failure_reason.get("error_code", "") + if error_msg or error_code: + task_entry["error"] = f"[{error_code}] {error_msg}" if error_code else error_msg + + # Categorize task status + if status == "done": + # Check if simulation actually succeeded + if result_body.get("status") == "success": + summary_data["successful_tasks"] += 1 + else: + # Task completed but simulation failed + summary_data["failed_tasks"] += 1 + elif status in ("timeout", "pending", "running", "queued", "unknown"): + # Task hasn't completed yet + summary_data["running_tasks"] += 1 + else: + # Real failure (error in submission) + summary_data["failed_tasks"] += 1 + if "error" not in task_entry: + error_msg = result.get("error", "Unknown error") + task_entry["error"] = error_msg + + summary_data["tasks"].append(task_entry) + + # Write JSON summary + summary_json = artifact_dir / "summary.json" + with open(summary_json, "w", encoding="utf-8") as f: + json.dump(summary_data, f, indent=2) + + # Write CSV summary + if summary_data["tasks"]: + summary_csv = artifact_dir / "summary.csv" + fieldnames = ["task_id", "kernel_name", "op", "input_dim", "dtype", + "system_key", "status", "latency_s", "error"] + + with open(summary_csv, "w", encoding="utf-8", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames, extrasaction="ignore") + writer.writeheader() + writer.writerows(summary_data["tasks"]) + + # Print summary to console + print("\n" + "=" * 60) + print("SIMULATION SUMMARY") + print("=" * 60) + print(f"Total tasks: {summary_data['total_tasks']}") + print(f"Successful: {summary_data['successful_tasks']}") + print(f"Running: {summary_data['running_tasks']}") + print(f"Failed: {summary_data['failed_tasks']}") + print("=" * 60 + "\n") + + +def main(argv: Optional[list] = None) -> int: + args = parse_args(argv) + + trace_path = Path(args.trace_file) + if not trace_path.exists(): + print(f"[ERROR] Trace file not found: {trace_path}") + return 1 + + artifact_dir = Path(args.artifact_dir) + artifact_dir.mkdir(parents=True, exist_ok=True) + + # Create subdirectory for individual task files + tasks_dir = artifact_dir / "tasks" + tasks_dir.mkdir(parents=True, exist_ok=True) + + api_url = args.api_url.rstrip("/") + + print(f"[INFO] Using trace: {trace_path}") + print(f"[INFO] Backend URL: {api_url}") + print(f"[INFO] Artifacts directory: {artifact_dir}") + print(f"[INFO] Tasks directory: {tasks_dir}") + + print("[INFO] Parsing trace...") + parser = BaseKernelInfoParser(str(trace_path), enable_comm_calibration=False) + entries = getattr(parser, "individual_info", None) or [] + + if not isinstance(entries, list) or not entries: + print("[ERROR] No kernel entries found in trace") + return 1 + + print(f"[INFO] Found {len(entries)} kernel entries") + + session = requests.Session() + + try: + print("[INFO] Waiting for backend /health...") + healthy = wait_for_health(api_url, timeout=30.0) + if not healthy: + print("[ERROR] Backend did not become healthy within timeout") + return 1 + + submitted = {} + max_entries = args.limit if args.limit and args.limit > 0 else len(entries) + + for idx, entry in enumerate(entries, start=1): + if idx > max_entries: + break + + kernel_name, input_dim, dtype, op = parse_kernel_entry(entry) + payload = { + "kernel_name": kernel_name, + "op": op, + "input_dim": input_dim, + "dtype": dtype, + "system_key": args.system_key, + } + + out_file = tasks_dir / f"task_{idx}_{kernel_name[:10]}.json" + + resp = submit_task(api_url, payload, timeout=10, session=session) + + with open(out_file, "w", encoding="utf-8") as f: + json.dump({"request": payload, "response": resp}, f, indent=2) + + if "error" in resp: + print(f"[WARN] submit_task error for entry {idx}: {resp['error']}") + continue + + if resp.get("status_code") != 200: + print(f"[WARN] Non-200 status for entry {idx}: {resp.get('status_code')}") + continue + + body = resp.get("body") or {} + task_id = body.get("task_id") + if not task_id: + print(f"[WARN] Missing task_id for entry {idx}") + continue + + submitted[task_id] = {"out_file": out_file, "payload": payload} + + if idx % 10 == 0: + print(f"[INFO] Submitted {idx} tasks so far...") + + time.sleep(0.02) + + if not submitted: + print("[ERROR] No tasks were successfully submitted") + return 1 + + print(f"[INFO] Submitted {len(submitted)} tasks. Polling for completion...") + + pending = set(submitted.keys()) + results = {} + poll_deadline = time.time() + max(120.0, len(pending) * 5) + + while time.time() < poll_deadline and pending: + for task_id in list(pending): + res = get_result(api_url, task_id, timeout=10, session=session) + + with open( + tasks_dir / f"task_{task_id}_poll.json", "w", encoding="utf-8" + ) as pf: + json.dump(res, pf, indent=2) + + if "error" in res: + results[task_id] = res + pending.discard(task_id) + # Update summary immediately after each task completes/fails + write_summary(artifact_dir, submitted, results) + continue + + if res.get("status") == "done": + result = res.get("result") + if not isinstance(result, dict): + print(f"[WARN] Task {task_id} done but result not a dict: {res}") + results[task_id] = res + pending.discard(task_id) + # Update summary immediately after each task completes + write_summary(artifact_dir, submitted, results) + + if pending: + time.sleep(0.5) + + # Mark incomplete tasks + for task_id in pending: + results[task_id] = { + "status": "timeout", + "error": "Task did not complete within timeout" + } + + # Final summary write + write_summary(artifact_dir, submitted, results) + + if pending: + print(f"[ERROR] Some tasks did not complete: {sorted(pending)}") + return 1 + + print("[INFO] All tasks completed successfully") + return 0 + finally: + session.close() + + +if __name__ == "__main__": + raise SystemExit(main()) \ No newline at end of file diff --git a/simulator/base_parser.py b/simulator/base_parser.py index 852ff8a..07b6542 100644 --- a/simulator/base_parser.py +++ b/simulator/base_parser.py @@ -96,7 +96,7 @@ def __init__( self._calibrate_communication_kernels() # Add annotations from kernel database - self.post_process_with_db(db_path="/workloadsim/kernels.json") + self.post_process_with_db(db_path="/flowsim/kernels.json") def _load_events(self) -> None: """ @@ -507,7 +507,7 @@ def _calibrate_communication_kernels(self) -> None: # -g: number of GPUs # -d: data type profiled_duration = nb.run_nccl_all_reduce_perf( - cmd_path="/workloadsim/third_party/nccl-tests/build/all_reduce_perf", + cmd_path="/flowsim/third_party/nccl-tests/build/all_reduce_perf", b=str(size), e=str(size), g=str(self.tensor_parallelism), @@ -536,7 +536,7 @@ def _calibrate_communication_kernels(self) -> None: profiled_duration = comm_profile_cache[cache_key] else: profiled_duration = nb.run_nccl_all_reduce_perf( - cmd_path="/workloadsim/third_party/nccl-tests/build/all_reduce_perf", + cmd_path="/flowsim/third_party/nccl-tests/build/all_reduce_perf", b=str(size), e=str(size), g=str(self.tensor_parallelism), @@ -565,7 +565,7 @@ def _calibrate_communication_kernels(self) -> None: profiled_duration = comm_profile_cache[cache_key] else: profiled_duration = nb.run_nccl_all_gather_perf( - cmd_path="/workloadsim/third_party/nccl-tests/build/all_gather_perf", + cmd_path="/flowsim/third_party/nccl-tests/build/all_gather_perf", b=str(size), e=str(size), g=str(self.tensor_parallelism), @@ -591,7 +591,7 @@ def _calibrate_communication_kernels(self) -> None: continue def post_process_with_db( - self, db_path: str = "/workloadsim/kernels.json" + self, db_path: str = "/flowsim/kernels.json" ) -> None: """ Post-process the individual kernel info with the kernel database. @@ -603,9 +603,9 @@ def post_process_with_db( 4. If no match is found, add the kernel to unknown_kernels.json and ask user to update. Arguments: - db_path (str): Path to the kernel database JSON file to use for enrichment. Defaults to '/workloadsim/kernels.json'. + db_path (str): Path to the kernel database JSON file to use for enrichment. Defaults to '/flowsim/kernels.json'. Returns: - None. Modifies self.individual_info in-place and may create/update '/workloadsim/unknown_kernels.json'. + None. Modifies self.individual_info in-place and may create/update '/flowsim/unknown_kernels.json'. Example Database Entry: { @@ -667,7 +667,7 @@ def post_process_with_db( db_data_kernel_name = locals().get("db_data_kernel_name", {}) db_data_kernel_impl = locals().get("db_data_kernel_impl", {}) - unknown_path = "/workloadsim/unknown_kernels.json" + unknown_path = "/flowsim/unknown_kernels.json" unknown_list = [] if os.path.exists(unknown_path): with open(unknown_path, "r") as f: diff --git a/tests/integration/test_docker_image.py b/tests/integration/test_docker_image.py index c2037f9..568e454 100644 --- a/tests/integration/test_docker_image.py +++ b/tests/integration/test_docker_image.py @@ -7,7 +7,7 @@ from tests.utils import _write_artifact from tests.utils import ARTIFACT_ENV, DEFAULT_ARTIFACT_DIR, SG_LANG_DIR -MODEL_PATH = "/workloadsim/workload/models/configs/deepseek" +MODEL_PATH = "/flowsim/workload/models/configs/deepseek" @pytest.mark.parametrize("tp", [1, 2]) diff --git a/tests/integration/test_model_config.py b/tests/integration/test_model_config.py index 09f22f5..eb23124 100644 --- a/tests/integration/test_model_config.py +++ b/tests/integration/test_model_config.py @@ -12,8 +12,8 @@ @pytest.mark.parametrize( "model_path", [ - "/workloadsim/workload/models/configs/deepseek", - "/workloadsim/workload/models/configs/gpt3", + "/flowsim/workload/models/configs/deepseek", + "/flowsim/workload/models/configs/gpt3", ], ) def test_docker_image(tp, model_path): diff --git a/tests/integration/test_moe_balancing.py b/tests/integration/test_moe_balancing.py index 2c0062e..cac397a 100644 --- a/tests/integration/test_moe_balancing.py +++ b/tests/integration/test_moe_balancing.py @@ -12,7 +12,7 @@ @pytest.mark.parametrize( "model_path", [ - "/workloadsim/workload/models/configs/deepseek", + "/flowsim/workload/models/configs/deepseek", ], ) def test_docker_image(tp, model_path): diff --git a/tests/integration/test_profile.py b/tests/integration/test_profile.py index 93b1c80..8b3f7c8 100644 --- a/tests/integration/test_profile.py +++ b/tests/integration/test_profile.py @@ -21,7 +21,7 @@ def wait_for_port(host, port, timeout=60): def test_bench_serving_predefined_len_profile(): # Set environment variables env = os.environ.copy() - profile_dir = "/workloadsim/server_profile" + profile_dir = "/flowsim/server_profile" env["SGLANG_TORCH_PROFILER_DIR"] = profile_dir env["SGLANG_PROFILE_KERNELS"] = "1" env["SGLANG_PROFILE_DEBUG"] = "1" @@ -39,7 +39,7 @@ def test_bench_serving_predefined_len_profile(): os.makedirs(profile_dir) # Prepare server log files (Scheme A: redirect to files instead of PIPE) - artifacts_dir = "/workloadsim/tests/test-artifacts" + artifacts_dir = "/flowsim/tests/test-artifacts" os.makedirs(artifacts_dir, exist_ok=True) ts = int(time.time()) server_stdout_path = os.path.join(artifacts_dir, f"server_{ts}.stdout.log") @@ -54,7 +54,7 @@ def test_bench_serving_predefined_len_profile(): "-m", "sglang.launch_server", "--model-path", - "/workloadsim/workload/models/configs/deepseek/", + "/flowsim/workload/models/configs/deepseek/", "--load-format", "dummy", "--tp", @@ -72,7 +72,7 @@ def test_bench_serving_predefined_len_profile(): "30001", "--disable-cuda-graph", ], - cwd="/workloadsim/workload/framework/sglang/python", + cwd="/flowsim/workload/framework/sglang/python", stdout=server_stdout_f, stderr=server_stderr_f, preexec_fn=os.setsid, @@ -84,7 +84,7 @@ def test_bench_serving_predefined_len_profile(): ), "Server did not start in time" script = os.path.abspath( - "/workloadsim/workload/framework/sglang/python/sglang/bench_serving.py" + "/flowsim/workload/framework/sglang/python/sglang/bench_serving.py" ) args = [ sys.executable, @@ -107,7 +107,7 @@ def test_bench_serving_predefined_len_profile(): args, capture_output=True, text=True, env=env, timeout=1200 ) # Store logs to test-artifacts directory for debugging regardless of success - artifacts_dir = "/workloadsim/tests/test-artifacts" + artifacts_dir = "/flowsim/tests/test-artifacts" try: os.makedirs(artifacts_dir, exist_ok=True) ts = int(time.time()) diff --git a/tests/unit/test_base_parser.py b/tests/unit/test_base_parser.py index 39b3d72..342939e 100644 --- a/tests/unit/test_base_parser.py +++ b/tests/unit/test_base_parser.py @@ -9,8 +9,8 @@ @pytest.fixture(scope="module") def real_trace_file(): - trace_path = "/workloadsim/tests/unit/test_trace.trace.json.gz" - assert os.path.exists(trace_path), f"Profile File Not Exisit: {trace_path}" + trace_path = "/flowsim/tests/unit/test_trace.trace.json.gz" + assert os.path.exists(trace_path), f"Profile File Not Exist: {trace_path}" return trace_path diff --git a/tests/unit/test_batch_request.py b/tests/unit/test_batch_request.py index c106c78..13cc186 100644 --- a/tests/unit/test_batch_request.py +++ b/tests/unit/test_batch_request.py @@ -7,7 +7,7 @@ import os sys.path.insert( - 0, os.path.abspath("/workloadsim/workload/framework/sglang/python") + 0, os.path.abspath("/flowsim/workload/framework/sglang/python") ) import sglang.bench_serving as bs diff --git a/tests/unit/test_comm_calibration.py b/tests/unit/test_comm_calibration.py index e7328a0..a031c92 100644 --- a/tests/unit/test_comm_calibration.py +++ b/tests/unit/test_comm_calibration.py @@ -15,7 +15,7 @@ @pytest.fixture(scope="module") def real_trace_file(): - trace_path = "/workloadsim/tests/unit/test_trace.trace.json.gz" + trace_path = "/flowsim/tests/unit/test_trace.trace.json.gz" assert os.path.exists(trace_path), f"Profile File Not Exist: {trace_path}" return trace_path diff --git a/tests/unit/test_defined_len.py b/tests/unit/test_defined_len.py index 7e71233..fd0a6c2 100644 --- a/tests/unit/test_defined_len.py +++ b/tests/unit/test_defined_len.py @@ -3,7 +3,7 @@ import os sys.path.insert( - 0, os.path.abspath("/workloadsim/workload/framework/sglang/python") + 0, os.path.abspath("/flowsim/workload/framework/sglang/python") ) from sglang.bench_serving import generate_defined_len_requests diff --git a/tests/unit/test_kernel_db_coverage.py b/tests/unit/test_kernel_db_coverage.py index 8af3f64..310f554 100644 --- a/tests/unit/test_kernel_db_coverage.py +++ b/tests/unit/test_kernel_db_coverage.py @@ -12,7 +12,7 @@ @pytest.fixture(scope="module") def real_trace_file(): - trace_path = "/workloadsim/tests/unit/test_trace.trace.json.gz" + trace_path = "/flowsim/tests/unit/test_trace.trace.json.gz" assert os.path.exists(trace_path), f"Profile File Not Exist: {trace_path}" return trace_path diff --git a/tests/unit/test_llmcompass_backend.py b/tests/unit/test_llmcompass_backend.py index 7d308b8..7c0ab2f 100644 --- a/tests/unit/test_llmcompass_backend.py +++ b/tests/unit/test_llmcompass_backend.py @@ -30,7 +30,7 @@ def test_post_parsed_kernels_to_backend(): """Parse a real trace and post each parsed kernel to the backend `/tasks` endpoint. Environment: - TRACE_PATH - optional path to trace file (default: /workloadsim/tests/unit/test_trace.trace.json.gz) + TRACE_PATH - optional path to trace file (default: /flowsim/tests/unit/test_trace.trace.json.gz) API_URL - optional backend url (default: http://127.0.0.1:8000) """ _ensure_artifacts_dir() diff --git a/tests/utils.py b/tests/utils.py index ee9607d..c4dc505 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,8 +1,8 @@ import os ARTIFACT_ENV = "PYTEST_ARTIFACT_DIR" -DEFAULT_ARTIFACT_DIR = "/workloadsim/tests/test-artifacts" -SG_LANG_DIR = "/workloadsim/workload/framework/sglang/python/sglang" +DEFAULT_ARTIFACT_DIR = "/flowsim/tests/test-artifacts" +SG_LANG_DIR = "/flowsim/workload/framework/sglang/python/sglang" def _write_artifact(artifact_dir: str, name: str, content: str):