diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 2e1254d6..3a1bac70 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -9,7 +9,6 @@ from collections.abc import Sequence import pytest -from tqdm import tqdm from eval_protocol.dataset_logger import default_logger from eval_protocol.dataset_logger.dataset_logger import DatasetLogger @@ -58,6 +57,8 @@ parse_ep_num_runs, parse_ep_passed_threshold, rollout_processor_with_retry, + run_tasks_with_eval_progress, + run_tasks_with_run_progress, ) from eval_protocol.utils.show_results_url import show_results_url @@ -379,7 +380,9 @@ async def _execute_groupwise_eval_with_semaphore( pointwise_tasks.append( asyncio.create_task(_execute_pointwise_eval_with_semaphore(row=row)) ) - results = await asyncio.gather(*pointwise_tasks) + + # Run evaluation tasks with progress bar + results = await run_tasks_with_eval_progress(pointwise_tasks, run_idx) all_results[run_idx] = results elif mode == "groupwise": @@ -497,27 +500,7 @@ async def _collect_result(config, lst): # pyright: ignore[reportUnknownParamete else: # For other processors, create all tasks at once and run in parallel # Concurrency is now controlled by the shared semaphore in each rollout processor - with tqdm( - total=num_runs, - desc="Runs (Parallel)", - unit="run", - file=sys.__stderr__, - position=0, - leave=True, - dynamic_ncols=True, - miniters=1, - bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", - ) as run_pbar: - - async def execute_run_with_progress(run_idx: int, config): - result = await execute_run(run_idx, config) - run_pbar.update(1) - return result - - tasks = [] - for run_idx in range(num_runs): - tasks.append(asyncio.create_task(execute_run_with_progress(run_idx, config))) - await asyncio.gather(*tasks) # pyright: ignore[reportUnknownArgumentType] + await run_tasks_with_run_progress(execute_run, num_runs, config) experiment_duration_seconds = time.perf_counter() - experiment_start_time diff --git a/eval_protocol/pytest/utils.py b/eval_protocol/pytest/utils.py index 0b0c6bc7..ce15cd19 100644 --- a/eval_protocol/pytest/utils.py +++ b/eval_protocol/pytest/utils.py @@ -33,6 +33,89 @@ AggregationMethod = Literal["mean", "max", "min", "bootstrap"] +async def run_tasks_with_eval_progress(pointwise_tasks: list, run_idx: int): + """ + Run evaluation tasks with a progress bar and proper cancellation handling. + + Args: + pointwise_tasks: List of asyncio tasks to execute + run_idx: Run index for progress bar positioning and naming + + Returns: + Results from all tasks + """ + eval_position = run_idx + 2 # Position after rollout progress bar + with tqdm( + total=len(pointwise_tasks), + desc=f" Eval {run_idx + 1}", + unit="eval", + file=sys.__stderr__, + leave=False, + position=eval_position, + dynamic_ncols=True, + miniters=1, + mininterval=0.1, + bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", + ) as eval_pbar: + + async def task_with_progress(task): + try: + result = await task + return result + finally: + eval_pbar.update(1) + + wrapped_tasks = [task_with_progress(task) for task in pointwise_tasks] + try: + results = await asyncio.gather(*wrapped_tasks) + return results + except Exception: + # Propagate cancellation to the real tasks and await them to quiesce + for task in pointwise_tasks: + task.cancel() + await asyncio.gather(*pointwise_tasks, return_exceptions=True) + raise + + +async def run_tasks_with_run_progress(execute_run_func, num_runs, config): + """ + Run tasks with a parallel runs progress bar, preserving original logic. + + Args: + execute_run_func: The execute_run function to call + num_runs: Number of runs to execute + config: Configuration to pass to execute_run_func + """ + with tqdm( + total=num_runs, + desc="Runs (Parallel)", + unit="run", + file=sys.__stderr__, + position=0, + leave=True, + dynamic_ncols=True, + miniters=1, + bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]", + ) as run_pbar: + + async def execute_run_with_progress(run_idx: int, config): + result = await execute_run_func(run_idx, config) + run_pbar.update(1) + return result + + tasks = [] + for run_idx in range(num_runs): + tasks.append(asyncio.create_task(execute_run_with_progress(run_idx, config))) + try: + await asyncio.gather(*tasks) + except Exception: + # Propagate cancellation to tasks and await them to quiesce + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + raise + + def calculate_bootstrap_scores(all_scores: list[float]) -> float: """ Calculate bootstrap confidence intervals for individual scores. @@ -277,7 +360,7 @@ async def execute_row_with_backoff_and_log(task: asyncio.Task, row: EvaluationRo position = run_idx + 1 # Position 0 is reserved for main run bar, so shift up by 1 with tqdm( total=len(retry_tasks), - desc=f" Run {position}", + desc=f" Run {run_idx + 1}", unit="rollout", file=sys.__stderr__, leave=False, diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index e9621658..a5225857 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -47,11 +47,14 @@ async def aha_judge( model_a_answer = str(row.ground_truth) model_b_answer = serialize_message(row.messages[-1]) - client = AsyncOpenAI(api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url")) - - # Run two judgment rounds in sequence (A vs B, then B vs A) - result1 = await run_single_judgment(question_text, model_a_answer, model_b_answer, row.tools, judge_config, client) - result2 = await run_single_judgment(question_text, model_b_answer, model_a_answer, row.tools, judge_config, client) + async with AsyncOpenAI(api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url")) as client: + # Run two judgment rounds in sequence (A vs B, then B vs A) + result1 = await run_single_judgment( + question_text, model_a_answer, model_b_answer, row.tools, judge_config, client + ) + result2 = await run_single_judgment( + question_text, model_b_answer, model_a_answer, row.tools, judge_config, client + ) if not result1 or not result2 or not result1.get("score") or not result2.get("score"): # If either judgment failed, mark as invalid (don't include in distribution) diff --git a/eval_protocol/quickstart/llm_judge_braintrust.py b/eval_protocol/quickstart/llm_judge_braintrust.py index 65bde54c..91bce9cf 100644 --- a/eval_protocol/quickstart/llm_judge_braintrust.py +++ b/eval_protocol/quickstart/llm_judge_braintrust.py @@ -6,6 +6,10 @@ import pytest +# Skip entire module in CI to prevent import-time side effects +if os.environ.get("CI") == "true": + pytest.skip("Skip quickstart in CI", allow_module_level=True) + from eval_protocol import ( evaluation_test, aha_judge, @@ -13,25 +17,28 @@ EvaluationRow, SingleTurnRolloutProcessor, create_braintrust_adapter, + DefaultParameterIdGenerator, ) + # adapter = create_braintrust_adapter() +# input_rows = [ +# adapter.get_evaluation_rows( +# btql_query=f""" +# select: * +# from: project_logs('{os.getenv("BRAINTRUST_PROJECT_ID")}') traces +# filter: is_root = true +# limit: 10 +# """ +# ) +# ] +input_rows = [] +# uncomment when dataloader is fixed @pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI") -@pytest.mark.asyncio -@evaluation_test( - input_rows=[ - # adapter.get_evaluation_rows( - # btql_query=f""" - # select: * - # from: project_logs('{os.getenv("BRAINTRUST_PROJECT_ID")}') traces - # filter: is_root = true - # limit: 10 - # """ - # ) - [] - ], - completion_params=[ +@pytest.mark.parametrize( + "completion_params", + [ {"model": "gpt-4.1"}, { "max_tokens": 131000, @@ -44,10 +51,12 @@ "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, ], +) +@evaluation_test( + input_rows=[input_rows], rollout_processor=SingleTurnRolloutProcessor(), preprocess_fn=multi_turn_assistant_to_ground_truth, - max_concurrent_rollouts=64, - aggregation_method="bootstrap", + max_concurrent_evaluations=2, ) async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: return await aha_judge(row) diff --git a/eval_protocol/quickstart/llm_judge_langfuse.py b/eval_protocol/quickstart/llm_judge_langfuse.py index bfd19b0d..a8e92c05 100644 --- a/eval_protocol/quickstart/llm_judge_langfuse.py +++ b/eval_protocol/quickstart/llm_judge_langfuse.py @@ -14,25 +14,25 @@ EvaluationRow, SingleTurnRolloutProcessor, create_langfuse_adapter, + DefaultParameterIdGenerator, ) from eval_protocol.quickstart import aha_judge adapter = create_langfuse_adapter() +input_rows = adapter.get_evaluation_rows( + to_timestamp=datetime(2025, 9, 12, 0, 11, 18), + limit=711, + sample_size=50, + sleep_between_gets=3.0, + max_retries=5, +) @pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI") -@evaluation_test( - input_rows=[ - adapter.get_evaluation_rows( - to_timestamp=datetime(2025, 9, 12, 0, 11, 18), - limit=711, - sample_size=50, - sleep_between_gets=3.0, - max_retries=5, - ) - ], - completion_params=[ +@pytest.mark.parametrize( + "completion_params", + [ {"model": "gpt-4.1"}, { "max_tokens": 131000, @@ -45,10 +45,12 @@ "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, ], +) +@evaluation_test( + input_rows=[input_rows], rollout_processor=SingleTurnRolloutProcessor(), preprocess_fn=multi_turn_assistant_to_ground_truth, - max_concurrent_rollouts=2, - aggregation_method="bootstrap", + max_concurrent_evaluations=2, ) async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: return await aha_judge(row) diff --git a/eval_protocol/quickstart/llm_judge_langsmith.py b/eval_protocol/quickstart/llm_judge_langsmith.py index b8850ee0..f62fdb28 100644 --- a/eval_protocol/quickstart/llm_judge_langsmith.py +++ b/eval_protocol/quickstart/llm_judge_langsmith.py @@ -31,6 +31,7 @@ EvaluationRow, SingleTurnRolloutProcessor, LangSmithAdapter, + DefaultParameterIdGenerator, ) @@ -53,11 +54,13 @@ def fetch_langsmith_traces_as_evaluation_rows( return [] +input_rows = fetch_langsmith_traces_as_evaluation_rows() + + @pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI") -@pytest.mark.asyncio -@evaluation_test( - input_rows=[fetch_langsmith_traces_as_evaluation_rows()], - completion_params=[ +@pytest.mark.parametrize( + "completion_params", + [ { "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", }, @@ -67,9 +70,12 @@ def fetch_langsmith_traces_as_evaluation_rows( "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, ], +) +@evaluation_test( + input_rows=[input_rows], rollout_processor=SingleTurnRolloutProcessor(), preprocess_fn=multi_turn_assistant_to_ground_truth, - aggregation_method="bootstrap", + max_concurrent_evaluations=2, ) async def test_llm_judge_langsmith(row: EvaluationRow) -> EvaluationRow: """LLM Judge evaluation over LangSmith-sourced rows, persisted locally by Eval Protocol. diff --git a/eval_protocol/quickstart/llm_judge_openai_responses.py b/eval_protocol/quickstart/llm_judge_openai_responses.py index e096c759..a30feee0 100644 --- a/eval_protocol/quickstart/llm_judge_openai_responses.py +++ b/eval_protocol/quickstart/llm_judge_openai_responses.py @@ -51,13 +51,12 @@ "model": "fireworks_ai/accounts/fireworks/models/kimi-k2-instruct-0905", }, ], - ids=DefaultParameterIdGenerator.generate_id_from_dict, ) @evaluation_test( input_rows=[input_rows], rollout_processor=SingleTurnRolloutProcessor(), preprocess_fn=multi_turn_assistant_to_ground_truth, - aggregation_method="bootstrap", + max_concurrent_evaluations=2, ) async def test_llm_judge_openai_responses(row: EvaluationRow) -> EvaluationRow: return await aha_judge(row)