From 370d4da60c83360fa341f91de46786945e6b2f78 Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Wed, 20 Aug 2025 01:02:14 -0700 Subject: [PATCH 1/6] preserve metadata and evaluator id etc to the wrapped eval func --- eval_protocol/pytest/evaluation_test.py | 72 ++++++++++++++++++------- tests/pytest/test_get_metadata.py | 46 ++++++++++++++++ 2 files changed, 98 insertions(+), 20 deletions(-) create mode 100644 tests/pytest/test_get_metadata.py diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 45ad02ac..d4fd0eec 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -11,7 +11,8 @@ from dataclasses import replace from typing import Any, Callable, Dict, List, Literal, Optional, Union from collections import defaultdict - +import hashlib +import ast from mcp.types import Completion import pytest @@ -244,6 +245,7 @@ def evaluation_test( # noqa: C901 max_dataset_rows: Optional[int] = None, mcp_config_path: Optional[str] = None, max_concurrent_rollouts: int = 8, + max_concurrent_evaluations: int = 64, server_script_path: Optional[str] = None, steps: int = 30, mode: EvaluationTestMode = "pointwise", @@ -308,6 +310,7 @@ def evaluation_test( # noqa: C901 max_dataset_rows: Limit dataset to the first N rows. mcp_config_path: Path to MCP config file that follows MCPMultiClientConfiguration schema max_concurrent_rollouts: Maximum number of concurrent rollouts to run in parallel. + max_concurrent_evaluations: Maximum number of concurrent evaluations to run in parallel. server_script_path: Path to the MCP server script to run (default: "examples/tau2_mcp/server.py"). steps: Number of rollout steps to execute (default: 30). mode: Evaluation mode. "pointwise" (default) applies test function to each row (rollout result). @@ -581,19 +584,17 @@ def _log_eval_error( # log the fresh_dataset for row in fresh_dataset: active_logger.log(row) - - if mode == "pointwise": - # Pointwise mode, rollouts will return as they complete so we can pipeline evaluation_test execution - semaphore = asyncio.Semaphore(max_concurrent_rollouts) - tasks = [] - - async def _execute_with_semaphore(row): - async with semaphore: - # NOTE: we will still evaluate errored rows (give users control over this) - # i.e., they can choose to give EvaluateResult.score = 0 for errored rows in their test_func + + # prepare parallel eval helper function + semaphore = asyncio.Semaphore(max_concurrent_evaluations) + async def _execute_eval_with_semaphore(**kwargs): + async with semaphore: + # NOTE: we will still evaluate errored rows (give users control over this) + # i.e., they can choose to give EvaluateResult.score = 0 for errored rows in their test_func + if "row" in kwargs: result = await execute_with_params( test_func, - processed_row=row, + processed_row=kwargs["rows"], evaluation_test_kwargs=kwargs.get("evaluation_test_kwargs") or {}, ) if result is None or not isinstance(result, EvaluationRow): @@ -601,10 +602,24 @@ async def _execute_with_semaphore(row): f"Test function {test_func.__name__} did not return an EvaluationRow instance. You must return an EvaluationRow instance from your test function decorated with @evaluation_test." ) return result + if "rows" in kwargs: + results = await execute_with_params( + test_func, + processed_dataset=kwargs["rows"], + evaluation_test_kwargs=kwargs.get("evaluation_test_kwargs") or {}, + ) + if results is None or not isinstance(results, list): + raise ValueError( + f"Test function {test_func.__name__} did not return a list of EvaluationRow instances. You must return a list of EvaluationRow instances from your test function decorated with @evaluation_test." + ) + return results + if mode == "pointwise": + # Pointwise mode, rollouts will return as they complete so we can pipeline evaluation_test execution + tasks = [] # Use wrapper that handles retry logic internally async for row in rollout_processor_with_retry(rollout_processor, fresh_dataset, config): - tasks.append(asyncio.create_task(_execute_with_semaphore(row))) + tasks.append(asyncio.create_task(_execute_eval_with_semaphore(row=row))) results = await asyncio.gather(*tasks) @@ -645,14 +660,13 @@ async def _collect_result(config, lst): for result in rollout_results: for row in result: row_groups[row.input_metadata.row_id].append(row) - results = [] + tasks = [] for row_id, rows in row_groups.items(): - result = await execute_with_params( - test_func, - processed_dataset=rows, - evaluation_test_kwargs=kwargs.get("evaluation_test_kwargs") or {}, - ) - results.extend(result) + tasks.append(asyncio.create_task(_execute_eval_with_semaphore(rows=rows))) + results = [] + for task in tasks: + res = await task + results.extend(res) all_results[i] = results else: # Batch mode: collect all results first, then evaluate (no pipelining) @@ -788,6 +802,24 @@ async def dual_mode_wrapper(*args, **kwargs): # If not a direct call, use the pytest wrapper return await pytest_wrapper(*args, **kwargs) + + dual_mode_wrapper._origin_func = test_func + dual_mode_wrapper._evaluator_id = test_func.__name__ + # Generate (stable) evaluator ID from function source code hash + try: + func_source = inspect.getsource(test_func) + parsed = ast.parse(func_source) + normalized_source = ast.unparse(parsed) + clean_source = ''.join(normalized_source.split()) + test_func.__name__ + func_hash = hashlib.sha256(clean_source.encode('utf-8')).hexdigest()[:12] + dual_mode_wrapper._version = f"{test_func.__name__}_{func_hash}" + except (OSError, TypeError, SyntaxError): + pass + dual_mode_wrapper._metainfo = { + "mode": mode, + "max_rollout_concurrency": max_concurrent_rollouts, + "max_evaluation_concurrency": max_concurrent_evaluations, + } # Copy all attributes from the pytest wrapper to our dual mode wrapper import functools diff --git a/tests/pytest/test_get_metadata.py b/tests/pytest/test_get_metadata.py new file mode 100644 index 00000000..23ace3b5 --- /dev/null +++ b/tests/pytest/test_get_metadata.py @@ -0,0 +1,46 @@ +import asyncio +from typing import Dict, List + +from eval_protocol.pytest import evaluation_test +from eval_protocol.models import EvaluationRow, Message + +@evaluation_test( + input_messages=[ + [ + Message(role="user", content="What is the capital of France?"), + ], + [ + Message(role="user", content="What is the capital of the moon?"), + ], + ], + completion_params=[{"model": "accounts/fireworks/models/kimi-k2-instruct"}] * 2, + mode="groupwise", + max_concurrent_rollouts=5, + max_concurrent_evaluations=10, +) +def test_pytest_async(rows: List[EvaluationRow]) -> List[EvaluationRow]: + """Run math evaluation on sample dataset using pytest interface.""" + return rows + + + +def test_pytest_func_metainfo(): + assert hasattr(test_pytest_async, "_origin_func") + origin_func = test_pytest_async._origin_func + assert not asyncio.iscoroutinefunction(origin_func) + assert asyncio.iscoroutinefunction(test_pytest_async) + assert test_pytest_async._metainfo["mode"] == "groupwise" + assert test_pytest_async._metainfo["max_rollout_concurrency"] == 5 + assert test_pytest_async._metainfo["max_evaluation_concurrency"] == 10 + + # Test evaluator ID generation + assert hasattr(test_pytest_async, "_evaluator_id") + evaluator_id = test_pytest_async._evaluator_id + assert evaluator_id.startswith("eval_") + assert len(evaluator_id) == 17 # "eval_" + 12 character hash + print(f"Generated evaluator ID: {evaluator_id}") + + + + + \ No newline at end of file From 8c1c254475dd9f5fbde2c7ebf0288b76189a62da Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Wed, 20 Aug 2025 01:03:50 -0700 Subject: [PATCH 2/6] format --- eval_protocol/pytest/evaluation_test.py | 17 +++++++++-------- tests/pytest/test_get_metadata.py | 11 +++-------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index d4fd0eec..04b11707 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -584,9 +584,10 @@ def _log_eval_error( # log the fresh_dataset for row in fresh_dataset: active_logger.log(row) - + # prepare parallel eval helper function semaphore = asyncio.Semaphore(max_concurrent_evaluations) + async def _execute_eval_with_semaphore(**kwargs): async with semaphore: # NOTE: we will still evaluate errored rows (give users control over this) @@ -802,23 +803,23 @@ async def dual_mode_wrapper(*args, **kwargs): # If not a direct call, use the pytest wrapper return await pytest_wrapper(*args, **kwargs) - - dual_mode_wrapper._origin_func = test_func + + dual_mode_wrapper._origin_func = test_func dual_mode_wrapper._evaluator_id = test_func.__name__ # Generate (stable) evaluator ID from function source code hash try: func_source = inspect.getsource(test_func) parsed = ast.parse(func_source) normalized_source = ast.unparse(parsed) - clean_source = ''.join(normalized_source.split()) + test_func.__name__ - func_hash = hashlib.sha256(clean_source.encode('utf-8')).hexdigest()[:12] + clean_source = "".join(normalized_source.split()) + test_func.__name__ + func_hash = hashlib.sha256(clean_source.encode("utf-8")).hexdigest()[:12] dual_mode_wrapper._version = f"{test_func.__name__}_{func_hash}" except (OSError, TypeError, SyntaxError): pass dual_mode_wrapper._metainfo = { - "mode": mode, - "max_rollout_concurrency": max_concurrent_rollouts, - "max_evaluation_concurrency": max_concurrent_evaluations, + "mode": mode, + "max_rollout_concurrency": max_concurrent_rollouts, + "max_evaluation_concurrency": max_concurrent_evaluations, } # Copy all attributes from the pytest wrapper to our dual mode wrapper diff --git a/tests/pytest/test_get_metadata.py b/tests/pytest/test_get_metadata.py index 23ace3b5..947067d5 100644 --- a/tests/pytest/test_get_metadata.py +++ b/tests/pytest/test_get_metadata.py @@ -4,6 +4,7 @@ from eval_protocol.pytest import evaluation_test from eval_protocol.models import EvaluationRow, Message + @evaluation_test( input_messages=[ [ @@ -23,24 +24,18 @@ def test_pytest_async(rows: List[EvaluationRow]) -> List[EvaluationRow]: return rows - def test_pytest_func_metainfo(): - assert hasattr(test_pytest_async, "_origin_func") + assert hasattr(test_pytest_async, "_origin_func") origin_func = test_pytest_async._origin_func assert not asyncio.iscoroutinefunction(origin_func) assert asyncio.iscoroutinefunction(test_pytest_async) assert test_pytest_async._metainfo["mode"] == "groupwise" assert test_pytest_async._metainfo["max_rollout_concurrency"] == 5 assert test_pytest_async._metainfo["max_evaluation_concurrency"] == 10 - + # Test evaluator ID generation assert hasattr(test_pytest_async, "_evaluator_id") evaluator_id = test_pytest_async._evaluator_id assert evaluator_id.startswith("eval_") assert len(evaluator_id) == 17 # "eval_" + 12 character hash print(f"Generated evaluator ID: {evaluator_id}") - - - - - \ No newline at end of file From c632efe9d4f3c00df338ef600b5bd3f04dc4551f Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Wed, 20 Aug 2025 01:49:20 -0700 Subject: [PATCH 3/6] fix ut --- eval_protocol/pytest/evaluation_test.py | 5 ++--- tests/pytest/test_get_metadata.py | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 04b11707..334d019a 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -595,7 +595,7 @@ async def _execute_eval_with_semaphore(**kwargs): if "row" in kwargs: result = await execute_with_params( test_func, - processed_row=kwargs["rows"], + processed_row=kwargs["row"], evaluation_test_kwargs=kwargs.get("evaluation_test_kwargs") or {}, ) if result is None or not isinstance(result, EvaluationRow): @@ -805,7 +805,6 @@ async def dual_mode_wrapper(*args, **kwargs): return await pytest_wrapper(*args, **kwargs) dual_mode_wrapper._origin_func = test_func - dual_mode_wrapper._evaluator_id = test_func.__name__ # Generate (stable) evaluator ID from function source code hash try: func_source = inspect.getsource(test_func) @@ -813,7 +812,7 @@ async def dual_mode_wrapper(*args, **kwargs): normalized_source = ast.unparse(parsed) clean_source = "".join(normalized_source.split()) + test_func.__name__ func_hash = hashlib.sha256(clean_source.encode("utf-8")).hexdigest()[:12] - dual_mode_wrapper._version = f"{test_func.__name__}_{func_hash}" + dual_mode_wrapper._evaluator_id= f"{test_func.__name__}_{func_hash}" except (OSError, TypeError, SyntaxError): pass dual_mode_wrapper._metainfo = { diff --git a/tests/pytest/test_get_metadata.py b/tests/pytest/test_get_metadata.py index 947067d5..5807c032 100644 --- a/tests/pytest/test_get_metadata.py +++ b/tests/pytest/test_get_metadata.py @@ -36,6 +36,5 @@ def test_pytest_func_metainfo(): # Test evaluator ID generation assert hasattr(test_pytest_async, "_evaluator_id") evaluator_id = test_pytest_async._evaluator_id - assert evaluator_id.startswith("eval_") - assert len(evaluator_id) == 17 # "eval_" + 12 character hash + assert evaluator_id.startswith("test_pytest_async") print(f"Generated evaluator ID: {evaluator_id}") From f56ad9121fd943b16b09fb21bb97cf2fe01233a9 Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Wed, 20 Aug 2025 01:58:14 -0700 Subject: [PATCH 4/6] fix format --- eval_protocol/pytest/evaluation_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 334d019a..8cd58128 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -812,7 +812,7 @@ async def dual_mode_wrapper(*args, **kwargs): normalized_source = ast.unparse(parsed) clean_source = "".join(normalized_source.split()) + test_func.__name__ func_hash = hashlib.sha256(clean_source.encode("utf-8")).hexdigest()[:12] - dual_mode_wrapper._evaluator_id= f"{test_func.__name__}_{func_hash}" + dual_mode_wrapper._evaluator_id = f"{test_func.__name__}_{func_hash}" except (OSError, TypeError, SyntaxError): pass dual_mode_wrapper._metainfo = { From 468eecf3ea203456587e1d427dbf399856c67f19 Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Wed, 20 Aug 2025 14:29:36 -0700 Subject: [PATCH 5/6] remove id gen logic --- eval_protocol/pytest/evaluation_test.py | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 8cd58128..b6e10ab7 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -588,14 +588,14 @@ def _log_eval_error( # prepare parallel eval helper function semaphore = asyncio.Semaphore(max_concurrent_evaluations) - async def _execute_eval_with_semaphore(**kwargs): + async def _execute_eval_with_semaphore(**inner_kwargs): async with semaphore: # NOTE: we will still evaluate errored rows (give users control over this) # i.e., they can choose to give EvaluateResult.score = 0 for errored rows in their test_func - if "row" in kwargs: + if "row" in inner_kwargs: result = await execute_with_params( test_func, - processed_row=kwargs["row"], + processed_row=inner_kwargs["row"], evaluation_test_kwargs=kwargs.get("evaluation_test_kwargs") or {}, ) if result is None or not isinstance(result, EvaluationRow): @@ -603,10 +603,10 @@ async def _execute_eval_with_semaphore(**kwargs): f"Test function {test_func.__name__} did not return an EvaluationRow instance. You must return an EvaluationRow instance from your test function decorated with @evaluation_test." ) return result - if "rows" in kwargs: + if "rows" in inner_kwargs: results = await execute_with_params( test_func, - processed_dataset=kwargs["rows"], + processed_dataset=inner_kwargs["rows"], evaluation_test_kwargs=kwargs.get("evaluation_test_kwargs") or {}, ) if results is None or not isinstance(results, list): @@ -805,16 +805,6 @@ async def dual_mode_wrapper(*args, **kwargs): return await pytest_wrapper(*args, **kwargs) dual_mode_wrapper._origin_func = test_func - # Generate (stable) evaluator ID from function source code hash - try: - func_source = inspect.getsource(test_func) - parsed = ast.parse(func_source) - normalized_source = ast.unparse(parsed) - clean_source = "".join(normalized_source.split()) + test_func.__name__ - func_hash = hashlib.sha256(clean_source.encode("utf-8")).hexdigest()[:12] - dual_mode_wrapper._evaluator_id = f"{test_func.__name__}_{func_hash}" - except (OSError, TypeError, SyntaxError): - pass dual_mode_wrapper._metainfo = { "mode": mode, "max_rollout_concurrency": max_concurrent_rollouts, From 92a400e82984da5340cb6d000d904284a2b90401 Mon Sep 17 00:00:00 2001 From: Yinghan Ma Date: Wed, 20 Aug 2025 14:29:58 -0700 Subject: [PATCH 6/6] fix ut --- tests/pytest/test_get_metadata.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/pytest/test_get_metadata.py b/tests/pytest/test_get_metadata.py index 5807c032..3917fb3b 100644 --- a/tests/pytest/test_get_metadata.py +++ b/tests/pytest/test_get_metadata.py @@ -32,9 +32,3 @@ def test_pytest_func_metainfo(): assert test_pytest_async._metainfo["mode"] == "groupwise" assert test_pytest_async._metainfo["max_rollout_concurrency"] == 5 assert test_pytest_async._metainfo["max_evaluation_concurrency"] == 10 - - # Test evaluator ID generation - assert hasattr(test_pytest_async, "_evaluator_id") - evaluator_id = test_pytest_async._evaluator_id - assert evaluator_id.startswith("test_pytest_async") - print(f"Generated evaluator ID: {evaluator_id}")