diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index f3e82329..d9dc0c66 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -6,6 +6,8 @@ from langfuse.api.resources.commons.types.observations_view import ObservationsView import logging +import random +import time from datetime import datetime, timedelta from typing import Any, Dict, Iterator, List, Optional, cast @@ -59,54 +61,154 @@ def __init__(self): def get_evaluation_rows( self, limit: int = 100, + sample_size: int = 50, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, hours_back: Optional[int] = None, + from_timestamp: Optional[datetime] = None, + to_timestamp: Optional[datetime] = None, include_tool_calls: bool = True, + sleep_between_gets: float = 2.5, + max_retries: int = 3, ) -> List[EvaluationRow]: """Pull traces from Langfuse and convert to EvaluationRow format. Args: - limit: Maximum number of rows to return + limit: Max number of trace summaries to collect via pagination (pre-sampling) + sample_size: Number of traces to fetch full details for (sampled from collected summaries) tags: Filter by specific tags user_id: Filter by user ID session_id: Filter by session ID hours_back: Filter traces from this many hours ago + from_timestamp: Explicit start time (overrides hours_back) + to_timestamp: Explicit end time (overrides hours_back) include_tool_calls: Whether to include tool calling traces + sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit) + max_retries: Maximum retries for rate limit errors - Yields: - EvaluationRow: Converted evaluation rows + Returns: + List[EvaluationRow]: Converted evaluation rows """ - # Get traces from Langfuse using new API + eval_rows = [] - if hours_back: + # Determine time window: explicit from/to takes precedence over hours_back + if from_timestamp is None and to_timestamp is None and hours_back: to_timestamp = datetime.now() from_timestamp = to_timestamp - timedelta(hours=hours_back) - else: - to_timestamp = None - from_timestamp = None - eval_rows = [] + # Collect trace summaries via pagination (up to limit) + all_traces = [] + page = 1 + collected = 0 - traces: Traces = self.client.api.trace.list( - limit=limit, - tags=tags, - user_id=user_id, - session_id=session_id, - from_timestamp=from_timestamp, - to_timestamp=to_timestamp, - ) + while collected < limit: + current_page_limit = min(100, limit - collected) # Langfuse API max is 100 - for trace in traces.data: - try: - trace: TraceWithFullDetails = self.client.api.trace.get(trace.id) - eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls) - if eval_row: - eval_rows.append(eval_row) - except (AttributeError, ValueError, KeyError) as e: - logger.warning("Failed to convert trace %s: %s", trace.id, e) - continue + logger.debug( + "Fetching page %d with limit %d (collected: %d/%d)", page, current_page_limit, collected, limit + ) + + # Fetch trace list with retry logic + traces = None + list_retries = 0 + while list_retries < max_retries: + try: + traces = self.client.api.trace.list( + page=page, + limit=current_page_limit, + tags=tags, + user_id=user_id, + session_id=session_id, + from_timestamp=from_timestamp, + to_timestamp=to_timestamp, + order_by="timestamp.desc", + ) + break + except Exception as e: + list_retries += 1 + if "429" in str(e) and list_retries < max_retries: + sleep_time = 2**list_retries # Exponential backoff + logger.warning( + "Rate limit hit on trace.list(), retrying in %ds (attempt %d/%d)", + sleep_time, + list_retries, + max_retries, + ) + time.sleep(sleep_time) + else: + logger.error("Failed to fetch trace list after %d retries: %s", max_retries, e) + return eval_rows # Return what we have so far + + if not traces or not traces.data: + logger.debug("No more traces found on page %d", page) + break + + logger.debug("Collected %d traces from page %d", len(traces.data), page) + + all_traces.extend(traces.data) + collected += len(traces.data) + + # Check if we have more pages + if hasattr(traces.meta, "page") and hasattr(traces.meta, "total_pages"): + if traces.meta.page >= traces.meta.total_pages: + break + elif len(traces.data) < current_page_limit: + break + + page += 1 + + if not all_traces: + logger.debug("No traces found") + return eval_rows + + # Randomly sample traces to fetch full details (respect rate limits) + actual_sample_size = min(sample_size, len(all_traces)) + selected_traces = random.sample(all_traces, actual_sample_size) + + logger.debug("Randomly selected %d traces from %d collected", actual_sample_size, len(all_traces)) + + # Process each selected trace with sleep and retry logic + for trace_info in selected_traces: + # Sleep between gets to avoid rate limits + if sleep_between_gets > 0: + time.sleep(sleep_between_gets) + + # Fetch full trace details with retry logic + trace_full = None + detail_retries = 0 + while detail_retries < max_retries: + try: + trace_full = self.client.api.trace.get(trace_info.id) + break + except Exception as e: + detail_retries += 1 + if "429" in str(e) and detail_retries < max_retries: + sleep_time = 2**detail_retries # Exponential backoff + logger.warning( + "Rate limit hit on trace.get(%s), retrying in %ds (attempt %d/%d)", + trace_info.id, + sleep_time, + detail_retries, + max_retries, + ) + time.sleep(sleep_time) + else: + logger.warning("Failed to fetch trace %s after %d retries: %s", trace_info.id, max_retries, e) + break # Skip this trace + + if trace_full: + try: + eval_row = self._convert_trace_to_evaluation_row(trace_full, include_tool_calls) + if eval_row: + eval_rows.append(eval_row) + except (AttributeError, ValueError, KeyError) as e: + logger.warning("Failed to convert trace %s: %s", trace_info.id, e) + continue + + logger.info( + "Successfully processed %d selected traces into %d evaluation rows", len(selected_traces), len(eval_rows) + ) return eval_rows def get_evaluation_rows_by_ids( diff --git a/eval_protocol/mcp/execution/policy.py b/eval_protocol/mcp/execution/policy.py index 1adb9b95..56de4f82 100644 --- a/eval_protocol/mcp/execution/policy.py +++ b/eval_protocol/mcp/execution/policy.py @@ -194,7 +194,12 @@ async def _make_llm_call(self, messages: List[Dict[str, Any]], tools: List[Dict[ request_params["tools"] = tools try: - response = await acompletion(model=self.model_id, **request_params) + response = await acompletion( + model=self.model_id, + **request_params, + # api_base="https://litellm-cloud-proxy-prod-zfdbl7ykrq-uc.a.run.app/v1", + # extra_body={"tags": ["kimi-k2-tau-bench"]}, + ) # Log cache hit/miss for monitoring hidden = getattr(response, "_hidden_params", {}) diff --git a/eval_protocol/quickstart/llm_judge.py b/eval_protocol/quickstart/llm_judge.py index 28d40366..37eca788 100644 --- a/eval_protocol/quickstart/llm_judge.py +++ b/eval_protocol/quickstart/llm_judge.py @@ -3,6 +3,7 @@ """ import os +from datetime import datetime from typing import List, Dict, Any, Optional from tqdm import tqdm @@ -14,32 +15,44 @@ from eval_protocol.quickstart.utils import ( split_multi_turn_rows, JUDGE_CONFIGS, - fetch_langfuse_traces_as_evaluation_rows, calculate_bootstrap_scores, push_scores_to_langfuse, - run_judgment, + run_judgment_async, ) +import asyncio +from openai import AsyncOpenAI +from eval_protocol.adapters.langfuse import create_langfuse_adapter -import concurrent.futures -from concurrent.futures import ThreadPoolExecutor +adapter = create_langfuse_adapter() -@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI") @pytest.mark.asyncio @evaluation_test( - input_rows=[fetch_langfuse_traces_as_evaluation_rows()], + input_rows=[ + adapter.get_evaluation_rows( + to_timestamp=datetime(2025, 9, 12, 0, 11, 18), + limit=711, + sample_size=50, + sleep_between_gets=3.0, + max_retries=5, + ) + ], completion_params=[ + {"model": "gpt-4.1"}, { - "model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507", + "max_tokens": 131000, + "extra_body": {"reasoning_effort": "medium"}, + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", }, { "max_tokens": 131000, "extra_body": {"reasoning_effort": "low"}, - "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b", + "model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b", }, ], rollout_processor=SingleTurnRolloutProcessor(), preprocess_fn=split_multi_turn_rows, + max_concurrent_rollouts=64, mode="all", ) async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: @@ -73,11 +86,21 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]: judgments = [] max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"] - with ThreadPoolExecutor(max_workers=max_concurrency) as executor: - futures = [executor.submit(run_judgment, row, model_name, judge_name) for row in rows] + judge_config = JUDGE_CONFIGS[judge_name] + + async with AsyncOpenAI( + api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url") + ) as shared_client: + semaphore = asyncio.Semaphore(max_concurrency) + + async def run_judgment(row): + async with semaphore: + return await run_judgment_async(row, model_name, judge_name, shared_client) + + tasks = [run_judgment(row) for row in rows] - for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Generating judgments"): - result = future.result() + for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"): + result = await coro if result and result["games"][0] and result["games"][1]: judgments.append(result) diff --git a/eval_protocol/quickstart/utils.py b/eval_protocol/quickstart/utils.py index 46186819..fc7de3ef 100644 --- a/eval_protocol/quickstart/utils.py +++ b/eval_protocol/quickstart/utils.py @@ -3,11 +3,13 @@ """ import os +from datetime import datetime import re from typing import List, Dict, Any, Optional import pandas as pd from eval_protocol.models import EvaluationRow, Message, EvaluateResult, MetricResult +import asyncio OG_ARENA_HARD_PROMPT = """Please act as an impartial judge and evaluate the quality of the responses provided by two AI assistants to the user prompt displayed below. You will be given assistant A's answer and assistant B's answer. Your job is to evaluate which assistant's answer is better. @@ -46,7 +48,23 @@ "max_tokens": 32000, "api_key": os.getenv("GEMINI_API_KEY"), "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/", - "max_concurrency": 32, + "max_concurrency": 16, + }, + "gemini-2.5-flash": { + "model": "gemini-2.5-flash", + "temperature": 1.0, + "max_tokens": 32000, + "api_key": os.getenv("GEMINI_API_KEY"), + "base_url": "https://generativelanguage.googleapis.com/v1beta/openai/", + "max_concurrency": 16, + }, + "kimi-k2-instruct-0905": { + "model": "accounts/fireworks/models/kimi-k2-instruct-0905", + "temperature": 0.6, # Kimi recommended temperature + "max_tokens": 131000, + "api_key": os.getenv("FIREWORKS_API_KEY"), + "base_url": "https://api.fireworks.ai/inference/v1", + "max_concurrency": 64, }, } @@ -102,6 +120,7 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: List of expanded EvaluationRow objects, one for each assistant message """ expanded_rows = [] + seen_traces: set[str] = set() for row in data: messages = row.messages @@ -118,6 +137,12 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: messages_before_assistant = messages[:pos] assistant_message = messages[pos] + # In this case, we trace every request, so we need to filter out duplicates + curr_trace = "\n".join(serialize_message(m) for m in messages_before_assistant) + if curr_trace in seen_traces: + continue + seen_traces.add(curr_trace) + ground_truth_message = serialize_message(assistant_message) expanded_rows.append( @@ -132,8 +157,8 @@ def split_multi_turn_rows(data: list[EvaluationRow]) -> list[EvaluationRow]: return expanded_rows -def pairwise_judgment(question_text, answer_a, answer_b, tools, judge_config): - """Pairwise judgment function. Adapted from arena-hard-auto/gen_judgment.py""" +async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judge_config, shared_client): + """Async pairwise judgment using a shared client.""" user_prompt = f"""<|User Prompt|> {question_text} @@ -143,39 +168,29 @@ def pairwise_judgment(question_text, answer_a, answer_b, tools, judge_config): <|The Start of Assistant B's Answer|> {answer_b} -<|The End of Assistant B's Answer|>""" +<|The End of Assistant B's Answer|> - messages = [ - { - "role": "system", - "content": OG_ARENA_HARD_PROMPT, - }, - { - "role": "user", - "content": user_prompt, - }, - ] +<|Available Tools|> +{tools} +<|End of Available Tools|> - try: - from openai import OpenAI +{OG_ARENA_HARD_PROMPT}""" - client = OpenAI(api_key=judge_config["api_key"], base_url=judge_config["base_url"]) + messages = [{"role": "user", "content": user_prompt}] + try: api_params = { "model": judge_config["model"], - "messages": messages, # type: ignore + "messages": messages, "temperature": judge_config["temperature"], "max_tokens": judge_config["max_tokens"], } if tools: api_params["tools"] = tools - api_params["tool_choice"] = ( - "none" # Judge can see tools to help in response, but won't actually try to call them - ) - - response = client.chat.completions.create(**api_params) + api_params["tool_choice"] = "none" + response = await shared_client.chat.completions.create(**api_params) judgment_text = response.choices[0].message.content if not judgment_text: return None @@ -185,52 +200,44 @@ def pairwise_judgment(question_text, answer_a, answer_b, tools, judge_config): return None score = get_score(judgment_text, [r"\[\[([AB<>=]+)\]\]", r"\[([AB<>=]+)\]"]) + return {"score": score, "judgment": judgment_text, "prompt": messages} - result = { - "score": score, - "judgment": judgment_text, - "prompt": messages, - } - return result - - -def fetch_langfuse_traces_as_evaluation_rows( - limit: int = 100, - tags: Optional[List[str]] = None, - user_id: Optional[str] = None, - session_id: Optional[str] = None, - hours_back: Optional[int] = None, - include_tool_calls: bool = True, -) -> List[EvaluationRow]: - """ - Fetch Langfuse traces and convert them to EvaluationRow objects. - Args: - limit: Maximum number of traces to fetch - tags: Filter traces by tags - user_id: Filter traces by user ID - session_id: Filter traces by session ID - hours_back: Only fetch traces from the last N hours - include_tool_calls: Whether to include tool calls in messages +async def run_judgment_async( + row: EvaluationRow, model_name: str, judge_name: str, shared_client +) -> Optional[Dict[str, Any]]: + """Async judgment using shared client to avoid cleanup issues.""" + if not row.messages: + return None - Returns: - List of EvaluationRow objects converted from Langfuse traces - """ - try: - from eval_protocol.adapters.langfuse import create_langfuse_adapter + question_text = "\n".join([serialize_message(msg) for msg in row.messages[:-1]]) + model_a_answer = row.ground_truth + model_b_answer = serialize_message(row.messages[-1]) - adapter = create_langfuse_adapter() - return adapter.get_evaluation_rows( - limit=limit, - tags=tags, - user_id=user_id, - session_id=session_id, - hours_back=hours_back, - include_tool_calls=include_tool_calls, - ) - except Exception as e: - print(f"❌ LangfuseAdapter failed: {e}") - return [] + # Run both rounds concurrently with shared client + result1, result2 = await asyncio.gather( + pairwise_judgment_async( + question_text, model_a_answer, model_b_answer, row.tools, JUDGE_CONFIGS[judge_name], shared_client + ), + pairwise_judgment_async( + question_text, model_b_answer, model_a_answer, row.tools, JUDGE_CONFIGS[judge_name], shared_client + ), + ) + + games = [result1, result2] + + row.evaluation_result = EvaluateResult( + score=0.0, + reason=f"LLM Judge comparison: Round 1: {result1['score']}, Round 2: {result2['score']}" + if result1 and result2 + else "Failed to get judgement scores", + metrics={ + "round1_judgment": MetricResult(score=0.0, reason=result1["judgment"] if result1 else "Failed"), + "round2_judgment": MetricResult(score=0.0, reason=result2["judgment"] if result2 else "Failed"), + }, + ) + + return {"model": model_name, "games": games} def calculate_bootstrap_scores(judgments: List[Dict[str, Any]]) -> tuple[float, float, float]: @@ -275,72 +282,6 @@ def calculate_bootstrap_scores(judgments: List[Dict[str, Any]]) -> tuple[float, return mean_score, lower_score, upper_score -def run_judgment(row: EvaluationRow, model_name: str, judge_name: str) -> Optional[Dict[str, Any]]: - """ - Run Arena-Hard-Auto style pairwise judgment for a single evaluation row. - - Performs two rounds of judgment (A vs B, B vs A) to reduce position bias: - - Round 1: ground_truth (original) vs messages[-1] (new model response) - - Round 2: messages[-1] (new model response) vs ground_truth (original) - - Updates the row's evaluation_result with judgment details and returns results - for aggregation across the dataset. - - Args: - row: EvaluationRow containing messages, ground_truth, and tools - model_name: Name of the model being evaluated (for result tracking) - judge_name: Key from JUDGE_CONFIGS to use for judgment - - Returns: - Dict with "model" and "games" keys, or None if row has no messages - """ - if not row.messages: - return None - - question_text = "\n".join([serialize_message(msg) for msg in row.messages[:-1]]) - model_a_answer = row.ground_truth - model_b_answer = serialize_message(row.messages[-1]) - - games = [] - - # Round 1: A vs B (original vs comparison) - result1 = pairwise_judgment( - question_text=question_text, - answer_a=model_a_answer, - answer_b=model_b_answer, - tools=row.tools, - judge_config=JUDGE_CONFIGS[judge_name], - ) - games.append(result1) - - # Round 2: B vs A (comparison vs original) - result2 = pairwise_judgment( - question_text=question_text, - answer_a=model_b_answer, - answer_b=model_a_answer, - tools=row.tools, - judge_config=JUDGE_CONFIGS[judge_name], - ) - games.append(result2) - - row.evaluation_result = EvaluateResult( - score=0.0, - reason=f"LLM Judge comparison: Round 1: {result1['score']}, Round 2: {result2['score']}" - if result1 and result2 - else "Failed to get judgement scores", - metrics={ - "round1_judgment": MetricResult( - score=0.0, reason=result1["judgment"] if result1 else "Failed to get judgment reason" - ), - "round2_judgment": MetricResult( - score=0.0, reason=result2["judgment"] if result2 else "Failed to get judgment reason" - ), - }, - ) - - return {"model": model_name, "games": games} - - def push_scores_to_langfuse(rows: List[EvaluationRow], model_name: str, mean_score: float) -> None: """ Push evaluation scores back to Langfuse traces for tracking and analysis.