diff --git a/eval_protocol/__init__.py b/eval_protocol/__init__.py index cd1efd2c..1a80bad9 100644 --- a/eval_protocol/__init__.py +++ b/eval_protocol/__init__.py @@ -37,18 +37,18 @@ from .resources import create_llm_resource from .reward_function import RewardFunction from .typed_interface import reward_function -from .quickstart import aha_judge, split_multi_turn_rows +from .quickstart import aha_judge, multi_turn_assistant_to_ground_truth, assistant_to_ground_truth from .pytest import evaluation_test, SingleTurnRolloutProcessor from .adapters import OpenAIResponsesAdapter try: - from .adapters import LangfuseAdapter + from .adapters import LangfuseAdapter, create_langfuse_adapter except ImportError: LangfuseAdapter = None try: - from .adapters import BraintrustAdapter + from .adapters import BraintrustAdapter, create_braintrust_adapter except ImportError: BraintrustAdapter = None @@ -62,12 +62,15 @@ __all__ = [ "aha_judge", - "split_multi_turn_rows", + "multi_turn_assistant_to_ground_truth", + "assistant_to_ground_truth", "evaluation_test", "SingleTurnRolloutProcessor", "OpenAIResponsesAdapter", "LangfuseAdapter", + "create_langfuse_adapter", "BraintrustAdapter", + "create_braintrust_adapter", "LangSmithAdapter", # Core interfaces "Message", diff --git a/eval_protocol/adapters/base.py b/eval_protocol/adapters/base.py index 6009b8e1..812cfda6 100644 --- a/eval_protocol/adapters/base.py +++ b/eval_protocol/adapters/base.py @@ -19,3 +19,7 @@ def get_evaluation_rows(self, *args, **kwargs) -> List[EvaluationRow]: def upload_scores(self, rows: List[EvaluationRow], model_name: str, mean_score: float) -> None: """Upload evaluation scores back to the data source for tracking and analysis.""" pass + + def upload_score(self, row: EvaluationRow, model_name: str) -> None: + """Upload evaluation score for a single row back to the data source.""" + pass diff --git a/eval_protocol/adapters/braintrust.py b/eval_protocol/adapters/braintrust.py index 2007f322..45395125 100644 --- a/eval_protocol/adapters/braintrust.py +++ b/eval_protocol/adapters/braintrust.py @@ -264,6 +264,40 @@ def upload_scores(self, rows: List[EvaluationRow], model_name: str, mean_score: except Exception as e: logger.warning("Failed to push scores to Braintrust: %s", e) + def upload_score(self, row: EvaluationRow, model_name: str) -> None: + """Upload evaluation score for a single row back to Braintrust. + + Args: + row: Single EvaluationRow with evaluation_result and session_data containing trace ID + model_name: Name of the model (used as the score name in Braintrust) + """ + try: + if ( + row.evaluation_result + and row.evaluation_result.is_score_valid + and row.input_metadata + and row.input_metadata.session_data + and "braintrust_trace_id" in row.input_metadata.session_data + ): + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + trace_id = row.input_metadata.session_data["braintrust_trace_id"] + if trace_id: + feedback_items = [{"id": trace_id, "scores": {model_name: row.evaluation_result.score}}] + + response = requests.post( + f"{self.api_url}/v1/feedback", + headers=headers, + json={"feedback": feedback_items}, + timeout=30, + ) + response.raise_for_status() + except Exception as e: + logger.warning("Failed to upload single score to Braintrust: %s", e) + def create_braintrust_adapter( api_key: Optional[str] = None, diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index 44c43fe2..99337be9 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -445,9 +445,6 @@ def upload_scores(self, rows: List[EvaluationRow], model_name: str, mean_score: rows: List of EvaluationRow objects with session_data containing trace IDs model_name: Name of the model (used as the score name in Langfuse) mean_score: The calculated mean score to push to Langfuse - - Note: - Silently handles errors if rows lack session data """ try: for trace_id in set( @@ -464,6 +461,31 @@ def upload_scores(self, rows: List[EvaluationRow], model_name: str, mean_score: except Exception as e: logger.warning("Failed to push scores to Langfuse: %s", e) + def upload_score(self, row: EvaluationRow, model_name: str) -> None: + """Upload evaluation score for a single row back to Langfuse. + + Args: + row: Single EvaluationRow with evaluation_result and session_data containing trace ID + model_name: Name of the model (used as the score name in Langfuse) + """ + try: + if ( + row.evaluation_result + and row.evaluation_result.is_score_valid + and row.input_metadata + and row.input_metadata.session_data + and "langfuse_trace_id" in row.input_metadata.session_data + ): + trace_id = row.input_metadata.session_data["langfuse_trace_id"] + if trace_id: + self.client.create_score( + trace_id=trace_id, + name=model_name, + value=row.evaluation_result.score, + ) + except Exception as e: + logger.warning("Failed to push score to Langfuse: %s", e) + def create_langfuse_adapter() -> LangfuseAdapter: """Factory function to create a Langfuse adapter.""" diff --git a/eval_protocol/dataset_logger/__init__.py b/eval_protocol/dataset_logger/__init__.py index c087b6cd..b3fc1cb2 100644 --- a/eval_protocol/dataset_logger/__init__.py +++ b/eval_protocol/dataset_logger/__init__.py @@ -22,11 +22,19 @@ def read(self, rollout_id=None): # Lazy property that creates the logger only when accessed class _LazyLogger(DatasetLogger): + def __init__(self): + self._logger: DatasetLogger | None = None + + def _get_logger(self): + if self._logger is None: + self._logger = _get_default_logger() + return self._logger + def log(self, row): - return _get_default_logger().log(row) + return self._get_logger().log(row) def read(self, rollout_id=None): - return _get_default_logger().read(rollout_id) + return self._get_logger().read(rollout_id) default_logger: DatasetLogger = _LazyLogger() diff --git a/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py b/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py index 6ab0bb8e..a8e7b229 100644 --- a/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py +++ b/eval_protocol/dataset_logger/sqlite_evaluation_row_store.py @@ -17,7 +17,7 @@ class SqliteEvaluationRowStore: def __init__(self, db_path: str): os.makedirs(os.path.dirname(db_path), exist_ok=True) self._db_path = db_path - self._db = SqliteDatabase(self._db_path) + self._db = SqliteDatabase(self._db_path, pragmas={"journal_mode": "wal"}) class BaseModel(Model): class Meta: @@ -41,10 +41,12 @@ def upsert_row(self, data: dict) -> None: rollout_id = data["execution_metadata"]["rollout_id"] if rollout_id is None: raise ValueError("execution_metadata.rollout_id is required to upsert a row") - if self._EvaluationRow.select().where(self._EvaluationRow.rollout_id == rollout_id).exists(): - self._EvaluationRow.update(data=data).where(self._EvaluationRow.rollout_id == rollout_id).execute() - else: - self._EvaluationRow.create(rollout_id=rollout_id, data=data) + + with self._db.atomic("EXCLUSIVE"): + if self._EvaluationRow.select().where(self._EvaluationRow.rollout_id == rollout_id).exists(): + self._EvaluationRow.update(data=data).where(self._EvaluationRow.rollout_id == rollout_id).execute() + else: + self._EvaluationRow.create(rollout_id=rollout_id, data=data) def read_rows(self, rollout_id: Optional[str] = None) -> List[dict]: if rollout_id is None: diff --git a/eval_protocol/pytest/evaluation_test_postprocess.py b/eval_protocol/pytest/evaluation_test_postprocess.py index 6e44c620..aebe579a 100644 --- a/eval_protocol/pytest/evaluation_test_postprocess.py +++ b/eval_protocol/pytest/evaluation_test_postprocess.py @@ -10,7 +10,7 @@ from eval_protocol.models import CompletionParams, EvaluationRow, EvaluationThreshold from eval_protocol.pytest.handle_persist_flow import handle_persist_flow from eval_protocol.pytest.types import EvaluationTestMode -from eval_protocol.pytest.utils import AggregationMethod, aggregate, extract_effort_tag, sanitize_filename # pyright: ignore[reportUnknownVariableType] +from eval_protocol.pytest.utils import AggregationMethod, aggregate, extract_effort_tag, sanitize_filename from eval_protocol.stats.confidence_intervals import compute_fixed_set_mu_ci @@ -25,9 +25,18 @@ def postprocess( num_runs: int, experiment_duration_seconds: float, ): - scores = [ - sum([r.evaluation_result.score for r in result if r.evaluation_result]) / len(result) for result in all_results + valid_results = [ + [r for r in result if r.evaluation_result and r.evaluation_result.is_score_valid] for result in all_results ] + + if aggregation_method == "bootstrap": + scores = [r.evaluation_result.score for result in valid_results for r in result if r.evaluation_result] + else: + scores = [ + sum(r.evaluation_result.score for r in result if r.evaluation_result) / len(result) + for result in valid_results + if result + ] agg_score = aggregate(scores, aggregation_method) # Compute 95% confidence interval for the fixed-set mean μ (by-question, using repeats) diff --git a/eval_protocol/pytest/handle_persist_flow.py b/eval_protocol/pytest/handle_persist_flow.py index 63f865ee..6db75098 100644 --- a/eval_protocol/pytest/handle_persist_flow.py +++ b/eval_protocol/pytest/handle_persist_flow.py @@ -71,7 +71,7 @@ def handle_persist_flow(all_results: list[list[EvaluationRow]], test_func_name: row_data["evals"] = {"score": 0} row_data["eval_details"] = { "score": 0, - "is_score_valid": True, + "is_score_valid": False, "reason": "No evaluation result", "metrics": {}, } diff --git a/eval_protocol/pytest/utils.py b/eval_protocol/pytest/utils.py index ac81eb10..0b0c6bc7 100644 --- a/eval_protocol/pytest/utils.py +++ b/eval_protocol/pytest/utils.py @@ -27,9 +27,36 @@ import logging import json +import pandas as pd -AggregationMethod = Literal["mean", "max", "min"] +AggregationMethod = Literal["mean", "max", "min", "bootstrap"] + + +def calculate_bootstrap_scores(all_scores: list[float]) -> float: + """ + Calculate bootstrap confidence intervals for individual scores. + + Args: + all_scores: List of individual scores from all rows + + Returns: + Mean bootstrap score + """ + if not all_scores: + return 0.0 + + # Create DataFrame (single column of scores) + battles = pd.DataFrame({"score": all_scores}) + + # Bootstrap sampling for calculating relative performance + bootstrap_means = [battles.sample(frac=1.0, replace=True)["score"].mean() for _ in range(100)] + + # Calculate final scores + bootstraps = pd.Series(bootstrap_means) + mean_score = bootstraps.mean() + + return float(mean_score) def aggregate(scores: list[float], method: AggregationMethod) -> float: @@ -41,7 +68,8 @@ def aggregate(scores: list[float], method: AggregationMethod) -> float: return max(scores) if method == "min": return min(scores) - raise ValueError(f"Unknown aggregation method: {method}") # pyright: ignore[reportUnreachable] + if method == "bootstrap": + return calculate_bootstrap_scores(scores) def log_eval_status_and_rows( diff --git a/eval_protocol/quickstart/__init__.py b/eval_protocol/quickstart/__init__.py index 24592748..eed4fd21 100644 --- a/eval_protocol/quickstart/__init__.py +++ b/eval_protocol/quickstart/__init__.py @@ -1,4 +1,4 @@ from .llm_judge import aha_judge -from .utils import split_multi_turn_rows +from .utils import multi_turn_assistant_to_ground_truth, assistant_to_ground_truth -__all__ = ["aha_judge"] +__all__ = ["aha_judge", "multi_turn_assistant_to_ground_truth", "assistant_to_ground_truth"] diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 7c793795..e9621658 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -2,99 +2,86 @@ Default LLM judge for Eval Protocol. Inspired by Arena-Hard-Auto. """ -from tqdm import tqdm from typing import Optional -from eval_protocol.models import EvaluationRow +from eval_protocol.models import EvaluationRow, EvaluateResult, MetricResult from eval_protocol.adapters.base import BaseAdapter from eval_protocol.quickstart.utils import ( JUDGE_CONFIGS, - calculate_bootstrap_scores, - run_judgment_async, + LABEL_TO_SCORE, + serialize_message, + run_single_judgment, ) -import asyncio + from openai import AsyncOpenAI async def aha_judge( - rows: list[EvaluationRow], judge_name: str = "gemini-2.5-pro", adapter: Optional[BaseAdapter] = None -) -> list[EvaluationRow]: + row: EvaluationRow, judge_name: str = "kimi-k2-instruct-0905", adapter: Optional[BaseAdapter] = None +) -> EvaluationRow: """ - LLM Judge evaluation using Arena-Hard-Auto style pairwise comparisons. + LLM Judge evaluation using Arena-Hard-Auto style pairwise comparisons for a single row. - Compares model responses against ground truth using an LLM judge. For each row: + Compares model response against ground truth using an LLM judge: 1. Extracts the question from messages[:-1] 2. Compares messages[-1] (new model response) vs ground_truth (baseline response) 3. Runs two judgment rounds (A vs B, B vs A) to reduce position bias - 4. Calculates bootstrap scores across all comparisons - 5. Updates evaluation_result with final scores and confidence intervals + 4. Returns individual scores for bootstrap aggregation Args: - rows: List of EvaluationRow objects with messages, ground_truth, and tools + row: Single EvaluationRow object with messages, ground_truth, and tools judge_name: Name of the judge configuration to use adapter: Optional adapter to push scores back to (if provided) Returns: - Same rows with updated evaluation_result containing scores and judgments + Same row with updated evaluation_result containing individual judgment scores """ - if not rows: - print("❌ No evaluation rows provided") - return rows - - print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...") - - model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") - - judgments = [] - max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] + if not row.messages: + return row judge_config = JUDGE_CONFIGS[judge_name] - async with AsyncOpenAI( - api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") - ) as shared_client: - semaphore = asyncio.Semaphore(max_concurrency) - - async def run_judgment(row): - async with semaphore: - return await run_judgment_async(row, model_name, judge_name, shared_client) - - tasks = [run_judgment(row) for row in rows] - - for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): - result = await coro - if result and result["games"][0] and result["games"][1]: - judgments.append(result) - - if not judgments: - print("❌ No valid judgments generated") - return rows - - print(f"✅ Generated {len(judgments)} valid judgments") - - # Calculate bootstrap scores - result = calculate_bootstrap_scores(judgments) - if not result: - print("❌ No valid scores extracted") - return rows - - mean_score, lower_score, upper_score = result - - # Print leaderboard - print("\n##### LLM Judge Results (90th percentile CI) #####") - - clean_model_name = model_name.split("/")[-1] # Clean model name - - print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") - print("original: 50.0% (CI: 50.0% - 50.0%)") - - for row in rows: - if row.evaluation_result: - row.evaluation_result.score = mean_score - - # Push scores back to adapter if provided - if adapter: - adapter.upload_scores(rows, model_name, mean_score) - - return rows + # Extract question and answers + question_text = "\n".join([serialize_message(msg) for msg in row.messages[:-1]]) + model_a_answer = str(row.ground_truth) + model_b_answer = serialize_message(row.messages[-1]) + + client = AsyncOpenAI(api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url")) + + # Run two judgment rounds in sequence (A vs B, then B vs A) + result1 = await run_single_judgment(question_text, model_a_answer, model_b_answer, row.tools, judge_config, client) + result2 = await run_single_judgment(question_text, model_b_answer, model_a_answer, row.tools, judge_config, client) + + if not result1 or not result2 or not result1.get("score") or not result2.get("score"): + # If either judgment failed, mark as invalid (don't include in distribution) + final_score = 0.0 + reason = "Failed to get judgment scores" + metrics = {} + is_score_valid = False + else: + # Convert judgment scores to numerical scores + game1_score = 1 - LABEL_TO_SCORE[result1["score"]] + game2_score = LABEL_TO_SCORE[result2["score"]] + final_score = (game1_score + game2_score) / 2 + + reason = f"LLM Judge comparison: Round 1: {result1['score']}, Round 2: {result2['score']}" + metrics = { + "round1_judgment": MetricResult(score=game1_score, reason=result1["judgment"]), + "round2_judgment": MetricResult(score=game2_score, reason=result2["judgment"]), + } + is_score_valid = True + + row.evaluation_result = EvaluateResult( + score=final_score, + reason=reason, + metrics=metrics, + is_score_valid=is_score_valid, + ) + + # Upload score to adapter if provided + if adapter and row.evaluation_result and row.evaluation_result.is_score_valid: + model_name = row.input_metadata.completion_params.get("model", "unknown_model") + adapter.upload_score(row, model_name) + + return row diff --git a/eval_protocol/quickstart/llm_judge_braintrust.py b/eval_protocol/quickstart/llm_judge_braintrust.py index b31966e1..65bde54c 100644 --- a/eval_protocol/quickstart/llm_judge_braintrust.py +++ b/eval_protocol/quickstart/llm_judge_braintrust.py @@ -6,13 +6,14 @@ import pytest -from eval_protocol.models import EvaluationRow -from eval_protocol.pytest import evaluation_test -from eval_protocol.pytest.default_single_turn_rollout_process import SingleTurnRolloutProcessor -from eval_protocol.quickstart.utils import split_multi_turn_rows -from eval_protocol.adapters.braintrust import create_braintrust_adapter -from eval_protocol.quickstart import aha_judge - +from eval_protocol import ( + evaluation_test, + aha_judge, + multi_turn_assistant_to_ground_truth, + EvaluationRow, + SingleTurnRolloutProcessor, + create_braintrust_adapter, +) # adapter = create_braintrust_adapter() @@ -44,9 +45,9 @@ }, ], rollout_processor=SingleTurnRolloutProcessor(), - preprocess_fn=split_multi_turn_rows, + preprocess_fn=multi_turn_assistant_to_ground_truth, max_concurrent_rollouts=64, - mode="all", + aggregation_method="bootstrap", ) -async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: - return await aha_judge(rows) +async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: + return await aha_judge(row) diff --git a/eval_protocol/quickstart/llm_judge_langfuse.py b/eval_protocol/quickstart/llm_judge_langfuse.py index 5154ac8e..bfd19b0d 100644 --- a/eval_protocol/quickstart/llm_judge_langfuse.py +++ b/eval_protocol/quickstart/llm_judge_langfuse.py @@ -7,19 +7,21 @@ import pytest -from eval_protocol.models import EvaluationRow -from eval_protocol.pytest import evaluation_test -from eval_protocol.pytest.default_single_turn_rollout_process import SingleTurnRolloutProcessor -from eval_protocol.quickstart.utils import split_multi_turn_rows +from eval_protocol import ( + evaluation_test, + aha_judge, + multi_turn_assistant_to_ground_truth, + EvaluationRow, + SingleTurnRolloutProcessor, + create_langfuse_adapter, +) -from eval_protocol.adapters.langfuse import create_langfuse_adapter from eval_protocol.quickstart import aha_judge adapter = create_langfuse_adapter() @pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI") -@pytest.mark.asyncio @evaluation_test( input_rows=[ adapter.get_evaluation_rows( @@ -44,9 +46,9 @@ }, ], rollout_processor=SingleTurnRolloutProcessor(), - preprocess_fn=split_multi_turn_rows, - max_concurrent_rollouts=64, - mode="all", + preprocess_fn=multi_turn_assistant_to_ground_truth, + max_concurrent_rollouts=2, + aggregation_method="bootstrap", ) -async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: - return await aha_judge(rows) +async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: + return await aha_judge(row) diff --git a/eval_protocol/quickstart/llm_judge_langsmith.py b/eval_protocol/quickstart/llm_judge_langsmith.py index 16a287a9..b8850ee0 100644 --- a/eval_protocol/quickstart/llm_judge_langsmith.py +++ b/eval_protocol/quickstart/llm_judge_langsmith.py @@ -20,21 +20,18 @@ """ import os -from typing import Any, Dict, List, Optional +from typing import List, Optional -from openai import AsyncOpenAI import pytest -from eval_protocol.models import EvaluationRow, Message, EvaluateResult, MetricResult -from eval_protocol.pytest import evaluation_test -from eval_protocol.pytest.default_single_turn_rollout_process import SingleTurnRolloutProcessor -from eval_protocol.quickstart.utils import ( - split_multi_turn_rows, - JUDGE_CONFIGS, - calculate_bootstrap_scores, - run_judgment_async, +from eval_protocol import ( + evaluation_test, + aha_judge, + multi_turn_assistant_to_ground_truth, + EvaluationRow, + SingleTurnRolloutProcessor, + LangSmithAdapter, ) -from eval_protocol.adapters.langsmith import LangSmithAdapter def fetch_langsmith_traces_as_evaluation_rows( @@ -44,7 +41,7 @@ def fetch_langsmith_traces_as_evaluation_rows( """Fetch LangSmith root runs and convert to EvaluationRow, mirroring Langfuse adapter shape. - Extract messages from run.inputs and run.outputs - - Append assistant message from outputs so split_multi_turn_rows can derive ground_truth + - Append assistant message from outputs so we can derive ground_truth - Store run_id in input_metadata.session_data """ project = project_name or os.getenv("LS_PROJECT", "ep-langgraph-examples") @@ -71,69 +68,12 @@ def fetch_langsmith_traces_as_evaluation_rows( }, ], rollout_processor=SingleTurnRolloutProcessor(), - preprocess_fn=split_multi_turn_rows, - mode="all", + preprocess_fn=multi_turn_assistant_to_ground_truth, + aggregation_method="bootstrap", ) -async def test_llm_judge_langsmith(rows: List[EvaluationRow]) -> List[EvaluationRow]: +async def test_llm_judge_langsmith(row: EvaluationRow) -> EvaluationRow: """LLM Judge evaluation over LangSmith-sourced rows, persisted locally by Eval Protocol. Mirrors quickstart/llm_judge.py, using Arena-Hard-Auto style pairwise judgment. """ - - judge_name = "gemini-2.5-pro" - - if not rows: - print("❌ No evaluation rows provided") - return rows - - print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging (LangSmith source)...") - - model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") - - judgments: List[Dict[str, Any]] = [] - - judge_config = JUDGE_CONFIGS[judge_name] - - async with AsyncOpenAI( - api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") - ) as shared_client: - for row in rows: - result = await run_judgment_async(row, model_name, judge_name, shared_client) - if result and result["games"][0] and result["games"][1]: - judgments.append(result) - - if not judgments: - print("❌ No valid judgments generated") - return rows - - print(f"✅ Generated {len(judgments)} valid judgments") - - result = calculate_bootstrap_scores(judgments) - if not result: - print("❌ No valid scores extracted") - return rows - - mean_score, lower_score, upper_score = result - if mean_score == 0.0: - print("❌ No valid scores extracted") - return rows - - print("\n##### LLM Judge Results (90th percentile CI) #####") - clean_model_name = model_name.split("/")[-1] - print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") - print("original: 50.0% (CI: 50.0% - 50.0%)") - - for row in rows: - if row.evaluation_result: - row.evaluation_result.score = mean_score - row.evaluation_result.standard_error = (upper_score - lower_score) / (2 * 1.645) - else: - row.evaluation_result = EvaluateResult( - score=mean_score, - reason="Aggregated LLM judge score", - metrics={ - "summary": MetricResult(score=mean_score, reason="Aggregated over judgments"), - }, - ) - - return rows + return await aha_judge(row) diff --git a/eval_protocol/quickstart/llm_judge_openai_responses.py b/eval_protocol/quickstart/llm_judge_openai_responses.py index 5d8cb983..27a0deea 100644 --- a/eval_protocol/quickstart/llm_judge_openai_responses.py +++ b/eval_protocol/quickstart/llm_judge_openai_responses.py @@ -16,14 +16,13 @@ """ import os -from typing import List import pytest from eval_protocol import ( evaluation_test, aha_judge, - split_multi_turn_rows, + multi_turn_assistant_to_ground_truth, EvaluationRow, SingleTurnRolloutProcessor, OpenAIResponsesAdapter, @@ -53,8 +52,8 @@ }, ], rollout_processor=SingleTurnRolloutProcessor(), - preprocess_fn=split_multi_turn_rows, - mode="all", + preprocess_fn=multi_turn_assistant_to_ground_truth, + aggregation_method="bootstrap", ) -async def test_llm_judge_openai_responses(rows: List[EvaluationRow]) -> List[EvaluationRow]: - return await aha_judge(rows) +async def test_llm_judge_openai_responses(row: EvaluationRow) -> EvaluationRow: + return await aha_judge(row) diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index 8b98d83e..96ecb808 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -11,7 +11,7 @@ from eval_protocol.models import EvaluationRow, Message, EvaluateResult, MetricResult import asyncio - +from openai import OpenAI OG_ARENA_HARD_PROMPT = """Please act as an impartial judge and evaluate the quality of the responses provided by two AI assistants to the user prompt displayed below. You will be given assistant A's answer and assistant B's answer. Your job is to evaluate which assistant's answer is better. @@ -41,7 +41,6 @@ "model": "gpt-4.1", "temperature": 0.0, "max_tokens": 16000, - "max_concurrency": 64, }, "gemini-2.5-pro": { "model": "gemini-2.5-pro", @@ -49,7 +48,6 @@ "max_tokens": 32000, "api_key": os.getenv("GEMINI_API_KEY"), "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/", - "max_concurrency": 16, }, "gemini-2.5-flash": { "model": "gemini-2.5-flash", @@ -57,7 +55,6 @@ "max_tokens": 32000, "api_key": os.getenv("GEMINI_API_KEY"), "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/", - "max_concurrency": 16, }, "kimi-k2-instruct-0905": { "model": "accounts/fireworks/models/kimi-k2-instruct-0905", @@ -65,23 +62,20 @@ "max_tokens": 131000, "api_key": os.getenv("FIREWORKS_API_KEY"), "base_url": "https://api.fireworks.ai/inference/v1", - "max_concurrency": 64, }, } -# Mapping from Arena-Hard-Auto judgment labels to numerical scores -# Stronger preferences (>> or <<) get weighted more heavily (3x) than slight preferences LABEL_TO_SCORE = { - "A>B": [1], - "A>>B": [1] * 3, - "A=B": [0.5], - "A<A": [0], - "B>>A": [0] * 3, - "B=A": [0.5], - "B<>B": 1.0, + "B<B": 6 / 7, + "BA": 1 / 7, + "A<>A": 0.0, } @@ -110,9 +104,9 @@ def serialize_message(msg: Message) -> str: return "\n".join(parts) -def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: +def multi_turn_assistant_to_ground_truth(data: list[EvaluationRow]) -> list[EvaluationRow]: """ - Split multi-turn conversation rows into individual evaluation rows for each assistant message. + Split multi-turn conversations into rows, with each assistant message as ground truth. Args: data: List of EvaluationRow objects @@ -158,8 +152,44 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: return expanded_rows -async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judge_config, shared_client): - """Async pairwise judgment using a shared client.""" +def assistant_to_ground_truth(data: list[EvaluationRow]) -> list[EvaluationRow]: + """ + Extract the last assistant message as ground truth and remove it from the conversation. + + Args: + data: List of EvaluationRow objects + + Returns: + List of EvaluationRow objects with last assistant message moved to ground_truth + """ + processed_rows = [] + + for row in data: + messages = row.messages.copy() # Don't modify original + + if messages[-1].role == "assistant": + assistant_message = messages[-1] + messages = messages[:-1] + ground_truth_message = serialize_message(assistant_message) + else: + raise ValueError("Last message is not from assistant") + + processed_rows.append( + EvaluationRow( + messages=messages, + tools=row.tools, + input_metadata=row.input_metadata, + ground_truth=ground_truth_message, + ) + ) + + return processed_rows + + +async def run_single_judgment( + question_text: str, answer_a: str, answer_b: str, tools, judge_config, client +) -> Optional[Dict[str, Any]]: + """Run a single pairwise judgment between two answers.""" user_prompt = f"""<|User Prompt|> {question_text} @@ -191,7 +221,7 @@ async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judg api_params["tools"] = tools api_params["tool_choice"] = "none" - response = await shared_client.chat.completions.create(**api_params) + response = await client.chat.completions.create(**api_params) judgment_text = response.choices[0].message.content if not judgment_text: return None @@ -202,82 +232,3 @@ async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judg score = get_score(judgment_text, [r"\[\[([AB<>=]+)\]\]", r"\[([AB<>=]+)\]"]) return {"score": score, "judgment": judgment_text, "prompt": messages} - - -async def run_judgment_async( - row: EvaluationRow, model_name: str, judge_name: str, shared_client: AsyncOpenAI -) -> Optional[Dict[str, Any]]: - """Async judgment using shared client to avoid cleanup issues.""" - if not row.messages: - return None - - question_text = "\n".join([serialize_message(msg) for msg in row.messages[:-1]]) - model_a_answer = row.ground_truth - model_b_answer = serialize_message(row.messages[-1]) - - # Run both rounds concurrently with shared client - result1, result2 = await asyncio.gather( - pairwise_judgment_async( - question_text, model_a_answer, model_b_answer, row.tools, JUDGE_CONFIGS[judge_name], shared_client - ), - pairwise_judgment_async( - question_text, model_b_answer, model_a_answer, row.tools, JUDGE_CONFIGS[judge_name], shared_client - ), - ) - - games = [result1, result2] - - row.evaluation_result = EvaluateResult( - score=0.0, - reason=f"LLM Judge comparison: Round 1: {result1['score']}, Round 2: {result2['score']}" - if result1 and result2 - else "Failed to get judgement scores", - metrics={ - "round1_judgment": MetricResult(score=0.0, reason=result1["judgment"] if result1 else "Failed"), - "round2_judgment": MetricResult(score=0.0, reason=result2["judgment"] if result2 else "Failed"), - }, - ) - - return {"model": model_name, "games": games} - - -def calculate_bootstrap_scores(judgments: List[Dict[str, Any]]) -> Optional[tuple[float, float, float]]: - """ - Calculate bootstrap confidence intervals for Arena-Hard-Auto style judgments. - - Converts judgment labels (A>B, A>>B, etc.) to numerical scores, performs bootstrap - sampling to estimate score distribution, and returns mean with 90% confidence interval. - - Args: - judgments: List of judgment dicts, each containing "games" with two rounds of scores - - Returns: - Optional[tuple]: (mean_score, lower_5th_percentile, upper_95th_percentile) - Returns None if no valid scores found - """ - # Extract scores from judgments - scores_data = [] - for judgment in judgments: - game1, game2 = judgment["games"] - if game1 and game2 and game1.get("score") and game2.get("score"): - # Convert judgment scores to numerical scores - scores = LABEL_TO_SCORE[game2["score"]] + [1 - s for s in LABEL_TO_SCORE[game1["score"]]] - for score in scores: - scores_data.append(score) - - if not scores_data: - return None - - # Create DataFrame (single column of scores) - battles = pd.DataFrame({"score": scores_data}) - - # Bootstrap sampling for calculating relative performance to original model at fixed 50% - bootstrap_means = [battles.sample(frac=1.0, replace=True)["score"].mean() for _ in range(100)] - - # Calculate final scores - bootstraps = pd.Series(bootstrap_means) - mean_score = bootstraps.mean() - lower_score = bootstraps.quantile(0.05) - upper_score = bootstraps.quantile(0.95) - - return mean_score, lower_score, upper_score diff --git a/tests/test_evaluation_postprocess.py b/tests/test_evaluation_postprocess.py new file mode 100644 index 00000000..1bbdb51a --- /dev/null +++ b/tests/test_evaluation_postprocess.py @@ -0,0 +1,207 @@ +"""Tests for evaluation postprocess functionality.""" + +import pytest +from unittest.mock import Mock, patch + +from eval_protocol.models import EvaluationRow, EvaluateResult, EvalMetadata, ExecutionMetadata, InputMetadata +from eval_protocol.pytest.evaluation_test_postprocess import postprocess + + +class TestPostprocess: + """Tests for postprocess function.""" + + def create_test_row(self, score: float, is_valid: bool = True) -> EvaluationRow: + """Helper to create a test evaluation row.""" + return EvaluationRow( + messages=[], + evaluation_result=EvaluateResult(score=score, is_score_valid=is_valid, reason="test"), + input_metadata=InputMetadata(completion_params={"model": "test-model"}), + execution_metadata=ExecutionMetadata(), + eval_metadata=EvalMetadata( + name="test", + description="test", + version="1.0", + status=None, + num_runs=1, + aggregation_method="mean", + passed_threshold=None, + passed=None, + ), + ) + + @patch.dict("os.environ", {"EP_NO_UPLOAD": "1"}) # Disable uploads + def test_bootstrap_aggregation_with_valid_scores(self): + """Test bootstrap aggregation with all valid scores and verify exact scores list.""" + # Create test data: 2 runs with 2 rows each + all_results = [ + [self.create_test_row(0.8), self.create_test_row(0.6)], # Run 1 + [self.create_test_row(0.7), self.create_test_row(0.9)], # Run 2 + ] + + mock_logger = Mock() + + # Mock the aggregate function to capture the exact scores passed to it + with patch("eval_protocol.pytest.evaluation_test_postprocess.aggregate") as mock_aggregate: + mock_aggregate.return_value = 0.75 # Mock return value + + postprocess( + all_results=all_results, + aggregation_method="bootstrap", + threshold=None, + active_logger=mock_logger, + mode="pointwise", + completion_params={"model": "test-model"}, + test_func_name="test_bootstrap", + num_runs=2, + experiment_duration_seconds=10.0, + ) + + # Check that aggregate was called with all individual scores in order + mock_aggregate.assert_called_once_with([0.8, 0.6, 0.7, 0.9], "bootstrap") + + # Should call logger.log for each row + assert mock_logger.log.call_count == 4 + + @patch.dict("os.environ", {"EP_NO_UPLOAD": "1"}) # Disable uploads + def test_bootstrap_aggregation_filters_invalid_scores(self): + """Test that bootstrap aggregation excludes invalid scores and generates correct scores list.""" + # Create test data with some invalid scores + all_results = [ + [ + self.create_test_row(0.8, is_valid=True), + self.create_test_row(0.0, is_valid=False), # Invalid - should be excluded + ], + [ + self.create_test_row(0.7, is_valid=True), + self.create_test_row(0.0, is_valid=False), # Invalid - should be excluded + ], + ] + + mock_logger = Mock() + + # Mock the aggregate function to capture the scores passed to it + with patch("eval_protocol.pytest.evaluation_test_postprocess.aggregate") as mock_aggregate: + mock_aggregate.return_value = 0.75 # Mock return value + + postprocess( + all_results=all_results, + aggregation_method="bootstrap", + threshold=None, + active_logger=mock_logger, + mode="pointwise", + completion_params={"model": "test-model"}, + test_func_name="test_bootstrap_invalid", + num_runs=2, + experiment_duration_seconds=10.0, + ) + + # Check that aggregate was called with only valid scores + mock_aggregate.assert_called_once_with([0.8, 0.7], "bootstrap") + + # Should still call logger.log for all rows (including invalid ones) + assert mock_logger.log.call_count == 4 + + @patch.dict("os.environ", {"EP_NO_UPLOAD": "1"}) # Disable uploads + def test_mean_aggregation_with_valid_scores(self): + """Test mean aggregation with all valid scores.""" + all_results = [ + [self.create_test_row(0.8), self.create_test_row(0.6)], # Run 1: mean = 0.7 + [self.create_test_row(0.4), self.create_test_row(0.8)], # Run 2: mean = 0.6 + ] + + mock_logger = Mock() + + postprocess( + all_results=all_results, + aggregation_method="mean", + threshold=None, + active_logger=mock_logger, + mode="pointwise", + completion_params={"model": "test-model"}, + test_func_name="test_mean", + num_runs=2, + experiment_duration_seconds=10.0, + ) + + # Should call logger.log for each row + assert mock_logger.log.call_count == 4 + + @patch.dict("os.environ", {"EP_NO_UPLOAD": "1"}) # Disable uploads + def test_mean_aggregation_filters_invalid_scores(self): + """Test that mean aggregation excludes invalid scores from run averages.""" + all_results = [ + [ + self.create_test_row(0.8, is_valid=True), + self.create_test_row(0.0, is_valid=False), # Invalid - excluded from run average + ], + [ + self.create_test_row(0.6, is_valid=True), + self.create_test_row(0.4, is_valid=True), + ], + ] + + mock_logger = Mock() + + postprocess( + all_results=all_results, + aggregation_method="mean", + threshold=None, + active_logger=mock_logger, + mode="pointwise", + completion_params={"model": "test-model"}, + test_func_name="test_mean_invalid", + num_runs=2, + experiment_duration_seconds=10.0, + ) + + # Should call logger.log for all rows + assert mock_logger.log.call_count == 4 + + @patch.dict("os.environ", {"EP_NO_UPLOAD": "1"}) # Disable uploads + def test_empty_runs_are_skipped(self): + """Test that runs with no valid scores are skipped.""" + all_results = [ + [self.create_test_row(0.8, is_valid=True)], # Run 1: has valid score + [self.create_test_row(0.0, is_valid=False)], # Run 2: no valid scores - should be skipped + ] + + mock_logger = Mock() + + postprocess( + all_results=all_results, + aggregation_method="mean", + threshold=None, + active_logger=mock_logger, + mode="pointwise", + completion_params={"model": "test-model"}, + test_func_name="test_empty_runs", + num_runs=2, + experiment_duration_seconds=10.0, + ) + + # Should still call logger.log for all rows + assert mock_logger.log.call_count == 2 + + @patch.dict("os.environ", {"EP_NO_UPLOAD": "1"}) # Disable uploads + def test_all_invalid_scores(self): + """Test behavior when all scores are invalid.""" + all_results = [ + [self.create_test_row(0.0, is_valid=False), self.create_test_row(0.0, is_valid=False)], + ] + + mock_logger = Mock() + + postprocess( + all_results=all_results, + aggregation_method="bootstrap", + threshold=None, + active_logger=mock_logger, + mode="pointwise", + completion_params={"model": "test-model"}, + test_func_name="test_all_invalid", + num_runs=1, + experiment_duration_seconds=10.0, + ) + + # Should still call logger.log for all rows + assert mock_logger.log.call_count == 2 diff --git a/tests/test_quickstart_utils.py b/tests/test_quickstart_utils.py index 860d1d35..de185c84 100644 --- a/tests/test_quickstart_utils.py +++ b/tests/test_quickstart_utils.py @@ -3,7 +3,11 @@ import pytest from eval_protocol.models import EvaluationRow, InputMetadata, Message -from eval_protocol.quickstart.utils import split_multi_turn_rows, serialize_message +from eval_protocol.quickstart.utils import ( + multi_turn_assistant_to_ground_truth, + serialize_message, + assistant_to_ground_truth, +) class TestSerializeMessage: @@ -75,8 +79,8 @@ def test_none_content_message(self): assert result == "assistant: None" -class TestSplitMultiTurnRows: - """Tests for split_multi_turn_rows function.""" +class TestMultiTurnAssistantToGroundTruth: + """Tests for multi_turn_assistant_to_ground_truth function.""" def test_single_turn_conversation(self): """Test that single-turn conversations are handled correctly.""" @@ -86,7 +90,7 @@ def test_single_turn_conversation(self): ] row = EvaluationRow(messages=messages) - result = split_multi_turn_rows([row]) + result = multi_turn_assistant_to_ground_truth([row]) assert len(result) == 1 assert len(result[0].messages) == 1 # Only user message before assistant @@ -104,7 +108,7 @@ def test_multi_turn_conversation(self): ] row = EvaluationRow(messages=messages) - result = split_multi_turn_rows([row]) + result = multi_turn_assistant_to_ground_truth([row]) assert len(result) == 2 @@ -131,7 +135,7 @@ def test_conversation_with_system_message(self): ] row = EvaluationRow(messages=messages) - result = split_multi_turn_rows([row]) + result = multi_turn_assistant_to_ground_truth([row]) assert len(result) == 2 @@ -161,7 +165,7 @@ def test_conversation_with_tool_calls(self): ] row = EvaluationRow(messages=messages) - result = split_multi_turn_rows([row]) + result = multi_turn_assistant_to_ground_truth([row]) assert len(result) == 1 expected_ground_truth = 'assistant: I\'ll check that for you.\n[Tool Call: get_weather({"location": "NYC"})]' @@ -176,7 +180,7 @@ def test_multiple_rows_processing(self): messages=[Message(role="user", content="Goodbye"), Message(role="assistant", content="Bye!")] ) - result = split_multi_turn_rows([row1, row2]) + result = multi_turn_assistant_to_ground_truth([row1, row2]) assert len(result) == 2 assert result[0].messages[0].content == "Hello" @@ -189,7 +193,7 @@ def test_no_assistant_messages(self): messages = [Message(role="user", content="Hello"), Message(role="user", content="Anyone there?")] row = EvaluationRow(messages=messages) - result = split_multi_turn_rows([row]) + result = multi_turn_assistant_to_ground_truth([row]) assert len(result) == 0 @@ -198,7 +202,7 @@ def test_only_assistant_messages(self): messages = [Message(role="assistant", content="Hello!"), Message(role="assistant", content="How can I help?")] row = EvaluationRow(messages=messages) - result = split_multi_turn_rows([row]) + result = multi_turn_assistant_to_ground_truth([row]) assert len(result) == 2 # First assistant message (no context) @@ -228,7 +232,7 @@ def test_duplicate_trace_filtering(self): row1 = EvaluationRow(messages=messages1) row2 = EvaluationRow(messages=messages2) - result = split_multi_turn_rows([row1, row2]) + result = multi_turn_assistant_to_ground_truth([row1, row2]) # Should only get 2 unique splits (not 4), because the context leading # to the second assistant message is the same in both rows @@ -248,7 +252,7 @@ def test_tools_and_metadata_preservation(self): messages = [Message(role="user", content="Hello"), Message(role="assistant", content="Hi!")] row = EvaluationRow(messages=messages, tools=tools, input_metadata=input_metadata) - result = split_multi_turn_rows([row]) + result = multi_turn_assistant_to_ground_truth([row]) assert len(result) == 1 assert result[0].tools == tools @@ -256,7 +260,7 @@ def test_tools_and_metadata_preservation(self): def test_empty_input_list(self): """Test that empty input list returns empty result.""" - result = split_multi_turn_rows([]) + result = multi_turn_assistant_to_ground_truth([]) assert len(result) == 0 def test_complex_multi_turn_with_tool_responses(self): @@ -281,7 +285,7 @@ def test_complex_multi_turn_with_tool_responses(self): ] row = EvaluationRow(messages=messages) - result = split_multi_turn_rows([row]) + result = multi_turn_assistant_to_ground_truth([row]) assert len(result) == 3 # Three assistant messages @@ -296,3 +300,89 @@ def test_complex_multi_turn_with_tool_responses(self): # Third assistant message assert len(result[2].messages) == 5 # All previous messages + "Thanks!" assert result[2].ground_truth == "assistant: You're welcome!" + + +class TestAssistantToGroundTruth: + """Tests for assistant_to_ground_truth function.""" + + def test_removes_last_assistant_message(self): + """Test that the last assistant message is removed and set as ground truth.""" + messages = [ + Message(role="user", content="What's the weather like?"), + Message(role="assistant", content="It's sunny today!"), + ] + row = EvaluationRow(messages=messages) + + result = assistant_to_ground_truth([row]) + + assert len(result) == 1 + assert len(result[0].messages) == 1 # Only user message remains + assert result[0].messages[0].role == "user" + assert result[0].messages[0].content == "What's the weather like?" + assert result[0].ground_truth == "assistant: It's sunny today!" + + def test_multi_turn_with_last_assistant(self): + """Test multi-turn conversation where last message is assistant.""" + messages = [ + Message(role="user", content="Hello"), + Message(role="assistant", content="Hi there!"), + Message(role="user", content="How are you?"), + Message(role="assistant", content="I'm doing well!"), + ] + row = EvaluationRow(messages=messages) + + result = assistant_to_ground_truth([row]) + + assert len(result) == 1 + assert len(result[0].messages) == 3 # All except last assistant + assert result[0].messages[-1].content == "How are you?" + assert result[0].ground_truth == "assistant: I'm doing well!" + + def test_fails_when_last_message_not_assistant(self): + """Test that function raises error when last message is not from assistant.""" + messages = [ + Message(role="user", content="Hello"), + Message(role="assistant", content="Hi!"), + Message(role="user", content="Goodbye"), + ] + row = EvaluationRow(messages=messages) + + with pytest.raises(ValueError, match="Last message is not from assistant"): + assistant_to_ground_truth([row]) + + def test_preserves_metadata_and_tools(self): + """Test that tools and metadata are preserved.""" + messages = [ + Message(role="user", content="Hello"), + Message(role="assistant", content="Hi there!"), + ] + tools = [{"type": "function", "function": {"name": "test"}}] + input_metadata = InputMetadata(row_id="test_123", completion_params={}) + row = EvaluationRow(messages=messages, tools=tools, input_metadata=input_metadata) + + result = assistant_to_ground_truth([row]) + + assert len(result) == 1 + assert result[0].tools == tools + assert result[0].input_metadata == input_metadata + assert result[0].ground_truth == "assistant: Hi there!" + + def test_multiple_rows(self): + """Test processing multiple rows.""" + row1 = EvaluationRow( + messages=[Message(role="user", content="Hello"), Message(role="assistant", content="Hi!")] + ) + row2 = EvaluationRow( + messages=[Message(role="user", content="Bye"), Message(role="assistant", content="Goodbye!")] + ) + + result = assistant_to_ground_truth([row1, row2]) + + assert len(result) == 2 + assert result[0].ground_truth == "assistant: Hi!" + assert result[1].ground_truth == "assistant: Goodbye!" + + def test_empty_input_list(self): + """Test that empty input list returns empty result.""" + result = assistant_to_ground_truth([]) + assert len(result) == 0