From 75473cbbc22d4a2cc3b6bdd1fe0772a0a8c86847 Mon Sep 17 00:00:00 2001 From: Ananth Subramaniam Date: Thu, 28 May 2026 00:00:15 -0700 Subject: [PATCH] perf(responses_api_agents): bypass HTTP self-call from /run to /v1/responses Each agent's `/run` handler previously made an HTTP self-call to its own `/v1/responses` endpoint, paying for aiohttp round-trip, FastAPI middleware, and pydantic re-validation on every rollout. This converts all production agents that still did this to call the implementation in-process. The refactor extracts the existing `responses()` body into a private `_responses(body, cookies) -> (response, set_cookies)` helper. The public `responses(request, response, body)` method keeps its FastAPI signature and is now a 4-line adapter that delegates to `_responses`, so external HTTP callers see no change. `/run` replaces the 7-line self-call block with two lines that call `_responses` directly. Agents converted: simple_agent, proof_refinement_agent, non_executing_simple_agent, speed_bench_agent, cvdp_agent, finance_agent, browsecomp_agent, hermes_agent, claude_code_agent, and the langgraph_agent base + 4 subclasses (rewoo, orchestrator, parallel_thinking, reflection). swe_agents, aviary_agent, verifiers_agent, and stirrup_agent were already on this pattern. Microbenchmark shows ~0.7-1.2 ms framework overhead per self-call at concurrency=1, scaling to a ~70-150x rps multiplier in the LLM-overhead-isolated case. End-to-end with a mock model: simple_agent +25% rps / 20% wall-time reduction; proof_refinement_agent (3 self-calls per rollout) up to +2x rps / 50% wall-time reduction at concurrency=256. With a real OpenAI model in the loop the absolute per-rollout savings carry over (~7% rps on simple, ~13% on pref) and the HTTP path also hit FD exhaustion at concurrency=1024 where the in-process path did not. Signed-off-by: Ananth Subramaniam --- responses_api_agents/browsecomp_agent/app.py | 35 +++++++++------- responses_api_agents/claude_code_agent/app.py | 18 ++++----- responses_api_agents/cvdp_agent/app.py | 36 ++++++++++------- responses_api_agents/finance_agent/app.py | 38 ++++++++++-------- responses_api_agents/hermes_agent/app.py | 18 ++++----- responses_api_agents/langgraph_agent/app.py | 30 +++++++++----- .../langgraph_agent/orchestrator_agent.py | 10 ++--- .../parallel_thinking_agent.py | 10 ++--- .../langgraph_agent/reflection_agent.py | 10 ++--- .../langgraph_agent/rewoo_agent.py | 12 ++---- .../non_executing_simple_agent/app.py | 34 ++++++++++------ .../tests/test_app.py | 9 +++-- .../proof_refinement_agent/app.py | 40 ++++++++++--------- responses_api_agents/simple_agent/app.py | 34 ++++++++++------ responses_api_agents/speed_bench_agent/app.py | 32 +++++++++------ 15 files changed, 209 insertions(+), 157 deletions(-) diff --git a/responses_api_agents/browsecomp_agent/app.py b/responses_api_agents/browsecomp_agent/app.py index b9a32efa5..658f94fc5 100644 --- a/responses_api_agents/browsecomp_agent/app.py +++ b/responses_api_agents/browsecomp_agent/app.py @@ -20,7 +20,7 @@ import traceback from datetime import datetime from pathlib import Path -from typing import List, Optional +from typing import List, Mapping, Optional, Tuple from fastapi import Request, Response from pydantic import ConfigDict, ValidationError @@ -126,6 +126,17 @@ async def responses( response: Response, body: NeMoGymResponseCreateParamsNonStreaming = Body(), ) -> NeMoGymResponse: + result, set_cookies = await self._responses(body, request.cookies) + for k, v in set_cookies.items(): + response.set_cookie(k, v) + return result + + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + cookies: Optional[Mapping[str, str]] = None, + ) -> Tuple[NeMoGymResponse, dict]: + """Implementation of `/v1/responses`; `run` invokes this in-process.""" body = body.model_copy(deep=True) if isinstance(body.input, str): @@ -142,7 +153,9 @@ async def responses( step = 0 num_tool_calls = 0 model_server_cookies = None # update the cookies on every model response - resources_server_cookies = request.cookies # update the cookies on every resources server response + resources_server_cookies = ( + dict(cookies) if cookies else {} + ) # update the cookies on every resources server response reset_threshold = 0 reset_count = 0 @@ -636,8 +649,9 @@ def save_ckpt() -> None: pass # best-effort; a stale ckpt won't be re-read because the driver caches this sample # Propogate any extra cookies necessary for downstream verification + set_cookies: dict[str, str] = {} for k, v in (*resources_server_cookies.items(), *model_server_cookies.items()): - response.set_cookie(k, v) + set_cookies[k] = v print( f"{user_query[:20]}... | FINISHED | Step {step} | Time: {time.monotonic() - time_taken:.2f}s (model {time_taken_model_call:.2f}s, tool {time_taken_tool_call:.2f}s) | Max output tokens: {max_output_tokens} | Missing end thinks: {missing_end_think_count}" @@ -648,7 +662,7 @@ def save_ckpt() -> None: model_response.reset_count = reset_count model_response.num_tool_calls = num_tool_calls model_response.metadata = {"missing_end_think_count": str(missing_end_think_count)} - return model_response + return model_response, set_cookies async def run(self, request: Request, body: BrowsecompAgentRunRequest) -> BrowsecompAgentVerifyResponse: cookies = request.cookies @@ -668,16 +682,9 @@ async def run(self, request: Request, body: BrowsecompAgentRunRequest) -> Browse body.responses_create_params.metadata["task_index"] = str(body._ng_task_index) body.responses_create_params.metadata["attempt"] = str(0) - response = await self.server_client.post( - server_name=self.config.name, - url_path="/v1/responses", - json=body.responses_create_params, - cookies=cookies, - ) - await raise_for_status(response) - cookies = response.cookies - - response_json = await get_response_json(response) + inproc_response, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies + response_json = inproc_response.model_dump() verify_request = BrowsecompAgentVerifyRequest.model_validate(body.model_dump() | {"response": response_json}) diff --git a/responses_api_agents/claude_code_agent/app.py b/responses_api_agents/claude_code_agent/app.py index c80bb14b4..ac4aa5e9e 100644 --- a/responses_api_agents/claude_code_agent/app.py +++ b/responses_api_agents/claude_code_agent/app.py @@ -345,6 +345,13 @@ async def responses( request: Request, body: NeMoGymResponseCreateParamsNonStreaming = Body(), ) -> NeMoGymResponse: + return await self._responses(body) + + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + ) -> NeMoGymResponse: + """Implementation of `/v1/responses`; `run` invokes this in-process.""" body = body.model_copy(deep=True) if isinstance(body.input, str): body.input = [NeMoGymEasyInputMessage(role="user", content=body.input)] @@ -405,15 +412,8 @@ async def run(self, request: Request, body: ClaudeCodeAgentRunRequest) -> Claude await raise_for_status(seed_resp) cookies = seed_resp.cookies - agent_resp = await self.server_client.post( - server_name=self.config.name, - url_path="/v1/responses", - json=body.responses_create_params, - cookies=cookies, - ) - await raise_for_status(agent_resp) - cookies = agent_resp.cookies - agent_resp_json = await get_response_json(agent_resp) + inproc_resp = await self._responses(body.responses_create_params) + agent_resp_json = inproc_resp.model_dump() verify_resp = await self.server_client.post( server_name=self.config.resources_server.name, diff --git a/responses_api_agents/cvdp_agent/app.py b/responses_api_agents/cvdp_agent/app.py index b9e491612..c4251a0d9 100644 --- a/responses_api_agents/cvdp_agent/app.py +++ b/responses_api_agents/cvdp_agent/app.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json -from typing import List +from typing import List, Mapping, Optional, Tuple from fastapi import Request, Response from pydantic import ConfigDict, ValidationError @@ -68,6 +68,17 @@ async def responses( response: Response, body: NeMoGymResponseCreateParamsNonStreaming = Body(), ) -> NeMoGymResponse: + result, set_cookies = await self._responses(body, request.cookies) + for k, v in set_cookies.items(): + response.set_cookie(k, v) + return result + + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + cookies: Optional[Mapping[str, str]] = None, + ) -> Tuple[NeMoGymResponse, dict]: + """Implementation of `/v1/responses`; `run` invokes this in-process.""" body = body.model_copy(deep=True) if isinstance(body.input, str): @@ -77,7 +88,9 @@ async def responses( usage = None step = 0 model_server_cookies = None # update the cookies on every model response - resources_server_cookies = request.cookies # update the cookies on every resources server response + resources_server_cookies = ( + dict(cookies) if cookies else {} + ) # update the cookies on every resources server response while True: step += 1 @@ -148,12 +161,13 @@ async def responses( break # Propogate any extra cookies necessary for downstream verification + set_cookies: dict[str, str] = {} for k, v in (*resources_server_cookies.items(), *model_server_cookies.items()): - response.set_cookie(k, v) + set_cookies[k] = v model_response.output = new_outputs model_response.usage = usage - return model_response + return model_response, set_cookies async def run(self, request: Request, body: SimpleAgentRunRequest) -> SimpleAgentVerifyResponse: cookies = request.cookies @@ -179,24 +193,18 @@ async def run(self, request: Request, body: SimpleAgentRunRequest) -> SimpleAgen retries_left = self.config.llm_parse_retries while True: try: - response = await self.server_client.post( - server_name=self.config.name, - url_path="/v1/responses", - json=body.responses_create_params, - cookies=cookies, - ) - await raise_for_status(response) - cookies = response.cookies + inproc_response, set_cookies = await self._responses(body.responses_create_params, cookies) + attempt_cookies = set_cookies verify_request = SimpleAgentVerifyRequest.model_validate( - body.model_dump() | {"response": await get_response_json(response)} + body.model_dump() | {"response": inproc_response.model_dump()} ) verify_response = await self.server_client.post( server_name=self.config.resources_server.name, url_path="/verify", json=verify_request.model_dump(), - cookies=cookies, + cookies=attempt_cookies, ) await raise_for_status(verify_response) result = SimpleAgentVerifyResponse.model_validate(await get_response_json(verify_response)) diff --git a/responses_api_agents/finance_agent/app.py b/responses_api_agents/finance_agent/app.py index 30e53fcef..2f6f53c2b 100644 --- a/responses_api_agents/finance_agent/app.py +++ b/responses_api_agents/finance_agent/app.py @@ -16,7 +16,7 @@ import json import logging import re -from typing import Any, List, Optional +from typing import Any, List, Mapping, Optional, Tuple from fastapi import Request, Response from pydantic import ConfigDict, Field @@ -161,6 +161,17 @@ async def responses( response: Response, body: NeMoGymResponseCreateParamsNonStreaming = Body(), ) -> NeMoGymResponse: + result, set_cookies = await self._responses(body, request.cookies) + for k, v in set_cookies.items(): + response.set_cookie(k, v) + return result + + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + cookies: Optional[Mapping[str, str]] = None, + ) -> Tuple[NeMoGymResponse, dict]: + """Implementation of `/v1/responses`; `run` invokes this in-process.""" body = body.model_copy(deep=True) if isinstance(body.input, str): @@ -171,7 +182,7 @@ async def responses( step = 0 last_model_response: Optional[NeMoGymResponse] = None model_server_cookies = None - resources_server_cookies = request.cookies + resources_server_cookies = dict(cookies) if cookies else {} done_tools_set = set(self.config.done_tools) max_steps = self.config.max_steps @@ -316,15 +327,16 @@ async def responses( tool_choice="auto", ) - cookie_items = list(resources_server_cookies.items()) + set_cookies: dict[str, str] = {} + for k, v in resources_server_cookies.items(): + set_cookies[k] = v if model_server_cookies: - cookie_items.extend(model_server_cookies.items()) - for k, v in cookie_items: - response.set_cookie(k, v) + for k, v in model_server_cookies.items(): + set_cookies[k] = v last_model_response.output = new_outputs last_model_response.usage = usage - return last_model_response + return last_model_response, set_cookies async def run(self, request: Request, body: FinanceAgentRunRequest) -> FinanceAgentVerifyResponse: try: @@ -359,17 +371,11 @@ async def _run_inner(self, request: Request, body: FinanceAgentRunRequest) -> Fi await raise_for_status(seed_session_response) cookies = seed_session_response.cookies - response = await self.server_client.post( - server_name=self.config.name, - url_path="/v1/responses", - json=body.responses_create_params, - cookies=cookies, - ) - await raise_for_status(response) - cookies = response.cookies + inproc_response, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies verify_request = FinanceAgentVerifyRequest.model_validate( - body.model_dump() | {"response": await get_response_json(response)} + body.model_dump() | {"response": inproc_response.model_dump()} ) verify_response = await self.server_client.post( diff --git a/responses_api_agents/hermes_agent/app.py b/responses_api_agents/hermes_agent/app.py index 57a550996..d96f4bc61 100644 --- a/responses_api_agents/hermes_agent/app.py +++ b/responses_api_agents/hermes_agent/app.py @@ -197,6 +197,13 @@ async def responses( request: Request, body: NeMoGymResponseCreateParamsNonStreaming = Body(), ) -> NeMoGymResponse: + return await self._responses(body) + + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + ) -> NeMoGymResponse: + """Implementation of `/v1/responses`; `run` invokes this in-process.""" from run_agent import AIAgent # from hermes-agent on path body = body.model_copy(deep=True) @@ -315,15 +322,8 @@ async def run(self, request: Request, body: HermesAgentRunRequest) -> HermesAgen await raise_for_status(seed_resp) cookies = seed_resp.cookies - agent_resp = await self.server_client.post( - server_name=self.config.name, - url_path="/v1/responses", - json=body.responses_create_params, - cookies=cookies, - ) - await raise_for_status(agent_resp) - cookies = agent_resp.cookies - agent_resp_json = await get_response_json(agent_resp) + inproc_resp = await self._responses(body.responses_create_params) + agent_resp_json = inproc_resp.model_dump() verify_resp = await self.server_client.post( server_name=self.config.resources_server.name, diff --git a/responses_api_agents/langgraph_agent/app.py b/responses_api_agents/langgraph_agent/app.py index 2541545fc..7efaedc93 100644 --- a/responses_api_agents/langgraph_agent/app.py +++ b/responses_api_agents/langgraph_agent/app.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from abc import abstractmethod -from typing import Any +from typing import Any, Mapping, Optional, Tuple from fastapi import Body, Request, Response from pydantic import ConfigDict @@ -58,17 +58,29 @@ def extract_model_response(self, final_state: dict) -> NeMoGymResponse: async def responses( self, request: Request, response: Response, body: NeMoGymResponseCreateParamsNonStreaming = Body() ) -> NeMoGymResponse: - initial_state = await self.get_initial_state(body, request.cookies) + result, set_cookies = await self._responses(body, request.cookies) + for k, v in set_cookies.items(): + response.set_cookie(k, v) + return result + + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + cookies: Optional[Mapping[str, str]] = None, + ) -> Tuple[NeMoGymResponse, dict]: + """Implementation of `/v1/responses`; subclass `run` methods invoke this in-process.""" + initial_state = await self.get_initial_state(body, dict(cookies) if cookies else {}) final_state = await self.graph.ainvoke(initial_state) + set_cookies: dict[str, str] = {} if "cookies" in final_state: for k, v in final_state["cookies"].items(): - response.set_cookie(k, v) + set_cookies[k] = v model_response = self.extract_model_response(final_state) outputs = self.extract_outputs(final_state) model_response.output = outputs - return model_response + return model_response, set_cookies async def run(self, request: Request, body: BaseRunRequest) -> BaseVerifyResponse: cookies = request.cookies @@ -82,18 +94,16 @@ async def run(self, request: Request, body: BaseRunRequest) -> BaseVerifyRespons await raise_for_status(seed) cookies = seed.cookies - resp = await self.server_client.post( - server_name=self.config.name, url_path="/v1/responses", json=body.responses_create_params, cookies=cookies - ) - await raise_for_status(resp) + inproc_resp, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies - verify_request_dict = body.model_dump() | {"response": await get_response_json(resp)} + verify_request_dict = body.model_dump() | {"response": inproc_resp.model_dump()} verify = await self.server_client.post( server_name=self.config.resources_server.name, url_path="/verify", json=verify_request_dict, - cookies=resp.cookies, + cookies=cookies, ) await raise_for_status(verify) return BaseVerifyResponse.model_validate(await get_response_json(verify)) diff --git a/responses_api_agents/langgraph_agent/orchestrator_agent.py b/responses_api_agents/langgraph_agent/orchestrator_agent.py index 69fb28494..15b0cc835 100644 --- a/responses_api_agents/langgraph_agent/orchestrator_agent.py +++ b/responses_api_agents/langgraph_agent/orchestrator_agent.py @@ -234,20 +234,18 @@ async def run(self, request: Request, body: OrchestratorRunRequest) -> Orchestra await raise_for_status(seed) cookies = seed.cookies - resp = await self.server_client.post( - server_name=self.config.name, url_path="/v1/responses", json=body.responses_create_params, cookies=cookies - ) - await raise_for_status(resp) + inproc_resp, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies verify_request = OrchestratorVerifyRequest.model_validate( - body.model_dump() | {"response": await get_response_json(resp)} + body.model_dump() | {"response": inproc_resp.model_dump()} ) verify = await self.server_client.post( server_name=self.config.resources_server.name, url_path="/verify", json=verify_request.model_dump(), - cookies=resp.cookies, + cookies=cookies, ) await raise_for_status(verify) return OrchestratorVerifyResponse.model_validate(await get_response_json(verify)) diff --git a/responses_api_agents/langgraph_agent/parallel_thinking_agent.py b/responses_api_agents/langgraph_agent/parallel_thinking_agent.py index cdad807e4..0ce3a1d48 100644 --- a/responses_api_agents/langgraph_agent/parallel_thinking_agent.py +++ b/responses_api_agents/langgraph_agent/parallel_thinking_agent.py @@ -210,20 +210,18 @@ async def run(self, request: Request, body: ParallelThinkingRunRequest) -> Paral await raise_for_status(seed) cookies = seed.cookies - resp = await self.server_client.post( - server_name=self.config.name, url_path="/v1/responses", json=body.responses_create_params, cookies=cookies - ) - await raise_for_status(resp) + inproc_resp, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies verify_request = ParallelThinkingVerifyRequest.model_validate( - body.model_dump() | {"response": await get_response_json(resp)} + body.model_dump() | {"response": inproc_resp.model_dump()} ) verify = await self.server_client.post( server_name=self.config.resources_server.name, url_path="/verify", json=verify_request.model_dump(), - cookies=resp.cookies, + cookies=cookies, ) await raise_for_status(verify) return ParallelThinkingVerifyResponse.model_validate(await get_response_json(verify)) diff --git a/responses_api_agents/langgraph_agent/reflection_agent.py b/responses_api_agents/langgraph_agent/reflection_agent.py index 0c8a237db..1355d2772 100644 --- a/responses_api_agents/langgraph_agent/reflection_agent.py +++ b/responses_api_agents/langgraph_agent/reflection_agent.py @@ -211,20 +211,18 @@ async def run(self, request: Request, body: ReflectionAgentRunRequest) -> Reflec await raise_for_status(seed) cookies = seed.cookies - resp = await self.server_client.post( - server_name=self.config.name, url_path="/v1/responses", json=body.responses_create_params, cookies=cookies - ) - await raise_for_status(resp) + inproc_resp, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies verify_request = ReflectionAgentVerifyRequest.model_validate( - body.model_dump() | {"response": await get_response_json(resp)} + body.model_dump() | {"response": inproc_resp.model_dump()} ) verify = await self.server_client.post( server_name=self.config.resources_server.name, url_path="/verify", json=verify_request.model_dump(), - cookies=resp.cookies, + cookies=cookies, ) await raise_for_status(verify) return ReflectionAgentVerifyResponse.model_validate(await get_response_json(verify)) diff --git a/responses_api_agents/langgraph_agent/rewoo_agent.py b/responses_api_agents/langgraph_agent/rewoo_agent.py index 975b12e9e..550e8541b 100644 --- a/responses_api_agents/langgraph_agent/rewoo_agent.py +++ b/responses_api_agents/langgraph_agent/rewoo_agent.py @@ -256,20 +256,16 @@ async def run(self, request: Request, body: ReWOORunRequest) -> ReWOOVerifyRespo await raise_for_status(seed) cookies = seed.cookies - resp = await self.server_client.post( - server_name=self.config.name, url_path="/v1/responses", json=body.responses_create_params, cookies=cookies - ) - await raise_for_status(resp) + inproc_resp, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies - verify_request = ReWOOVerifyRequest.model_validate( - body.model_dump() | {"response": await get_response_json(resp)} - ) + verify_request = ReWOOVerifyRequest.model_validate(body.model_dump() | {"response": inproc_resp.model_dump()}) verify = await self.server_client.post( server_name=self.config.resources_server.name, url_path="/verify", json=verify_request.model_dump(), - cookies=resp.cookies, + cookies=cookies, ) await raise_for_status(verify) return ReWOOVerifyResponse.model_validate(await get_response_json(verify)) diff --git a/responses_api_agents/non_executing_simple_agent/app.py b/responses_api_agents/non_executing_simple_agent/app.py index 367c05ca5..e6ba72d48 100644 --- a/responses_api_agents/non_executing_simple_agent/app.py +++ b/responses_api_agents/non_executing_simple_agent/app.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json +from typing import Mapping, Optional, Tuple from fastapi import Request, Response from pydantic import ConfigDict, ValidationError @@ -64,6 +65,17 @@ async def responses( response: Response, body: NeMoGymResponseCreateParamsNonStreaming = Body(), ) -> NeMoGymResponse: + result, set_cookies = await self._responses(body, request.cookies) + for k, v in set_cookies.items(): + response.set_cookie(k, v) + return result + + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + cookies: Optional[Mapping[str, str]] = None, + ) -> Tuple[NeMoGymResponse, dict]: + """Implementation of `/v1/responses`; `run` invokes this in-process.""" body = body.model_copy(deep=True) if isinstance(body.input, str): @@ -85,10 +97,14 @@ async def responses( ) from e # Preserve session cookies for /run verification, but do not inspect or execute tool calls. - for k, v in (*request.cookies.items(), *model_response.cookies.items()): - response.set_cookie(k, v) + set_cookies: dict[str, str] = {} + if cookies: + for k, v in cookies.items(): + set_cookies[k] = v + for k, v in model_response.cookies.items(): + set_cookies[k] = v - return parsed_response + return parsed_response, set_cookies async def run( self, @@ -106,17 +122,11 @@ async def run( await raise_for_status(seed_session_response) cookies = seed_session_response.cookies - response = await self.server_client.post( - server_name=self.config.name, - url_path="/v1/responses", - json=body.responses_create_params, - cookies=cookies, - ) - await raise_for_status(response) - cookies = response.cookies + inproc_response, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies verify_request = NonExecutingSimpleAgentVerifyRequest.model_validate( - body.model_dump() | {"response": await get_response_json(response)} + body.model_dump() | {"response": inproc_response.model_dump()} ) verify_response = await self.server_client.post( diff --git a/responses_api_agents/non_executing_simple_agent/tests/test_app.py b/responses_api_agents/non_executing_simple_agent/tests/test_app.py index 875582000..366a67155 100644 --- a/responses_api_agents/non_executing_simple_agent/tests/test_app.py +++ b/responses_api_agents/non_executing_simple_agent/tests/test_app.py @@ -136,11 +136,14 @@ async def test_run_seeds_and_verifies_model_response_without_tool_execution(self json={"responses_create_params": responses_create_params.model_dump()}, cookies={}, ) + # `run` calls `_responses` in-process; call[1] is the model_server + # call that `_responses` makes. assert server.server_client.post.call_args_list[1] == call( - server_name="non_executing_agent", + server_name="model", url_path="/v1/responses", - json=responses_create_params, - cookies={"session": "seeded"}, + json=NeMoGymResponseCreateParamsNonStreaming( + input=[NeMoGymEasyInputMessage(content="hello", role="user", type="message")] + ), ) assert server.server_client.post.call_args_list[2].kwargs["server_name"] == "resource" assert server.server_client.post.call_args_list[2].kwargs["url_path"] == "/verify" diff --git a/responses_api_agents/proof_refinement_agent/app.py b/responses_api_agents/proof_refinement_agent/app.py index 256fdd826..3b39c4918 100644 --- a/responses_api_agents/proof_refinement_agent/app.py +++ b/responses_api_agents/proof_refinement_agent/app.py @@ -27,7 +27,7 @@ import logging from dataclasses import dataclass -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Mapping, Optional, Tuple from fastapi import Request, Response from pydantic import ConfigDict @@ -105,11 +105,17 @@ async def responses( response: Response, body: NeMoGymResponseCreateParamsNonStreaming = Body(), ) -> NeMoGymResponse: - """Generate a model response (single turn, no tool calls). + result, set_cookies = await self._responses(body, request.cookies) + for k, v in set_cookies.items(): + response.set_cookie(k, v) + return result - This is called for each generation turn. The verify-correction loop - is handled in run(). - """ + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + cookies: Optional[Mapping[str, str]] = None, + ) -> Tuple[NeMoGymResponse, dict]: + """Implementation of `/v1/responses`; `run` invokes this in-process per correction turn.""" body = body.model_copy(deep=True) if isinstance(body.input, str): @@ -119,16 +125,16 @@ async def responses( server_name=self.config.model_server.name, url_path="/v1/responses", json=body, - cookies=request.cookies, + cookies=dict(cookies) if cookies else None, ) await raise_for_status(model_response) model_response_json = await model_response.json() - # Propagate cookies + set_cookies: dict[str, str] = {} for k, v in model_response.cookies.items(): - response.set_cookie(k, v) + set_cookies[k] = v - return NeMoGymResponse.model_validate(model_response_json) + return NeMoGymResponse.model_validate(model_response_json), set_cookies async def run(self, request: Request, body: ProofRefinementRunRequest) -> ProofRefinementVerifyResponse: """Execute the proof refinement loop. @@ -165,15 +171,13 @@ async def run(self, request: Request, body: ProofRefinementRunRequest) -> ProofR LOG.info("Turn %d: Generating proof attempt", turn_index) # 2. Generate proof attempt - gen_response = await self.server_client.post( - server_name=self.config.name, - url_path="/v1/responses", - json=current_input, - cookies=cookies, - ) - await raise_for_status(gen_response) - cookies = gen_response.cookies - model_response_json = await gen_response.json() + if isinstance(current_input, NeMoGymResponseCreateParamsNonStreaming): + gen_input = current_input + else: + gen_input = NeMoGymResponseCreateParamsNonStreaming.model_validate(current_input) + gen_model_response, set_cookies = await self._responses(gen_input, cookies) + cookies = set_cookies + model_response_json = gen_model_response.model_dump() # 3. Verify the proof verify_request_data = body.model_dump() diff --git a/responses_api_agents/simple_agent/app.py b/responses_api_agents/simple_agent/app.py index 5fbfab199..5bac81b72 100644 --- a/responses_api_agents/simple_agent/app.py +++ b/responses_api_agents/simple_agent/app.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import json -from typing import List +from typing import List, Mapping, Optional, Tuple from fastapi import Request, Response from pydantic import ConfigDict, ValidationError @@ -69,6 +69,17 @@ async def responses( response: Response, body: NeMoGymResponseCreateParamsNonStreaming = Body(), ) -> NeMoGymResponse: + result, set_cookies = await self._responses(body, request.cookies) + for k, v in set_cookies.items(): + response.set_cookie(k, v) + return result + + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + cookies: Optional[Mapping[str, str]] = None, + ) -> Tuple[NeMoGymResponse, dict]: + """Implementation of `/v1/responses`; `run` invokes this in-process.""" body = body.model_copy(deep=True) if isinstance(body.input, str): @@ -78,7 +89,9 @@ async def responses( usage = None step = 0 model_server_cookies = None # update the cookies on every model response - resources_server_cookies = request.cookies # update the cookies on every resources server response + resources_server_cookies = ( + dict(cookies) if cookies else {} + ) # update the cookies on every resources server response while True: step += 1 @@ -166,12 +179,13 @@ async def responses( break # Propogate any extra cookies necessary for downstream verification + set_cookies: dict[str, str] = {} for k, v in (*resources_server_cookies.items(), *model_server_cookies.items()): - response.set_cookie(k, v) + set_cookies[k] = v model_response.output = new_outputs model_response.usage = usage - return model_response + return model_response, set_cookies async def run(self, request: Request, body: SimpleAgentRunRequest) -> SimpleAgentVerifyResponse: cookies = request.cookies @@ -185,17 +199,11 @@ async def run(self, request: Request, body: SimpleAgentRunRequest) -> SimpleAgen await raise_for_status(seed_session_response) cookies = seed_session_response.cookies - response = await self.server_client.post( - server_name=self.config.name, - url_path="/v1/responses", - json=body.responses_create_params, - cookies=cookies, - ) - await raise_for_status(response) - cookies = response.cookies + inproc_response, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies verify_request = SimpleAgentVerifyRequest.model_validate( - body.model_dump() | {"response": await get_response_json(response)} + body.model_dump() | {"response": inproc_response.model_dump()} ) verify_response = await self.server_client.post( diff --git a/responses_api_agents/speed_bench_agent/app.py b/responses_api_agents/speed_bench_agent/app.py index 80d67cd05..3082d388a 100644 --- a/responses_api_agents/speed_bench_agent/app.py +++ b/responses_api_agents/speed_bench_agent/app.py @@ -44,7 +44,7 @@ import json import logging -from typing import List +from typing import List, Mapping, Optional, Tuple from fastapi import Request, Response from pydantic import ConfigDict, ValidationError @@ -119,6 +119,17 @@ async def responses( response: Response, body: NeMoGymResponseCreateParamsNonStreaming = Body(), ) -> NeMoGymResponse: + result, set_cookies = await self._responses(body, request.cookies) + for k, v in set_cookies.items(): + response.set_cookie(k, v) + return result + + async def _responses( + self, + body: NeMoGymResponseCreateParamsNonStreaming, + cookies: Optional[Mapping[str, str]] = None, + ) -> Tuple[NeMoGymResponse, dict]: + """Implementation of `/v1/responses`; `run` invokes this in-process.""" body = body.model_copy(deep=True) if isinstance(body.input, str): @@ -157,7 +168,7 @@ async def responses( usage = None last_response: NeMoGymResponse = None model_server_cookies = None - resources_server_cookies = request.cookies + resources_server_cookies = dict(cookies) if cookies else {} async def _call_model(turn_input): new_body = body.model_copy(update={"input": turn_input}) @@ -214,12 +225,13 @@ def _gather_assistant_text(resp: NeMoGymResponse) -> str: last_response.usage = None # Propagate any cookies the resources server set so /verify sees them. + set_cookies: dict[str, str] = {} for k, v in (*resources_server_cookies.items(), *(model_server_cookies or {}).items()): - response.set_cookie(k, v) + set_cookies[k] = v last_response.output = accumulated_outputs last_response.usage = usage - return last_response + return last_response, set_cookies async def run(self, request: Request, body: SpeedBenchAgentRunRequest) -> SpeedBenchAgentVerifyResponse: cookies = request.cookies @@ -233,17 +245,11 @@ async def run(self, request: Request, body: SpeedBenchAgentRunRequest) -> SpeedB await raise_for_status(seed_session_response) cookies = seed_session_response.cookies - api_response = await self.server_client.post( - server_name=self.config.name, - url_path="/v1/responses", - json=body.responses_create_params, - cookies=cookies, - ) - await raise_for_status(api_response) - cookies = api_response.cookies + inproc_response, set_cookies = await self._responses(body.responses_create_params, cookies) + cookies = set_cookies verify_request = SpeedBenchAgentVerifyRequest.model_validate( - body.model_dump() | {"response": await get_response_json(api_response)} + body.model_dump() | {"response": inproc_response.model_dump()} ) verify_response = await self.server_client.post( server_name=self.config.resources_server.name,