From 0c0abf24b7e880b3551a3df5692eadc254ddc7b1 Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Mon, 1 Sep 2025 14:52:20 +0000 Subject: [PATCH 1/2] Update async rollout, add requestor, and improve type handling Co-authored-by: bchen --- .../notes/pytest_integration_proposal.md | 2 +- .../benchmarks/test_tau_bench_airline.py | 11 ++++++++-- .../benchmarks/test_tau_bench_retail.py | 11 ++++++++-- eval_protocol/execution/pipeline.py | 7 +++++-- eval_protocol/mcp/simulation_server.py | 14 ++++++++++++- eval_protocol/mcp_env.py | 17 ++++++++------- .../mcp_servers/tau2/tests/test_tau2_e2e.py | 10 +++++++-- .../default_langchain_rollout_processor.py | 7 +++++-- .../default_mcp_gym_rollout_processor.py | 21 ++++++++++++------- .../local_testing/test_north_star.py | 7 +++---- tests/pytest/test_tau_bench_airline.py | 7 ++++++- .../test_rollout_control_plane_integration.py | 6 ++---- 12 files changed, 85 insertions(+), 35 deletions(-) diff --git a/development/notes/pytest_integration_proposal.md b/development/notes/pytest_integration_proposal.md index 784cc215..4832a5b3 100644 --- a/development/notes/pytest_integration_proposal.md +++ b/development/notes/pytest_integration_proposal.md @@ -149,7 +149,7 @@ def frozen_lake_rollout_processor(row: EvaluationRow, model: str, input_params: """ env_url = env_urls[0] if env_urls else None # ep.rollout handles the core interaction loop with the game environment. - trajectories = ep.rollout(row, model, input_params, env_url) + trajectories = await ep.rollout(row, model, input_params, env_url) return [t.to_evaluation_row() for t in trajectories] @evaluation_test( diff --git a/eval_protocol/benchmarks/test_tau_bench_airline.py b/eval_protocol/benchmarks/test_tau_bench_airline.py index 1cd3149e..d9701e5e 100644 --- a/eval_protocol/benchmarks/test_tau_bench_airline.py +++ b/eval_protocol/benchmarks/test_tau_bench_airline.py @@ -173,6 +173,7 @@ def test_tau_bench_airline_evaluation(row: EvaluationRow) -> EvaluationRow: id=tool_call.id, name=tool_call.function.name, arguments=arguments, + requestor="assistant", ) tau2_tool_calls.append(tau2_tool_call) @@ -181,7 +182,7 @@ def test_tau_bench_airline_evaluation(row: EvaluationRow) -> EvaluationRow: trajectory_objects.append(UserMessage(role=role, content=text_content)) elif role == "tool": tool_id = msg.tool_call_id - trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=text_content)) + trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=text_content, requestor="assistant")) reward = 1.0 @@ -189,6 +190,7 @@ def test_tau_bench_airline_evaluation(row: EvaluationRow) -> EvaluationRow: nl_assertions=nl_assertions, communicate_info=communicate_info, actions=actions, + env_assertions=None, reward_basis=[ # Use this to adjust how to calculate reward. Tau2-bench uses DB and COMMUNICATE by default for airline tasks. RewardType.DB, RewardType.COMMUNICATE, @@ -196,7 +198,12 @@ def test_tau_bench_airline_evaluation(row: EvaluationRow) -> EvaluationRow: ) task = Task( - id="Filler", evaluation_criteria=evaluation_criteria, user_scenario=UserScenario(instructions="Filler") + id="Filler", + description=None, + user_scenario=UserScenario(instructions="Filler", persona=None), + ticket=None, + initial_state=None, + evaluation_criteria=evaluation_criteria, ) # id and user_scenario are required for the Task type but not used in calculating reward assert task.evaluation_criteria is not None, "Task evaluation criteria is None" diff --git a/eval_protocol/benchmarks/test_tau_bench_retail.py b/eval_protocol/benchmarks/test_tau_bench_retail.py index a3faa2b9..6ca8c040 100644 --- a/eval_protocol/benchmarks/test_tau_bench_retail.py +++ b/eval_protocol/benchmarks/test_tau_bench_retail.py @@ -163,6 +163,7 @@ def test_tau_bench_retail_evaluation(row: EvaluationRow) -> EvaluationRow: id=tool_call.id, name=tool_call.function.name, arguments=arguments, + requestor="assistant", ) tau2_tool_calls.append(tau2_tool_call) @@ -171,7 +172,7 @@ def test_tau_bench_retail_evaluation(row: EvaluationRow) -> EvaluationRow: trajectory_objects.append(UserMessage(role=role, content=text_content)) elif role == "tool": tool_id = msg.tool_call_id - trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=text_content)) + trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=text_content, requestor="assistant")) reward = 1.0 @@ -179,6 +180,7 @@ def test_tau_bench_retail_evaluation(row: EvaluationRow) -> EvaluationRow: nl_assertions=nl_assertions, communicate_info=communicate_info, actions=actions, + env_assertions=None, reward_basis=[ # Use this to adjust how to calculate reward. Tau2-bench uses DB and COMMUNICATE by default for retail tasks. RewardType.DB, RewardType.COMMUNICATE, @@ -186,7 +188,12 @@ def test_tau_bench_retail_evaluation(row: EvaluationRow) -> EvaluationRow: ) task = Task( - id="Filler", evaluation_criteria=evaluation_criteria, user_scenario=UserScenario(instructions="Filler") + id="Filler", + description=None, + user_scenario=UserScenario(instructions="Filler", persona=None), + ticket=None, + initial_state=None, + evaluation_criteria=evaluation_criteria, ) # id and user_scenario are required for the Task type but not used in calculating reward assert task.evaluation_criteria is not None, "Task evaluation criteria is None" diff --git a/eval_protocol/execution/pipeline.py b/eval_protocol/execution/pipeline.py index 80e25463..bf3b498a 100644 --- a/eval_protocol/execution/pipeline.py +++ b/eval_protocol/execution/pipeline.py @@ -310,6 +310,8 @@ async def _execute_mcp_agent_rollout( f"Sample {sample_id}: Agent Rollout Turn {turn_num + 1}/{max_rollout_turns}. History size: {len(current_messages_for_rollout)}" ) + # model_client is initialized when generation is enabled; assert for type-checker + assert self.model_client is not None generation_output_turn = await self.model_client.generate( messages=current_messages_for_rollout, session=http_session, @@ -845,7 +847,7 @@ async def process_with_semaphore_wrapper(sample_idx: int, sample_data: Dict[str, for i_outer in range(0, len(tasks), batch_size_for_logging): batch_tasks = tasks[i_outer : i_outer + batch_size_for_logging] - batch_results_values = await asyncio.gather(*batch_tasks, return_exceptions=True) + batch_results_values: List[Union[Exception, Dict[str, Any], List[Dict[str, Any]]]] = await asyncio.gather(*batch_tasks, return_exceptions=True) for res_idx, res_or_exc in enumerate(batch_results_values): if isinstance(res_or_exc, Exception): logger.error( @@ -863,7 +865,8 @@ async def process_with_semaphore_wrapper(sample_idx: int, sample_data: Dict[str, if isinstance(res_or_exc, list): all_results.extend(res_or_exc) else: - all_results.append(res_or_exc) + # res_or_exc is a Dict[str, Any] here + all_results.append(res_or_exc) # type: ignore[arg-type] logger.info( f"Completed batch up to sample {i_outer + len(batch_tasks)}. Total results/errors: {len(all_results)}" ) diff --git a/eval_protocol/mcp/simulation_server.py b/eval_protocol/mcp/simulation_server.py index 1e5f9ff4..dfca1e27 100644 --- a/eval_protocol/mcp/simulation_server.py +++ b/eval_protocol/mcp/simulation_server.py @@ -328,6 +328,7 @@ async def list_resources(): description = resource_func.__doc__ or f"Resource {resource_name}" # Some callables may not have the attribute; guard for type checkers + # MyPy/Pyright: Resource expects AnyUrl; convert string to str, letting pydantic coerce it uri_value = getattr(resource_func, "_resource_uri", f"/{resource_name}") resources.append( Resource( @@ -346,7 +347,7 @@ def _register_session_handlers(self): """Register session initialization and cleanup handlers.""" @self.app.set_logging_level() - async def set_logging_level(level): + async def set_logging_level(level: str): """Handle logging level requests.""" logger.setLevel(getattr(logging, level.upper())) return {} @@ -392,6 +393,17 @@ def get_default_config(self) -> Dict[str, Any]: """Get default environment configuration.""" pass + # Optional hook: some environments need seed at creation time + def create_environment_with_seed(self, config: Dict[str, Any], *, seed: Optional[int] = None) -> Tuple[Any, Any, Dict[str, Any]]: + """Create environment with a seed when required; default falls back to create+reset. + + Subclasses can override when the environment requires the seed at construction time. + Returns a tuple of (env, initial_observation, info). + """ + env = self.create_environment(config) + obs, info = self.reset_environment(env, seed=seed) + return env, obs, info + def run(self, port: int = 8000, host: str = "127.0.0.1", **kwargs): """ Run the simulation server using StreamableHTTPSessionManager. diff --git a/eval_protocol/mcp_env.py b/eval_protocol/mcp_env.py index ca32b327..779f7503 100644 --- a/eval_protocol/mcp_env.py +++ b/eval_protocol/mcp_env.py @@ -240,7 +240,7 @@ def make( return mcp_envs -def rollout( +async def rollout( envs: GeneralMCPVectorEnv, policy: Union[FireworksPolicy, LLMBasePolicy, Callable], *, @@ -250,7 +250,7 @@ def rollout( steps: int = 512, openai_format_log_file: Optional[str] = None, max_concurrent_rollouts: int = 8, -) -> List[asyncio.Task[EvaluationRow]]: +) -> List[EvaluationRow]: """ Execute general rollouts using tool calling interface with automatic record/playback. @@ -282,10 +282,10 @@ def rollout( Example: # Live mode - tasks = ep.rollout(envs, policy) + results = await ep.rollout(envs, policy) # Create environments automatically - tasks = ep.rollout( + results = await ep.rollout( "http://localhost:8000/mcp/", policy, evaluation_rows=my_evaluation_rows, @@ -294,10 +294,10 @@ def rollout( # Recording mode os.environ["EP_PLAYBACK_FILE"] = "record.jsonl" - tasks = ep.rollout(envs, policy, openai_format_log_file="sft_data.jsonl") + results = await ep.rollout(envs, policy, openai_format_log_file="sft_data.jsonl") # Playback mode (after recording file exists) - tasks = ep.rollout(envs, policy) + results = await ep.rollout(envs, policy) """ # Automatically create environments if a base URL is provided if isinstance(envs, str): @@ -313,7 +313,10 @@ def rollout( tasks = execution_manager.execute_rollouts( envs, policy, steps, openai_format_log_file, max_concurrent_rollouts, evaluation_rows ) - return tasks + + # Await all tasks and return concrete EvaluationRows + results: List[EvaluationRow] = await asyncio.gather(*tasks) + return results async def test_mcp(base_url: str, seeds: List[int]) -> Dict[str, Any]: diff --git a/eval_protocol/mcp_servers/tau2/tests/test_tau2_e2e.py b/eval_protocol/mcp_servers/tau2/tests/test_tau2_e2e.py index ed7e77e7..5003ae16 100644 --- a/eval_protocol/mcp_servers/tau2/tests/test_tau2_e2e.py +++ b/eval_protocol/mcp_servers/tau2/tests/test_tau2_e2e.py @@ -739,6 +739,7 @@ def tau2_airline_eval( id=tool_call.id, name=tool_call.function.name, arguments=arguments, + requestor="assistant", ) tau2_tool_calls.append(tau2_tool_call) @@ -747,7 +748,7 @@ def tau2_airline_eval( trajectory_objects.append(UserMessage(role=role, content=content)) elif role == "tool": tool_id = msg.tool_call_id - trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=content)) + trajectory_objects.append(ToolMessage(id=tool_id, role=role, content=content, requestor="assistant")) reward = 1.0 @@ -764,7 +765,12 @@ def tau2_airline_eval( ) task = Task( - id="Filler", evaluation_criteria=evaluation_criteria, user_scenario=UserScenario(instructions="Filler") + id="Filler", + description=None, + user_scenario=UserScenario(instructions="Filler", persona=None), + ticket=None, + initial_state=None, + evaluation_criteria=evaluation_criteria, ) # id and user_scenario are required for the Task type but not used in calculating reward, filler values env_reward_info = EnvironmentEvaluator.calculate_reward( diff --git a/eval_protocol/pytest/default_langchain_rollout_processor.py b/eval_protocol/pytest/default_langchain_rollout_processor.py index 35924570..4c807633 100644 --- a/eval_protocol/pytest/default_langchain_rollout_processor.py +++ b/eval_protocol/pytest/default_langchain_rollout_processor.py @@ -68,9 +68,12 @@ async def _invoke_direct(payload): invoke_fn = _invoke_direct elif callable(target): - + # If target is a normal callable, call it directly; if it returns an awaitable, await it async def _invoke_wrapper(payload): - return await target(payload) + result = target(payload) + if asyncio.iscoroutine(result): + return await result + return result invoke_fn = _invoke_wrapper else: diff --git a/eval_protocol/pytest/default_mcp_gym_rollout_processor.py b/eval_protocol/pytest/default_mcp_gym_rollout_processor.py index f3cde601..21b1e99c 100644 --- a/eval_protocol/pytest/default_mcp_gym_rollout_processor.py +++ b/eval_protocol/pytest/default_mcp_gym_rollout_processor.py @@ -250,13 +250,20 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> ) # Get rollout tasks from ep.rollout - tasks = ep.rollout( - envs, - policy=self.policy, - evaluation_rows=rows, - steps=config.steps, - max_concurrent_rollouts=config.max_concurrent_rollouts, - ) + async def _run_rollout_and_wrap(row_index: int) -> EvaluationRow: + # ep.rollout now returns concrete results + results = await ep.rollout( + envs, + policy=self.policy, + evaluation_rows=rows, + steps=config.steps, + max_concurrent_rollouts=config.max_concurrent_rollouts, + ) + return results[row_index] + + tasks: List[asyncio.Task[EvaluationRow]] = [ + asyncio.create_task(_run_rollout_and_wrap(i)) for i in range(len(rows)) + ] return tasks def cleanup(self) -> None: diff --git a/examples/taxi_mcp_complete/local_testing/test_north_star.py b/examples/taxi_mcp_complete/local_testing/test_north_star.py index e0a3c75d..75b556af 100644 --- a/examples/taxi_mcp_complete/local_testing/test_north_star.py +++ b/examples/taxi_mcp_complete/local_testing/test_north_star.py @@ -64,10 +64,9 @@ async def test_north_star_interface(): start_time = time.time() evaluation_rows = await ep.rollout( envs, - policy=policy, - steps=25, # Taxi typically needs more steps than FrozenLake - openai_format_log_file=("clean_openai_format.jsonl" if recording_mode else None), - ) + policy, + steps=20, + ) # Keep short for testing duration = time.time() - start_time print(f"✅ Completed {len(evaluation_rows)} evaluation rows in {duration:.2f}s") diff --git a/tests/pytest/test_tau_bench_airline.py b/tests/pytest/test_tau_bench_airline.py index b9c59c7a..88eb06e9 100644 --- a/tests/pytest/test_tau_bench_airline.py +++ b/tests/pytest/test_tau_bench_airline.py @@ -142,7 +142,12 @@ def test_tau_bench_airline_evaluation(row: EvaluationRow) -> EvaluationRow: ) task = Task( - id="Filler", evaluation_criteria=evaluation_criteria, user_scenario=UserScenario(instructions="Filler") + id="Filler", + description=None, + user_scenario=UserScenario(instructions="Filler", persona=None), + ticket=None, + initial_state=None, + evaluation_criteria=evaluation_criteria, ) # id and user_scenario are required for the Task type but not used in calculating reward if RewardType.DB in task.evaluation_criteria.reward_basis: diff --git a/tests/test_rollout_control_plane_integration.py b/tests/test_rollout_control_plane_integration.py index e97769c7..db76e46d 100644 --- a/tests/test_rollout_control_plane_integration.py +++ b/tests/test_rollout_control_plane_integration.py @@ -522,16 +522,14 @@ def mock_execute_rollouts(*args, **kwargs): manager_instance.execute_rollouts = mock_execute_rollouts result = [] - tasks = ep.rollout( + tasks = await ep.rollout( "http://localhost:1234/mcp/", policy, dataset=dataset, model_id="test_model", steps=5, ) - for task in tasks: - row = await task - result.append(row) + result.extend(tasks) mock_make.assert_called_once_with( "http://localhost:1234/mcp/", From 11fb547f0051b26bd072a522cf08b614b8d0075f Mon Sep 17 00:00:00 2001 From: Benny Chen Date: Tue, 2 Sep 2025 06:34:36 +0800 Subject: [PATCH 2/2] reformat --- eval_protocol/execution/pipeline.py | 4 +++- eval_protocol/mcp/simulation_server.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/eval_protocol/execution/pipeline.py b/eval_protocol/execution/pipeline.py index bf3b498a..dcda4ca0 100644 --- a/eval_protocol/execution/pipeline.py +++ b/eval_protocol/execution/pipeline.py @@ -847,7 +847,9 @@ async def process_with_semaphore_wrapper(sample_idx: int, sample_data: Dict[str, for i_outer in range(0, len(tasks), batch_size_for_logging): batch_tasks = tasks[i_outer : i_outer + batch_size_for_logging] - batch_results_values: List[Union[Exception, Dict[str, Any], List[Dict[str, Any]]]] = await asyncio.gather(*batch_tasks, return_exceptions=True) + batch_results_values: List[ + Union[Exception, Dict[str, Any], List[Dict[str, Any]]] + ] = await asyncio.gather(*batch_tasks, return_exceptions=True) for res_idx, res_or_exc in enumerate(batch_results_values): if isinstance(res_or_exc, Exception): logger.error( diff --git a/eval_protocol/mcp/simulation_server.py b/eval_protocol/mcp/simulation_server.py index dfca1e27..205e7a27 100644 --- a/eval_protocol/mcp/simulation_server.py +++ b/eval_protocol/mcp/simulation_server.py @@ -394,7 +394,9 @@ def get_default_config(self) -> Dict[str, Any]: pass # Optional hook: some environments need seed at creation time - def create_environment_with_seed(self, config: Dict[str, Any], *, seed: Optional[int] = None) -> Tuple[Any, Any, Dict[str, Any]]: + def create_environment_with_seed( + self, config: Dict[str, Any], *, seed: Optional[int] = None + ) -> Tuple[Any, Any, Dict[str, Any]]: """Create environment with a seed when required; default falls back to create+reset. Subclasses can override when the environment requires the seed at construction time.