|
30 | 30 | Status, |
31 | 31 | ) |
32 | 32 | from eval_protocol.pytest.default_dataset_adapter import default_dataset_adapter |
| 33 | +from eval_protocol.pytest.default_mcp_gym_rollout_processor import MCPGymRolloutProcessor |
33 | 34 | from eval_protocol.pytest.default_no_op_rollout_processor import NoOpRolloutProcessor |
34 | 35 | from eval_protocol.pytest.rollout_processor import RolloutProcessor |
35 | 36 | from eval_protocol.pytest.types import ( |
@@ -585,7 +586,9 @@ def _log_eval_error(status: Status, rows: Optional[List[EvaluationRow]] | None, |
585 | 586 | exception_handler_config=exception_handler_config, |
586 | 587 | ) |
587 | 588 |
|
588 | | - for i in range(num_runs): |
| 589 | + async def execute_run(i: int, config: RolloutProcessorConfig): |
| 590 | + nonlocal all_results |
| 591 | + |
589 | 592 | # Regenerate outputs each run by deep-copying the pristine dataset |
590 | 593 | # so model responses are not reused across runs. |
591 | 594 | run_id = generate_id() |
@@ -728,6 +731,18 @@ async def _collect_result(config, lst): |
728 | 731 | r.eval_metadata.status = Status.eval_finished() |
729 | 732 | active_logger.log(r) |
730 | 733 |
|
| 734 | + tasks = [] |
| 735 | + for i in range(num_runs): |
| 736 | + tasks.append(asyncio.create_task(execute_run(i, config))) |
| 737 | + |
| 738 | + # if rollout_processor is McpGymRolloutProcessor, we execute runs sequentially since McpGym does not support concurrent runs |
| 739 | + # else, we execute runs in parallel |
| 740 | + if isinstance(rollout_processor, MCPGymRolloutProcessor): |
| 741 | + for task in tasks: |
| 742 | + await task |
| 743 | + else: |
| 744 | + await asyncio.gather(*tasks) |
| 745 | + |
731 | 746 | # for groupwise mode, the result contains eval otuput from multiple completion_params, we need to differentiate them |
732 | 747 | # rollout_id is used to differentiate the result from different completion_params |
733 | 748 | if mode == "groupwise": |
|
0 commit comments