Skip to content

Commit 7472db4

Browse files
committed
update adapter
1 parent 0dc82c7 commit 7472db4

File tree

3 files changed

+99
-92
lines changed

3 files changed

+99
-92
lines changed

eval_protocol/adapters/langfuse.py

Lines changed: 78 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -61,68 +61,117 @@ def __init__(self):
6161
def get_evaluation_rows(
6262
self,
6363
limit: int = 100,
64+
sample_size: int = 50,
6465
tags: Optional[List[str]] = None,
6566
user_id: Optional[str] = None,
6667
session_id: Optional[str] = None,
6768
hours_back: Optional[int] = None,
6869
from_timestamp: Optional[datetime] = None,
6970
to_timestamp: Optional[datetime] = None,
7071
include_tool_calls: bool = True,
71-
page_size: int = 30, # TODO: remove probably
72-
sleep_between_gets: float = 0.1,
72+
sleep_between_gets: float = 2.5,
7373
max_retries: int = 3,
7474
) -> List[EvaluationRow]:
7575
"""Pull traces from Langfuse and convert to EvaluationRow format.
7676
7777
Args:
78-
limit: Maximum number of rows to return
78+
limit: Max number of trace summaries to collect via pagination (pre-sampling)
79+
sample_size: Number of traces to fetch full details for (sampled from collected summaries)
7980
tags: Filter by specific tags
8081
user_id: Filter by user ID
8182
session_id: Filter by session ID
8283
hours_back: Filter traces from this many hours ago
83-
from_timestamp: Only include traces with timestamp >= this datetime
84-
to_timestamp: Only include traces with timestamp <= this datetime
84+
from_timestamp: Explicit start time (overrides hours_back)
85+
to_timestamp: Explicit end time (overrides hours_back)
8586
include_tool_calls: Whether to include tool calling traces
86-
page_size: Number of traces to fetch per page (smaller = less rate limit issues)
87-
sleep_between_gets: Sleep time between individual trace.get() calls
87+
sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit)
8888
max_retries: Maximum retries for rate limit errors
8989
9090
Returns:
9191
List[EvaluationRow]: Converted evaluation rows
9292
"""
9393
eval_rows = []
9494

95-
# Determine time window: explicit from/to takes precedence
95+
# Determine time window: explicit from/to takes precedence over hours_back
9696
if from_timestamp is None and to_timestamp is None and hours_back:
9797
to_timestamp = datetime.now()
9898
from_timestamp = to_timestamp - timedelta(hours=hours_back)
9999

100-
# Single API call to get trace list
101-
traces = self.client.api.trace.list(
102-
limit=limit,
103-
tags=tags,
104-
user_id=user_id,
105-
session_id=session_id,
106-
from_timestamp=from_timestamp,
107-
to_timestamp=to_timestamp,
108-
order_by="timestamp.desc",
109-
)
100+
# Collect trace summaries via pagination (up to limit)
101+
all_traces = []
102+
page = 1
103+
collected = 0
104+
105+
while collected < limit:
106+
current_page_limit = min(100, limit - collected) # Langfuse API max is 100
107+
108+
logger.debug(
109+
"Fetching page %d with limit %d (collected: %d/%d)", page, current_page_limit, collected, limit
110+
)
110111

111-
if not traces or not traces.data:
112+
# Fetch trace list with retry logic
113+
traces = None
114+
list_retries = 0
115+
while list_retries < max_retries:
116+
try:
117+
traces = self.client.api.trace.list(
118+
page=page,
119+
limit=current_page_limit,
120+
tags=tags,
121+
user_id=user_id,
122+
session_id=session_id,
123+
from_timestamp=from_timestamp,
124+
to_timestamp=to_timestamp,
125+
order_by="timestamp.desc",
126+
)
127+
break
128+
except Exception as e:
129+
list_retries += 1
130+
if "429" in str(e) and list_retries < max_retries:
131+
sleep_time = 2**list_retries # Exponential backoff
132+
logger.warning(
133+
"Rate limit hit on trace.list(), retrying in %ds (attempt %d/%d)",
134+
sleep_time,
135+
list_retries,
136+
max_retries,
137+
)
138+
time.sleep(sleep_time)
139+
else:
140+
logger.error("Failed to fetch trace list after %d retries: %s", max_retries, e)
141+
return eval_rows # Return what we have so far
142+
143+
if not traces or not traces.data:
144+
logger.debug("No more traces found on page %d", page)
145+
break
146+
147+
logger.debug("Collected %d traces from page %d", len(traces.data), page)
148+
149+
all_traces.extend(traces.data)
150+
collected += len(traces.data)
151+
152+
# Check if we have more pages
153+
if hasattr(traces.meta, "page") and hasattr(traces.meta, "total_pages"):
154+
if traces.meta.page >= traces.meta.total_pages:
155+
break
156+
elif len(traces.data) < current_page_limit:
157+
break
158+
159+
page += 1
160+
161+
if not all_traces:
112162
logger.debug("No traces found")
113163
return eval_rows
114164

