Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
5428de6
fix langfuse rate limit issue
xzrderek Sep 12, 2025
5ccaa48
to revert later, get 50 random traces to query
xzrderek Sep 12, 2025
2b83d60
don't skip
xzrderek Sep 12, 2025
cafd9f7
make judgment async
xzrderek Sep 12, 2025
494b5c9
bump limit up
xzrderek Sep 12, 2025
e70327b
lower concurrency for gemini
xzrderek Sep 12, 2025
d845a4b
small limit to see if we get the error still
xzrderek Sep 12, 2025
0a7b1c5
test
xzrderek Sep 12, 2025
c68e38c
test
xzrderek Sep 12, 2025
b58d491
try this
xzrderek Sep 12, 2025
a59036e
fix
xzrderek Sep 12, 2025
173b65b
fix
xzrderek Sep 12, 2025
96b0d34
no split
xzrderek Sep 12, 2025
549b396
ok wtf
xzrderek Sep 12, 2025
d9ea133
try something else
xzrderek Sep 12, 2025
f357065
test
xzrderek Sep 12, 2025
088dea6
1 run
xzrderek Sep 12, 2025
009b0bb
same as aime now
xzrderek Sep 12, 2025
8c62b6b
try osmething else
xzrderek Sep 12, 2025
466581c
remove gpt
xzrderek Sep 12, 2025
620611f
gpt
xzrderek Sep 12, 2025
c5d17e4
try to mute and see what happens
xzrderek Sep 12, 2025
ce04620
monkey patch
xzrderek Sep 12, 2025
c1b0516
try
xzrderek Sep 12, 2025
a26cf02
broken still
xzrderek Sep 12, 2025
d5f3b81
how about 2 and 4
xzrderek Sep 12, 2025
ada2ef2
fix single turn rollout acompletion
xzrderek Sep 12, 2025
4c6a80c
add back
xzrderek Sep 12, 2025
9aff19c
test repro
xzrderek Sep 12, 2025
ba6ff32
add
xzrderek Sep 12, 2025
21fdb2b
undo weird changes i made
xzrderek Sep 12, 2025
824e0db
big run with kimi judge
xzrderek Sep 12, 2025
b995a9f
lol
xzrderek Sep 12, 2025
ae7211a
add timing filter
xzrderek Sep 13, 2025
0dc82c7
unique traces
xzrderek Sep 14, 2025
7472db4
update adapter
xzrderek Sep 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 128 additions & 26 deletions eval_protocol/adapters/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

from langfuse.api.resources.commons.types.observations_view import ObservationsView
import logging
import random
import time
from datetime import datetime, timedelta
from typing import Any, Dict, Iterator, List, Optional, cast

Expand Down Expand Up @@ -59,54 +61,154 @@ def __init__(self):
def get_evaluation_rows(
self,
limit: int = 100,
sample_size: int = 50,
tags: Optional[List[str]] = None,
user_id: Optional[str] = None,
session_id: Optional[str] = None,
hours_back: Optional[int] = None,
from_timestamp: Optional[datetime] = None,
to_timestamp: Optional[datetime] = None,
include_tool_calls: bool = True,
sleep_between_gets: float = 2.5,
max_retries: int = 3,
) -> List[EvaluationRow]:
"""Pull traces from Langfuse and convert to EvaluationRow format.

Args:
limit: Maximum number of rows to return
limit: Max number of trace summaries to collect via pagination (pre-sampling)
sample_size: Number of traces to fetch full details for (sampled from collected summaries)
tags: Filter by specific tags
user_id: Filter by user ID
session_id: Filter by session ID
hours_back: Filter traces from this many hours ago
from_timestamp: Explicit start time (overrides hours_back)
to_timestamp: Explicit end time (overrides hours_back)
include_tool_calls: Whether to include tool calling traces
sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit)
max_retries: Maximum retries for rate limit errors

Yields:
EvaluationRow: Converted evaluation rows
Returns:
List[EvaluationRow]: Converted evaluation rows
"""
# Get traces from Langfuse using new API
eval_rows = []

if hours_back:
# Determine time window: explicit from/to takes precedence over hours_back
if from_timestamp is None and to_timestamp is None and hours_back:
to_timestamp = datetime.now()
from_timestamp = to_timestamp - timedelta(hours=hours_back)
else:
to_timestamp = None
from_timestamp = None

eval_rows = []
# Collect trace summaries via pagination (up to limit)
all_traces = []
page = 1
collected = 0

traces: Traces = self.client.api.trace.list(
limit=limit,
tags=tags,
user_id=user_id,
session_id=session_id,
from_timestamp=from_timestamp,
to_timestamp=to_timestamp,
)
while collected < limit:
current_page_limit = min(100, limit - collected) # Langfuse API max is 100

for trace in traces.data:
try:
trace: TraceWithFullDetails = self.client.api.trace.get(trace.id)
eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls)
if eval_row:
eval_rows.append(eval_row)
except (AttributeError, ValueError, KeyError) as e:
logger.warning("Failed to convert trace %s: %s", trace.id, e)
continue
logger.debug(
"Fetching page %d with limit %d (collected: %d/%d)", page, current_page_limit, collected, limit
)

# Fetch trace list with retry logic
traces = None
list_retries = 0
while list_retries < max_retries:
try:
traces = self.client.api.trace.list(
page=page,
limit=current_page_limit,
tags=tags,
user_id=user_id,
session_id=session_id,
from_timestamp=from_timestamp,
to_timestamp=to_timestamp,
order_by="timestamp.desc",
)
break
except Exception as e:
list_retries += 1
if "429" in str(e) and list_retries < max_retries:
sleep_time = 2**list_retries # Exponential backoff
logger.warning(
"Rate limit hit on trace.list(), retrying in %ds (attempt %d/%d)",
sleep_time,
list_retries,
max_retries,
)
time.sleep(sleep_time)
else:
logger.error("Failed to fetch trace list after %d retries: %s", max_retries, e)
return eval_rows # Return what we have so far

