-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrouter.py
More file actions
179 lines (139 loc) · 5.08 KB
/
router.py
File metadata and controls
179 lines (139 loc) · 5.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
'''
Created on May 17, 2025
@author: Yihao Fang
'''
"""A server that provides OpenAI-compatible RESTful APIs.
It current only supports Chat Completions: https://platform.openai.com/docs/api-reference/chat)
"""
import logging
import os
import time
from collections import defaultdict
from typing import AsyncGenerator, Dict, List, Literal, Optional, Union
import fastapi
import shortuuid
import uvicorn
from fastapi.concurrency import asynccontextmanager
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from openai import AsyncOpenAI
import copy
import asyncio
from node import serve_node
import json
os.environ["TOKENIZERS_PARALLELISM"] = "false"
with open("config/config.json","r") as f:
config = json.load(f)
router_port = config.get("router_port", None)
vllm_ports = config.get("vllm_ports", None)
model = config.get("model", None)
vllm_devices = config.get("vllm_devices", None)
vllm_memory_utils = config.get("vllm_memory_utils", None)
count = defaultdict(lambda: defaultdict(int))
@asynccontextmanager
async def lifespan(app):
yield
app = fastapi.FastAPI(lifespan=lifespan)
class ErrorResponse(BaseModel):
object: str = "error"
message: str
class UsageInfo(BaseModel):
prompt_tokens: int = 0
total_tokens: int = 0
completion_tokens: Optional[int] = 0
class ChatCompletionRequest(BaseModel):
# OpenAI fields: https://platform.openai.com/docs/api-reference/chat/create
model: str
messages: Union[
str,
List[Dict[str, str]],
List[Dict[str, Union[str, List[Dict[str, Union[str, Dict[str, str]]]]]]],
]
frequency_penalty: Optional[float] = 0.0
logit_bias: Optional[Dict[int, float]] = None
logprobs: Optional[bool] = None
top_logprobs: Optional[int] = None
max_tokens: Optional[int] = None
n: Optional[int] = 1
presence_penalty: Optional[float] = 0.0
response_format: Optional[Dict[str, str]] = (
None # { "type": "json_object" } for json mode
)
seed: Optional[int] = None
stop: Optional[Union[str, List[str]]] = None
stream: Optional[bool] = False
temperature: Optional[float] = 1.0
top_p: Optional[float] = 1.0
tools: Optional[List[Dict[str, Union[str, int, float]]]] = None
tool_choice: Optional[str] = None
user: Optional[str] = None
class ChatMessage(BaseModel):
role: str
content: str
class ChatCompletionResponseChoice(BaseModel):
index: int
message: ChatMessage
finish_reason: Optional[Literal["stop", "length"]] = None
class ChatCompletionResponse(BaseModel):
id: str = Field(default_factory=lambda: f"chatcmpl-{shortuuid.random()}")
object: str = "chat.completion"
created: int = Field(default_factory=lambda: int(time.time()))
model: str
choices: List[ChatCompletionResponseChoice]
usage: UsageInfo
async def stream_response(response) -> AsyncGenerator:
async for chunk in response:
yield f"data: {chunk.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
# The model name field contains the parameters for routing.
# Model name uses format router-[router name]-[threshold] e.g. router-bert-0.7
# The router type and threshold is used for routing that specific request.
logging.info(f"Received request: {request}")
async def make_request(port, req_dict_per_c):
client = AsyncOpenAI(
api_key="EMPTY",
base_url=f"http://localhost:{port}/v1",
)
res = await client.chat.completions.create(**req_dict_per_c)
return res
try:
req_dict = request.model_dump(exclude_none=True)
n = req_dict["n"]
l = len(vllm_ports)
n_partition = [round(n/l)]*(l-1)+ [n-(l-1)*(round(n/l))]
req_dicts = []
for n_per_c in n_partition:
if n_per_c > 0:
req_dict_per_c = copy.deepcopy(req_dict)
req_dict_per_c["n"] = n_per_c
req_dicts.append(req_dict_per_c)
tasks = [asyncio.create_task(make_request(port, req_dict_per_c)) for port, req_dict_per_c in zip(vllm_ports[:len(req_dicts)], req_dicts)]
results = await asyncio.gather(*tasks)
resp = results[0]
for r in results[1:]:
resp.choices.extend(r.choices)
except Exception as e:
return JSONResponse(
ErrorResponse(message=str(e)).model_dump(),
status_code=400,
)
if request.stream:
return StreamingResponse(
content=stream_response(resp), media_type="text/event-stream"
)
else:
return JSONResponse(content=resp.model_dump())
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return JSONResponse(content={"status": "online"})
if __name__ == "__main__":
serve_node(model = model, ports = vllm_ports, devices = vllm_devices, memory_utils=vllm_memory_utils)
uvicorn.run(
"router:app",
port=router_port,
host="0.0.0.0",
workers=0,
)