From 094e096b970b0995b238a415ef8c9130122514c7 Mon Sep 17 00:00:00 2001 From: Copilot Date: Fri, 27 Mar 2026 08:34:35 +0000 Subject: [PATCH 1/5] refactor: remove slime router --- docs/en/advanced/slime-router.md | 62 +-- docs/en/get_started/customization.md | 2 +- docs/en/index.rst | 1 - docs/zh/advanced/slime-router.md | 64 +-- docs/zh/get_started/customization.md | 2 +- docs/zh/index.rst | 1 - .../run-kimi-k2-Thinking-int4.sh | 1 - .../low_precision/run-qwen3-235B-A22B-int4.sh | 1 - .../low_precision/run-qwen3-30B-A3B-int4.sh | 1 - .../low_precision/run-qwen3-30b-a3b-fp8.sh | 1 - scripts/run-qwen2.5-0.5B-reproducibility.sh | 1 - slime/backends/sglang_utils/sglang_engine.py | 4 +- slime/ray/rollout.py | 56 +-- slime/rollout/sglang_rollout.py | 2 +- slime/router/__init__.py | 0 slime/router/router.py | 409 ------------------ slime/utils/arguments.py | 27 +- slime/utils/wandb_utils.py | 8 - tests/test_moonlight_16B_A3B_r3.py | 1 - tests/test_quick_start_glm4_9B.py | 2 +- tests/test_qwen3_30B_A3B_r3.py | 1 - 21 files changed, 68 insertions(+), 579 deletions(-) delete mode 100644 slime/router/__init__.py delete mode 100644 slime/router/router.py diff --git a/docs/en/advanced/slime-router.md b/docs/en/advanced/slime-router.md index ad56241da1..21adba28b3 100644 --- a/docs/en/advanced/slime-router.md +++ b/docs/en/advanced/slime-router.md @@ -1,59 +1,17 @@ # slime router -slime includes an optional slime router used during rollout / data generation. It is a lightweight HTTP router/proxy that sits in front of one or more SGLang worker servers and adds training-oriented capabilities that are not the main goal of serving-focused routers. +slime router has been removed from slime. ---- +When slime needs to launch a local router for rollout, it now always uses `sglang_router` built from [zhuzilin/sgl-router](https://github.com/zhuzilin/sgl-router). -## 1. What is slime router? +## Migration -slime router is a small FastAPI service that: +- `--use-slime-router` is deprecated and ignored. +- `--slime-router-timeout` is deprecated and ignored. +- `--slime-router-max-connections` is deprecated and ignored. +- `--slime-router-health-check-failure-threshold` is deprecated and ignored. -- Registers workers (SGLang HTTP servers) into a local pool, with support for **prefill / decode / regular** worker types -- Routes requests to a selected worker via least-inflight load balancing or **PD dual-dispatch routing** -- Streams proxied responses (e.g. `/generate`) without buffering the full body, improving throughput under high concurrency -- Runs periodic health checks and quarantines unhealthy workers +## What to use instead -In slime's architecture, the router is part of the rollout system ("SGLang + router") that generates samples and pushes them into the data buffer. - -### How it is launched - -In distributed training, slime will start a router automatically when `--sglang-router-ip` is not provided: - -- If `--use-slime-router` is set, slime starts slime router -- Otherwise, slime starts SGLang Model Gateway - ---- - -## 2. Why we need slime router - -Unlike production inference, RL rollout needs to capture additional metadata for training: token-level logprobs and loss masks. slime router provides these capabilities through its passthrough proxy design. - -### 2.1 PD disaggregation - -slime router supports **Prefill-Decode (PD) disaggregation**. When prefill and decode workers are registered, the router automatically enables PD mode: - -- Workers register themselves with a `worker_type` (`prefill`, `decode`, or `regular`) via the `POST /workers` endpoint. -- For each request, the router picks a (prefill, decode) worker pair via least-inflight load balancing, injects bootstrap information (`bootstrap_host`, `bootstrap_port`, `bootstrap_room`) into the request body, and sends the same modified request to **both** workers concurrently. -- The decode worker's response is returned to the caller. The actual KV-cache transfer between workers is coordinated internally via the bootstrap connection. -- If no prefill/decode workers exist, the router falls back to standard single-worker routing. - -This mirrors the dual-dispatch approach used by SGLang Model Gateway's PD router. - ---- - -## 3. Differences vs SGLang Model Gateway - -slime router and SGLang Model Gateway can both route requests to workers, but they are optimized for different goals. - -### Key differences - -slime router is a lightweight Python/FastAPI proxy that acts as a passthrough to SGLang workers. - -SGLang Model Gateway is a high-performance Rust-based router optimized for large-scale inference: async non-blocking routing, advanced fault tolerance (retries, circuit breakers), multiple load balancing policies (including cache-aware routing), and PD disaggregation support. - -For more details on SGLang Model Gateway, see the [official documentation](https://docs.sglang.io/advanced_features/sgl_model_gateway.html). - -### When to use which - -- Use slime router when you need PD disaggregation with metadata preservation -- Use SGLang Model Gateway for everything else (recommended default) +- Use slime's default `sglang_router` path. +- For router capabilities and deployment details, see the [SGLang Model Gateway documentation](https://docs.sglang.io/advanced_features/sgl_model_gateway.html). diff --git a/docs/en/get_started/customization.md b/docs/en/get_started/customization.md index 2e3b791662..79d2e8264f 100644 --- a/docs/en/get_started/customization.md +++ b/docs/en/get_started/customization.md @@ -406,7 +406,7 @@ Stabilize MoE RL training by recording and replaying expert routing decisions to | Argument | Description | | --- | --- | | `--use-routing-replay` | Forward-backward routing consistency in training. ([arXiv:2507.18071](https://arxiv.org/abs/2507.18071)) | -| `--use-rollout-routing-replay` | R3: Replay routing from rollout during training. Works with both slime router and SGLang Model Gateway. ([arXiv:2510.11370](https://arxiv.org/abs/2510.11370)) | +| `--use-rollout-routing-replay` | R3: Replay routing from rollout during training. Supported by slime's default `sglang_router` path. ([arXiv:2510.11370](https://arxiv.org/abs/2510.11370)) | ## Testing Custom Function Paths diff --git a/docs/en/index.rst b/docs/en/index.rst index f229b215b9..e0af06ec31 100644 --- a/docs/en/index.rst +++ b/docs/en/index.rst @@ -49,7 +49,6 @@ slime is the RL-framework behind GLM-4.7, GLM-4.6 and GLM-4.5. Apart from models advanced/pd-disaggregation.md advanced/sglang-config.md advanced/arch-support-beyond-megatron.md - advanced/slime-router.md .. toctree:: :maxdepth: 1 diff --git a/docs/zh/advanced/slime-router.md b/docs/zh/advanced/slime-router.md index 9906e1c161..82784348b4 100644 --- a/docs/zh/advanced/slime-router.md +++ b/docs/zh/advanced/slime-router.md @@ -1,61 +1,17 @@ # slime router -slime 提供一个可选的 slime router,用于 rollout / data generation 阶段。它是一个轻量级的 HTTP router/proxy,位于一个或多个 SGLang worker server 前,补齐一些 training-oriented 能力——这些并不是 serving-focused router 的主要目标。 +slime 中的 slime router 已被移除。 ---- +现在,当 slime 需要为 rollout 启动本地 router 时,会统一使用由 [zhuzilin/sgl-router](https://github.com/zhuzilin/sgl-router) 构建的 `sglang_router`。 -## 1. 什么是 slime router? +## 迁移说明 -slime router 是一个小型 FastAPI 服务,主要能力包括: +- `--use-slime-router` 已废弃并被忽略。 +- `--slime-router-timeout` 已废弃并被忽略。 +- `--slime-router-max-connections` 已废弃并被忽略。 +- `--slime-router-health-check-failure-threshold` 已废弃并被忽略。 -- 注册 worker(SGLang HTTP server)到本地池,支持 **prefill / decode / regular** worker 类型 -- 路由请求到选定的 worker——支持 least-inflight 负载均衡和 **PD 双发路由** -- 流式代理请求到选定的 worker(例如 `/generate`),不缓冲完整 response body,提高高并发下的吞吐 -- 定期 health checks,并隔离不健康的 worker +## 替代方案 -在 slime 架构中,router 是 rollout 系统("SGLang + router")的一部分:负责生成样本并将其推入数据缓冲区。 - -### 启动方式 - -在分布式训练中,当未提供 `--sglang-router-ip` 时,slime 会自动启动一个 router: - -- 如果设置了 `--use-slime-router`,slime 启动 slime router -- 否则,slime 启动 SGLang Model Gateway - ---- - -## 2. 为什么需要 slime router - -与 production inference 不同,RL rollout 往往需要捕获用于训练的额外 metadata:token-level logprobs 和 loss masks。slime router 通过 passthrough proxy 设计提供这些能力。 - -### 2.1 PD 分离(Prefill-Decode 分离) - -slime router 支持 **Prefill-Decode (PD) 分离**。当 prefill 和 decode worker 注册后,router 会自动启用 PD 模式: - -- Worker 注册时携带 `worker_type`(`prefill`、`decode` 或 `regular`),通过 `POST /workers` 端点。 -- 对每个请求,router 通过 least-inflight 负载均衡选择一对 (prefill, decode) worker,向请求体注入 bootstrap 信息(`bootstrap_host`、`bootstrap_port`、`bootstrap_room`),然后将同一个修改后的请求**并发发送**给两个 worker。 -- Decode worker 的响应返回给调用方。实际的 KV-cache 传输由 worker 通过 bootstrap 连接内部协调完成。 -- 如果没有 prefill/decode worker 存在,router 回退到标准的单 worker 路由。 - -这与 SGLang Model Gateway 的 PD router 所使用的双发方式一致。 - ---- - -## 3. 与 SGLang Model Gateway 的区别 - -slime router 与 SGLang Model Gateway 都能将请求路由到 worker,但它们面向的目标不同、优化方向也不同。 - -### 主要区别 - -slime router 是一个轻量级的 Python/FastAPI proxy,作为 SGLang worker 的 passthrough proxy。 - -SGLang Model Gateway 是一个高性能 Rust router,面向大规模 inference 优化:async non-blocking routing、高级 fault tolerance(retries、circuit breakers)、多种 load balancing policy(包括 cache-aware routing),以及 PD disaggregation 支持。 - -两个 router 都支持 R3(rollout routing replay)用于 MoE 模型。 - -更多关于 SGLang Model Gateway 的信息,请参阅[官方文档](https://docs.sglang.io/advanced_features/sgl_model_gateway.html)。 - -### 如何选择 - -- 当你需要保留 metadata 的 PD 分离时,使用 slime router -- 其他情况使用 SGLang Model Gateway(推荐默认选项) +- 直接使用 slime 默认的 `sglang_router` 路径。 +- Router 的能力和部署方式请参考 [SGLang Model Gateway 官方文档](https://docs.sglang.io/advanced_features/sgl_model_gateway.html)。 diff --git a/docs/zh/get_started/customization.md b/docs/zh/get_started/customization.md index d05b3d182b..509badf3f6 100644 --- a/docs/zh/get_started/customization.md +++ b/docs/zh/get_started/customization.md @@ -408,7 +408,7 @@ def custom_hook(args, rollout_id, step_id, model, optimizer, opt_param_scheduler | 参数 | 说明 | | --- | --- | | `--use-routing-replay` | 训练中前向-反向路由一致性。([arXiv:2507.18071](https://arxiv.org/abs/2507.18071)) | -| `--use-rollout-routing-replay` | R3:在训练时重放 rollout 阶段的路由。slime router 和 SGLang Model Gateway 均支持。([arXiv:2510.11370](https://arxiv.org/abs/2510.11370)) | +| `--use-rollout-routing-replay` | R3:在训练时重放 rollout 阶段的路由。slime 默认的 `sglang_router` 路径支持该功能。([arXiv:2510.11370](https://arxiv.org/abs/2510.11370)) | ## 自定义函数路径的测试 diff --git a/docs/zh/index.rst b/docs/zh/index.rst index bc7201b656..3ae7a783a3 100644 --- a/docs/zh/index.rst +++ b/docs/zh/index.rst @@ -49,7 +49,6 @@ slime 是 GLM-4.7、GLM-4.6、GLM-4.5 背后的 RL 训练框架。除此之外 advanced/pd-disaggregation.md advanced/sglang-config.md advanced/arch-support-beyond-megatron.md - advanced/slime-router.md .. toctree:: :maxdepth: 1 diff --git a/scripts/low_precision/run-kimi-k2-Thinking-int4.sh b/scripts/low_precision/run-kimi-k2-Thinking-int4.sh index c41ea3df82..f7abd62b57 100644 --- a/scripts/low_precision/run-kimi-k2-Thinking-int4.sh +++ b/scripts/low_precision/run-kimi-k2-Thinking-int4.sh @@ -135,7 +135,6 @@ SGLANG_ARGS=( # make every dp rank has 128 concurrency --sglang-server-concurrency 1024 - --use-slime-router ) diff --git a/scripts/low_precision/run-qwen3-235B-A22B-int4.sh b/scripts/low_precision/run-qwen3-235B-A22B-int4.sh index 597838f697..b5ddc7587c 100644 --- a/scripts/low_precision/run-qwen3-235B-A22B-int4.sh +++ b/scripts/low_precision/run-qwen3-235B-A22B-int4.sh @@ -119,7 +119,6 @@ SGLANG_ARGS=( # --sglang-dp-size 4 --sglang-ep-size 8 --sglang-cuda-graph-bs 1 2 4 8 $(seq 16 8 256) - --use-slime-router ) diff --git a/scripts/low_precision/run-qwen3-30B-A3B-int4.sh b/scripts/low_precision/run-qwen3-30B-A3B-int4.sh index 01b4242866..b591047e10 100644 --- a/scripts/low_precision/run-qwen3-30B-A3B-int4.sh +++ b/scripts/low_precision/run-qwen3-30B-A3B-int4.sh @@ -114,7 +114,6 @@ SGLANG_ARGS=( --rollout-num-gpus-per-engine 1 --sglang-mem-fraction-static 0.7 --sglang-cuda-graph-bs 1 2 4 8 $(seq 16 8 256) - --use-slime-router ) MISC_ARGS=( diff --git a/scripts/low_precision/run-qwen3-30b-a3b-fp8.sh b/scripts/low_precision/run-qwen3-30b-a3b-fp8.sh index fbda2572d2..766e0dcc62 100644 --- a/scripts/low_precision/run-qwen3-30b-a3b-fp8.sh +++ b/scripts/low_precision/run-qwen3-30b-a3b-fp8.sh @@ -127,7 +127,6 @@ SGLANG_ARGS=( --sglang-mem-fraction-static 0.6 --sglang-cuda-graph-bs 1 2 4 8 $(seq 16 8 256) --sglang-expert-parallel-size 8 - --use-slime-router # --use-rollout-routing-replay ) diff --git a/scripts/run-qwen2.5-0.5B-reproducibility.sh b/scripts/run-qwen2.5-0.5B-reproducibility.sh index 695a282d74..d1d2d3bf1c 100644 --- a/scripts/run-qwen2.5-0.5B-reproducibility.sh +++ b/scripts/run-qwen2.5-0.5B-reproducibility.sh @@ -126,7 +126,6 @@ ray job submit --address="http://127.0.0.1:8265" \ --actor-num-gpus-per-node 8 \ --colocate \ --calculate-per-token-loss \ - --use-slime-router \ ${MODEL_ARGS[@]} \ ${CKPT_ARGS[@]} \ ${ROLLOUT_ARGS[@]} \ diff --git a/slime/backends/sglang_utils/sglang_engine.py b/slime/backends/sglang_utils/sglang_engine.py index 664d75834b..1e15f8c564 100644 --- a/slime/backends/sglang_utils/sglang_engine.py +++ b/slime/backends/sglang_utils/sglang_engine.py @@ -195,7 +195,7 @@ def _init_normal(self, server_args_dict): return if self.node_rank == 0 and self.router_ip and self.router_port: - if not self.args.use_slime_router and parse(sglang_router.__version__) <= parse("0.2.1"): + if parse(sglang_router.__version__) <= parse("0.2.1"): assert self.worker_type == "regular", "pd disaggregation is not supported in old router." response = requests.post( f"http://{self.router_ip}:{self.router_port}/add_worker?url=http://{self.server_host}:{self.server_port}" @@ -315,7 +315,7 @@ def shutdown(self): if self.worker_type != "encoder" and self.node_rank == 0: worker_url = f"http://{self.server_host}:{self.server_port}" response = None - if self.args.use_slime_router or parse(sglang_router.__version__) <= parse("0.2.1"): + if parse(sglang_router.__version__) <= parse("0.2.1"): response = requests.post( f"http://{self.router_ip}:{self.router_port}/remove_worker?url=http://{self.server_host}:{self.server_port}" ) diff --git a/slime/ray/rollout.py b/slime/ray/rollout.py index a584e7e29a..2a7f883c7a 100644 --- a/slime/ray/rollout.py +++ b/slime/ray/rollout.py @@ -398,16 +398,7 @@ def _get_metrics_router_addr(self) -> str | None: which aggregates Prometheus metrics from all backend sglang servers. Returns ``http://{ip}:{port}`` for the first server, or ``None`` when metrics are disabled or no servers are running. - - Note: the ``use_slime_router`` path does not expose ``/engine_metrics``; - metrics forwarding to W&B requires the sglang_router gateway. """ - if getattr(self.args, "use_slime_router", False): - logger.warning( - "SGLang metrics forwarding to W&B is not supported with --use-slime-router. " - "Use the default sglang_router gateway for /engine_metrics aggregation." - ) - return None srv = self.server if srv is None or srv.router_ip is None: return None @@ -914,7 +905,7 @@ def addr(): def _start_router(args, *, has_pd_disaggregation: bool = False, force_new: bool = False) -> tuple[str, int]: - """Start sgl router or slime router and return (router_ip, router_port). + """Start sglang_router and return (router_ip, router_port). If ``args.sglang_router_ip`` is already set (e.g. by the user) and ``force_new`` is False, skip launching and return the existing values. @@ -931,37 +922,28 @@ def _start_router(args, *, has_pd_disaggregation: bool = False, force_new: bool if router_port is None: router_port = find_available_port(random.randint(3000, 4000)) - if args.use_slime_router: - import copy - - from slime.router.router import run_router - - router_args = copy.copy(args) - router_args.sglang_router_ip = router_ip - router_args.sglang_router_port = router_port - else: - from sglang_router.launch_router import RouterArgs + from sglang_router.launch_router import RouterArgs - from slime.utils.http_utils import run_router + from slime.utils.http_utils import run_router - router_args = RouterArgs.from_cli_args(args, use_router_prefix=True) - router_args.host = router_ip - router_args.port = router_port - router_args.prometheus_port = find_available_port(random.randint(4000, 5000)) - router_args.log_level = "warn" - router_args.request_timeout_secs = args.sglang_router_request_timeout_secs + router_args = RouterArgs.from_cli_args(args, use_router_prefix=True) + router_args.host = router_ip + router_args.port = router_port + router_args.prometheus_port = find_available_port(random.randint(4000, 5000)) + router_args.log_level = "warn" + router_args.request_timeout_secs = args.sglang_router_request_timeout_secs - if has_pd_disaggregation: - router_args.pd_disaggregation = True - # Disable circuit breaker to prevent RDMA transfer timeouts from - # marking decode workers as dead. Timeouts are transient (PCIe - # contention under high load) and do not indicate a dead server. - router_args.disable_circuit_breaker = True + if has_pd_disaggregation: + router_args.pd_disaggregation = True + # Disable circuit breaker to prevent RDMA transfer timeouts from + # marking decode workers as dead. Timeouts are transient (PCIe + # contention under high load) and do not indicate a dead server. + router_args.disable_circuit_breaker = True - # We will not use the health check from router. - router_args.disable_health_check = True + # We will not use the health check from router. + router_args.disable_health_check = True - logger.info(f"Launch router with args: {router_args}") + logger.info(f"Launch router with args: {router_args}") process = multiprocessing.Process( target=run_router, @@ -972,7 +954,7 @@ def _start_router(args, *, has_pd_disaggregation: bool = False, force_new: bool # Wait 3 seconds time.sleep(3) assert process.is_alive() - logger.info(f"Router launched at {router_ip}:{router_port}") + logger.info(f"Router launched at {router_ip}:{router_port}, Prometheus port: {router_args.prometheus_port}") return router_ip, router_port diff --git a/slime/rollout/sglang_rollout.py b/slime/rollout/sglang_rollout.py index e8f73d137a..e75346a64f 100644 --- a/slime/rollout/sglang_rollout.py +++ b/slime/rollout/sglang_rollout.py @@ -316,7 +316,7 @@ async def abort(args: Namespace, rollout_id: int) -> list[list[Sample]]: assert not state.aborted state.aborted = True - if parse(sglang_router.__version__) <= parse("0.2.1") or args.use_slime_router: + if parse(sglang_router.__version__) <= parse("0.2.1"): response = await get(f"http://{args.sglang_router_ip}:{args.sglang_router_port}/list_workers") urls = response["urls"] else: diff --git a/slime/router/__init__.py b/slime/router/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/slime/router/router.py b/slime/router/router.py deleted file mode 100644 index 05f1b989ae..0000000000 --- a/slime/router/router.py +++ /dev/null @@ -1,409 +0,0 @@ -import argparse -import asyncio -import json -import logging -import uuid -from enum import Enum -from urllib.parse import urlparse - -import httpx -import uvicorn -from fastapi import FastAPI, Request -from fastapi.responses import JSONResponse, StreamingResponse -from starlette.responses import Response - - -logger = logging.getLogger(__name__) - - -class WorkerType(str, Enum): - REGULAR = "regular" - PREFILL = "prefill" - DECODE = "decode" - PLACEHOLDER = "placeholder" - - -class WorkerInfo: - """Metadata for a registered worker.""" - - __slots__ = ("url", "worker_type", "active_requests", "consecutive_failures", "bootstrap_port") - - def __init__(self, url: str, worker_type: WorkerType = WorkerType.REGULAR, bootstrap_port: int | None = None): - self.url = url - self.worker_type = worker_type - self.active_requests: int = 0 - self.consecutive_failures: int = 0 - self.bootstrap_port = bootstrap_port - - -def run_router(args): - """Run the Slime router with the specified configuration.""" - slime_router = SlimeRouter(args, verbose=False) - uvicorn.run(slime_router.app, host=args.sglang_router_ip, port=args.sglang_router_port, log_level="info") - - -class SlimeRouter: - def __init__(self, args, verbose=False): - """Initialize the slime-router.""" - self.args = args - self.verbose = verbose - - self.app = FastAPI() - self.app.add_event_handler("startup", self._start_background_health_check) - - # URL -> WorkerInfo - self.workers: dict[str, WorkerInfo] = {} - # Quarantined workers excluded from routing pool - self.dead_workers: set[str] = set() - self.max_weight_version = None - - # --- Connection pool --- - max_connections = getattr(args, "slime_router_max_connections", None) - if max_connections is None: - max_connections = ( - args.sglang_server_concurrency * args.rollout_num_gpus // args.rollout_num_gpus_per_engine - ) - # Generous keep-alive pool for high concurrency - max_keepalive = max(max_connections // 2, 20) - - timeout = getattr(args, "slime_router_timeout", None) - - self.client = httpx.AsyncClient( - limits=httpx.Limits( - max_connections=max_connections, - max_keepalive_connections=max_keepalive, - keepalive_expiry=30, - ), - timeout=httpx.Timeout(timeout), - http2=True, - ) - - self._setup_routes() - - # ------------------------------------------------------------------ - # Routes - # ------------------------------------------------------------------ - - def _setup_routes(self): - """Setup all the HTTP routes.""" - self.app.post("/add_worker")(self.add_worker) - self.app.post("/remove_worker")(self.remove_worker) - self.app.post("/workers")(self.add_worker_v2) - self.app.get("/workers")(self.list_workers_v2) - self.app.get("/list_workers")(self.list_workers) - self.app.get("/health")(self.health) - # Catch-all route for proxying — must be registered LAST - self.app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])(self.proxy) - - # ------------------------------------------------------------------ - # Health check background loop - # ------------------------------------------------------------------ - - async def _start_background_health_check(self): - asyncio.create_task(self._health_check_loop()) - - async def _check_worker_health(self, url: str): - try: - response = await self.client.get(f"{url}/health", timeout=5.0) - if response.status_code == 200: - return url, True - logger.debug(f"[slime-router] Worker {url} is unhealthy (Status: {response.status_code})") - except Exception as e: - logger.debug(f"[slime-router] Worker {url} health check failed: {e}") - return url, False - - async def _health_check_loop(self): - """Background loop to monitor worker health and adjust routing pool.""" - interval = self.args.rollout_health_check_interval - threshold = self.args.slime_router_health_check_failure_threshold - - while True: - try: - await asyncio.sleep(interval) - - urls = [u for u in self.workers if u not in self.dead_workers] - if not urls: - continue - - results = await asyncio.gather(*(self._check_worker_health(url) for url in urls)) - - for url, is_healthy in results: - if url not in self.workers: - continue - if not is_healthy: - self.workers[url].consecutive_failures += 1 - if self.workers[url].consecutive_failures >= threshold: - logger.warning( - f"[slime-router] Worker {url} failed {threshold} consecutive health checks. Marking as DEAD." - ) - self.dead_workers.add(url) - else: - self.workers[url].consecutive_failures = 0 - - alive = sum(1 for u in self.workers if u not in self.dead_workers) - logger.debug(f"[slime-router] Health check complete. {alive} workers healthy.") - - except asyncio.CancelledError: - logger.warning("[slime-router] Background health check loop is being cancelled.") - raise - except Exception as e: - logger.error(f"[slime-router] Unexpected error in health check loop: {e}", exc_info=True) - await asyncio.sleep(5) - - # ------------------------------------------------------------------ - # Worker selection - # ------------------------------------------------------------------ - - def _healthy_workers(self, worker_type: WorkerType | None = None) -> list[WorkerInfo]: - """Return live workers, optionally filtered by type.""" - workers = [w for url, w in self.workers.items() if url not in self.dead_workers] - if worker_type is not None: - workers = [w for w in workers if w.worker_type == worker_type] - return workers - - def _select_by_least_inflight(self, candidates: list[WorkerInfo]) -> WorkerInfo: - """Pick the worker with the fewest active requests.""" - if not candidates: - raise RuntimeError("No healthy workers available in the pool") - return min(candidates, key=lambda w: w.active_requests) - - def _is_pd_mode(self) -> bool: - """Check if PD disaggregation is active (prefill workers exist).""" - return any( - w.worker_type == WorkerType.PREFILL for url, w in self.workers.items() if url not in self.dead_workers - ) - - def _pick_pd_pair(self) -> tuple[WorkerInfo, WorkerInfo]: - """Pick a (prefill, decode) worker pair using least-inflight.""" - prefill_candidates = self._healthy_workers(WorkerType.PREFILL) - decode_candidates = self._healthy_workers(WorkerType.DECODE) - if not prefill_candidates: - raise RuntimeError("No healthy prefill workers available") - if not decode_candidates: - raise RuntimeError("No healthy decode workers available") - prefill = self._select_by_least_inflight(prefill_candidates) - decode = self._select_by_least_inflight(decode_candidates) - prefill.active_requests += 1 - decode.active_requests += 1 - return prefill, decode - - def _pick_worker(self) -> WorkerInfo: - """Pick a single worker via least-inflight (non-PD mode).""" - candidates = self._healthy_workers() - worker = self._select_by_least_inflight(candidates) - worker.active_requests += 1 - return worker - - def _finish_worker(self, worker: WorkerInfo): - """Mark the request to the given worker as finished.""" - worker.active_requests -= 1 - assert worker.active_requests >= 0, f"Worker {worker.url} active_requests went negative" - - # ------------------------------------------------------------------ - # Proxy (streaming) - # ------------------------------------------------------------------ - - async def proxy(self, request: Request, path: str): - """Stream-proxy requests to a selected backend worker. - - In PD disaggregation mode, picks a (prefill, decode) pair, injects - bootstrap info, and sends the same request to both workers concurrently - (mirroring sgl-model-gateway behaviour). The decode worker's response - is returned to the caller. - """ - body = await request.body() - headers = dict(request.headers) - - if self._is_pd_mode(): - return await self._proxy_pd(path, body, headers) - else: - worker = self._pick_worker() - try: - return await self._forward_to_worker(worker, path, body, headers) - finally: - self._finish_worker(worker) - - # --- PD dual-dispatch helpers --- - - def _bootstrap_host_from_url(self, worker_url: str) -> str: - """Extract the hostname from a worker URL for bootstrap.""" - return urlparse(worker_url).hostname or "127.0.0.1" - - def _inject_bootstrap(self, body: bytes, prefill: WorkerInfo) -> bytes: - """Inject bootstrap_host / bootstrap_port / bootstrap_room into the request body.""" - try: - payload = json.loads(body) if body else {} - except Exception: - return body - payload["bootstrap_host"] = self._bootstrap_host_from_url(prefill.url) - payload["bootstrap_port"] = prefill.bootstrap_port - payload["bootstrap_room"] = uuid.uuid4().hex - return json.dumps(payload).encode() - - async def _proxy_pd(self, path: str, body: bytes, headers: dict) -> Response: - """PD dual dispatch: send the same request to prefill + decode concurrently.""" - prefill, decode = self._pick_pd_pair() - try: - modified_body = self._inject_bootstrap(body, prefill) - - prefill_url = f"{prefill.url}/{path}" - decode_url = f"{decode.url}/{path}" - - prefill_req = self.client.build_request("POST", prefill_url, content=modified_body, headers=headers) - decode_req = self.client.build_request("POST", decode_url, content=modified_body, headers=headers) - - # Fire both concurrently; we only care about the decode response. - _prefill_task = asyncio.ensure_future(self.client.send(prefill_req, stream=True)) - decode_response = await self.client.send(decode_req, stream=True) - - return await self._build_response(decode_response) - finally: - self._finish_worker(prefill) - self._finish_worker(decode) - - async def _forward_to_worker(self, worker: WorkerInfo, path: str, body: bytes, headers: dict) -> Response: - """Forward a request to a single worker and return its response.""" - url = f"{worker.url}/{path}" - req = self.client.build_request("POST", url, content=body, headers=headers) - response = await self.client.send(req, stream=True) - return await self._build_response(response) - - async def _build_response(self, response: httpx.Response) -> Response: - """Convert an httpx streaming response into a FastAPI response.""" - content_type = response.headers.get("content-type", "") - - if "text/event-stream" not in content_type: - content = await response.aread() - await response.aclose() - try: - data = json.loads(content) - return JSONResponse(content=data, status_code=response.status_code) - except Exception: - return Response(content=content, status_code=response.status_code, media_type=content_type or None) - - async def _stream(): - try: - async for chunk in response.aiter_bytes(): - yield chunk - finally: - await response.aclose() - - return StreamingResponse(_stream(), status_code=response.status_code, media_type=content_type) - - # ------------------------------------------------------------------ - # Worker management endpoints - # ------------------------------------------------------------------ - - async def add_worker(self, request: Request): - """Add a new worker (v1 compat — query string or JSON body). - - Examples: - POST /add_worker?url=http://127.0.0.1:10090 - POST /add_worker?url=http://127.0.0.1:10090&worker_type=prefill - POST /add_worker {"url": "...", "worker_type": "prefill"} - """ - worker_url = request.query_params.get("url") or request.query_params.get("worker_url") - worker_type_str = request.query_params.get("worker_type", "regular") - - if not worker_url: - body = await request.body() - payload = json.loads(body) if body else {} - worker_url = payload.get("url") or payload.get("worker_url") - worker_type_str = payload.get("worker_type", worker_type_str) - - if not worker_url: - return JSONResponse( - status_code=400, content={"error": "url is required (use query ?url=... or JSON body)"} - ) - - try: - worker_type = WorkerType(worker_type_str) - except ValueError: - worker_type = WorkerType.REGULAR - - if worker_url not in self.workers: - self.workers[worker_url] = WorkerInfo(url=worker_url, worker_type=worker_type) - if self.verbose: - print(f"[slime-router] Added new worker: {worker_url} (type={worker_type.value})") - - return {"status": "success", "worker_urls": {u: w.active_requests for u, w in self.workers.items()}} - - async def add_worker_v2(self, request: Request): - """Add worker — SGLang Model Gateway compatible ``POST /workers`` endpoint. - - Body: {"url": "...", "worker_type": "prefill"|"decode"|"regular", "bootstrap_port": 12345} - """ - body = await request.body() - payload = json.loads(body) if body else {} - worker_url = payload.get("url") - worker_type_str = payload.get("worker_type", "regular") - bootstrap_port = payload.get("bootstrap_port") - - if not worker_url: - return JSONResponse(status_code=400, content={"error": "url is required in JSON body"}) - - try: - worker_type = WorkerType(worker_type_str) - except ValueError: - worker_type = WorkerType.REGULAR - - if worker_url not in self.workers: - self.workers[worker_url] = WorkerInfo( - url=worker_url, worker_type=worker_type, bootstrap_port=bootstrap_port - ) - if self.verbose: - print(f"[slime-router] Added new worker: {worker_url} (type={worker_type.value})") - - return {"status": "success"} - - async def remove_worker(self, request: Request): - """Remove a worker from the pool.""" - worker_url = request.query_params.get("url") or request.query_params.get("worker_url") - - if not worker_url: - body = await request.body() - payload = json.loads(body) if body else {} - worker_url = payload.get("url") or payload.get("worker_url") - - if not worker_url: - return JSONResponse(status_code=400, content={"error": "url is required"}) - - self.workers.pop(worker_url, None) - self.dead_workers.discard(worker_url) - return {"status": "success"} - - async def list_workers(self, request: Request): - """List all registered workers (v1 compat).""" - return {"urls": list(self.workers.keys())} - - async def list_workers_v2(self, request: Request): - """List workers — SGLang Model Gateway compatible ``GET /workers``.""" - workers_list = [] - for url, w in self.workers.items(): - entry = { - "url": url, - "worker_type": w.worker_type.value, - "active_requests": w.active_requests, - "is_healthy": url not in self.dead_workers, - } - if w.bootstrap_port is not None: - entry["bootstrap_port"] = w.bootstrap_port - workers_list.append(entry) - return {"workers": workers_list} - - async def health(self, request: Request): - """Router health check endpoint.""" - alive = sum(1 for u in self.workers if u not in self.dead_workers) - return {"status": "ok", "healthy_workers": alive, "total_workers": len(self.workers)} - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("--host", type=str, default="0.0.0.0") - parser.add_argument("--port", type=int, default=30000) - parser.add_argument("--sglang-host", type=str, required=True) - parser.add_argument("--sglang-port", type=int, required=True) - parser.add_argument("--verbose", action="store_true", help="Enable verbose output") - - args = parser.parse_args() - run_router(args) diff --git a/slime/utils/arguments.py b/slime/utils/arguments.py index daeb1180d6..7621decb09 100644 --- a/slime/utils/arguments.py +++ b/slime/utils/arguments.py @@ -1005,25 +1005,28 @@ def add_router_arguments(parser): "--use-slime-router", action="store_true", default=False, - help="Whether to use SlimeRouter for text-based routing instead of SGLang token-based routing", + help=( + "Deprecated no-op. slime now always launches sglang_router built from " + "https://github.com/zhuzilin/sgl-router." + ), ) parser.add_argument( "--slime-router-timeout", type=float, default=None, - help="Timeout for SlimeRouter HTTP requests in seconds.", + help="Deprecated no-op retained for backward compatibility.", ) parser.add_argument( "--slime-router-max-connections", type=int, default=None, - help="Max connections for SlimeRouter HTTP client.", + help="Deprecated no-op retained for backward compatibility.", ) parser.add_argument( "--slime-router-health-check-failure-threshold", type=int, default=3, - help="Number of consecutive failures before marking a worker as unhealthy.", + help="Deprecated no-op retained for backward compatibility.", ) RouterArgs.add_cli_args(parser, use_router_prefix=True, exclude_host_port=True) return parser @@ -1515,6 +1518,22 @@ def _resolve_eval_datasets(args) -> list[EvalDatasetConfig]: def slime_validate_args(args): args.eval_datasets = _resolve_eval_datasets(args) + if args.use_slime_router: + logger.warning( + "--use-slime-router is deprecated and ignored. slime now always uses sglang_router " + "built from https://github.com/zhuzilin/sgl-router." + ) + args.use_slime_router = False + + if args.slime_router_timeout is not None: + logger.warning("--slime-router-timeout is deprecated and ignored.") + + if args.slime_router_max_connections is not None: + logger.warning("--slime-router-max-connections is deprecated and ignored.") + + if args.slime_router_health_check_failure_threshold != 3: + logger.warning("--slime-router-health-check-failure-threshold is deprecated and ignored.") + if args.kl_coef != 0 or args.use_kl_loss: if not os.path.exists(args.ref_load): raise FileNotFoundError(f"ref_load {args.ref_load} does not exist, please check the path.") diff --git a/slime/utils/wandb_utils.py b/slime/utils/wandb_utils.py index cd5302049c..32a9b0f5a3 100644 --- a/slime/utils/wandb_utils.py +++ b/slime/utils/wandb_utils.py @@ -92,14 +92,6 @@ def reinit_wandb_primary_with_open_metrics(args, router_addr): if router_addr is None: return - import sglang_router - - if "slime" not in sglang_router.__version__: - logger.warning( - "Only customized sglang_router from https://github.com/zhuzilin/sgl-router supports uploading metrics." - ) - return - logger.info(f"Re-initializing primary W&B with SGLang metrics at {router_addr}.") wandb.finish() diff --git a/tests/test_moonlight_16B_A3B_r3.py b/tests/test_moonlight_16B_A3B_r3.py index b8a4e5ee3a..facb2d0b70 100644 --- a/tests/test_moonlight_16B_A3B_r3.py +++ b/tests/test_moonlight_16B_A3B_r3.py @@ -69,7 +69,6 @@ def execute(): "--entropy-coef 0.00 " "--eps-clip 4e-4 " "--use-rollout-routing-replay " - "--use-slime-router " ) optimizer_args = ( diff --git a/tests/test_quick_start_glm4_9B.py b/tests/test_quick_start_glm4_9B.py index 4f87df8eab..30cb348594 100644 --- a/tests/test_quick_start_glm4_9B.py +++ b/tests/test_quick_start_glm4_9B.py @@ -80,7 +80,7 @@ def execute(): "--adam-beta2 0.98 " ) - sglang_args = "--rollout-num-gpus-per-engine 2 " "--sglang-cuda-graph-max-bs 32 " "--use-slime-router " + sglang_args = "--rollout-num-gpus-per-engine 2 " "--sglang-cuda-graph-max-bs 32 " ci_args = "--ci-test " diff --git a/tests/test_qwen3_30B_A3B_r3.py b/tests/test_qwen3_30B_A3B_r3.py index 9beeb57686..71d87a3919 100644 --- a/tests/test_qwen3_30B_A3B_r3.py +++ b/tests/test_qwen3_30B_A3B_r3.py @@ -77,7 +77,6 @@ def execute(): "--eps-clip 4e-4 " "--use-tis " "--use-rollout-routing-replay " - "--use-slime-router " ) optimizer_args = ( From 07e12e9e162ef6292d055844d87f347351a8188a Mon Sep 17 00:00:00 2001 From: Copilot Date: Fri, 27 Mar 2026 08:44:23 +0000 Subject: [PATCH 2/5] refactor: drop obsolete slime router flags --- docs/en/advanced/slime-router.md | 17 ------------- docs/zh/advanced/slime-router.md | 17 ------------- slime/utils/arguments.py | 43 -------------------------------- 3 files changed, 77 deletions(-) delete mode 100644 docs/en/advanced/slime-router.md delete mode 100644 docs/zh/advanced/slime-router.md diff --git a/docs/en/advanced/slime-router.md b/docs/en/advanced/slime-router.md deleted file mode 100644 index 21adba28b3..0000000000 --- a/docs/en/advanced/slime-router.md +++ /dev/null @@ -1,17 +0,0 @@ -# slime router - -slime router has been removed from slime. - -When slime needs to launch a local router for rollout, it now always uses `sglang_router` built from [zhuzilin/sgl-router](https://github.com/zhuzilin/sgl-router). - -## Migration - -- `--use-slime-router` is deprecated and ignored. -- `--slime-router-timeout` is deprecated and ignored. -- `--slime-router-max-connections` is deprecated and ignored. -- `--slime-router-health-check-failure-threshold` is deprecated and ignored. - -## What to use instead - -- Use slime's default `sglang_router` path. -- For router capabilities and deployment details, see the [SGLang Model Gateway documentation](https://docs.sglang.io/advanced_features/sgl_model_gateway.html). diff --git a/docs/zh/advanced/slime-router.md b/docs/zh/advanced/slime-router.md deleted file mode 100644 index 82784348b4..0000000000 --- a/docs/zh/advanced/slime-router.md +++ /dev/null @@ -1,17 +0,0 @@ -# slime router - -slime 中的 slime router 已被移除。 - -现在,当 slime 需要为 rollout 启动本地 router 时,会统一使用由 [zhuzilin/sgl-router](https://github.com/zhuzilin/sgl-router) 构建的 `sglang_router`。 - -## 迁移说明 - -- `--use-slime-router` 已废弃并被忽略。 -- `--slime-router-timeout` 已废弃并被忽略。 -- `--slime-router-max-connections` 已废弃并被忽略。 -- `--slime-router-health-check-failure-threshold` 已废弃并被忽略。 - -## 替代方案 - -- 直接使用 slime 默认的 `sglang_router` 路径。 -- Router 的能力和部署方式请参考 [SGLang Model Gateway 官方文档](https://docs.sglang.io/advanced_features/sgl_model_gateway.html)。 diff --git a/slime/utils/arguments.py b/slime/utils/arguments.py index 7621decb09..c82f0bd883 100644 --- a/slime/utils/arguments.py +++ b/slime/utils/arguments.py @@ -1001,33 +1001,6 @@ def add_on_policy_distillation_arguments(parser): return parser def add_router_arguments(parser): - parser.add_argument( - "--use-slime-router", - action="store_true", - default=False, - help=( - "Deprecated no-op. slime now always launches sglang_router built from " - "https://github.com/zhuzilin/sgl-router." - ), - ) - parser.add_argument( - "--slime-router-timeout", - type=float, - default=None, - help="Deprecated no-op retained for backward compatibility.", - ) - parser.add_argument( - "--slime-router-max-connections", - type=int, - default=None, - help="Deprecated no-op retained for backward compatibility.", - ) - parser.add_argument( - "--slime-router-health-check-failure-threshold", - type=int, - default=3, - help="Deprecated no-op retained for backward compatibility.", - ) RouterArgs.add_cli_args(parser, use_router_prefix=True, exclude_host_port=True) return parser @@ -1518,22 +1491,6 @@ def _resolve_eval_datasets(args) -> list[EvalDatasetConfig]: def slime_validate_args(args): args.eval_datasets = _resolve_eval_datasets(args) - if args.use_slime_router: - logger.warning( - "--use-slime-router is deprecated and ignored. slime now always uses sglang_router " - "built from https://github.com/zhuzilin/sgl-router." - ) - args.use_slime_router = False - - if args.slime_router_timeout is not None: - logger.warning("--slime-router-timeout is deprecated and ignored.") - - if args.slime_router_max_connections is not None: - logger.warning("--slime-router-max-connections is deprecated and ignored.") - - if args.slime_router_health_check_failure_threshold != 3: - logger.warning("--slime-router-health-check-failure-threshold is deprecated and ignored.") - if args.kl_coef != 0 or args.use_kl_loss: if not os.path.exists(args.ref_load): raise FileNotFoundError(f"ref_load {args.ref_load} does not exist, please check the path.") From df45df11d11e248dd52453e931ff8777207d925c Mon Sep 17 00:00:00 2001 From: Copilot Date: Fri, 27 Mar 2026 08:50:53 +0000 Subject: [PATCH 3/5] refactor: keep deprecated slime router flag --- slime/utils/arguments.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/slime/utils/arguments.py b/slime/utils/arguments.py index c82f0bd883..a634d1f003 100644 --- a/slime/utils/arguments.py +++ b/slime/utils/arguments.py @@ -1001,6 +1001,12 @@ def add_on_policy_distillation_arguments(parser): return parser def add_router_arguments(parser): + parser.add_argument( + "--use-slime-router", + action="store_true", + default=False, + help="Whether to use SlimeRouter for text-based routing instead of SGLang token-based routing", + ) RouterArgs.add_cli_args(parser, use_router_prefix=True, exclude_host_port=True) return parser @@ -1491,6 +1497,13 @@ def _resolve_eval_datasets(args) -> list[EvalDatasetConfig]: def slime_validate_args(args): args.eval_datasets = _resolve_eval_datasets(args) + if args.use_slime_router: + logger.warning( + "--use-slime-router is deprecated and ignored. slime now always uses sglang_router " + "built from https://github.com/zhuzilin/sgl-router." + ) + args.use_slime_router = False + if args.kl_coef != 0 or args.use_kl_loss: if not os.path.exists(args.ref_load): raise FileNotFoundError(f"ref_load {args.ref_load} does not exist, please check the path.") From 4f07b2d9508458d0e22d5a96de0fdb4ca5f372cd Mon Sep 17 00:00:00 2001 From: Copilot Date: Fri, 27 Mar 2026 08:53:14 +0000 Subject: [PATCH 4/5] revert change --- slime/utils/wandb_utils.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/slime/utils/wandb_utils.py b/slime/utils/wandb_utils.py index 32a9b0f5a3..cd5302049c 100644 --- a/slime/utils/wandb_utils.py +++ b/slime/utils/wandb_utils.py @@ -92,6 +92,14 @@ def reinit_wandb_primary_with_open_metrics(args, router_addr): if router_addr is None: return + import sglang_router + + if "slime" not in sglang_router.__version__: + logger.warning( + "Only customized sglang_router from https://github.com/zhuzilin/sgl-router supports uploading metrics." + ) + return + logger.info(f"Re-initializing primary W&B with SGLang metrics at {router_addr}.") wandb.finish() From 3217dd7c28466fb5ebafcefc39a430ab71536a1f Mon Sep 17 00:00:00 2001 From: Copilot Date: Fri, 27 Mar 2026 08:55:57 +0000 Subject: [PATCH 5/5] fix lint --- slime_plugins/models/hf_attention.py | 2 +- slime_plugins/models/qwen3_next.py | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/slime_plugins/models/hf_attention.py b/slime_plugins/models/hf_attention.py index 26b04d6d3b..d5cf619724 100644 --- a/slime_plugins/models/hf_attention.py +++ b/slime_plugins/models/hf_attention.py @@ -1,6 +1,6 @@ -from abc import ABC, abstractmethod import json import os +from abc import ABC, abstractmethod import torch import torch.distributed as dist diff --git a/slime_plugins/models/qwen3_next.py b/slime_plugins/models/qwen3_next.py index c6026af869..92e39ff318 100644 --- a/slime_plugins/models/qwen3_next.py +++ b/slime_plugins/models/qwen3_next.py @@ -7,9 +7,10 @@ from megatron.core.transformer.spec_utils import ModuleSpec from megatron.core.transformer.transformer_block import get_num_layers_to_build from megatron.core.transformer.transformer_layer import get_transformer_layer_offset -from .hf_attention import _load_hf_config from transformers.activations import ACT2FN +from .hf_attention import _load_hf_config + try: from fla.modules import FusedRMSNormGated, ShortConvolution from fla.ops.gated_delta_rule import chunk_gated_delta_rule @@ -220,9 +221,7 @@ def get_qwen3_next_spec(args, config, vp_stage): if not hasattr(hf_config, "layer_types"): interval = getattr(hf_config, "full_attention_interval", 4) n = hf_config.num_hidden_layers - hf_config.layer_types = [ - "full_attention" if (i + 1) % interval == 0 else "linear_attention" for i in range(n) - ] + hf_config.layer_types = ["full_attention" if (i + 1) % interval == 0 else "linear_attention" for i in range(n)] for layer_id in range(num_layers_to_build): if hf_config.layer_types[layer_id + offset] == "linear_attention":