115-
# Randomly sample the requested number of traces
116-
available_traces = traces.data
117-
sample_size = min(limit, len(available_traces))
118-
selected_traces = random.sample(available_traces, sample_size)
165+
# Randomly sample traces to fetch full details (respect rate limits)
166+
actual_sample_size = min(sample_size, len(all_traces))
167+
selected_traces = random.sample(all_traces, actual_sample_size)
119168

120-
logger.debug("Randomly selected %d traces from %d available", len(selected_traces), len(available_traces))
169+
logger.debug("Randomly selected %d traces from %d collected", actual_sample_size, len(all_traces))
121170

122171
# Process each selected trace with sleep and retry logic
123-
for i, trace_info in enumerate(selected_traces):
172+
for trace_info in selected_traces:
124173
# Sleep between gets to avoid rate limits
125-
if sleep_between_gets > 0 and i > 0:
174+
if sleep_between_gets > 0:
126175
time.sleep(sleep_between_gets)
127176

128177
# Fetch full trace details with retry logic
@@ -157,7 +206,9 @@ def get_evaluation_rows(
157206
logger.warning("Failed to convert trace %s: %s", trace_info.id, e)
158207
continue
159208

160-
logger.info("Successfully processed %d traces into evaluation rows", len(selected_traces))
209+
logger.info(
210+
"Successfully processed %d selected traces into %d evaluation rows", len(selected_traces), len(eval_rows)
211+
)
161212
return eval_rows
162213

163214
def get_evaluation_rows_by_ids(

eval_protocol/quickstart/llm_judge.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,24 @@
1515
from eval_protocol.quickstart.utils import (
1616
split_multi_turn_rows,
1717
JUDGE_CONFIGS,
18-
fetch_langfuse_traces_as_evaluation_rows,
1918
calculate_bootstrap_scores,
2019
push_scores_to_langfuse,
21-
run_judgment_async_with_shared_client,
20+
run_judgment_async,
2221
)
2322
import asyncio
2423
from openai import AsyncOpenAI
24+
from eval_protocol.adapters.langfuse import create_langfuse_adapter
25+
26+
adapter = create_langfuse_adapter()
2527

2628

2729
@pytest.mark.asyncio
2830
@evaluation_test(
2931
input_rows=[
30-
fetch_langfuse_traces_as_evaluation_rows(
32+
adapter.get_evaluation_rows(
3133
to_timestamp=datetime(2025, 9, 12, 0, 11, 18),
32-
limit=40,
33-
page_size=10,
34+
limit=711,
35+
sample_size=50,
3436
sleep_between_gets=3.0,
3537
max_retries=5,
3638
)
@@ -71,7 +73,7 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]:
7173
Same rows with updated evaluation_result containing scores and judgments
7274
"""
7375

74-
judge_name = "kimi-k2-instruct-0905" # Edit to which judge you'd like to use. Configs are in utils.py.
76+
judge_name = "gemini-2.5-pro" # Edit to which judge you'd like to use. Configs are in utils.py.
7577

7678
if not rows:
7779
print("❌ No evaluation rows provided")
@@ -91,11 +93,11 @@ async def test_llm_judge(rows: list[EvaluationRow]) -> list[EvaluationRow]:
9193
) as shared_client:
9294
semaphore = asyncio.Semaphore(max_concurrency)
9395

94-
async def run_judgment_with_semaphore(row):
96+
async def run_judgment(row):
9597
async with semaphore:
96-
return await run_judgment_async_with_shared_client(row, model_name, judge_name, shared_client)
98+
return await run_judgment_async(row, model_name, judge_name, shared_client)
9799

98-
tasks = [run_judgment_with_semaphore(row) for row in rows]
100+
tasks = [run_judgment(row) for row in rows]
99101

100102
for coro in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Generating judgments"):
101103
result = await coro
@@ -131,6 +133,6 @@ async def run_judgment_with_semaphore(row):
131133
) # Standard error approximation from 90% CI
132134

133135
# Optional, push scores back to Langfuse. Note that one score per model will be pushed back onto same trace.
134-
# push_scores_to_langfuse(rows, model_name, mean_score)
136+
push_scores_to_langfuse(rows, model_name, mean_score)
135137

136138
return rows

eval_protocol/quickstart/utils.py

Lines changed: 9 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@
5050
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai/",
5151
"max_concurrency": 16,
5252
},
53+
"gemini-2.5-flash": {
54+
"model": "gemini-2.5-flash",
55+
"temperature": 1.0,
56+
"max_tokens": 32000,
57+
"api_key": os.getenv("GEMINI_API_KEY"),
58+
"base_url": "https://generativelanguage.googleapis.com/v1beta/openai/",
59+
"max_concurrency": 16,
60+
},
5361
"kimi-k2-instruct-0905": {
5462
"model": "accounts/fireworks/models/kimi-k2-instruct-0905",
5563
"temperature": 0.6, # Kimi recommended temperature
@@ -195,7 +203,7 @@ async def pairwise_judgment_async(question_text, answer_a, answer_b, tools, judg
195203
return {"score": score, "judgment": judgment_text, "prompt": messages}
196204

197205

198-
async def run_judgment_async_with_shared_client(
206+
async def run_judgment_async(
199207
row: EvaluationRow, model_name: str, judge_name: str, shared_client
200208
) -> Optional[Dict[str, Any]]:
201209
"""Async judgment using shared client to avoid cleanup issues."""
@@ -232,60 +240,6 @@ async def run_judgment_async_with_shared_client(
232240
return {"model": model_name, "games": games}
233241

234242

235-
def fetch_langfuse_traces_as_evaluation_rows(
236-
limit: int = 100,
237-
tags: Optional[List[str]] = None,
238-
user_id: Optional[str] = None,
239-
session_id: Optional[str] = None,
240-
hours_back: Optional[int] = None,
241-
from_timestamp: Optional[datetime] = None,
242-
to_timestamp: Optional[datetime] = None,
243-
include_tool_calls: bool = True,
244-
page_size: int = 30,
245-
sleep_between_gets: float = 0.1,
246-
max_retries: int = 3,
247-
) -> List[EvaluationRow]:
248-
"""
249-
Fetch Langfuse traces and convert them to EvaluationRow objects.
250-
251-
Args:
252-
limit: Maximum number of traces to fetch
253-
tags: Filter traces by tags
254-
user_id: Filter traces by user ID
255-
session_id: Filter traces by session ID
256-
hours_back: Only fetch traces from the last N hours
257-
from_timestamp: Only include traces with timestamp >= this datetime
258-
to_timestamp: Only include traces with timestamp <= this datetime
259-
include_tool_calls: Whether to include tool calls in messages
260-
page_size: Number of traces to fetch per page (smaller = less rate limit issues)
261-
sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit)
262-
max_retries: Maximum retries for rate limit errors
263-
264-
Returns:
265-
List of EvaluationRow objects converted from Langfuse traces
266-
"""
267-
try:
268-
from eval_protocol.adapters.langfuse import create_langfuse_adapter
269-
270-
adapter = create_langfuse_adapter()
271-
return adapter.get_evaluation_rows(
272-
limit=limit,
273-
tags=tags,
274-
user_id=user_id,
275-
session_id=session_id,
276-
hours_back=hours_back,
277-
from_timestamp=from_timestamp,
278-
to_timestamp=to_timestamp,
279-
include_tool_calls=include_tool_calls,
280-
page_size=page_size,
281-
sleep_between_gets=sleep_between_gets,
282-
max_retries=max_retries,
283-
)
284-
except Exception as e:
285-
print(f"❌ LangfuseAdapter failed: {e}")
286-
return []
287-
288-
289243
def calculate_bootstrap_scores(judgments: List[Dict[str, Any]]) -> tuple[float, float, float]:
290244
"""
291245
Calculate bootstrap confidence intervals for Arena-Hard-Auto style judgments.

0 commit comments

Comments
 (0)