-
Notifications
You must be signed in to change notification settings - Fork 681
[BugFix] Fix process_response_dict to support async in serving_completion #5758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] Fix process_response_dict to support async in serving_completion #5758
Conversation
|
Thanks for your contribution! |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## develop #5758 +/- ##
==========================================
Coverage ? 65.51%
==========================================
Files ? 337
Lines ? 43082
Branches ? 6639
==========================================
Hits ? 28225
Misses ? 12754
Partials ? 2103
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a bug where the process_response_dict function in serving_completion.py did not support asynchronous data processors. The fix adds runtime checks to determine if the function is a coroutine and appropriately awaits it if needed.
- Adds
inspect.iscoroutinefunction()checks to detect async processors - Handles both sync and async code paths for
process_response_dictcalls - Enables compatibility with EB5's processor which requires async support
| if inspect.iscoroutinefunction(self.engine_client.data_processor.process_response_dict): | ||
| await self.engine_client.data_processor.process_response_dict( | ||
| res, stream=True, include_stop_str_in_output=request.include_stop_str_in_output | ||
| ) | ||
| else: | ||
| self.engine_client.data_processor.process_response_dict( | ||
| res, stream=True, include_stop_str_in_output=request.include_stop_str_in_output | ||
| ) |
Copilot
AI
Dec 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inspect.iscoroutinefunction check is performed on every iteration inside the loop, which can impact performance. Consider caching this check result in the __init__ method of the class as an instance variable (e.g., self._is_process_response_dict_async), and then use the cached value here. This would eliminate the repeated introspection overhead during request processing.
| if inspect.iscoroutinefunction(self.engine_client.data_processor.process_response_dict): | ||
| await self.engine_client.data_processor.process_response_dict( | ||
| data, stream=False, include_stop_str_in_output=request.include_stop_str_in_output | ||
| ) | ||
| else: | ||
| self.engine_client.data_processor.process_response_dict( | ||
| data, stream=False, include_stop_str_in_output=request.include_stop_str_in_output | ||
| ) |
Copilot
AI
Dec 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code block is duplicated across two locations in the file (lines 318-325 and lines 513-520). Consider extracting this logic into a helper method (e.g., async def _call_process_response_dict) to improve maintainability and ensure consistency if the logic needs to change in the future.
| if inspect.iscoroutinefunction(self.engine_client.data_processor.process_response_dict): | ||
| await self.engine_client.data_processor.process_response_dict( | ||
| data, stream=False, include_stop_str_in_output=request.include_stop_str_in_output | ||
| ) | ||
| else: | ||
| self.engine_client.data_processor.process_response_dict( | ||
| data, stream=False, include_stop_str_in_output=request.include_stop_str_in_output | ||
| ) |
Copilot
AI
Dec 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new async path for process_response_dict is not covered by tests. Since this file has comprehensive test coverage in tests/entrypoints/openai/test_serving_completion.py, consider adding test cases that verify the async code path works correctly. This would involve mocking process_response_dict as an async function using AsyncMock and verifying it gets called with await.
| if inspect.iscoroutinefunction(self.engine_client.data_processor.process_response_dict): | ||
| await self.engine_client.data_processor.process_response_dict( | ||
| data, stream=False, include_stop_str_in_output=request.include_stop_str_in_output | ||
| ) | ||
| else: | ||
| self.engine_client.data_processor.process_response_dict( | ||
| data, stream=False, include_stop_str_in_output=request.include_stop_str_in_output | ||
| ) |
Copilot
AI
Dec 25, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The inspect.iscoroutinefunction check is performed on every iteration inside the loop, which can impact performance. Consider caching this check result in the __init__ method of the class as an instance variable (e.g., self._is_process_response_dict_async), and then use the cached value here. This would eliminate the repeated introspection overhead during request processing.
LiqinruiG
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
LiqinruiG
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…tion (PaddlePaddle#5758) * support process_response_dict async initial commit * fixbug * add unit test * optimize
Motivation
The
process_response_dictfunction inserving_completion.pydoes not support asynchronous calls, which causes issues when using EB5's processor.Modifications
Add a check to determine whether the
process_response_dictfunction is an asynchronous call, enabling support for asynchronous invocations.Usage or Command
no change in user command.
Accuracy Tests
no need
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.