Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions lightllm/server/api_openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ async def _safe_stream_wrapper(stream_generator):
return


# OpenAI response schemas restrict finish_reason to {"stop","length","tool_calls"} (chat) and
# {"stop","length"} (completions). The internal FINISHED_ERROR status surfaces as "error" — surface
# that at the API boundary as a controlled error response rather than letting it leak into the
# Pydantic models (which would raise ValidationError) or to the client as a silent stop.
_INTERNAL_ERROR_MESSAGE = "Internal server error during request processing"
_INTERNAL_ERROR_TYPE = "InternalServerError"


def _sse_internal_error_payload() -> str:
error = {"error": {"message": _INTERNAL_ERROR_MESSAGE, "type": _INTERNAL_ERROR_TYPE}}
return json.dumps(error, ensure_ascii=False)


def _serialize_sse_chunk(chunk, choice_nulls=(), response_nulls=()):
"""Serialize a streaming chunk, explicitly including specified null fields."""
d = chunk.model_dump(exclude_none=True)
Expand Down Expand Up @@ -355,6 +368,9 @@ async def chat_completions_impl(request: ChatCompletionRequest, raw_request: Req
finish_reason_dict[sub_req_id] = finish_status.get_finish_reason()
prompt_tokens_dict[sub_req_id] = metadata["prompt_tokens"]
prompt_cache_len_dict[sub_req_id] = metadata.get("prompt_cache_len", 0)
if any(r == "error" for r in finish_reason_dict.values()):
logger.error(f"internal pipeline error during chat completion group_id={group_request_id}")
return create_error_response(HTTPStatus.INTERNAL_SERVER_ERROR, _INTERNAL_ERROR_MESSAGE)
choices = []
sub_ids = list(final_output_dict.keys())[: request.n]
for i in range(request.n):
Expand Down Expand Up @@ -473,6 +489,15 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
delta = request_output
current_finish_reason = finish_status.get_finish_reason()

if current_finish_reason == "error":
logger.error(
f"internal pipeline error during chat stream group_id={group_request_id} "
f"sub_req_id={sub_req_id}"
)
yield f"data: {_sse_internal_error_payload()}\n\n"
yield "data: [DONE]\n\n".encode("utf-8")
return

# Emit the initial role-only chunk once per choice, as required by the
# OpenAI SSE spec: role appears only in the first delta with content="".
if not has_emitted_first_chunk[choice_index]:
Expand Down Expand Up @@ -882,6 +907,9 @@ async def process_single_prompt(prompt: Union[str, List[int]], prompt_index: int
tasks = [asyncio.create_task(process_single_prompt(prompt, i)) for i, prompt in enumerate(prompts)]

results = await asyncio.gather(*tasks)
if any(r.get("finish_reason") == "error" for r in results):
logger.error("internal pipeline error during completion")
return create_error_response(HTTPStatus.INTERNAL_SERVER_ERROR, _INTERNAL_ERROR_MESSAGE)
return _build_completion_response(results, request, created_time, len(prompts) > 1)


Expand Down Expand Up @@ -916,6 +944,15 @@ async def stream_results() -> AsyncGenerator[bytes, None]:
if finish_status.is_finished():
current_finish_reason = finish_status.get_finish_reason()

if current_finish_reason == "error":
logger.error(
f"internal pipeline error during completion stream group_id={group_request_id} "
f"sub_req_id={sub_req_id}"
)
yield f"data: {_sse_internal_error_payload()}\n\n"
yield "data: [DONE]\n\n"
return

output_text = request_output
if request.echo and metadata.get("is_first_token", False):
prompt_str = prompt
Expand Down
7 changes: 5 additions & 2 deletions lightllm/server/core/objs/req.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@ class FinishStatus(ctypes.Structure):
NO_FINISH = 0
FINISHED_STOP = 1
FINISHED_LENGTH = 2
FINISHED_ERROR = 3

def __init__(self, init_state=NO_FINISH):
self.status = init_state

def set_status(self, new_status):
assert 0 <= new_status <= 2
assert 0 <= new_status <= 3
self.status = new_status

def get_status(self):
return self.status

def is_finished(self):
return self.FINISHED_STOP <= self.status <= self.FINISHED_LENGTH
return self.FINISHED_STOP <= self.status <= self.FINISHED_ERROR

def is_stopped(self):
return self.status == self.FINISHED_STOP
Expand All @@ -50,6 +51,8 @@ def get_finish_reason(self):
return "stop"
elif self.status == self.FINISHED_LENGTH:
return "length"
elif self.status == self.FINISHED_ERROR:
return "error"
return None


Expand Down
65 changes: 46 additions & 19 deletions lightllm/server/detokenization/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,46 @@ def _init_get_token_id_to_token_str(self):
self.token_id_to_token = {token_id: token for token, token_id in self.tokenizer.get_vocab().items()}
return

def _add_new_group_req_index(self, recv_obj: GroupReqIndexes):
def _add_new_group_req_index(self, recv_obj: GroupReqIndexes) -> int:
from lightllm.server.core.objs import FinishStatus

failed_count = 0
for req_index in recv_obj.shm_req_indexes:
req = self.shm_req_manager.get_req_obj_by_index(req_index)
req.link_prompt_ids_shm_array()
req.link_logprobs_shm_array()

logger.info(
f"detokenization recv req id {req.request_id} " f"cost time {time.time() - recv_obj.time_mark} s"
)

# p d 分离模式,decode节点的解码需要做一些特殊的修复。
decode_req = DecodeReq(req, self.is_pd_decode_mode)
if self.is_pd_decode_mode:
decode_req = decode_mode_fix(decode_req, self.tokenizer, self.eos_id)
# token_healing mode 的特殊初始化
if self.args.token_healing_mode:
decode_req.init_token_healing_prefix_str(self.token_id_to_token, self.tokenizer)

self.req_id_to_out[req.request_id] = decode_req
return
try:
req.link_prompt_ids_shm_array()
req.link_logprobs_shm_array()

logger.info(
f"detokenization recv req id {req.request_id} " f"cost time {time.time() - recv_obj.time_mark} s"
)

# p d 分离模式,decode节点的解码需要做一些特殊的修复。
decode_req = DecodeReq(req, self.is_pd_decode_mode)
if self.is_pd_decode_mode:
decode_req = decode_mode_fix(decode_req, self.tokenizer, self.eos_id)
# token_healing mode 的特殊初始化
if self.args.token_healing_mode:
decode_req.init_token_healing_prefix_str(self.token_id_to_token, self.tokenizer)

self.req_id_to_out[req.request_id] = decode_req
except Exception:
# Init failed (shm link, tokenizer, decode-mode fix, …). Mark the req
# finished with an error and push a sentinel into out_tokens_queue so the
# http loop forwards a terminal status — otherwise the queue stays empty,
# the client hangs until disconnect, and the shm slot leaks because
# can_released_mark never gets set. Continue with the rest of the group.
logger.exception(f"detokenization init failed for req_id {req.request_id}")
req.finish_status.set_status(FinishStatus.FINISHED_ERROR)
req.finish_token_index = req.input_len
try:
if not req.out_tokens_queue.is_full():
req.out_tokens_queue.push("", req.input_len, False, 1)
except Exception:
logger.exception(f"failed to push error sentinel for req_id {req.request_id}")
req.can_released_mark = True
failed_count += 1
Comment on lines +86 to +87
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The shared memory request object must be returned to the manager in the error path to prevent a reference count leak. In the successful path, this is handled in remove_finished_reqs (line 191). Without this call, the ref_count will not reach 1, and the HttpServerManager will not be able to reclaim the shared memory slot, eventually leading to resource exhaustion.

Suggested change
req.can_released_mark = True
failed_count += 1
req.can_released_mark = True
self.shm_req_manager.put_back_req_obj(req)
failed_count += 1

return failed_count

def handle_loop(self):
try:
Expand All @@ -76,7 +96,14 @@ def handle_loop(self):
for _ in range(recv_max_count):
recv_obj: GroupReqIndexes = self.zmq_recv_socket.recv_pyobj(zmq.NOBLOCK)
assert isinstance(recv_obj, GroupReqIndexes)
self._add_new_group_req_index(recv_obj=recv_obj)
try:
failed_count = self._add_new_group_req_index(recv_obj=recv_obj)
except Exception:
logger.exception("add new group req index has exception")
failed_count = len(recv_obj.shm_req_indexes)
if failed_count:
# Wake the http loop so it drains the error sentinel(s) we just pushed.
self.pub_to_httpserver.send_pyobj(None, protocol=pickle.HIGHEST_PROTOCOL)

# 当队列中存在较多的请求时,将一次接受的数量上调
recv_max_count = min(int(recv_max_count * 1.3), 256)
Expand Down
Loading