From 9aa2e43d01bc3393bb41fee56432fb09451cd2b1 Mon Sep 17 00:00:00 2001 From: sufubao Date: Thu, 7 May 2026 20:43:26 +0800 Subject: [PATCH 1/2] feat(detoken): propagate FINISHED_ERROR on init failure Add FinishStatus.FINISHED_ERROR (status 3, render as "error") and wrap detokenization _add_new_group_req_index in try/except. On init failure (shm link, decode-mode fix, token-healing init), mark the req with FINISHED_ERROR so the http loop forwards a terminal status instead of hanging until client disconnect. --- lightllm/server/core/objs/req.py | 7 ++-- lightllm/server/detokenization/manager.py | 44 ++++++++++++++--------- 2 files changed, 32 insertions(+), 19 deletions(-) diff --git a/lightllm/server/core/objs/req.py b/lightllm/server/core/objs/req.py index 7f2b697091..a17b93eeab 100644 --- a/lightllm/server/core/objs/req.py +++ b/lightllm/server/core/objs/req.py @@ -25,19 +25,20 @@ class FinishStatus(ctypes.Structure): NO_FINISH = 0 FINISHED_STOP = 1 FINISHED_LENGTH = 2 + FINISHED_ERROR = 3 def __init__(self, init_state=NO_FINISH): self.status = init_state def set_status(self, new_status): - assert 0 <= new_status <= 2 + assert 0 <= new_status <= 3 self.status = new_status def get_status(self): return self.status def is_finished(self): - return self.FINISHED_STOP <= self.status <= self.FINISHED_LENGTH + return self.FINISHED_STOP <= self.status <= self.FINISHED_ERROR def is_stopped(self): return self.status == self.FINISHED_STOP @@ -50,6 +51,8 @@ def get_finish_reason(self): return "stop" elif self.status == self.FINISHED_LENGTH: return "length" + elif self.status == self.FINISHED_ERROR: + return "error" return None diff --git a/lightllm/server/detokenization/manager.py b/lightllm/server/detokenization/manager.py index 389171ba8a..253c74ee63 100644 --- a/lightllm/server/detokenization/manager.py +++ b/lightllm/server/detokenization/manager.py @@ -47,24 +47,30 @@ def _init_get_token_id_to_token_str(self): return def _add_new_group_req_index(self, recv_obj: GroupReqIndexes): + from lightllm.server.core.objs import FinishStatus + for req_index in recv_obj.shm_req_indexes: req = self.shm_req_manager.get_req_obj_by_index(req_index) - req.link_prompt_ids_shm_array() - req.link_logprobs_shm_array() - - logger.info( - f"detokenization recv req id {req.request_id} " f"cost time {time.time() - recv_obj.time_mark} s" - ) - - # p d 分离模式,decode节点的解码需要做一些特殊的修复。 - decode_req = DecodeReq(req, self.is_pd_decode_mode) - if self.is_pd_decode_mode: - decode_req = decode_mode_fix(decode_req, self.tokenizer, self.eos_id) - # token_healing mode 的特殊初始化 - if self.args.token_healing_mode: - decode_req.init_token_healing_prefix_str(self.token_id_to_token, self.tokenizer) - - self.req_id_to_out[req.request_id] = decode_req + try: + req.link_prompt_ids_shm_array() + req.link_logprobs_shm_array() + + logger.info( + f"detokenization recv req id {req.request_id} " f"cost time {time.time() - recv_obj.time_mark} s" + ) + + # p d 分离模式,decode节点的解码需要做一些特殊的修复。 + decode_req = DecodeReq(req, self.is_pd_decode_mode) + if self.is_pd_decode_mode: + decode_req = decode_mode_fix(decode_req, self.tokenizer, self.eos_id) + # token_healing mode 的特殊初始化 + if self.args.token_healing_mode: + decode_req.init_token_healing_prefix_str(self.token_id_to_token, self.tokenizer) + + self.req_id_to_out[req.request_id] = decode_req + except Exception as e: + req.finish_status.set_status(FinishStatus.FINISHED_ERROR) + raise e return def handle_loop(self): @@ -76,7 +82,11 @@ def handle_loop(self): for _ in range(recv_max_count): recv_obj: GroupReqIndexes = self.zmq_recv_socket.recv_pyobj(zmq.NOBLOCK) assert isinstance(recv_obj, GroupReqIndexes) - self._add_new_group_req_index(recv_obj=recv_obj) + try: + self._add_new_group_req_index(recv_obj=recv_obj) + except Exception: + logger.exception("add new group req index has exception") + self.pub_to_httpserver.send_pyobj(None, protocol=pickle.HIGHEST_PROTOCOL) # 当队列中存在较多的请求时,将一次接受的数量上调 recv_max_count = min(int(recv_max_count * 1.3), 256) From 59a735573fbb29aedbd70f9e6f27ac76115bfc40 Mon Sep 17 00:00:00 2001 From: sufubao Date: Fri, 8 May 2026 16:20:16 +0800 Subject: [PATCH 2/2] fix(detoken,openai): unhang client on detoken init failure; map "error" to API error path - detoken: on _add_new_group_req_index failure, set FINISHED_ERROR, push an empty-string sentinel into out_tokens_queue at finish_token_index, mark can_released_mark, and continue with the rest of the group instead of re-raising. Without this the http loop stays blocked (queue empty, no finish ever forwarded) and the shm req leaks until client disconnect. - openai: surface FINISHED_ERROR as a controlled error response. Non-stream chat / completions return HTTP 500; streaming chat / completions yield an SSE error event followed by [DONE] and stop. Previously "error" leaked into ChatCompletionResponseChoice / CompletionChoice whose finish_reason literals reject it, raising Pydantic ValidationError. --- lightllm/server/api_openai.py | 37 +++++++++++++++++++++++ lightllm/server/detokenization/manager.py | 27 ++++++++++++++--- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/lightllm/server/api_openai.py b/lightllm/server/api_openai.py index c324df19c8..bc355e7448 100644 --- a/lightllm/server/api_openai.py +++ b/lightllm/server/api_openai.py @@ -75,6 +75,19 @@ async def _safe_stream_wrapper(stream_generator): return +# OpenAI response schemas restrict finish_reason to {"stop","length","tool_calls"} (chat) and +# {"stop","length"} (completions). The internal FINISHED_ERROR status surfaces as "error" — surface +# that at the API boundary as a controlled error response rather than letting it leak into the +# Pydantic models (which would raise ValidationError) or to the client as a silent stop. +_INTERNAL_ERROR_MESSAGE = "Internal server error during request processing" +_INTERNAL_ERROR_TYPE = "InternalServerError" + + +def _sse_internal_error_payload() -> str: + error = {"error": {"message": _INTERNAL_ERROR_MESSAGE, "type": _INTERNAL_ERROR_TYPE}} + return json.dumps(error, ensure_ascii=False) + + def _serialize_sse_chunk(chunk, choice_nulls=(), response_nulls=()): """Serialize a streaming chunk, explicitly including specified null fields.""" d = chunk.model_dump(exclude_none=True) @@ -355,6 +368,9 @@ async def chat_completions_impl(request: ChatCompletionRequest, raw_request: Req finish_reason_dict[sub_req_id] = finish_status.get_finish_reason() prompt_tokens_dict[sub_req_id] = metadata["prompt_tokens"] prompt_cache_len_dict[sub_req_id] = metadata.get("prompt_cache_len", 0) + if any(r == "error" for r in finish_reason_dict.values()): + logger.error(f"internal pipeline error during chat completion group_id={group_request_id}") + return create_error_response(HTTPStatus.INTERNAL_SERVER_ERROR, _INTERNAL_ERROR_MESSAGE) choices = [] sub_ids = list(final_output_dict.keys())[: request.n] for i in range(request.n): @@ -473,6 +489,15 @@ async def stream_results() -> AsyncGenerator[bytes, None]: delta = request_output current_finish_reason = finish_status.get_finish_reason() + if current_finish_reason == "error": + logger.error( + f"internal pipeline error during chat stream group_id={group_request_id} " + f"sub_req_id={sub_req_id}" + ) + yield f"data: {_sse_internal_error_payload()}\n\n" + yield "data: [DONE]\n\n".encode("utf-8") + return + # Emit the initial role-only chunk once per choice, as required by the # OpenAI SSE spec: role appears only in the first delta with content="". if not has_emitted_first_chunk[choice_index]: @@ -882,6 +907,9 @@ async def process_single_prompt(prompt: Union[str, List[int]], prompt_index: int tasks = [asyncio.create_task(process_single_prompt(prompt, i)) for i, prompt in enumerate(prompts)] results = await asyncio.gather(*tasks) + if any(r.get("finish_reason") == "error" for r in results): + logger.error("internal pipeline error during completion") + return create_error_response(HTTPStatus.INTERNAL_SERVER_ERROR, _INTERNAL_ERROR_MESSAGE) return _build_completion_response(results, request, created_time, len(prompts) > 1) @@ -916,6 +944,15 @@ async def stream_results() -> AsyncGenerator[bytes, None]: if finish_status.is_finished(): current_finish_reason = finish_status.get_finish_reason() + if current_finish_reason == "error": + logger.error( + f"internal pipeline error during completion stream group_id={group_request_id} " + f"sub_req_id={sub_req_id}" + ) + yield f"data: {_sse_internal_error_payload()}\n\n" + yield "data: [DONE]\n\n" + return + output_text = request_output if request.echo and metadata.get("is_first_token", False): prompt_str = prompt diff --git a/lightllm/server/detokenization/manager.py b/lightllm/server/detokenization/manager.py index 253c74ee63..0341f1a3d5 100644 --- a/lightllm/server/detokenization/manager.py +++ b/lightllm/server/detokenization/manager.py @@ -46,9 +46,10 @@ def _init_get_token_id_to_token_str(self): self.token_id_to_token = {token_id: token for token, token_id in self.tokenizer.get_vocab().items()} return - def _add_new_group_req_index(self, recv_obj: GroupReqIndexes): + def _add_new_group_req_index(self, recv_obj: GroupReqIndexes) -> int: from lightllm.server.core.objs import FinishStatus + failed_count = 0 for req_index in recv_obj.shm_req_indexes: req = self.shm_req_manager.get_req_obj_by_index(req_index) try: @@ -68,10 +69,23 @@ def _add_new_group_req_index(self, recv_obj: GroupReqIndexes): decode_req.init_token_healing_prefix_str(self.token_id_to_token, self.tokenizer) self.req_id_to_out[req.request_id] = decode_req - except Exception as e: + except Exception: + # Init failed (shm link, tokenizer, decode-mode fix, …). Mark the req + # finished with an error and push a sentinel into out_tokens_queue so the + # http loop forwards a terminal status — otherwise the queue stays empty, + # the client hangs until disconnect, and the shm slot leaks because + # can_released_mark never gets set. Continue with the rest of the group. + logger.exception(f"detokenization init failed for req_id {req.request_id}") req.finish_status.set_status(FinishStatus.FINISHED_ERROR) - raise e - return + req.finish_token_index = req.input_len + try: + if not req.out_tokens_queue.is_full(): + req.out_tokens_queue.push("", req.input_len, False, 1) + except Exception: + logger.exception(f"failed to push error sentinel for req_id {req.request_id}") + req.can_released_mark = True + failed_count += 1 + return failed_count def handle_loop(self): try: @@ -83,9 +97,12 @@ def handle_loop(self): recv_obj: GroupReqIndexes = self.zmq_recv_socket.recv_pyobj(zmq.NOBLOCK) assert isinstance(recv_obj, GroupReqIndexes) try: - self._add_new_group_req_index(recv_obj=recv_obj) + failed_count = self._add_new_group_req_index(recv_obj=recv_obj) except Exception: logger.exception("add new group req index has exception") + failed_count = len(recv_obj.shm_req_indexes) + if failed_count: + # Wake the http loop so it drains the error sentinel(s) we just pushed. self.pub_to_httpserver.send_pyobj(None, protocol=pickle.HIGHEST_PROTOCOL) # 当队列中存在较多的请求时,将一次接受的数量上调