if not traces or not traces.data:
logger.debug("No more traces found on page %d", page)
break

logger.debug("Collected %d traces from page %d", len(traces.data), page)

all_traces.extend(traces.data)
collected += len(traces.data)

# Check if we have more pages
if hasattr(traces.meta, "page") and hasattr(traces.meta, "total_pages"):
if traces.meta.page >= traces.meta.total_pages:
break
elif len(traces.data) < current_page_limit:
break

page += 1

if not all_traces:
logger.debug("No traces found")
return eval_rows

# Randomly sample traces to fetch full details (respect rate limits)
actual_sample_size = min(sample_size, len(all_traces))
selected_traces = random.sample(all_traces, actual_sample_size)

logger.debug("Randomly selected %d traces from %d collected", actual_sample_size, len(all_traces))

# Process each selected trace with sleep and retry logic
for trace_info in selected_traces:
# Sleep between gets to avoid rate limits
if sleep_between_gets > 0:
time.sleep(sleep_between_gets)

# Fetch full trace details with retry logic
trace_full = None
detail_retries = 0
while detail_retries < max_retries:
try:
trace_full = self.client.api.trace.get(trace_info.id)
break
except Exception as e:
detail_retries += 1
if "429" in str(e) and detail_retries < max_retries:
sleep_time = 2**detail_retries # Exponential backoff
logger.warning(
"Rate limit hit on trace.get(%s), retrying in %ds (attempt %d/%d)",
trace_info.id,
sleep_time,
detail_retries,
max_retries,
)
time.sleep(sleep_time)
else:
logger.warning("Failed to fetch trace %s after %d retries: %s", trace_info.id, max_retries, e)
break # Skip this trace

if trace_full:
try:
eval_row = self._convert_trace_to_evaluation_row(trace_full, include_tool_calls)
if eval_row:
eval_rows.append(eval_row)
except (AttributeError, ValueError, KeyError) as e:
logger.warning("Failed to convert trace %s: %s", trace_info.id, e)
continue

logger.info(
"Successfully processed %d selected traces into %d evaluation rows", len(selected_traces), len(eval_rows)
)
return eval_rows

def get_evaluation_rows_by_ids(
Expand Down
7 changes: 6 additions & 1 deletion eval_protocol/mcp/execution/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,12 @@ async def _make_llm_call(self, messages: List[Dict[str, Any]], tools: List[Dict[
request_params["tools"] = tools

try:
response = await acompletion(model=self.model_id, **request_params)
response = await acompletion(
model=self.model_id,
**request_params,
# api_base="https://litellm-cloud-proxy-prod-zfdbl7ykrq-uc.a.run.app/v1",
# extra_body={"tags": ["kimi-k2-tau-bench"]},
)

# Log cache hit/miss for monitoring
hidden = getattr(response, "_hidden_params", {})
Expand Down
47 changes: 35 additions & 12 deletions eval_protocol/quickstart/llm_judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import os
from datetime import datetime
from typing import List, Dict, Any, Optional
from tqdm import tqdm

Expand All @@ -14,32 +15,44 @@
from eval_protocol.quickstart.utils import (
split_multi_turn_rows,
JUDGE_CONFIGS,
fetch_langfuse_traces_as_evaluation_rows,
calculate_bootstrap_scores,
push_scores_to_langfuse,
run_judgment,
run_judgment_async,
)
import asyncio
from openai import AsyncOpenAI
from eval_protocol.adapters.langfuse import create_langfuse_adapter

import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
adapter = create_langfuse_adapter()


@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI")
@pytest.mark.asyncio
@evaluation_test(
input_rows=[fetch_langfuse_traces_as_evaluation_rows()],
input_rows=[
adapter.get_evaluation_rows(
to_timestamp=datetime(2025, 9, 12, 0, 11, 18),
limit=711,
sample_size=50,
sleep_between_gets=3.0,
max_retries=5,
)
],
completion_params=[
{"model": "gpt-4.1"},
{
"model": "fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507",
"max_tokens": 131000,
"extra_body": {"reasoning_effort": "medium"},
"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b",
},
{
"max_tokens": 131000,
"extra_body": {"reasoning_effort": "low"},
"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-120b",
"model": "fireworks_ai/accounts/fireworks/models/gpt-oss-20b",
},
],
rollout_processor=SingleTurnRolloutProcessor(),
preprocess_fn=split_multi_turn_rows,
max_concurrent_rollouts=64,
mode="all",
)
async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]:
Expand Down Expand Up @@ -73,11 +86,21 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]:
judgments = []
max_concurrency = JUDGE_CONFIGS[judge_name]["max_concurrency"]

with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
futures = [executor.submit(run_judgment, row, model_name, judge_name) for row in rows]
judge_config = JUDGE_CONFIGS[judge_name]

async with AsyncOpenAI(
api_key=judge_config.get("api_key"), base_url=judge_config.get("base_url")
) as shared_client:
semaphore = asyncio.Semaphore(max_concurrency)

async def run_judgment(row):
async with semaphore:
return await run_judgment_async(row, model_name, judge_name, shared_client)

tasks = [run_judgment(row) for row in rows]

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Generating judgments"):
result = future.result()
for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"):
result = await coro
if result and result["games"][0] and result["games"][1]:
judgments.append(result)

Expand Down
Loading
Loading