diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 2dd4dbc0..561fb14d 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -30,6 +30,7 @@ Status, ) from eval_protocol.pytest.default_dataset_adapter import default_dataset_adapter +from eval_protocol.pytest.default_mcp_gym_rollout_processor import MCPGymRolloutProcessor from eval_protocol.pytest.default_no_op_rollout_processor import NoOpRolloutProcessor from eval_protocol.pytest.rollout_processor import RolloutProcessor from eval_protocol.pytest.types import ( @@ -585,7 +586,9 @@ def _log_eval_error(status: Status, rows: Optional[List[EvaluationRow]] | None, exception_handler_config=exception_handler_config, ) - for i in range(num_runs): + async def execute_run(i: int, config: RolloutProcessorConfig): + nonlocal all_results + # Regenerate outputs each run by deep-copying the pristine dataset # so model responses are not reused across runs. run_id = generate_id() @@ -728,6 +731,18 @@ async def _collect_result(config, lst): r.eval_metadata.status = Status.eval_finished() active_logger.log(r) + tasks = [] + for i in range(num_runs): + tasks.append(asyncio.create_task(execute_run(i, config))) + + # if rollout_processor is McpGymRolloutProcessor, we execute runs sequentially since McpGym does not support concurrent runs + # else, we execute runs in parallel + if isinstance(rollout_processor, MCPGymRolloutProcessor): + for task in tasks: + await task + else: + await asyncio.gather(*tasks) + # for groupwise mode, the result contains eval otuput from multiple completion_params, we need to differentiate them # rollout_id is used to differentiate the result from different completion_params if mode == "groupwise":