From 5428de6ab315801f59c2522e1e6c8307d47d1c9b Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 10:31:18 -0700 Subject: [PATCH 01/36] fix langfuse rate limit issue --- eval_protocol/adapters/langfuse.py | 136 ++++++++++++++++++++++---- eval_protocol/mcp/execution/policy.py | 7 +- eval_protocol/quickstart/llm_judge.py | 24 ++++- eval_protocol/quickstart/utils.py | 9 ++ 4 files changed, 150 insertions(+), 26 deletions(-) diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index f3e82329..e670213e 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -6,6 +6,7 @@ from langfuse.api.resources.commons.types.observations_view import ObservationsView import logging +import time from datetime import datetime, timedelta from typing import Any, Dict, Iterator, List, Optional, cast @@ -64,6 +65,9 @@ def get_evaluation_rows( session_id: Optional[str] = None, hours_back: Optional[int] = None, include_tool_calls: bool = True, + page_size: int = 30, + sleep_between_gets: float = 0.1, + max_retries: int = 3, ) -> List[EvaluationRow]: """Pull traces from Langfuse and convert to EvaluationRow format. @@ -74,11 +78,14 @@ def get_evaluation_rows( session_id: Filter by session ID hours_back: Filter traces from this many hours ago include_tool_calls: Whether to include tool calling traces + page_size: Number of traces to fetch per page (smaller = less rate limit issues) + sleep_between_gets: Sleep time between individual trace.get() calls + max_retries: Maximum retries for rate limit errors - Yields: - EvaluationRow: Converted evaluation rows + Returns: + List[EvaluationRow]: Converted evaluation rows """ - # Get traces from Langfuse using new API + eval_rows = [] if hours_back: to_timestamp = datetime.now() @@ -87,26 +94,113 @@ def get_evaluation_rows( to_timestamp = None from_timestamp = None - eval_rows = [] + # Use pagination to process traces in smaller batches (similar to Langfuse migration script) + page = 1 + total_processed = 0 - traces: Traces = self.client.api.trace.list( - limit=limit, - tags=tags, - user_id=user_id, - session_id=session_id, - from_timestamp=from_timestamp, - to_timestamp=to_timestamp, - ) + while total_processed < limit: + current_page_limit = min(page_size, limit - total_processed) - for trace in traces.data: - try: - trace: TraceWithFullDetails = self.client.api.trace.get(trace.id) - eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls) - if eval_row: - eval_rows.append(eval_row) - except (AttributeError, ValueError, KeyError) as e: - logger.warning("Failed to convert trace %s: %s", trace.id, e) - continue + logger.debug( + "Fetching page %d with limit %d (total processed: %d/%d)", + page, + current_page_limit, + total_processed, + limit, + ) + + # Fetch trace list with retry logic + traces = None + list_retries = 0 + while list_retries < max_retries: + try: + traces = self.client.api.trace.list( + page=page, + limit=current_page_limit, + tags=tags, + user_id=user_id, + session_id=session_id, + from_timestamp=from_timestamp, + to_timestamp=to_timestamp, + order_by="timestamp.desc", # Get most recent first + ) + break + except Exception as e: + list_retries += 1 + if "429" in str(e) and list_retries < max_retries: + sleep_time = 2**list_retries # Exponential backoff + logger.warning( + "Rate limit hit on trace.list(), retrying in %ds (attempt %d/%d)", + sleep_time, + list_retries, + max_retries, + ) + time.sleep(sleep_time) + else: + logger.error("Failed to fetch trace list after %d retries: %s", max_retries, e) + return eval_rows # Return what we have so far + + if not traces or not traces.data: + logger.debug("No more traces found on page %d", page) + break + + logger.debug("Processing %d traces from page %d", len(traces.data), page) + + # Process each trace with sleep and retry logic (like migration script) + for trace_info in traces.data: + if total_processed >= limit: + break + + # Sleep between gets to avoid rate limits + if sleep_between_gets > 0: + time.sleep(sleep_between_gets) + + # Fetch full trace details with retry logic + trace_full = None + detail_retries = 0 + while detail_retries < max_retries: + try: + trace_full = self.client.api.trace.get(trace_info.id) + break + except Exception as e: + detail_retries += 1 + if "429" in str(e) and detail_retries < max_retries: + sleep_time = 2**detail_retries # Exponential backoff + logger.warning( + "Rate limit hit on trace.get(%s), retrying in %ds (attempt %d/%d)", + trace_info.id, + sleep_time, + detail_retries, + max_retries, + ) + time.sleep(sleep_time) + else: + logger.warning( + "Failed to fetch trace %s after %d retries: %s", trace_info.id, max_retries, e + ) + break # Skip this trace + + if trace_full: + try: + eval_row = self._convert_trace_to_evaluation_row(trace_full, include_tool_calls) + if eval_row: + eval_rows.append(eval_row) + total_processed += 1 + except (AttributeError, ValueError, KeyError) as e: + logger.warning("Failed to convert trace %s: %s", trace_info.id, e) + continue + + # Check if we have more pages + if hasattr(traces.meta, "page") and hasattr(traces.meta, "total_pages"): + if traces.meta.page >= traces.meta.total_pages: + break + elif len(traces.data) < current_page_limit: + # If we got fewer traces than requested, we're probably at the end + break + + page += 1 + + logger.info("Successfully processed %d traces into %d evaluation rows", total_processed, len(eval_rows)) return eval_rows def get_evaluation_rows_by_ids( diff --git a/eval_protocol/mcp/execution/policy.py b/eval_protocol/mcp/execution/policy.py index 1adb9b95..0916e933 100644 --- a/eval_protocol/mcp/execution/policy.py +++ b/eval_protocol/mcp/execution/policy.py @@ -194,7 +194,12 @@ async def _make_llm_call(self, messages: List[Dict[str, Any]], tools: List[Dict[ request_params["tools"] = tools try: - response = await acompletion(model=self.model_id, **request_params) + response = await acompletion( + model=self.model_id, + **request_params, + api_base="https://litellm-cloud-proxy-prod-zfdbl7ykrq-uc.a.run.app/v1", + extra_body={"tags": ["kimi-k2-tau-bench"]}, + ) # Log cache hit/miss for monitoring hidden = getattr(response, "_hidden_params", {}) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 28d40366..2e0564ba 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -27,19 +27,34 @@ @pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI") @pytest.mark.asyncio @evaluation_test( - input_rows=[fetch_langfuse_traces_as_evaluation_rows()], + input_rows=[ + fetch_langfuse_traces_as_evaluation_rows( + hours_back=24, + limit=20, + page_size=10, + sleep_between_gets=3.0, + max_retries=5, + ) + ], completion_params=[ + # { + # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", + # }, + {"model": "gpt-4.1"}, { - "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", + "max_tokens": 131000, + "extra_body": {"reasoning_effort": "medium"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, { "max_tokens": 131000, "extra_body": {"reasoning_effort": "low"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, ], rollout_processor=SingleTurnRolloutProcessor(), preprocess_fn=split_multi_turn_rows, + max_concurrent_rollouts=64, mode="all", ) async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: @@ -61,6 +76,7 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: """ judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. + # judge_name = "gpt-4.1" if not rows: print("❌ No evaluation rows provided") @@ -110,6 +126,6 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: ) # Standard error approximation from 90% CI # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. - push_scores_to_langfuse(rows, model_name, mean_score) + # push_scores_to_langfuse(rows, model_name, mean_score) return rows diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index 46186819..df0eb4f9 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -201,6 +201,9 @@ def fetch_langfuse_traces_as_evaluation_rows( session_id: Optional[str] = None, hours_back: Optional[int] = None, include_tool_calls: bool = True, + page_size: int = 30, + sleep_between_gets: float = 0.1, + max_retries: int = 3, ) -> List[EvaluationRow]: """ Fetch Langfuse traces and convert them to EvaluationRow objects. @@ -212,6 +215,9 @@ def fetch_langfuse_traces_as_evaluation_rows( session_id: Filter traces by session ID hours_back: Only fetch traces from the last N hours include_tool_calls: Whether to include tool calls in messages + page_size: Number of traces to fetch per page (smaller = less rate limit issues) + sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit) + max_retries: Maximum retries for rate limit errors Returns: List of EvaluationRow objects converted from Langfuse traces @@ -227,6 +233,9 @@ def fetch_langfuse_traces_as_evaluation_rows( session_id=session_id, hours_back=hours_back, include_tool_calls=include_tool_calls, + page_size=page_size, + sleep_between_gets=sleep_between_gets, + max_retries=max_retries, ) except Exception as e: print(f"❌ LangfuseAdapter failed: {e}") From 5ccaa48f54e8d71e993ac6b1281e2c5e234bc2ac Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 10:58:32 -0700 Subject: [PATCH 02/36] to revert later, get 50 random traces to query --- eval_protocol/adapters/langfuse.py | 148 +++++++++++------------------ 1 file changed, 54 insertions(+), 94 deletions(-) diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index e670213e..6cfce17f 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -6,6 +6,7 @@ from langfuse.api.resources.commons.types.observations_view import ObservationsView import logging +import random import time from datetime import datetime, timedelta from typing import Any, Dict, Iterator, List, Optional, cast @@ -94,113 +95,72 @@ def get_evaluation_rows( to_timestamp = None from_timestamp = None - # Use pagination to process traces in smaller batches (similar to Langfuse migration script) - page = 1 - total_processed = 0 + # Temporary: Fetch more traces than needed, then randomly sample + fetch_limit = min(limit * 3, 500) # Fetch 3x more traces than needed (up to 500 max) - while total_processed < limit: - current_page_limit = min(page_size, limit - total_processed) + logger.debug("Fetching %d traces to randomly sample %d", fetch_limit, limit) - logger.debug( - "Fetching page %d with limit %d (total processed: %d/%d)", - page, - current_page_limit, - total_processed, - limit, - ) + # Single API call to get trace list + traces = self.client.api.trace.list( + limit=fetch_limit, + tags=tags, + user_id=user_id, + session_id=session_id, + from_timestamp=from_timestamp, + to_timestamp=to_timestamp, + order_by="timestamp.desc", + ) + + if not traces or not traces.data: + logger.debug("No traces found") + return eval_rows + + # Randomly sample the requested number of traces + available_traces = traces.data + sample_size = min(limit, len(available_traces)) + selected_traces = random.sample(available_traces, sample_size) - # Fetch trace list with retry logic - traces = None - list_retries = 0 - while list_retries < max_retries: + logger.debug("Randomly selected %d traces from %d available", len(selected_traces), len(available_traces)) + + # Process each selected trace with sleep and retry logic + for i, trace_info in enumerate(selected_traces): + # Sleep between gets to avoid rate limits + if sleep_between_gets > 0 and i > 0: + time.sleep(sleep_between_gets) + + # Fetch full trace details with retry logic + trace_full = None + detail_retries = 0 + while detail_retries < max_retries: try: - traces = self.client.api.trace.list( - page=page, - limit=current_page_limit, - tags=tags, - user_id=user_id, - session_id=session_id, - from_timestamp=from_timestamp, - to_timestamp=to_timestamp, - order_by="timestamp.desc", # Get most recent first - ) + trace_full = self.client.api.trace.get(trace_info.id) break except Exception as e: - list_retries += 1 - if "429" in str(e) and list_retries < max_retries: - sleep_time = 2**list_retries # Exponential backoff + detail_retries += 1 + if "429" in str(e) and detail_retries < max_retries: + sleep_time = 2**detail_retries # Exponential backoff logger.warning( - "Rate limit hit on trace.list(), retrying in %ds (attempt %d/%d)", + "Rate limit hit on trace.get(%s), retrying in %ds (attempt %d/%d)", + trace_info.id, sleep_time, - list_retries, + detail_retries, max_retries, ) time.sleep(sleep_time) else: - logger.error("Failed to fetch trace list after %d retries: %s", max_retries, e) - return eval_rows # Return what we have so far - - if not traces or not traces.data: - logger.debug("No more traces found on page %d", page) - break + logger.warning("Failed to fetch trace %s after %d retries: %s", trace_info.id, max_retries, e) + break # Skip this trace - logger.debug("Processing %d traces from page %d", len(traces.data), page) - - # Process each trace with sleep and retry logic (like migration script) - for trace_info in traces.data: - if total_processed >= limit: - break - - # Sleep between gets to avoid rate limits - if sleep_between_gets > 0: - time.sleep(sleep_between_gets) - - # Fetch full trace details with retry logic - trace_full = None - detail_retries = 0 - while detail_retries < max_retries: - try: - trace_full = self.client.api.trace.get(trace_info.id) - break - except Exception as e: - detail_retries += 1 - if "429" in str(e) and detail_retries < max_retries: - sleep_time = 2**detail_retries # Exponential backoff - logger.warning( - "Rate limit hit on trace.get(%s), retrying in %ds (attempt %d/%d)", - trace_info.id, - sleep_time, - detail_retries, - max_retries, - ) - time.sleep(sleep_time) - else: - logger.warning( - "Failed to fetch trace %s after %d retries: %s", trace_info.id, max_retries, e - ) - break # Skip this trace - - if trace_full: - try: - eval_row = self._convert_trace_to_evaluation_row(trace_full, include_tool_calls) - if eval_row: - eval_rows.append(eval_row) - total_processed += 1 - except (AttributeError, ValueError, KeyError) as e: - logger.warning("Failed to convert trace %s: %s", trace_info.id, e) - continue - - # Check if we have more pages - if hasattr(traces.meta, "page") and hasattr(traces.meta, "total_pages"): - if traces.meta.page >= traces.meta.total_pages: - break - elif len(traces.data) < current_page_limit: - # If we got fewer traces than requested, we're probably at the end - break - - page += 1 - - logger.info("Successfully processed %d traces into %d evaluation rows", total_processed, len(eval_rows)) + if trace_full: + try: + eval_row = self._convert_trace_to_evaluation_row(trace_full, include_tool_calls) + if eval_row: + eval_rows.append(eval_row) + except (AttributeError, ValueError, KeyError) as e: + logger.warning("Failed to convert trace %s: %s", trace_info.id, e) + continue + + logger.info("Successfully processed %d traces into %d evaluation rows", len(selected_traces), len(eval_rows)) return eval_rows def get_evaluation_rows_by_ids( From 2b83d60631108cf3d4c5d58afaef2b82e57a1360 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 11:08:33 -0700 Subject: [PATCH 03/36] don't skip --- eval_protocol/quickstart/llm_judge.py | 1 - 1 file changed, 1 deletion(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 2e0564ba..7fb06aa8 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -24,7 +24,6 @@ from concurrent.futures import ThreadPoolExecutor -@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI") @pytest.mark.asyncio @evaluation_test( input_rows=[ From cafd9f7740f1a4302ca0c5ab72be93d3bb9944f1 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 12:53:20 -0700 Subject: [PATCH 04/36] make judgment async --- eval_protocol/quickstart/llm_judge.py | 27 +++-- eval_protocol/quickstart/utils.py | 144 +++++++++----------------- 2 files changed, 68 insertions(+), 103 deletions(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 7fb06aa8..a92fc85d 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -17,11 +17,10 @@ fetch_langfuse_traces_as_evaluation_rows, calculate_bootstrap_scores, push_scores_to_langfuse, - run_judgment, + run_judgment_async_with_shared_client, ) - -import concurrent.futures -from concurrent.futures import ThreadPoolExecutor +import asyncio +from openai import AsyncOpenAI @pytest.mark.asyncio @@ -29,7 +28,7 @@ input_rows=[ fetch_langfuse_traces_as_evaluation_rows( hours_back=24, - limit=20, + limit=1, page_size=10, sleep_between_gets=3.0, max_retries=5, @@ -88,11 +87,21 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: judgments = [] max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] - with ThreadPoolExecutor(max_workers=max_concurrency) as executor: - futures = [executor.submit(run_judgment, row, model_name, judge_name) for row in rows] + judge_config = JUDGE_CONFIGS[judge_name] + + async with AsyncOpenAI( + api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") + ) as shared_client: + semaphore = asyncio.Semaphore(max_concurrency) + + async def run_judgment_with_semaphore(row): + async with semaphore: + return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) + + tasks = [run_judgment_with_semaphore(row) for row in rows] - for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Generating judgments"): - result = future.result() + for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): + result = await coro if result and result["games"][0] and result["games"][1]: judgments.append(result) diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index df0eb4f9..bb4f1282 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -8,6 +8,7 @@ import pandas as pd from eval_protocol.models import EvaluationRow, Message, EvaluateResult, MetricResult +import asyncio OG_ARENA_HARD_PROMPT = """Please act as an impartial judge and evaluate the quality of the responses provided by two AI assistants to the user prompt displayed below. You will be given assistant A's answer and assistant B's answer. Your job is to evaluate which assistant's answer is better. @@ -132,8 +133,8 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: return expanded_rows -def pairwise_judgment(question_text, answer_a, answer_b, tools, judge_config): - """Pairwise judgment function. Adapted from arena-hard-auto/gen_judgment.py""" +async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judge_config, shared_client): + """Async pairwise judgment using a shared client.""" user_prompt = f"""<|User Prompt|> {question_text} @@ -143,39 +144,29 @@ def pairwise_judgment(question_text, answer_a, answer_b, tools, judge_config): <|The Start of Assistant B's Answer|> {answer_b} -<|The End of Assistant B's Answer|>""" +<|The End of Assistant B's Answer|> - messages = [ - { - "role": "system", - "content": OG_ARENA_HARD_PROMPT, - }, - { - "role": "user", - "content": user_prompt, - }, - ] +<|Available Tools|> +{tools} +<|End of Available Tools|> - try: - from openai import OpenAI +{OG_ARENA_HARD_PROMPT}""" - client = OpenAI(api_key=judge_config["api_key"], base_url=judge_config["base_url"]) + messages = [{"role": "user", "content": user_prompt}] + try: api_params = { "model": judge_config["model"], - "messages": messages, # type: ignore + "messages": messages, "temperature": judge_config["temperature"], "max_tokens": judge_config["max_tokens"], } if tools: api_params["tools"] = tools - api_params["tool_choice"] = ( - "none" # Judge can see tools to help in response, but won't actually try to call them - ) - - response = client.chat.completions.create(**api_params) + api_params["tool_choice"] = "none" + response = await shared_client.chat.completions.create(**api_params) judgment_text = response.choices[0].message.content if not judgment_text: return None @@ -185,13 +176,44 @@ def pairwise_judgment(question_text, answer_a, answer_b, tools, judge_config): return None score = get_score(judgment_text, [r"\[\[([AB<>=]+)\]\]", r"\[([AB<>=]+)\]"]) + return {"score": score, "judgment": judgment_text, "prompt": messages} - result = { - "score": score, - "judgment": judgment_text, - "prompt": messages, - } - return result + +async def run_judgment_async_with_shared_client( + row: EvaluationRow, model_name: str, judge_name: str, shared_client +) -> Optional[Dict[str, Any]]: + """Async judgment using shared client to avoid cleanup issues.""" + if not row.messages: + return None + + question_text = "\n".join([serialize_message(msg) for msg in row.messages[:-1]]) + model_a_answer = row.ground_truth + model_b_answer = serialize_message(row.messages[-1]) + + # Run both rounds concurrently with shared client + result1, result2 = await asyncio.gather( + pairwise_judgment_async( + question_text, model_a_answer, model_b_answer, row.tools, JUDGE_CONFIGS[judge_name], shared_client + ), + pairwise_judgment_async( + question_text, model_b_answer, model_a_answer, row.tools, JUDGE_CONFIGS[judge_name], shared_client + ), + ) + + games = [result1, result2] + + row.evaluation_result = EvaluateResult( + score=0.0, + reason=f"LLM Judge comparison: Round 1: {result1['score']}, Round 2: {result2['score']}" + if result1 and result2 + else "Failed to get judgement scores", + metrics={ + "round1_judgment": MetricResult(score=0.0, reason=result1["judgment"] if result1 else "Failed"), + "round2_judgment": MetricResult(score=0.0, reason=result2["judgment"] if result2 else "Failed"), + }, + ) + + return {"model": model_name, "games": games} def fetch_langfuse_traces_as_evaluation_rows( @@ -284,72 +306,6 @@ def calculate_bootstrap_scores(judgments: List[Dict[str, Any]]) -> tuple[float, return mean_score, lower_score, upper_score -def run_judgment(row: EvaluationRow, model_name: str, judge_name: str) -> Optional[Dict[str, Any]]: - """ - Run Arena-Hard-Auto style pairwise judgment for a single evaluation row. - - Performs two rounds of judgment (A vs B, B vs A) to reduce position bias: - - Round 1: ground_truth (original) vs messages[-1] (new model response) - - Round 2: messages[-1] (new model response) vs ground_truth (original) - - Updates the row's evaluation_result with judgment details and returns results - for aggregation across the dataset. - - Args: - row: EvaluationRow containing messages, ground_truth, and tools - model_name: Name of the model being evaluated (for result tracking) - judge_name: Key from JUDGE_CONFIGS to use for judgment - - Returns: - Dict with "model" and "games" keys, or None if row has no messages - """ - if not row.messages: - return None - - question_text = "\n".join([serialize_message(msg) for msg in row.messages[:-1]]) - model_a_answer = row.ground_truth - model_b_answer = serialize_message(row.messages[-1]) - - games = [] - - # Round 1: A vs B (original vs comparison) - result1 = pairwise_judgment( - question_text=question_text, - answer_a=model_a_answer, - answer_b=model_b_answer, - tools=row.tools, - judge_config=JUDGE_CONFIGS[judge_name], - ) - games.append(result1) - - # Round 2: B vs A (comparison vs original) - result2 = pairwise_judgment( - question_text=question_text, - answer_a=model_b_answer, - answer_b=model_a_answer, - tools=row.tools, - judge_config=JUDGE_CONFIGS[judge_name], - ) - games.append(result2) - - row.evaluation_result = EvaluateResult( - score=0.0, - reason=f"LLM Judge comparison: Round 1: {result1['score']}, Round 2: {result2['score']}" - if result1 and result2 - else "Failed to get judgement scores", - metrics={ - "round1_judgment": MetricResult( - score=0.0, reason=result1["judgment"] if result1 else "Failed to get judgment reason" - ), - "round2_judgment": MetricResult( - score=0.0, reason=result2["judgment"] if result2 else "Failed to get judgment reason" - ), - }, - ) - - return {"model": model_name, "games": games} - - def push_scores_to_langfuse(rows: List[EvaluationRow], model_name: str, mean_score: float) -> None: """ Push evaluation scores back to Langfuse traces for tracking and analysis. From 494b5c9cf42222a50234a62705de92ea08748451 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 12:53:36 -0700 Subject: [PATCH 05/36] bump limit up --- eval_protocol/quickstart/llm_judge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index a92fc85d..afef2698 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -28,7 +28,7 @@ input_rows=[ fetch_langfuse_traces_as_evaluation_rows( hours_back=24, - limit=1, + limit=25, page_size=10, sleep_between_gets=3.0, max_retries=5, From e70327b8065fe00f0f77afb03ebc4a04c64cf536 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 12:59:21 -0700 Subject: [PATCH 06/36] lower concurrency for gemini --- eval_protocol/quickstart/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index bb4f1282..aa26f780 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -47,7 +47,7 @@ "max_tokens": 32000, "api_key": os.getenv("GEMINI_API_KEY"), "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/", - "max_concurrency": 32, + "max_concurrency": 16, }, } From d845a4bce0b843b277fbee2bcb2caaeaa4efa04e Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 13:05:08 -0700 Subject: [PATCH 07/36] small limit to see if we get the error still --- eval_protocol/quickstart/llm_judge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index afef2698..a92fc85d 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -28,7 +28,7 @@ input_rows=[ fetch_langfuse_traces_as_evaluation_rows( hours_back=24, - limit=25, + limit=1, page_size=10, sleep_between_gets=3.0, max_retries=5, From 0a7b1c5382bb59c9120fa045e7c97cb46f98c24a Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 13:36:19 -0700 Subject: [PATCH 08/36] test --- eval_protocol/quickstart/llm_judge.py | 95 ++++++++++++++------------- 1 file changed, 48 insertions(+), 47 deletions(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index a92fc85d..93be3283 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -53,9 +53,9 @@ rollout_processor=SingleTurnRolloutProcessor(), preprocess_fn=split_multi_turn_rows, max_concurrent_rollouts=64, - mode="all", + mode="pointwise", ) -async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: +async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: """ LLM Judge evaluation using Arena-Hard-Auto style pairwise comparisons. @@ -72,68 +72,69 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: Returns: Same rows with updated evaluation_result containing scores and judgments """ + return row - judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. - # judge_name = "gpt-4.1" + # judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. + # # judge_name = "gpt-4.1" - if not rows: - print("❌ No evaluation rows provided") - return rows + # if not rows: + # print("❌ No evaluation rows provided") + # return rows - print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...") + # print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...") - model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") + # model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") - judgments = [] - max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] + # judgments = [] + # max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] - judge_config = JUDGE_CONFIGS[judge_name] + # judge_config = JUDGE_CONFIGS[judge_name] - async with AsyncOpenAI( - api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") - ) as shared_client: - semaphore = asyncio.Semaphore(max_concurrency) + # async with AsyncOpenAI( + # api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") + # ) as shared_client: + # semaphore = asyncio.Semaphore(max_concurrency) - async def run_judgment_with_semaphore(row): - async with semaphore: - return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) + # async def run_judgment_with_semaphore(row): + # async with semaphore: + # return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) - tasks = [run_judgment_with_semaphore(row) for row in rows] + # tasks = [run_judgment_with_semaphore(row) for row in rows] - for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): - result = await coro - if result and result["games"][0] and result["games"][1]: - judgments.append(result) + # for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): + # result = await coro + # if result and result["games"][0] and result["games"][1]: + # judgments.append(result) - if not judgments: - print("❌ No valid judgments generated") - return rows + # if not judgments: + # print("❌ No valid judgments generated") + # return rows - print(f"✅ Generated {len(judgments)} valid judgments") + # print(f"✅ Generated {len(judgments)} valid judgments") - # Calculate bootstrap scores - mean_score, lower_score, upper_score = calculate_bootstrap_scores(judgments) + # # Calculate bootstrap scores + # mean_score, lower_score, upper_score = calculate_bootstrap_scores(judgments) - if mean_score == 0.0: - print("❌ No valid scores extracted") - return rows + # if mean_score == 0.0: + # print("❌ No valid scores extracted") + # return rows - # Print leaderboard - print("\n##### LLM Judge Results (90th percentile CI) #####") + # # Print leaderboard + # print("\n##### LLM Judge Results (90th percentile CI) #####") - clean_model_name = model_name.split("/")[-1] # Clean model name + # clean_model_name = model_name.split("/")[-1] # Clean model name - print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") - print("original: 50.0% (CI: 50.0% - 50.0%)") + # print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") + # print("original: 50.0% (CI: 50.0% - 50.0%)") - for row in rows: - if row.evaluation_result: - row.evaluation_result.score = mean_score - row.evaluation_result.standard_error = (upper_score - lower_score) / ( - 2 * 1.645 - ) # Standard error approximation from 90% CI + # for row in rows: + # if row.evaluation_result: + # row.evaluation_result.score = mean_score + # row.evaluation_result.standard_error = (upper_score - lower_score) / ( + # 2 * 1.645 + # ) # Standard error approximation from 90% CI - # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. - # push_scores_to_langfuse(rows, model_name, mean_score) + # # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. + # # push_scores_to_langfuse(rows, model_name, mean_score) - return rows + # return rows From c68e38cec2d575b9d5db4e1b0039fc7053c50163 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 13:43:13 -0700 Subject: [PATCH 09/36] test --- eval_protocol/benchmarks/test_aime25.py | 11 ++- eval_protocol/quickstart/llm_judge.py | 95 ++++++++++++------------- 2 files changed, 55 insertions(+), 51 deletions(-) diff --git a/eval_protocol/benchmarks/test_aime25.py b/eval_protocol/benchmarks/test_aime25.py index 91a67f77..d7f2bfe1 100644 --- a/eval_protocol/benchmarks/test_aime25.py +++ b/eval_protocol/benchmarks/test_aime25.py @@ -82,13 +82,18 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: "max_tokens": 131000, "extra_body": {"reasoning_effort": "low"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", - } + }, + { + "max_tokens": 131000, + "extra_body": {"reasoning_effort": "medium"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + }, ], rollout_processor=SingleTurnRolloutProcessor(), aggregation_method="mean", passed_threshold=0.8, - num_runs=8, - max_dataset_rows=2, + num_runs=1, + max_dataset_rows=1, max_concurrent_rollouts=4, mode="pointwise", ) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 93be3283..a92fc85d 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -53,9 +53,9 @@ rollout_processor=SingleTurnRolloutProcessor(), preprocess_fn=split_multi_turn_rows, max_concurrent_rollouts=64, - mode="pointwise", + mode="all", ) -async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: +async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: """ LLM Judge evaluation using Arena-Hard-Auto style pairwise comparisons. @@ -72,69 +72,68 @@ async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: Returns: Same rows with updated evaluation_result containing scores and judgments """ - return row - # judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. - # # judge_name = "gpt-4.1" + judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. + # judge_name = "gpt-4.1" - # if not rows: - # print("❌ No evaluation rows provided") - # return rows + if not rows: + print("❌ No evaluation rows provided") + return rows - # print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...") + print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...") - # model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") + model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") - # judgments = [] - # max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] + judgments = [] + max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] - # judge_config = JUDGE_CONFIGS[judge_name] + judge_config = JUDGE_CONFIGS[judge_name] - # async with AsyncOpenAI( - # api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") - # ) as shared_client: - # semaphore = asyncio.Semaphore(max_concurrency) + async with AsyncOpenAI( + api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") + ) as shared_client: + semaphore = asyncio.Semaphore(max_concurrency) - # async def run_judgment_with_semaphore(row): - # async with semaphore: - # return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) + async def run_judgment_with_semaphore(row): + async with semaphore: + return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) - # tasks = [run_judgment_with_semaphore(row) for row in rows] + tasks = [run_judgment_with_semaphore(row) for row in rows] - # for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): - # result = await coro - # if result and result["games"][0] and result["games"][1]: - # judgments.append(result) + for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): + result = await coro + if result and result["games"][0] and result["games"][1]: + judgments.append(result) - # if not judgments: - # print("❌ No valid judgments generated") - # return rows + if not judgments: + print("❌ No valid judgments generated") + return rows - # print(f"✅ Generated {len(judgments)} valid judgments") + print(f"✅ Generated {len(judgments)} valid judgments") - # # Calculate bootstrap scores - # mean_score, lower_score, upper_score = calculate_bootstrap_scores(judgments) + # Calculate bootstrap scores + mean_score, lower_score, upper_score = calculate_bootstrap_scores(judgments) - # if mean_score == 0.0: - # print("❌ No valid scores extracted") - # return rows + if mean_score == 0.0: + print("❌ No valid scores extracted") + return rows - # # Print leaderboard - # print("\n##### LLM Judge Results (90th percentile CI) #####") + # Print leaderboard + print("\n##### LLM Judge Results (90th percentile CI) #####") - # clean_model_name = model_name.split("/")[-1] # Clean model name + clean_model_name = model_name.split("/")[-1] # Clean model name - # print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") - # print("original: 50.0% (CI: 50.0% - 50.0%)") + print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") + print("original: 50.0% (CI: 50.0% - 50.0%)") - # for row in rows: - # if row.evaluation_result: - # row.evaluation_result.score = mean_score - # row.evaluation_result.standard_error = (upper_score - lower_score) / ( - # 2 * 1.645 - # ) # Standard error approximation from 90% CI + for row in rows: + if row.evaluation_result: + row.evaluation_result.score = mean_score + row.evaluation_result.standard_error = (upper_score - lower_score) / ( + 2 * 1.645 + ) # Standard error approximation from 90% CI - # # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. - # # push_scores_to_langfuse(rows, model_name, mean_score) + # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. + # push_scores_to_langfuse(rows, model_name, mean_score) - # return rows + return rows From b58d491a70923017a3ddc63ee5cda1b6747b06cc Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 13:49:27 -0700 Subject: [PATCH 10/36] try this --- eval_protocol/quickstart/llm_judge.py | 93 ++++++++++++++------------- eval_protocol/quickstart/utils.py | 2 +- 2 files changed, 48 insertions(+), 47 deletions(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index a92fc85d..f19ddc05 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -53,9 +53,9 @@ rollout_processor=SingleTurnRolloutProcessor(), preprocess_fn=split_multi_turn_rows, max_concurrent_rollouts=64, - mode="all", + mode="pointwise", ) -async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: +async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: """ LLM Judge evaluation using Arena-Hard-Auto style pairwise comparisons. @@ -72,68 +72,69 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: Returns: Same rows with updated evaluation_result containing scores and judgments """ + return row - judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. + # # judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. # judge_name = "gpt-4.1" - if not rows: - print("❌ No evaluation rows provided") - return rows + # if not rows: + # print("❌ No evaluation rows provided") + # return rows - print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...") + # print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...") - model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") + # model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") - judgments = [] - max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] + # judgments = [] + # max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] - judge_config = JUDGE_CONFIGS[judge_name] + # judge_config = JUDGE_CONFIGS[judge_name] - async with AsyncOpenAI( - api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") - ) as shared_client: - semaphore = asyncio.Semaphore(max_concurrency) + # async with AsyncOpenAI( + # api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") + # ) as shared_client: + # semaphore = asyncio.Semaphore(max_concurrency) - async def run_judgment_with_semaphore(row): - async with semaphore: - return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) + # async def run_judgment_with_semaphore(row): + # async with semaphore: + # return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) - tasks = [run_judgment_with_semaphore(row) for row in rows] + # tasks = [run_judgment_with_semaphore(row) for row in rows] - for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): - result = await coro - if result and result["games"][0] and result["games"][1]: - judgments.append(result) + # for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): + # result = await coro + # if result and result["games"][0] and result["games"][1]: + # judgments.append(result) - if not judgments: - print("❌ No valid judgments generated") - return rows + # if not judgments: + # print("❌ No valid judgments generated") + # return rows - print(f"✅ Generated {len(judgments)} valid judgments") + # print(f"✅ Generated {len(judgments)} valid judgments") - # Calculate bootstrap scores - mean_score, lower_score, upper_score = calculate_bootstrap_scores(judgments) + # # Calculate bootstrap scores + # mean_score, lower_score, upper_score = calculate_bootstrap_scores(judgments) - if mean_score == 0.0: - print("❌ No valid scores extracted") - return rows + # if mean_score == 0.0: + # print("❌ No valid scores extracted") + # return rows - # Print leaderboard - print("\n##### LLM Judge Results (90th percentile CI) #####") + # # Print leaderboard + # print("\n##### LLM Judge Results (90th percentile CI) #####") - clean_model_name = model_name.split("/")[-1] # Clean model name + # clean_model_name = model_name.split("/")[-1] # Clean model name - print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") - print("original: 50.0% (CI: 50.0% - 50.0%)") + # print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") + # print("original: 50.0% (CI: 50.0% - 50.0%)") - for row in rows: - if row.evaluation_result: - row.evaluation_result.score = mean_score - row.evaluation_result.standard_error = (upper_score - lower_score) / ( - 2 * 1.645 - ) # Standard error approximation from 90% CI + # for row in rows: + # if row.evaluation_result: + # row.evaluation_result.score = mean_score + # row.evaluation_result.standard_error = (upper_score - lower_score) / ( + # 2 * 1.645 + # ) # Standard error approximation from 90% CI - # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. - # push_scores_to_langfuse(rows, model_name, mean_score) + # # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. + # # push_scores_to_langfuse(rows, model_name, mean_score) - return rows + # return rows diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index aa26f780..bd115e4e 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -130,7 +130,7 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: ) ) - return expanded_rows + return expanded_rows[0] async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judge_config, shared_client): From a59036e03e886977d0b4dae16eab00cd97f5fd1e Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:00:01 -0700 Subject: [PATCH 11/36] fix --- eval_protocol/pytest/evaluation_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index a7ec65f3..efd0d69d 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -253,7 +253,7 @@ def _log_eval_error(status: Status, rows: list[EvaluationRow] | None, passed: bo for row in data: # generate a stable row_id for each row - if row.input_metadata.row_id is None: + if row.input_metadata and row.input_metadata.row_id is None: # Generate a stable, deterministic row_id using the row's hash and num_combinations index = hash(row) max_index = num_combinations() - 1 From 173b65b31f8b26a7aed565c42fab00df41d6695e Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:03:16 -0700 Subject: [PATCH 12/36] fix --- eval_protocol/pytest/evaluation_test.py | 2 +- eval_protocol/quickstart/utils.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index efd0d69d..a7ec65f3 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -253,7 +253,7 @@ def _log_eval_error(status: Status, rows: list[EvaluationRow] | None, passed: bo for row in data: # generate a stable row_id for each row - if row.input_metadata and row.input_metadata.row_id is None: + if row.input_metadata.row_id is None: # Generate a stable, deterministic row_id using the row's hash and num_combinations index = hash(row) max_index = num_combinations() - 1 diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index bd115e4e..465b7d28 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -130,7 +130,7 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: ) ) - return expanded_rows[0] + return [expanded_rows[0]] async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judge_config, shared_client): From 96b0d343ea77ecb45dfe65db07a6599eda0cc24c Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:07:02 -0700 Subject: [PATCH 13/36] no split --- eval_protocol/quickstart/llm_judge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index f19ddc05..010f5a38 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -51,7 +51,7 @@ }, ], rollout_processor=SingleTurnRolloutProcessor(), - preprocess_fn=split_multi_turn_rows, + # preprocess_fn=split_multi_turn_rows, max_concurrent_rollouts=64, mode="pointwise", ) From 549b396af7bf2d2579644323130909f5cb177c1b Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:09:42 -0700 Subject: [PATCH 14/36] ok wtf --- eval_protocol/quickstart/llm_judge.py | 44 ++++++++++++++++----------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 010f5a38..04e90889 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -8,7 +8,7 @@ import pytest -from eval_protocol.models import EvaluateResult, EvaluationRow, MetricResult +from eval_protocol.models import EvaluateResult, EvaluationRow, MetricResult, Message from eval_protocol.pytest import evaluation_test from eval_protocol.pytest.default_single_turn_rollout_process import SingleTurnRolloutProcessor from eval_protocol.quickstart.utils import ( @@ -23,36 +23,44 @@ from openai import AsyncOpenAI +def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: + converted: List[EvaluationRow] = [] + for r in rows: + question = r.get("question", "") + answer = r.get("answer", None) + messages = [ + Message(role="system", content="hi"), + Message(role="user", content=str(question)), + ] + converted.append(EvaluationRow(messages=messages, ground_truth=str(answer) if answer is not None else None)) + return converted + + @pytest.mark.asyncio @evaluation_test( - input_rows=[ - fetch_langfuse_traces_as_evaluation_rows( - hours_back=24, - limit=1, - page_size=10, - sleep_between_gets=3.0, - max_retries=5, - ) + input_dataset=[ + "https://huggingface.co/datasets/opencompass/AIME2025/raw/main/aime2025-I.jsonl", + "https://huggingface.co/datasets/opencompass/AIME2025/raw/main/aime2025-II.jsonl", ], + dataset_adapter=aime2025_dataset_adapter, completion_params=[ - # { - # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", - # }, - {"model": "gpt-4.1"}, { "max_tokens": 131000, - "extra_body": {"reasoning_effort": "medium"}, + "extra_body": {"reasoning_effort": "low"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, { "max_tokens": 131000, - "extra_body": {"reasoning_effort": "low"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", + "extra_body": {"reasoning_effort": "medium"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, ], rollout_processor=SingleTurnRolloutProcessor(), - # preprocess_fn=split_multi_turn_rows, - max_concurrent_rollouts=64, + aggregation_method="mean", + passed_threshold=0.8, + num_runs=1, + max_dataset_rows=1, + max_concurrent_rollouts=4, mode="pointwise", ) async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: From d9ea13351face788d23e892abac2625c2bc2bc87 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:12:51 -0700 Subject: [PATCH 15/36] try something else --- eval_protocol/quickstart/llm_judge.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 04e90889..90f6bec9 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -29,7 +29,10 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: question = r.get("question", "") answer = r.get("answer", None) messages = [ - Message(role="system", content="hi"), + Message( + role="system", + content="You are a helpful math assistant. Please reason step by step, and put your final answer within \\boxed{...}.", + ), Message(role="user", content=str(question)), ] converted.append(EvaluationRow(messages=messages, ground_truth=str(answer) if answer is not None else None)) @@ -44,23 +47,24 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: ], dataset_adapter=aime2025_dataset_adapter, completion_params=[ + # { + # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", + # }, + {"model": "gpt-4.1"}, { "max_tokens": 131000, - "extra_body": {"reasoning_effort": "low"}, + "extra_body": {"reasoning_effort": "medium"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, { "max_tokens": 131000, - "extra_body": {"reasoning_effort": "medium"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + "extra_body": {"reasoning_effort": "low"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, ], rollout_processor=SingleTurnRolloutProcessor(), - aggregation_method="mean", - passed_threshold=0.8, - num_runs=1, - max_dataset_rows=1, - max_concurrent_rollouts=4, + preprocess_fn=split_multi_turn_rows, + max_concurrent_rollouts=64, mode="pointwise", ) async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: From f357065704632ef9a646c3a8c1efa570debbabb4 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:15:08 -0700 Subject: [PATCH 16/36] test --- eval_protocol/quickstart/llm_judge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 90f6bec9..ec734fde 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -63,7 +63,7 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: }, ], rollout_processor=SingleTurnRolloutProcessor(), - preprocess_fn=split_multi_turn_rows, + # preprocess_fn=split_multi_turn_rows, max_concurrent_rollouts=64, mode="pointwise", ) From 088dea6e5247091101333dae174e7873a88865da Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:16:51 -0700 Subject: [PATCH 17/36] 1 run --- eval_protocol/quickstart/llm_judge.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index ec734fde..20038300 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -64,6 +64,8 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: ], rollout_processor=SingleTurnRolloutProcessor(), # preprocess_fn=split_multi_turn_rows, + num_runs=1, + max_dataset_rows=1, max_concurrent_rollouts=64, mode="pointwise", ) From 009b0bb24b7c39508bfb252ad42e3915dd0656e5 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:22:34 -0700 Subject: [PATCH 18/36] same as aime now --- eval_protocol/quickstart/llm_judge.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 20038300..d2a240d8 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -39,7 +39,6 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: return converted -@pytest.mark.asyncio @evaluation_test( input_dataset=[ "https://huggingface.co/datasets/opencompass/AIME2025/raw/main/aime2025-I.jsonl", @@ -47,26 +46,23 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: ], dataset_adapter=aime2025_dataset_adapter, completion_params=[ - # { - # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", - # }, - {"model": "gpt-4.1"}, { "max_tokens": 131000, - "extra_body": {"reasoning_effort": "medium"}, + "extra_body": {"reasoning_effort": "low"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, { "max_tokens": 131000, - "extra_body": {"reasoning_effort": "low"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", + "extra_body": {"reasoning_effort": "medium"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, ], rollout_processor=SingleTurnRolloutProcessor(), - # preprocess_fn=split_multi_turn_rows, + aggregation_method="mean", + passed_threshold=0.8, num_runs=1, max_dataset_rows=1, - max_concurrent_rollouts=64, + max_concurrent_rollouts=4, mode="pointwise", ) async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: From 8c62b6be4792e4eecb5235a3538c62374270d2ce Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:24:50 -0700 Subject: [PATCH 19/36] try osmething else --- eval_protocol/quickstart/llm_judge.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index d2a240d8..b9755833 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -39,6 +39,7 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: return converted +@pytest.mark.asyncio @evaluation_test( input_dataset=[ "https://huggingface.co/datasets/opencompass/AIME2025/raw/main/aime2025-I.jsonl", @@ -46,15 +47,19 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: ], dataset_adapter=aime2025_dataset_adapter, completion_params=[ + # { + # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", + # }, + {"model": "gpt-4.1"}, { "max_tokens": 131000, - "extra_body": {"reasoning_effort": "low"}, + "extra_body": {"reasoning_effort": "medium"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, { "max_tokens": 131000, - "extra_body": {"reasoning_effort": "medium"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + "extra_body": {"reasoning_effort": "low"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, ], rollout_processor=SingleTurnRolloutProcessor(), @@ -62,7 +67,7 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: passed_threshold=0.8, num_runs=1, max_dataset_rows=1, - max_concurrent_rollouts=4, + max_concurrent_rollouts=64, mode="pointwise", ) async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: From 466581ce0a778e208f66c42a9127da92cd00453f Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:26:42 -0700 Subject: [PATCH 20/36] remove gpt --- eval_protocol/quickstart/llm_judge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index b9755833..8ed86e37 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -50,7 +50,7 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: # { # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", # }, - {"model": "gpt-4.1"}, + # {"model": "gpt-4.1"}, { "max_tokens": 131000, "extra_body": {"reasoning_effort": "medium"}, From 620611f58b8db66e029ac6e734f0aaa43df5d44c Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:27:19 -0700 Subject: [PATCH 21/36] gpt --- eval_protocol/benchmarks/test_aime25.py | 1 + 1 file changed, 1 insertion(+) diff --git a/eval_protocol/benchmarks/test_aime25.py b/eval_protocol/benchmarks/test_aime25.py index d7f2bfe1..5b4ea823 100644 --- a/eval_protocol/benchmarks/test_aime25.py +++ b/eval_protocol/benchmarks/test_aime25.py @@ -78,6 +78,7 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: ], dataset_adapter=aime2025_dataset_adapter, completion_params=[ + {"model": "gpt-4.1"}, { "max_tokens": 131000, "extra_body": {"reasoning_effort": "low"}, From c5d17e459cb1763b60a3be14c1c0f947b0479bc4 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:41:26 -0700 Subject: [PATCH 22/36] try to mute and see what happens --- .../default_single_turn_rollout_process.py | 9 ++ eval_protocol/quickstart/llm_judge.py | 132 ++++++++---------- 2 files changed, 67 insertions(+), 74 deletions(-) diff --git a/eval_protocol/pytest/default_single_turn_rollout_process.py b/eval_protocol/pytest/default_single_turn_rollout_process.py index 2b4bf893..8196f75e 100644 --- a/eval_protocol/pytest/default_single_turn_rollout_process.py +++ b/eval_protocol/pytest/default_single_turn_rollout_process.py @@ -5,8 +5,16 @@ from typing import List from litellm import acompletion +import litellm from typing import Dict +# Fix LiteLLM event loop binding issues by setting logging to ERROR level +# This disables the logging worker that causes event loop binding problems +import os + +if os.environ.get("LITELLM_LOG") is None: + os.environ["LITELLM_LOG"] = "ERROR" + from eval_protocol.dataset_logger import default_logger from eval_protocol.models import EvaluationRow, Message from openai.types import CompletionUsage @@ -21,6 +29,7 @@ class SingleTurnRolloutProcessor(RolloutProcessor): def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: """Generate single turn rollout tasks and return them for external handling.""" + # Do not modify global LiteLLM cache. Disable caching per-request instead. async def process_row(row: EvaluationRow) -> EvaluationRow: diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 8ed86e37..9fabb895 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -8,7 +8,7 @@ import pytest -from eval_protocol.models import EvaluateResult, EvaluationRow, MetricResult, Message +from eval_protocol.models import EvaluateResult, EvaluationRow, MetricResult from eval_protocol.pytest import evaluation_test from eval_protocol.pytest.default_single_turn_rollout_process import SingleTurnRolloutProcessor from eval_protocol.quickstart.utils import ( @@ -23,34 +23,22 @@ from openai import AsyncOpenAI -def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: - converted: List[EvaluationRow] = [] - for r in rows: - question = r.get("question", "") - answer = r.get("answer", None) - messages = [ - Message( - role="system", - content="You are a helpful math assistant. Please reason step by step, and put your final answer within \\boxed{...}.", - ), - Message(role="user", content=str(question)), - ] - converted.append(EvaluationRow(messages=messages, ground_truth=str(answer) if answer is not None else None)) - return converted - - @pytest.mark.asyncio @evaluation_test( - input_dataset=[ - "https://huggingface.co/datasets/opencompass/AIME2025/raw/main/aime2025-I.jsonl", - "https://huggingface.co/datasets/opencompass/AIME2025/raw/main/aime2025-II.jsonl", + input_rows=[ + fetch_langfuse_traces_as_evaluation_rows( + hours_back=24, + limit=1, + page_size=10, + sleep_between_gets=3.0, + max_retries=5, + ) ], - dataset_adapter=aime2025_dataset_adapter, completion_params=[ # { # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", # }, - # {"model": "gpt-4.1"}, + {"model": "gpt-4.1"}, { "max_tokens": 131000, "extra_body": {"reasoning_effort": "medium"}, @@ -63,14 +51,11 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: }, ], rollout_processor=SingleTurnRolloutProcessor(), - aggregation_method="mean", - passed_threshold=0.8, - num_runs=1, - max_dataset_rows=1, + # preprocess_fn=split_multi_turn_rows, max_concurrent_rollouts=64, - mode="pointwise", + mode="all", ) -async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: +async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: """ LLM Judge evaluation using Arena-Hard-Auto style pairwise comparisons. @@ -87,69 +72,68 @@ async def test_llm_judge(row: EvaluationRow) -> EvaluationRow: Returns: Same rows with updated evaluation_result containing scores and judgments """ - return row - # # judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. - # judge_name = "gpt-4.1" + # judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. + judge_name = "gpt-4.1" - # if not rows: - # print("❌ No evaluation rows provided") - # return rows + if not rows: + print("❌ No evaluation rows provided") + return rows - # print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...") + print(f"🔄 Processing {len(rows)} evaluation rows for LLM judging...") - # model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") + model_name = rows[0].input_metadata.completion_params.get("model", "unknown_model") - # judgments = [] - # max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] + judgments = [] + max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] - # judge_config = JUDGE_CONFIGS[judge_name] + judge_config = JUDGE_CONFIGS[judge_name] - # async with AsyncOpenAI( - # api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") - # ) as shared_client: - # semaphore = asyncio.Semaphore(max_concurrency) + async with AsyncOpenAI( + api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") + ) as shared_client: + semaphore = asyncio.Semaphore(max_concurrency) - # async def run_judgment_with_semaphore(row): - # async with semaphore: - # return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) + async def run_judgment_with_semaphore(row): + async with semaphore: + return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) - # tasks = [run_judgment_with_semaphore(row) for row in rows] + tasks = [run_judgment_with_semaphore(row) for row in rows] - # for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): - # result = await coro - # if result and result["games"][0] and result["games"][1]: - # judgments.append(result) + for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): + result = await coro + if result and result["games"][0] and result["games"][1]: + judgments.append(result) - # if not judgments: - # print("❌ No valid judgments generated") - # return rows + if not judgments: + print("❌ No valid judgments generated") + return rows - # print(f"✅ Generated {len(judgments)} valid judgments") + print(f"✅ Generated {len(judgments)} valid judgments") - # # Calculate bootstrap scores - # mean_score, lower_score, upper_score = calculate_bootstrap_scores(judgments) + # Calculate bootstrap scores + mean_score, lower_score, upper_score = calculate_bootstrap_scores(judgments) - # if mean_score == 0.0: - # print("❌ No valid scores extracted") - # return rows + if mean_score == 0.0: + print("❌ No valid scores extracted") + return rows - # # Print leaderboard - # print("\n##### LLM Judge Results (90th percentile CI) #####") + # Print leaderboard + print("\n##### LLM Judge Results (90th percentile CI) #####") - # clean_model_name = model_name.split("/")[-1] # Clean model name + clean_model_name = model_name.split("/")[-1] # Clean model name - # print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") - # print("original: 50.0% (CI: 50.0% - 50.0%)") + print(f"{clean_model_name}: {mean_score:.1%} (CI: {lower_score:.1%} - {upper_score:.1%})") + print("original: 50.0% (CI: 50.0% - 50.0%)") - # for row in rows: - # if row.evaluation_result: - # row.evaluation_result.score = mean_score - # row.evaluation_result.standard_error = (upper_score - lower_score) / ( - # 2 * 1.645 - # ) # Standard error approximation from 90% CI + for row in rows: + if row.evaluation_result: + row.evaluation_result.score = mean_score + row.evaluation_result.standard_error = (upper_score - lower_score) / ( + 2 * 1.645 + ) # Standard error approximation from 90% CI - # # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. - # # push_scores_to_langfuse(rows, model_name, mean_score) + # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. + # push_scores_to_langfuse(rows, model_name, mean_score) - # return rows + return rows From ce04620b9c23414bc5103a484099b4c79d939398 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:47:05 -0700 Subject: [PATCH 23/36] monkey patch --- .../default_single_turn_rollout_process.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/eval_protocol/pytest/default_single_turn_rollout_process.py b/eval_protocol/pytest/default_single_turn_rollout_process.py index 8196f75e..352972c8 100644 --- a/eval_protocol/pytest/default_single_turn_rollout_process.py +++ b/eval_protocol/pytest/default_single_turn_rollout_process.py @@ -8,12 +8,16 @@ import litellm from typing import Dict -# Fix LiteLLM event loop binding issues by setting logging to ERROR level -# This disables the logging worker that causes event loop binding problems -import os - -if os.environ.get("LITELLM_LOG") is None: - os.environ["LITELLM_LOG"] = "ERROR" +# Fix LiteLLM event loop binding issues by disabling the logging worker +try: + # Disable LiteLLM's async logging worker to prevent event loop binding issues + import litellm.litellm_core_utils.logging_worker + + # Monkey patch to disable the worker entirely + original_start_worker = litellm.litellm_core_utils.logging_worker.start_worker + litellm.litellm_core_utils.logging_worker.start_worker = lambda *args, **kwargs: None +except Exception: + pass # Best effort from eval_protocol.dataset_logger import default_logger from eval_protocol.models import EvaluationRow, Message From c1b0516bc34639af04b461877b7ea472da86cfef Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:51:02 -0700 Subject: [PATCH 24/36] try --- .../pytest/default_single_turn_rollout_process.py | 13 ------------- eval_protocol/quickstart/llm_judge.py | 7 ++++++- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/eval_protocol/pytest/default_single_turn_rollout_process.py b/eval_protocol/pytest/default_single_turn_rollout_process.py index 352972c8..2b4bf893 100644 --- a/eval_protocol/pytest/default_single_turn_rollout_process.py +++ b/eval_protocol/pytest/default_single_turn_rollout_process.py @@ -5,20 +5,8 @@ from typing import List from litellm import acompletion -import litellm from typing import Dict -# Fix LiteLLM event loop binding issues by disabling the logging worker -try: - # Disable LiteLLM's async logging worker to prevent event loop binding issues - import litellm.litellm_core_utils.logging_worker - - # Monkey patch to disable the worker entirely - original_start_worker = litellm.litellm_core_utils.logging_worker.start_worker - litellm.litellm_core_utils.logging_worker.start_worker = lambda *args, **kwargs: None -except Exception: - pass # Best effort - from eval_protocol.dataset_logger import default_logger from eval_protocol.models import EvaluationRow, Message from openai.types import CompletionUsage @@ -33,7 +21,6 @@ class SingleTurnRolloutProcessor(RolloutProcessor): def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> List[asyncio.Task[EvaluationRow]]: """Generate single turn rollout tasks and return them for external handling.""" - # Do not modify global LiteLLM cache. Disable caching per-request instead. async def process_row(row: EvaluationRow) -> EvaluationRow: diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 9fabb895..04acbaa0 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -38,7 +38,7 @@ # { # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", # }, - {"model": "gpt-4.1"}, + # {"model": "gpt-4.1"}, { "max_tokens": 131000, "extra_body": {"reasoning_effort": "medium"}, @@ -49,6 +49,11 @@ "extra_body": {"reasoning_effort": "low"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, + { + "max_tokens": 131000, + "extra_body": {"reasoning_effort": "low"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + }, ], rollout_processor=SingleTurnRolloutProcessor(), # preprocess_fn=split_multi_turn_rows, From a26cf027a8e52a2037838fb7c126c5b30574b798 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:55:28 -0700 Subject: [PATCH 25/36] broken still --- eval_protocol/benchmarks/test_aime25.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/eval_protocol/benchmarks/test_aime25.py b/eval_protocol/benchmarks/test_aime25.py index 5b4ea823..0a38991e 100644 --- a/eval_protocol/benchmarks/test_aime25.py +++ b/eval_protocol/benchmarks/test_aime25.py @@ -78,7 +78,6 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: ], dataset_adapter=aime2025_dataset_adapter, completion_params=[ - {"model": "gpt-4.1"}, { "max_tokens": 131000, "extra_body": {"reasoning_effort": "low"}, @@ -89,6 +88,11 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: "extra_body": {"reasoning_effort": "medium"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, + { + "max_tokens": 131000, + "extra_body": {"reasoning_effort": "low"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", + }, ], rollout_processor=SingleTurnRolloutProcessor(), aggregation_method="mean", From d5f3b81bb54adcd5ec7f936444d38ee6853e750c Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 14:58:40 -0700 Subject: [PATCH 26/36] how about 2 and 4 --- eval_protocol/benchmarks/test_aime25.py | 5 +++++ eval_protocol/quickstart/llm_judge.py | 10 +++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/eval_protocol/benchmarks/test_aime25.py b/eval_protocol/benchmarks/test_aime25.py index 0a38991e..eb0aa2e6 100644 --- a/eval_protocol/benchmarks/test_aime25.py +++ b/eval_protocol/benchmarks/test_aime25.py @@ -93,6 +93,11 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: "extra_body": {"reasoning_effort": "low"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, + { + "max_tokens": 131000, + "extra_body": {"reasoning_effort": "medium"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", + }, ], rollout_processor=SingleTurnRolloutProcessor(), aggregation_method="mean", diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 04acbaa0..08fe5b17 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -49,11 +49,11 @@ "extra_body": {"reasoning_effort": "low"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, - { - "max_tokens": 131000, - "extra_body": {"reasoning_effort": "low"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", - }, + # { + # "max_tokens": 131000, + # "extra_body": {"reasoning_effort": "low"}, + # "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + # }, ], rollout_processor=SingleTurnRolloutProcessor(), # preprocess_fn=split_multi_turn_rows, From ada2ef21908750a3ce1854d9887de8a839b088ac Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 15:15:51 -0700 Subject: [PATCH 27/36] fix single turn rollout acompletion --- .../default_single_turn_rollout_process.py | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/eval_protocol/pytest/default_single_turn_rollout_process.py b/eval_protocol/pytest/default_single_turn_rollout_process.py index 2b4bf893..0bc9b1c9 100644 --- a/eval_protocol/pytest/default_single_turn_rollout_process.py +++ b/eval_protocol/pytest/default_single_turn_rollout_process.py @@ -5,7 +5,6 @@ from typing import List from litellm import acompletion -from typing import Dict from eval_protocol.dataset_logger import default_logger from eval_protocol.models import EvaluationRow, Message @@ -62,15 +61,10 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: if row.tools is not None: request_params["tools"] = row.tools - # Dynamic import to avoid static dependency/lint errors if LiteLLM isn't installed yet - import importlib - - _litellm = importlib.import_module("litellm") - acompletion = getattr(_litellm, "acompletion") response = await acompletion(**request_params) - assistant_content = response.choices[0].message.content or "" - tool_calls = response.choices[0].message.tool_calls if response.choices[0].message.tool_calls else None + assistant_content = response.choices[0].message.content or "" # pyright: ignore[reportAttributeAccessIssue] + tool_calls = response.choices[0].message.tool_calls if response.choices[0].message.tool_calls else None # pyright: ignore[reportAttributeAccessIssue] converted_tool_calls = None if tool_calls: @@ -112,9 +106,9 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: ] row.execution_metadata.usage = CompletionUsage( - prompt_tokens=response.usage.prompt_tokens, - completion_tokens=response.usage.completion_tokens, - total_tokens=response.usage.total_tokens, + prompt_tokens=response.usage.prompt_tokens, # pyright: ignore[reportAttributeAccessIssue] + completion_tokens=response.usage.completion_tokens, # pyright: ignore[reportAttributeAccessIssue] + total_tokens=response.usage.total_tokens, # pyright: ignore[reportAttributeAccessIssue] ) row.messages = messages From 4c6a80c795d68ec447028b98128bf811e7b6ff00 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 15:17:32 -0700 Subject: [PATCH 28/36] add back --- eval_protocol/quickstart/llm_judge.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 08fe5b17..04acbaa0 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -49,11 +49,11 @@ "extra_body": {"reasoning_effort": "low"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, - # { - # "max_tokens": 131000, - # "extra_body": {"reasoning_effort": "low"}, - # "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", - # }, + { + "max_tokens": 131000, + "extra_body": {"reasoning_effort": "low"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + }, ], rollout_processor=SingleTurnRolloutProcessor(), # preprocess_fn=split_multi_turn_rows, From 9aff19c11f299205eb5f35c22be595bf00dd16ef Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 15:34:53 -0700 Subject: [PATCH 29/36] test repro --- .../quickstart/test_litellm_bug_reproducer.py | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 eval_protocol/quickstart/test_litellm_bug_reproducer.py diff --git a/eval_protocol/quickstart/test_litellm_bug_reproducer.py b/eval_protocol/quickstart/test_litellm_bug_reproducer.py new file mode 100644 index 00000000..245390b8 --- /dev/null +++ b/eval_protocol/quickstart/test_litellm_bug_reproducer.py @@ -0,0 +1,27 @@ +""" +Minimal reproducer for LiteLLM event loop binding bug. + +This reproduces the issue where LiteLLM's LoggingWorker gets bound to the first +event loop but tries to operate from subsequent event loops in pytest parameterized tests. + +Only reproduces in CI environments (GitHub Actions), not locally. +""" + +import pytest +import litellm + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "model", + [ + "gpt-4o", + # "gpt-4o-mini", + # "gpt-3.5-turbo", + "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", + ], +) +async def test_multiple_models(model): + response = await litellm.acompletion(model=model, messages=[{"role": "user", "content": "Hello"}]) + assert response.choices[0].message.content # pyright: ignore[reportAttributeAccessIssue] From ba6ff3292bb09456ab2d6b2f4d58c99519531651 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 15:41:42 -0700 Subject: [PATCH 30/36] add --- eval_protocol/quickstart/llm_judge.py | 17 ++++++++++++ .../quickstart/test_litellm_bug_reproducer.py | 27 ------------------- 2 files changed, 17 insertions(+), 27 deletions(-) delete mode 100644 eval_protocol/quickstart/test_litellm_bug_reproducer.py diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 04acbaa0..b5ddd945 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -21,6 +21,23 @@ ) import asyncio from openai import AsyncOpenAI +import litellm + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "model", + [ + "gpt-4o", + # "gpt-4o-mini", + # "gpt-3.5-turbo", + "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", + ], +) +async def test_multiple_models(model): + response = await litellm.acompletion(model=model, messages=[{"role": "user", "content": "Hello"}]) + assert response.choices[0].message.content # pyright: ignore[reportAttributeAccessIssue] @pytest.mark.asyncio diff --git a/eval_protocol/quickstart/test_litellm_bug_reproducer.py b/eval_protocol/quickstart/test_litellm_bug_reproducer.py deleted file mode 100644 index 245390b8..00000000 --- a/eval_protocol/quickstart/test_litellm_bug_reproducer.py +++ /dev/null @@ -1,27 +0,0 @@ -""" -Minimal reproducer for LiteLLM event loop binding bug. - -This reproduces the issue where LiteLLM's LoggingWorker gets bound to the first -event loop but tries to operate from subsequent event loops in pytest parameterized tests. - -Only reproduces in CI environments (GitHub Actions), not locally. -""" - -import pytest -import litellm - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "model", - [ - "gpt-4o", - # "gpt-4o-mini", - # "gpt-3.5-turbo", - "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", - "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", - ], -) -async def test_multiple_models(model): - response = await litellm.acompletion(model=model, messages=[{"role": "user", "content": "Hello"}]) - assert response.choices[0].message.content # pyright: ignore[reportAttributeAccessIssue] From 21fdb2b80c42e50efe570e801e176f5201b129da Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 16:20:22 -0700 Subject: [PATCH 31/36] undo weird changes i made --- eval_protocol/benchmarks/test_aime25.py | 21 +++---------------- .../default_single_turn_rollout_process.py | 16 +++++++++----- eval_protocol/quickstart/utils.py | 2 +- 3 files changed, 15 insertions(+), 24 deletions(-) diff --git a/eval_protocol/benchmarks/test_aime25.py b/eval_protocol/benchmarks/test_aime25.py index eb0aa2e6..91a67f77 100644 --- a/eval_protocol/benchmarks/test_aime25.py +++ b/eval_protocol/benchmarks/test_aime25.py @@ -82,28 +82,13 @@ def aime2025_dataset_adapter(rows: List[Dict[str, Any]]) -> List[EvaluationRow]: "max_tokens": 131000, "extra_body": {"reasoning_effort": "low"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", - }, - { - "max_tokens": 131000, - "extra_body": {"reasoning_effort": "medium"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", - }, - { - "max_tokens": 131000, - "extra_body": {"reasoning_effort": "low"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", - }, - { - "max_tokens": 131000, - "extra_body": {"reasoning_effort": "medium"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", - }, + } ], rollout_processor=SingleTurnRolloutProcessor(), aggregation_method="mean", passed_threshold=0.8, - num_runs=1, - max_dataset_rows=1, + num_runs=8, + max_dataset_rows=2, max_concurrent_rollouts=4, mode="pointwise", ) diff --git a/eval_protocol/pytest/default_single_turn_rollout_process.py b/eval_protocol/pytest/default_single_turn_rollout_process.py index 0bc9b1c9..2b4bf893 100644 --- a/eval_protocol/pytest/default_single_turn_rollout_process.py +++ b/eval_protocol/pytest/default_single_turn_rollout_process.py @@ -5,6 +5,7 @@ from typing import List from litellm import acompletion +from typing import Dict from eval_protocol.dataset_logger import default_logger from eval_protocol.models import EvaluationRow, Message @@ -61,10 +62,15 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: if row.tools is not None: request_params["tools"] = row.tools + # Dynamic import to avoid static dependency/lint errors if LiteLLM isn't installed yet + import importlib + + _litellm = importlib.import_module("litellm") + acompletion = getattr(_litellm, "acompletion") response = await acompletion(**request_params) - assistant_content = response.choices[0].message.content or "" # pyright: ignore[reportAttributeAccessIssue] - tool_calls = response.choices[0].message.tool_calls if response.choices[0].message.tool_calls else None # pyright: ignore[reportAttributeAccessIssue] + assistant_content = response.choices[0].message.content or "" + tool_calls = response.choices[0].message.tool_calls if response.choices[0].message.tool_calls else None converted_tool_calls = None if tool_calls: @@ -106,9 +112,9 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: ] row.execution_metadata.usage = CompletionUsage( - prompt_tokens=response.usage.prompt_tokens, # pyright: ignore[reportAttributeAccessIssue] - completion_tokens=response.usage.completion_tokens, # pyright: ignore[reportAttributeAccessIssue] - total_tokens=response.usage.total_tokens, # pyright: ignore[reportAttributeAccessIssue] + prompt_tokens=response.usage.prompt_tokens, + completion_tokens=response.usage.completion_tokens, + total_tokens=response.usage.total_tokens, ) row.messages = messages diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index 465b7d28..aa26f780 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -130,7 +130,7 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: ) ) - return [expanded_rows[0]] + return expanded_rows async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judge_config, shared_client): From 824e0db0ccd367d42958161f1bb60f3bc804d57d Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 16:31:37 -0700 Subject: [PATCH 32/36] big run with kimi judge --- eval_protocol/quickstart/llm_judge.py | 31 ++++----------------------- eval_protocol/quickstart/utils.py | 8 +++++++ 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index b5ddd945..5725cbc8 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -21,23 +21,6 @@ ) import asyncio from openai import AsyncOpenAI -import litellm - - -@pytest.mark.asyncio -@pytest.mark.parametrize( - "model", - [ - "gpt-4o", - # "gpt-4o-mini", - # "gpt-3.5-turbo", - "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", - "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", - ], -) -async def test_multiple_models(model): - response = await litellm.acompletion(model=model, messages=[{"role": "user", "content": "Hello"}]) - assert response.choices[0].message.content # pyright: ignore[reportAttributeAccessIssue] @pytest.mark.asyncio @@ -45,7 +28,7 @@ async def test_multiple_models(model): input_rows=[ fetch_langfuse_traces_as_evaluation_rows( hours_back=24, - limit=1, + limit=40, page_size=10, sleep_between_gets=3.0, max_retries=5, @@ -55,7 +38,7 @@ async def test_multiple_models(model): # { # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", # }, - # {"model": "gpt-4.1"}, + {"model": "gpt-4.1"}, { "max_tokens": 131000, "extra_body": {"reasoning_effort": "medium"}, @@ -66,14 +49,9 @@ async def test_multiple_models(model): "extra_body": {"reasoning_effort": "low"}, "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, - { - "max_tokens": 131000, - "extra_body": {"reasoning_effort": "low"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", - }, ], rollout_processor=SingleTurnRolloutProcessor(), - # preprocess_fn=split_multi_turn_rows, + preprocess_fn=split_multi_turn_rows, max_concurrent_rollouts=64, mode="all", ) @@ -95,8 +73,7 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: Same rows with updated evaluation_result containing scores and judgments """ - # judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. - judge_name = "gpt-4.1" + judge_name = "kimi-k2-instruct-0905" # Edit to which judge you'd like to use. Configs are in utils.py. if not rows: print("❌ No evaluation rows provided") diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index aa26f780..d57c0478 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -49,6 +49,14 @@ "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/", "max_concurrency": 16, }, + "kimi-k2-instruct-0905": { + "model": "kimi-k2-instruct-0905", + "temperature": 0.6, # Kimi recommended temperature + "max_tokens": 131000, + "api_key": os.getenv("FIREWORKS_API_KEY"), + "base_url": "https://api.fireworks.ai/inference/v1", + "max_concurrency": 64, + }, } # Mapping from Arena-Hard-Auto judgment labels to numerical scores From b995a9fef9543c5fd1b9022062a41f262c4f1771 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 16:37:16 -0700 Subject: [PATCH 33/36] lol --- eval_protocol/adapters/langfuse.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index 6cfce17f..d7ccf4a5 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -95,14 +95,9 @@ def get_evaluation_rows( to_timestamp = None from_timestamp = None - # Temporary: Fetch more traces than needed, then randomly sample - fetch_limit = min(limit * 3, 500) # Fetch 3x more traces than needed (up to 500 max) - - logger.debug("Fetching %d traces to randomly sample %d", fetch_limit, limit) - # Single API call to get trace list traces = self.client.api.trace.list( - limit=fetch_limit, + limit=limit, tags=tags, user_id=user_id, session_id=session_id, From ae7211a006b92f8eda51df474783488c0e22fa11 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 12 Sep 2025 17:03:23 -0700 Subject: [PATCH 34/36] add timing filter --- eval_protocol/adapters/langfuse.py | 12 +++++++----- eval_protocol/mcp/execution/policy.py | 4 ++-- eval_protocol/quickstart/llm_judge.py | 3 ++- eval_protocol/quickstart/utils.py | 9 ++++++++- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index d7ccf4a5..b6655cf7 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -65,6 +65,8 @@ def get_evaluation_rows( user_id: Optional[str] = None, session_id: Optional[str] = None, hours_back: Optional[int] = None, + from_timestamp: Optional[datetime] = None, + to_timestamp: Optional[datetime] = None, include_tool_calls: bool = True, page_size: int = 30, sleep_between_gets: float = 0.1, @@ -78,6 +80,8 @@ def get_evaluation_rows( user_id: Filter by user ID session_id: Filter by session ID hours_back: Filter traces from this many hours ago + from_timestamp: Only include traces with timestamp >= this datetime + to_timestamp: Only include traces with timestamp <= this datetime include_tool_calls: Whether to include tool calling traces page_size: Number of traces to fetch per page (smaller = less rate limit issues) sleep_between_gets: Sleep time between individual trace.get() calls @@ -88,12 +92,10 @@ def get_evaluation_rows( """ eval_rows = [] - if hours_back: + # Determine time window: explicit from/to takes precedence + if from_timestamp is None and to_timestamp is None and hours_back: to_timestamp = datetime.now() from_timestamp = to_timestamp - timedelta(hours=hours_back) - else: - to_timestamp = None - from_timestamp = None # Single API call to get trace list traces = self.client.api.trace.list( @@ -155,7 +157,7 @@ def get_evaluation_rows( logger.warning("Failed to convert trace %s: %s", trace_info.id, e) continue - logger.info("Successfully processed %d traces into %d evaluation rows", len(selected_traces), len(eval_rows)) + logger.info("Successfully processed %d traces into evaluation rows", len(selected_traces)) return eval_rows def get_evaluation_rows_by_ids( diff --git a/eval_protocol/mcp/execution/policy.py b/eval_protocol/mcp/execution/policy.py index 0916e933..56de4f82 100644 --- a/eval_protocol/mcp/execution/policy.py +++ b/eval_protocol/mcp/execution/policy.py @@ -197,8 +197,8 @@ async def _make_llm_call(self, messages: List[Dict[str, Any]], tools: List[Dict[ response = await acompletion( model=self.model_id, **request_params, - api_base="https://litellm-cloud-proxy-prod-zfdbl7ykrq-uc.a.run.app/v1", - extra_body={"tags": ["kimi-k2-tau-bench"]}, + # api_base="https://litellm-cloud-proxy-prod-zfdbl7ykrq-uc.a.run.app/v1", + # extra_body={"tags": ["kimi-k2-tau-bench"]}, ) # Log cache hit/miss for monitoring diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 5725cbc8..331fcb7c 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -3,6 +3,7 @@ """ import os +from datetime import datetime from typing import List, Dict, Any, Optional from tqdm import tqdm @@ -27,7 +28,7 @@ @evaluation_test( input_rows=[ fetch_langfuse_traces_as_evaluation_rows( - hours_back=24, + to_timestamp=datetime(2025, 9, 12, 0, 11, 18), limit=40, page_size=10, sleep_between_gets=3.0, diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index d57c0478..589dd9ba 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -3,6 +3,7 @@ """ import os +from datetime import datetime import re from typing import List, Dict, Any, Optional import pandas as pd @@ -50,7 +51,7 @@ "max_concurrency": 16, }, "kimi-k2-instruct-0905": { - "model": "kimi-k2-instruct-0905", + "model": "accounts/fireworks/models/kimi-k2-instruct-0905", "temperature": 0.6, # Kimi recommended temperature "max_tokens": 131000, "api_key": os.getenv("FIREWORKS_API_KEY"), @@ -230,6 +231,8 @@ def fetch_langfuse_traces_as_evaluation_rows( user_id: Optional[str] = None, session_id: Optional[str] = None, hours_back: Optional[int] = None, + from_timestamp: Optional[datetime] = None, + to_timestamp: Optional[datetime] = None, include_tool_calls: bool = True, page_size: int = 30, sleep_between_gets: float = 0.1, @@ -244,6 +247,8 @@ def fetch_langfuse_traces_as_evaluation_rows( user_id: Filter traces by user ID session_id: Filter traces by session ID hours_back: Only fetch traces from the last N hours + from_timestamp: Only include traces with timestamp >= this datetime + to_timestamp: Only include traces with timestamp <= this datetime include_tool_calls: Whether to include tool calls in messages page_size: Number of traces to fetch per page (smaller = less rate limit issues) sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit) @@ -262,6 +267,8 @@ def fetch_langfuse_traces_as_evaluation_rows( user_id=user_id, session_id=session_id, hours_back=hours_back, + from_timestamp=from_timestamp, + to_timestamp=to_timestamp, include_tool_calls=include_tool_calls, page_size=page_size, sleep_between_gets=sleep_between_gets, From 0dc82c7f40406bc08e79a5d5229d1777b47eabf9 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Sat, 13 Sep 2025 23:53:37 -0700 Subject: [PATCH 35/36] unique traces --- eval_protocol/adapters/langfuse.py | 2 +- eval_protocol/quickstart/llm_judge.py | 3 --- eval_protocol/quickstart/utils.py | 7 +++++++ 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index b6655cf7..f8b593e0 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -68,7 +68,7 @@ def get_evaluation_rows( from_timestamp: Optional[datetime] = None, to_timestamp: Optional[datetime] = None, include_tool_calls: bool = True, - page_size: int = 30, + page_size: int = 30, # TODO: remove probably sleep_between_gets: float = 0.1, max_retries: int = 3, ) -> List[EvaluationRow]: diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 331fcb7c..ad95ee3a 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -36,9 +36,6 @@ ) ], completion_params=[ - # { - # "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", - # }, {"model": "gpt-4.1"}, { "max_tokens": 131000, diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index 589dd9ba..b90fca88 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -112,6 +112,7 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: List of expanded EvaluationRow objects, one for each assistant message """ expanded_rows = [] + seen_traces: set[str] = set() for row in data: messages = row.messages @@ -128,6 +129,12 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: messages_before_assistant = messages[:pos] assistant_message = messages[pos] + # In this case, we trace every request, so we need to filter out duplicates + curr_trace = "\n".join(serialize_message(m) for m in messages_before_assistant) + if curr_trace in seen_traces: + continue + seen_traces.add(curr_trace) + ground_truth_message = serialize_message(assistant_message) expanded_rows.append( From 7472db41ed167a81adb2d87bdbd9bca67a69637f Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Sun, 14 Sep 2025 21:35:24 -0700 Subject: [PATCH 36/36] update adapter --- eval_protocol/adapters/langfuse.py | 105 +++++++++++++++++++------- eval_protocol/quickstart/llm_judge.py | 22 +++--- eval_protocol/quickstart/utils.py | 64 +++------------- 3 files changed, 99 insertions(+), 92 deletions(-) diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index f8b593e0..d9dc0c66 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -61,6 +61,7 @@ def __init__(self): def get_evaluation_rows( self, limit: int = 100, + sample_size: int = 50, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, @@ -68,23 +69,22 @@ def get_evaluation_rows( from_timestamp: Optional[datetime] = None, to_timestamp: Optional[datetime] = None, include_tool_calls: bool = True, - page_size: int = 30, # TODO: remove probably - sleep_between_gets: float = 0.1, + sleep_between_gets: float = 2.5, max_retries: int = 3, ) -> List[EvaluationRow]: """Pull traces from Langfuse and convert to EvaluationRow format. Args: - limit: Maximum number of rows to return + limit: Max number of trace summaries to collect via pagination (pre-sampling) + sample_size: Number of traces to fetch full details for (sampled from collected summaries) tags: Filter by specific tags user_id: Filter by user ID session_id: Filter by session ID hours_back: Filter traces from this many hours ago - from_timestamp: Only include traces with timestamp >= this datetime - to_timestamp: Only include traces with timestamp <= this datetime + from_timestamp: Explicit start time (overrides hours_back) + to_timestamp: Explicit end time (overrides hours_back) include_tool_calls: Whether to include tool calling traces - page_size: Number of traces to fetch per page (smaller = less rate limit issues) - sleep_between_gets: Sleep time between individual trace.get() calls + sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit) max_retries: Maximum retries for rate limit errors Returns: @@ -92,37 +92,86 @@ def get_evaluation_rows( """ eval_rows = [] - # Determine time window: explicit from/to takes precedence + # Determine time window: explicit from/to takes precedence over hours_back if from_timestamp is None and to_timestamp is None and hours_back: to_timestamp = datetime.now() from_timestamp = to_timestamp - timedelta(hours=hours_back) - # Single API call to get trace list - traces = self.client.api.trace.list( - limit=limit, - tags=tags, - user_id=user_id, - session_id=session_id, - from_timestamp=from_timestamp, - to_timestamp=to_timestamp, - order_by="timestamp.desc", - ) + # Collect trace summaries via pagination (up to limit) + all_traces = [] + page = 1 + collected = 0 + + while collected < limit: + current_page_limit = min(100, limit - collected) # Langfuse API max is 100 + + logger.debug( + "Fetching page %d with limit %d (collected: %d/%d)", page, current_page_limit, collected, limit + ) - if not traces or not traces.data: + # Fetch trace list with retry logic + traces = None + list_retries = 0 + while list_retries < max_retries: + try: + traces = self.client.api.trace.list( + page=page, + limit=current_page_limit, + tags=tags, + user_id=user_id, + session_id=session_id, + from_timestamp=from_timestamp, + to_timestamp=to_timestamp, + order_by="timestamp.desc", + ) + break + except Exception as e: + list_retries += 1 + if "429" in str(e) and list_retries < max_retries: + sleep_time = 2**list_retries # Exponential backoff + logger.warning( + "Rate limit hit on trace.list(), retrying in %ds (attempt %d/%d)", + sleep_time, + list_retries, + max_retries, + ) + time.sleep(sleep_time) + else: + logger.error("Failed to fetch trace list after %d retries: %s", max_retries, e) + return eval_rows # Return what we have so far + + if not traces or not traces.data: + logger.debug("No more traces found on page %d", page) + break + + logger.debug("Collected %d traces from page %d", len(traces.data), page) + + all_traces.extend(traces.data) + collected += len(traces.data) + + # Check if we have more pages + if hasattr(traces.meta, "page") and hasattr(traces.meta, "total_pages"): + if traces.meta.page >= traces.meta.total_pages: + break + elif len(traces.data) < current_page_limit: + break + + page += 1 + + if not all_traces: logger.debug("No traces found") return eval_rows - # Randomly sample the requested number of traces - available_traces = traces.data - sample_size = min(limit, len(available_traces)) - selected_traces = random.sample(available_traces, sample_size) + # Randomly sample traces to fetch full details (respect rate limits) + actual_sample_size = min(sample_size, len(all_traces)) + selected_traces = random.sample(all_traces, actual_sample_size) - logger.debug("Randomly selected %d traces from %d available", len(selected_traces), len(available_traces)) + logger.debug("Randomly selected %d traces from %d collected", actual_sample_size, len(all_traces)) # Process each selected trace with sleep and retry logic - for i, trace_info in enumerate(selected_traces): + for trace_info in selected_traces: # Sleep between gets to avoid rate limits - if sleep_between_gets > 0 and i > 0: + if sleep_between_gets > 0: time.sleep(sleep_between_gets) # Fetch full trace details with retry logic @@ -157,7 +206,9 @@ def get_evaluation_rows( logger.warning("Failed to convert trace %s: %s", trace_info.id, e) continue - logger.info("Successfully processed %d traces into evaluation rows", len(selected_traces)) + logger.info( + "Successfully processed %d selected traces into %d evaluation rows", len(selected_traces), len(eval_rows) + ) return eval_rows def get_evaluation_rows_by_ids( diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index ad95ee3a..37eca788 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -15,22 +15,24 @@ from eval_protocol.quickstart.utils import ( split_multi_turn_rows, JUDGE_CONFIGS, - fetch_langfuse_traces_as_evaluation_rows, calculate_bootstrap_scores, push_scores_to_langfuse, - run_judgment_async_with_shared_client, + run_judgment_async, ) import asyncio from openai import AsyncOpenAI +from eval_protocol.adapters.langfuse import create_langfuse_adapter + +adapter = create_langfuse_adapter() @pytest.mark.asyncio @evaluation_test( input_rows=[ - fetch_langfuse_traces_as_evaluation_rows( + adapter.get_evaluation_rows( to_timestamp=datetime(2025, 9, 12, 0, 11, 18), - limit=40, - page_size=10, + limit=711, + sample_size=50, sleep_between_gets=3.0, max_retries=5, ) @@ -71,7 +73,7 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: Same rows with updated evaluation_result containing scores and judgments """ - judge_name = "kimi-k2-instruct-0905" # Edit to which judge you'd like to use. Configs are in utils.py. + judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py. if not rows: print("❌ No evaluation rows provided") @@ -91,11 +93,11 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: ) as shared_client: semaphore = asyncio.Semaphore(max_concurrency) - async def run_judgment_with_semaphore(row): + async def run_judgment(row): async with semaphore: - return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client) + return await run_judgment_async(row, model_name, judge_name, shared_client) - tasks = [run_judgment_with_semaphore(row) for row in rows] + tasks = [run_judgment(row) for row in rows] for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): result = await coro @@ -131,6 +133,6 @@ async def run_judgment_with_semaphore(row): ) # Standard error approximation from 90% CI # Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace. - # push_scores_to_langfuse(rows, model_name, mean_score) + push_scores_to_langfuse(rows, model_name, mean_score) return rows diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index b90fca88..fc7de3ef 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -50,6 +50,14 @@ "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/", "max_concurrency": 16, }, + "gemini-2.5-flash": { + "model": "gemini-2.5-flash", + "temperature": 1.0, + "max_tokens": 32000, + "api_key": os.getenv("GEMINI_API_KEY"), + "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/", + "max_concurrency": 16, + }, "kimi-k2-instruct-0905": { "model": "accounts/fireworks/models/kimi-k2-instruct-0905", "temperature": 0.6, # Kimi recommended temperature @@ -195,7 +203,7 @@ async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judg return {"score": score, "judgment": judgment_text, "prompt": messages} -async def run_judgment_async_with_shared_client( +async def run_judgment_async( row: EvaluationRow, model_name: str, judge_name: str, shared_client ) -> Optional[Dict[str, Any]]: """Async judgment using shared client to avoid cleanup issues.""" @@ -232,60 +240,6 @@ async def run_judgment_async_with_shared_client( return {"model": model_name, "games": games} -def fetch_langfuse_traces_as_evaluation_rows( - limit: int = 100, - tags: Optional[List[str]] = None, - user_id: Optional[str] = None, - session_id: Optional[str] = None, - hours_back: Optional[int] = None, - from_timestamp: Optional[datetime] = None, - to_timestamp: Optional[datetime] = None, - include_tool_calls: bool = True, - page_size: int = 30, - sleep_between_gets: float = 0.1, - max_retries: int = 3, -) -> List[EvaluationRow]: - """ - Fetch Langfuse traces and convert them to EvaluationRow objects. - - Args: - limit: Maximum number of traces to fetch - tags: Filter traces by tags - user_id: Filter traces by user ID - session_id: Filter traces by session ID - hours_back: Only fetch traces from the last N hours - from_timestamp: Only include traces with timestamp >= this datetime - to_timestamp: Only include traces with timestamp <= this datetime - include_tool_calls: Whether to include tool calls in messages - page_size: Number of traces to fetch per page (smaller = less rate limit issues) - sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit) - max_retries: Maximum retries for rate limit errors - - Returns: - List of EvaluationRow objects converted from Langfuse traces - """ - try: - from eval_protocol.adapters.langfuse import create_langfuse_adapter - - adapter = create_langfuse_adapter() - return adapter.get_evaluation_rows( - limit=limit, - tags=tags, - user_id=user_id, - session_id=session_id, - hours_back=hours_back, - from_timestamp=from_timestamp, - to_timestamp=to_timestamp, - include_tool_calls=include_tool_calls, - page_size=page_size, - sleep_between_gets=sleep_between_gets, - max_retries=max_retries, - ) - except Exception as e: - print(f"❌ LangfuseAdapter failed: {e}") - return [] - - def calculate_bootstrap_scores(judgments: List[Dict[str, Any]]) -> tuple[float, float, float]: """ Calculate bootstrap confidence intervals for Arena-Hard-Auto style judgments.