Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions vllm/config/vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,10 @@ def has_blocked_weights():
# Handle the KV connector configs
self._post_init_kv_transfer_config()

# for online reconfigure
self.org_max_num_seqs = self.scheduler_config.max_num_seqs
self.org_max_num_batched_tokens = self.scheduler_config.max_num_batched_tokens

def update_sizes_for_sequence_parallelism(self, possible_sizes: list) -> list:
# remove the sizes that not multiple of tp_size when
# enable sequence parallelism
Expand Down
10 changes: 10 additions & 0 deletions vllm/engine/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,13 @@ async def collective_rpc(
async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
"""Get supported tasks"""
raise NotImplementedError

def reconfigure(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this parameter be designed to be more universal? If other parameters need to be modified in the future, there's no need to add new parameters. It's suggested to pass a structure or dic instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree that passing a structure or dict would be better. Initially, the first version created a structure, but since there are only two parameters that can be modified, adding a structure just for that would make the code overly verbose. That’s why it was changed to the current approach.

) -> bool:
raise NotImplementedError

async def reconfigure_async(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
):
raise NotImplementedError
7 changes: 7 additions & 0 deletions vllm/entrypoints/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,13 @@ def reset_mm_cache(self) -> None:
self.input_processor.clear_mm_cache()
self.llm_engine.reset_mm_cache()

def reconfigure(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
) -> bool:
return self.llm_engine.engine_core.reconfigure(
max_num_seqs=max_num_seqs, max_num_batched_tokens=max_num_batched_tokens
)

def get_default_sampling_params(self) -> SamplingParams:
if self.default_sampling_params is None:
self.default_sampling_params = self.model_config.get_diff_sampling_param()
Expand Down
6 changes: 3 additions & 3 deletions vllm/entrypoints/serve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ def register_vllm_serve_api_routers(app: FastAPI):

attach_profile_router(app)

from vllm.entrypoints.serve.sleep.api_router import (
attach_router as attach_sleep_router,
from vllm.entrypoints.serve.dev.api_router import (
attach_router as attach_dev_router,
)

attach_sleep_router(app)
attach_dev_router(app)

from vllm.entrypoints.serve.tokenize.api_router import (
attach_router as attach_tokenize_router,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import vllm.envs as envs
from vllm.engine.protocol import EngineClient
from vllm.entrypoints.serve.dev.protocol import ReconfigureRequest
from vllm.logger import init_logger

logger = init_logger(__name__)
Expand Down Expand Up @@ -49,6 +50,15 @@ async def is_sleeping(raw_request: Request):
return JSONResponse(content={"is_sleeping": is_sleeping})


@router.post("/reconfigure")
async def reconfigure(request: ReconfigureRequest, raw_request: Request):
success = await engine_client(raw_request).reconfigure_async(
max_num_seqs=request.max_num_seqs,
max_num_batched_tokens=request.max_num_batched_tokens,
)
return JSONResponse(content={"success": success})


def attach_router(app: FastAPI):
if not envs.VLLM_SERVER_DEV_MODE:
return
Expand Down
9 changes: 9 additions & 0 deletions vllm/entrypoints/serve/dev/protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

from pydantic import BaseModel


class ReconfigureRequest(BaseModel):
max_num_seqs: int | None = None
max_num_batched_tokens: int | None = None
3 changes: 3 additions & 0 deletions vllm/v1/core/sched/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,6 @@ def shutdown(self) -> None:

def get_kv_connector(self) -> Optional["KVConnectorBase_V1"]:
return None

def reconfigure(self, max_num_seqs: int, max_num_batched_tokens: int) -> None:
raise NotImplementedError
4 changes: 4 additions & 0 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1803,3 +1803,7 @@ def _handle_invalid_blocks(self, invalid_block_ids: set[int]) -> set[str]:
self.failed_recving_kv_req_ids |= async_failed_req_ids
# Return sync affected IDs to skip in update_from_output
return sync_failed_req_ids

def reconfigure(self, max_num_seqs: int, max_num_batched_tokens: int) -> None:
self.max_num_running_reqs = max_num_seqs
self.max_num_scheduled_tokens = max_num_batched_tokens
12 changes: 12 additions & 0 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,18 @@ async def scale_elastic_ep(
custom_stat_loggers=None,
)

def reconfigure(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
) -> bool:
return self.engine_core.reconfigure(max_num_seqs, max_num_batched_tokens)

async def reconfigure_async(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
) -> bool:
return await self.engine_core.reconfigure_async(
max_num_seqs, max_num_batched_tokens
)

@property
def is_running(self) -> bool:
# Is None before the loop is started.
Expand Down
26 changes: 26 additions & 0 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,32 @@ def preprocess_add_request(self, request: EngineCoreRequest) -> tuple[Request, i
self.structured_output_manager.grammar_init(req)
return req, request.current_wave

def reconfigure(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
) -> bool:
vllm_config = self.vllm_config
max_num_seqs = max_num_seqs or vllm_config.org_max_num_seqs
max_num_batched_tokens = (
max_num_batched_tokens or vllm_config.org_max_num_batched_tokens
)

# The reconfigured values can only be less than or equal to their original
# values. Otherwise, it may lead to an OOM, or CUDA graphs are not covered.
if max_num_seqs > vllm_config.org_max_num_seqs:
return False
if max_num_batched_tokens > vllm_config.org_max_num_batched_tokens:
return False

scheduler_config = self.vllm_config.scheduler_config
scheduler_config.max_num_seqs = max_num_seqs
scheduler_config.max_num_batched_tokens = max_num_batched_tokens

self.scheduler.reconfigure(
max_num_seqs=max_num_seqs, max_num_batched_tokens=max_num_batched_tokens
)

return True


class EngineCoreProc(EngineCore):
"""ZMQ-wrapper for running EngineCore in background process."""
Expand Down
27 changes: 27 additions & 0 deletions vllm/v1/engine/core_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ def save_sharded_state(
) -> None:
raise NotImplementedError

def reconfigure(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
) -> bool:
raise NotImplementedError

def collective_rpc(
self,
method: str | Callable[..., _R],
Expand Down Expand Up @@ -253,6 +258,11 @@ async def collective_rpc_async(
) -> list[_R]:
raise NotImplementedError

async def reconfigure_async(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
) -> bool:
raise NotImplementedError


class InprocClient(EngineCoreClient):
"""
Expand Down Expand Up @@ -339,6 +349,11 @@ def collective_rpc(
def dp_engines_running(self) -> bool:
return False

def reconfigure(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
) -> bool:
return self.engine_core.reconfigure(max_num_seqs, max_num_batched_tokens)


@dataclass
class BackgroundResources:
Expand Down Expand Up @@ -804,6 +819,11 @@ def save_sharded_state(
) -> None:
self.call_utility("save_sharded_state", path, pattern, max_size)

def reconfigure(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
) -> bool:
return self.call_utility("reconfigure", max_num_seqs, max_num_batched_tokens)


class AsyncMPClient(MPClient):
"""Asyncio-compatible client for multi-proc EngineCore."""
Expand Down Expand Up @@ -1014,6 +1034,13 @@ async def collective_rpc_async(
"collective_rpc", method, timeout, args, kwargs
)

async def reconfigure_async(
self, max_num_seqs: int | None, max_num_batched_tokens: int | None
) -> bool:
return await self.call_utility_async(
"reconfigure", max_num_seqs, max_num_batched_tokens
)


class DPAsyncMPClient(AsyncMPClient):
"""Asyncio-compatible client for multi-proc, multi-engine (data parallel)
Expand Down