diff --git a/vllm/config/vllm.py b/vllm/config/vllm.py index a3a9eec9b320..103e7dd6e721 100644 --- a/vllm/config/vllm.py +++ b/vllm/config/vllm.py @@ -982,6 +982,10 @@ def has_blocked_weights(): # Handle the KV connector configs self._post_init_kv_transfer_config() + # for online reconfigure + self.org_max_num_seqs = self.scheduler_config.max_num_seqs + self.org_max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens + def update_sizes_for_sequence_parallelism(self, possible_sizes: list) -> list: # remove the sizes that not multiple of tp_size when # enable sequence parallelism diff --git a/vllm/engine/protocol.py b/vllm/engine/protocol.py index d94951a0cffc..d3ca35ac2df1 100644 --- a/vllm/engine/protocol.py +++ b/vllm/engine/protocol.py @@ -188,3 +188,13 @@ async def collective_rpc( async def get_supported_tasks(self) -> tuple[SupportedTask, ...]: """Get supported tasks""" raise NotImplementedError + + def reconfigure( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + raise NotImplementedError + + async def reconfigure_async( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ): + raise NotImplementedError diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index 6440b702f4fa..cd59c98f005b 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -370,6 +370,13 @@ def reset_mm_cache(self) -> None: self.input_processor.clear_mm_cache() self.llm_engine.reset_mm_cache() + def reconfigure( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + return self.llm_engine.engine_core.reconfigure( + max_num_seqs=max_num_seqs, max_num_batched_tokens=max_num_batched_tokens + ) + def get_default_sampling_params(self) -> SamplingParams: if self.default_sampling_params is None: self.default_sampling_params = self.model_config.get_diff_sampling_param() diff --git a/vllm/entrypoints/serve/__init__.py b/vllm/entrypoints/serve/__init__.py index c4fcc92db931..90fa1eb4af64 100644 --- a/vllm/entrypoints/serve/__init__.py +++ b/vllm/entrypoints/serve/__init__.py @@ -23,11 +23,11 @@ def register_vllm_serve_api_routers(app: FastAPI): attach_profile_router(app) - from vllm.entrypoints.serve.sleep.api_router import ( - attach_router as attach_sleep_router, + from vllm.entrypoints.serve.dev.api_router import ( + attach_router as attach_dev_router, ) - attach_sleep_router(app) + attach_dev_router(app) from vllm.entrypoints.serve.tokenize.api_router import ( attach_router as attach_tokenize_router, diff --git a/vllm/entrypoints/serve/sleep/__init__.py b/vllm/entrypoints/serve/dev/__init__.py similarity index 100% rename from vllm/entrypoints/serve/sleep/__init__.py rename to vllm/entrypoints/serve/dev/__init__.py diff --git a/vllm/entrypoints/serve/sleep/api_router.py b/vllm/entrypoints/serve/dev/api_router.py similarity index 82% rename from vllm/entrypoints/serve/sleep/api_router.py rename to vllm/entrypoints/serve/dev/api_router.py index bc01e185315c..83616fcc82aa 100644 --- a/vllm/entrypoints/serve/sleep/api_router.py +++ b/vllm/entrypoints/serve/dev/api_router.py @@ -7,6 +7,7 @@ import vllm.envs as envs from vllm.engine.protocol import EngineClient +from vllm.entrypoints.serve.dev.protocol import ReconfigureRequest from vllm.logger import init_logger logger = init_logger(__name__) @@ -49,6 +50,15 @@ async def is_sleeping(raw_request: Request): return JSONResponse(content={"is_sleeping": is_sleeping}) +@router.post("/reconfigure") +async def reconfigure(request: ReconfigureRequest, raw_request: Request): + success = await engine_client(raw_request).reconfigure_async( + max_num_seqs=request.max_num_seqs, + max_num_batched_tokens=request.max_num_batched_tokens, + ) + return JSONResponse(content={"success": success}) + + def attach_router(app: FastAPI): if not envs.VLLM_SERVER_DEV_MODE: return diff --git a/vllm/entrypoints/serve/dev/protocol.py b/vllm/entrypoints/serve/dev/protocol.py new file mode 100644 index 000000000000..826fe698bf13 --- /dev/null +++ b/vllm/entrypoints/serve/dev/protocol.py @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +from pydantic import BaseModel + + +class ReconfigureRequest(BaseModel): + max_num_seqs: int | None = None + max_num_batched_tokens: int | None = None diff --git a/vllm/v1/core/sched/interface.py b/vllm/v1/core/sched/interface.py index 596ab05ad320..a78fffe9c70b 100644 --- a/vllm/v1/core/sched/interface.py +++ b/vllm/v1/core/sched/interface.py @@ -187,3 +187,6 @@ def shutdown(self) -> None: def get_kv_connector(self) -> Optional["KVConnectorBase_V1"]: return None + + def reconfigure(self, max_num_seqs: int, max_num_batched_tokens: int) -> None: + raise NotImplementedError diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index c3d504f2e72c..960e4d8831a5 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -1803,3 +1803,7 @@ def _handle_invalid_blocks(self, invalid_block_ids: set[int]) -> set[str]: self.failed_recving_kv_req_ids |= async_failed_req_ids # Return sync affected IDs to skip in update_from_output return sync_failed_req_ids + + def reconfigure(self, max_num_seqs: int, max_num_batched_tokens: int) -> None: + self.max_num_running_reqs = max_num_seqs + self.max_num_scheduled_tokens = max_num_batched_tokens diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 8eff61563cce..d9054eb5d6aa 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -848,6 +848,18 @@ async def scale_elastic_ep( custom_stat_loggers=None, ) + def reconfigure( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + return self.engine_core.reconfigure(max_num_seqs, max_num_batched_tokens) + + async def reconfigure_async( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + return await self.engine_core.reconfigure_async( + max_num_seqs, max_num_batched_tokens + ) + @property def is_running(self) -> bool: # Is None before the loop is started. diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index 0045b8c1dd3e..80a54dd9bebb 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -580,6 +580,32 @@ def preprocess_add_request(self, request: EngineCoreRequest) -> tuple[Request, i self.structured_output_manager.grammar_init(req) return req, request.current_wave + def reconfigure( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + vllm_config = self.vllm_config + max_num_seqs = max_num_seqs or vllm_config.org_max_num_seqs + max_num_batched_tokens = ( + max_num_batched_tokens or vllm_config.org_max_num_batched_tokens + ) + + # The reconfigured values can only be less than or equal to their original + # values. Otherwise, it may lead to an OOM, or CUDA graphs are not covered. + if max_num_seqs > vllm_config.org_max_num_seqs: + return False + if max_num_batched_tokens > vllm_config.org_max_num_batched_tokens: + return False + + scheduler_config = self.vllm_config.scheduler_config + scheduler_config.max_num_seqs = max_num_seqs + scheduler_config.max_num_batched_tokens = max_num_batched_tokens + + self.scheduler.reconfigure( + max_num_seqs=max_num_seqs, max_num_batched_tokens=max_num_batched_tokens + ) + + return True + class EngineCoreProc(EngineCore): """ZMQ-wrapper for running EngineCore in background process.""" diff --git a/vllm/v1/engine/core_client.py b/vllm/v1/engine/core_client.py index c936646aa799..2ea38cd2a89f 100644 --- a/vllm/v1/engine/core_client.py +++ b/vllm/v1/engine/core_client.py @@ -178,6 +178,11 @@ def save_sharded_state( ) -> None: raise NotImplementedError + def reconfigure( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + raise NotImplementedError + def collective_rpc( self, method: str | Callable[..., _R], @@ -253,6 +258,11 @@ async def collective_rpc_async( ) -> list[_R]: raise NotImplementedError + async def reconfigure_async( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + raise NotImplementedError + class InprocClient(EngineCoreClient): """ @@ -339,6 +349,11 @@ def collective_rpc( def dp_engines_running(self) -> bool: return False + def reconfigure( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + return self.engine_core.reconfigure(max_num_seqs, max_num_batched_tokens) + @dataclass class BackgroundResources: @@ -804,6 +819,11 @@ def save_sharded_state( ) -> None: self.call_utility("save_sharded_state", path, pattern, max_size) + def reconfigure( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + return self.call_utility("reconfigure", max_num_seqs, max_num_batched_tokens) + class AsyncMPClient(MPClient): """Asyncio-compatible client for multi-proc EngineCore.""" @@ -1014,6 +1034,13 @@ async def collective_rpc_async( "collective_rpc", method, timeout, args, kwargs ) + async def reconfigure_async( + self, max_num_seqs: int | None, max_num_batched_tokens: int | None + ) -> bool: + return await self.call_utility_async( + "reconfigure", max_num_seqs, max_num_batched_tokens + ) + class DPAsyncMPClient(AsyncMPClient): """Asyncio-compatible client for multi-proc, multi-engine (data parallel)