Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 6 additions & 23 deletions eval_protocol/pytest/evaluation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from collections.abc import Sequence

import pytest
from tqdm import tqdm

from eval_protocol.dataset_logger import default_logger
from eval_protocol.dataset_logger.dataset_logger import DatasetLogger
Expand Down Expand Up @@ -58,6 +57,8 @@
parse_ep_num_runs,
parse_ep_passed_threshold,
rollout_processor_with_retry,
run_tasks_with_eval_progress,
run_tasks_with_run_progress,
)
from eval_protocol.utils.show_results_url import show_results_url

Expand Down Expand Up @@ -379,7 +380,9 @@ async def _execute_groupwise_eval_with_semaphore(
pointwise_tasks.append(
asyncio.create_task(_execute_pointwise_eval_with_semaphore(row=row))
)
results = await asyncio.gather(*pointwise_tasks)

# Run evaluation tasks with progress bar
results = await run_tasks_with_eval_progress(pointwise_tasks, run_idx)

all_results[run_idx] = results
elif mode == "groupwise":
Expand Down Expand Up @@ -497,27 +500,7 @@ async def _collect_result(config, lst): # pyright: ignore[reportUnknownParamete
else:
# For other processors, create all tasks at once and run in parallel
# Concurrency is now controlled by the shared semaphore in each rollout processor
with tqdm(
total=num_runs,
desc="Runs (Parallel)",
unit="run",
file=sys.__stderr__,
position=0,
leave=True,
dynamic_ncols=True,
miniters=1,
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
) as run_pbar:

async def execute_run_with_progress(run_idx: int, config):
result = await execute_run(run_idx, config)
run_pbar.update(1)
return result

tasks = []
for run_idx in range(num_runs):
tasks.append(asyncio.create_task(execute_run_with_progress(run_idx, config)))
await asyncio.gather(*tasks) # pyright: ignore[reportUnknownArgumentType]
await run_tasks_with_run_progress(execute_run, num_runs, config)

experiment_duration_seconds = time.perf_counter() - experiment_start_time

Expand Down
85 changes: 84 additions & 1 deletion eval_protocol/pytest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,89 @@
AggregationMethod = Literal["mean", "max", "min", "bootstrap"]


async def run_tasks_with_eval_progress(pointwise_tasks: list, run_idx: int):
"""
Run evaluation tasks with a progress bar and proper cancellation handling.

Args:
pointwise_tasks: List of asyncio tasks to execute
run_idx: Run index for progress bar positioning and naming

Returns:
Results from all tasks
"""
eval_position = run_idx + 2 # Position after rollout progress bar
with tqdm(
total=len(pointwise_tasks),
desc=f" Eval {run_idx + 1}",
unit="eval",
file=sys.__stderr__,
leave=False,
position=eval_position,
dynamic_ncols=True,
miniters=1,
mininterval=0.1,
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
) as eval_pbar:

async def task_with_progress(task):
try:
result = await task
return result
finally:
eval_pbar.update(1)

wrapped_tasks = [task_with_progress(task) for task in pointwise_tasks]
try:
results = await asyncio.gather(*wrapped_tasks)
return results
except Exception:
# Propagate cancellation to the real tasks and await them to quiesce
for task in pointwise_tasks:
task.cancel()
await asyncio.gather(*pointwise_tasks, return_exceptions=True)
raise


async def run_tasks_with_run_progress(execute_run_func, num_runs, config):
"""
Run tasks with a parallel runs progress bar, preserving original logic.

Args:
execute_run_func: The execute_run function to call
num_runs: Number of runs to execute
config: Configuration to pass to execute_run_func
"""
with tqdm(
total=num_runs,
desc="Runs (Parallel)",
unit="run",
file=sys.__stderr__,
position=0,
leave=True,
dynamic_ncols=True,
miniters=1,
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
) as run_pbar:

async def execute_run_with_progress(run_idx: int, config):
result = await execute_run_func(run_idx, config)
run_pbar.update(1)
return result

tasks = []
for run_idx in range(num_runs):
tasks.append(asyncio.create_task(execute_run_with_progress(run_idx, config)))
try:
await asyncio.gather(*tasks)
except Exception:
# Propagate cancellation to tasks and await them to quiesce
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise


def calculate_bootstrap_scores(all_scores: list[float]) -> float:
"""
Calculate bootstrap confidence intervals for individual scores.
Expand Down Expand Up @@ -277,7 +360,7 @@ async def execute_row_with_backoff_and_log(task: asyncio.Task, row: EvaluationRo
position = run_idx + 1 # Position 0 is reserved for main run bar, so shift up by 1
with tqdm(
total=len(retry_tasks),
desc=f" Run {position}",
desc=f" Run {run_idx + 1}",
unit="rollout",
file=sys.__stderr__,
leave=False,
Expand Down
13 changes: 8 additions & 5 deletions eval_protocol/quickstart/llm_judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,14 @@ async def aha_judge(
model_a_answer = str(row.ground_truth)
model_b_answer = serialize_message(row.messages[-1])

client = AsyncOpenAI(api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url"))

# Run two judgment rounds in sequence (A vs B, then B vs A)
result1 = await run_single_judgment(question_text, model_a_answer, model_b_answer, row.tools, judge_config, client)
result2 = await run_single_judgment(question_text, model_b_answer, model_a_answer, row.tools, judge_config, client)
async with AsyncOpenAI(api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url")) as client:
# Run two judgment rounds in sequence (A vs B, then B vs A)
result1 = await run_single_judgment(
question_text, model_a_answer, model_b_answer, row.tools, judge_config, client
)
result2 = await run_single_judgment(
question_text, model_b_answer, model_a_answer, row.tools, judge_config, client
)

if not result1 or not result2 or not result1.get("score") or not result2.get("score"):
# If either judgment failed, mark as invalid (don't include in distribution)
Expand Down
41 changes: 25 additions & 16 deletions eval_protocol/quickstart/llm_judge_braintrust.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,39 @@

import pytest

# Skip entire module in CI to prevent import-time side effects
if os.environ.get("CI") == "true":
pytest.skip("Skip quickstart in CI", allow_module_level=True)

from eval_protocol import (
evaluation_test,
aha_judge,
multi_turn_assistant_to_ground_truth,
EvaluationRow,
SingleTurnRolloutProcessor,
create_braintrust_adapter,
DefaultParameterIdGenerator,
)

# adapter = create_braintrust_adapter()
# input_rows = [
# adapter.get_evaluation_rows(
# btql_query=f"""
# select: *
# from: project_logs('{os.getenv("BRAINTRUST_PROJECT_ID")}') traces
# filter: is_root = true
# limit: 10
# """
# )
# ]
input_rows = []
# uncomment when dataloader is fixed


@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI")
@pytest.mark.asyncio
@evaluation_test(
input_rows=[
# adapter.get_evaluation_rows(
# btql_query=f"""
# select: *
# from: project_logs('{os.getenv("BRAINTRUST_PROJECT_ID")}') traces
# filter: is_root = true
# limit: 10
# """
# )
[]
],
completion_params=[
@pytest.mark.parametrize(
"completion_params",
[
{"model": "gpt-4.1"},
{
"max_tokens": 131000,
Expand All @@ -44,10 +51,12 @@
"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b",
},
],
)
@evaluation_test(
input_rows=[input_rows],
rollout_processor=SingleTurnRolloutProcessor(),
preprocess_fn=multi_turn_assistant_to_ground_truth,
max_concurrent_rollouts=64,
aggregation_method="bootstrap",
max_concurrent_evaluations=2,
)
async def test_llm_judge(row: EvaluationRow) -> EvaluationRow:
return await aha_judge(row)
28 changes: 15 additions & 13 deletions eval_protocol/quickstart/llm_judge_langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,25 @@
EvaluationRow,
SingleTurnRolloutProcessor,
create_langfuse_adapter,
DefaultParameterIdGenerator,
)

from eval_protocol.quickstart import aha_judge

adapter = create_langfuse_adapter()
input_rows = adapter.get_evaluation_rows(
to_timestamp=datetime(2025, 9, 12, 0, 11, 18),
limit=711,
sample_size=50,
sleep_between_gets=3.0,
max_retries=5,
)


@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI")
@evaluation_test(
input_rows=[
adapter.get_evaluation_rows(
to_timestamp=datetime(2025, 9, 12, 0, 11, 18),
limit=711,
sample_size=50,
sleep_between_gets=3.0,
max_retries=5,
)
],
completion_params=[
@pytest.mark.parametrize(
"completion_params",
[
{"model": "gpt-4.1"},
{
"max_tokens": 131000,
Expand All @@ -45,10 +45,12 @@
"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b",
},
],
)
@evaluation_test(
input_rows=[input_rows],
rollout_processor=SingleTurnRolloutProcessor(),
preprocess_fn=multi_turn_assistant_to_ground_truth,
max_concurrent_rollouts=2,
aggregation_method="bootstrap",
max_concurrent_evaluations=2,
)
async def test_llm_judge(row: EvaluationRow) -> EvaluationRow:
return await aha_judge(row)
16 changes: 11 additions & 5 deletions eval_protocol/quickstart/llm_judge_langsmith.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
EvaluationRow,
SingleTurnRolloutProcessor,
LangSmithAdapter,
DefaultParameterIdGenerator,
)


Expand All @@ -53,11 +54,13 @@ def fetch_langsmith_traces_as_evaluation_rows(
return []


input_rows = fetch_langsmith_traces_as_evaluation_rows()


@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI")
@pytest.mark.asyncio
@evaluation_test(
input_rows=[fetch_langsmith_traces_as_evaluation_rows()],
completion_params=[
@pytest.mark.parametrize(
"completion_params",
[
{
"model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507",
},
Expand All @@ -67,9 +70,12 @@ def fetch_langsmith_traces_as_evaluation_rows(
"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b",
},
],
)
@evaluation_test(
input_rows=[input_rows],
rollout_processor=SingleTurnRolloutProcessor(),
preprocess_fn=multi_turn_assistant_to_ground_truth,
aggregation_method="bootstrap",
max_concurrent_evaluations=2,
)
async def test_llm_judge_langsmith(row: EvaluationRow) -> EvaluationRow:
"""LLM Judge evaluation over LangSmith-sourced rows, persisted locally by Eval Protocol.
Expand Down
3 changes: 1 addition & 2 deletions eval_protocol/quickstart/llm_judge_openai_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,12 @@
"model": "fireworks_ai/accounts/fireworks/models/kimi-k2-instruct-0905",
},
],
ids=DefaultParameterIdGenerator.generate_id_from_dict,
)
@evaluation_test(
input_rows=[input_rows],
rollout_processor=SingleTurnRolloutProcessor(),
preprocess_fn=multi_turn_assistant_to_ground_truth,
aggregation_method="bootstrap",
max_concurrent_evaluations=2,
)
async def test_llm_judge_openai_responses(row: EvaluationRow) -> EvaluationRow:
return await aha_judge(row)
Loading