66
77from langfuse .api .resources .commons .types .observations_view import ObservationsView
88import logging
9+ import random
10+ import time
911from datetime import datetime , timedelta
1012from typing import Any , Dict , Iterator , List , Optional , cast
1113
@@ -59,53 +61,154 @@ def __init__(self):
5961 def get_evaluation_rows (
6062 self ,
6163 limit : int = 100 ,
64+ sample_size : int = 50 ,
6265 tags : Optional [List [str ]] = None ,
6366 user_id : Optional [str ] = None ,
6467 session_id : Optional [str ] = None ,
6568 hours_back : Optional [int ] = None ,
69+ from_timestamp : Optional [datetime ] = None ,
70+ to_timestamp : Optional [datetime ] = None ,
6671 include_tool_calls : bool = True ,
72+ sleep_between_gets : float = 2.5 ,
73+ max_retries : int = 3 ,
6774 ) -> List [EvaluationRow ]:
6875 """Pull traces from Langfuse and convert to EvaluationRow format.
6976
7077 Args:
71- limit: Maximum number of rows to return
78+ limit: Max number of trace summaries to collect via pagination (pre-sampling)
79+ sample_size: Number of traces to fetch full details for (sampled from collected summaries)
7280 tags: Filter by specific tags
7381 user_id: Filter by user ID
7482 session_id: Filter by session ID
7583 hours_back: Filter traces from this many hours ago
84+ from_timestamp: Explicit start time (overrides hours_back)
85+ to_timestamp: Explicit end time (overrides hours_back)
7686 include_tool_calls: Whether to include tool calling traces
87+ sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit)
88+ max_retries: Maximum retries for rate limit errors
7789
78- Yields :
79- EvaluationRow: Converted evaluation rows
90+ Returns :
91+ List[ EvaluationRow] : Converted evaluation rows
8092 """
81- # Get traces from Langfuse using new API
93+ eval_rows = []
8294
83- if hours_back :
95+ # Determine time window: explicit from/to takes precedence over hours_back
96+ if from_timestamp is None and to_timestamp is None and hours_back :
8497 to_timestamp = datetime .now ()
8598 from_timestamp = to_timestamp - timedelta (hours = hours_back )
86- else :
87- to_timestamp = None
88- from_timestamp = None
8999
90- eval_rows = []
100+ # Collect trace summaries via pagination (up to limit)
101+ all_traces = []
102+ page = 1
103+ collected = 0
91104
92- traces : Traces = self .client .api .trace .list (
93- limit = limit ,
94- tags = tags ,
95- user_id = user_id ,
96- session_id = session_id ,
97- from_timestamp = from_timestamp ,
98- to_timestamp = to_timestamp ,
99- )
105+ while collected < limit :
106+ current_page_limit = min (100 , limit - collected ) # Langfuse API max is 100
100107
101- for trace in traces .data :
102- try :
103- eval_row = self ._convert_trace_to_evaluation_row (trace , include_tool_calls )
104- if eval_row :
105- eval_rows .append (eval_row )
106- except (AttributeError , ValueError , KeyError ) as e :
107- logger .warning ("Failed to convert trace %s: %s" , trace .id , e )
108- continue
108+ logger .debug (
109+ "Fetching page %d with limit %d (collected: %d/%d)" , page , current_page_limit , collected , limit
110+ )
111+
112+ # Fetch trace list with retry logic
113+ traces = None
114+ list_retries = 0
115+ while list_retries < max_retries :
116+ try :
117+ traces = self .client .api .trace .list (
118+ page = page ,
119+ limit = current_page_limit ,
120+ tags = tags ,
121+ user_id = user_id ,
122+ session_id = session_id ,
123+ from_timestamp = from_timestamp ,
124+ to_timestamp = to_timestamp ,
125+ order_by = "timestamp.desc" ,
126+ )
127+ break
128+ except Exception as e :
129+ list_retries += 1
130+ if "429" in str (e ) and list_retries < max_retries :
131+ sleep_time = 2 ** list_retries # Exponential backoff
132+ logger .warning (
133+ "Rate limit hit on trace.list(), retrying in %ds (attempt %d/%d)" ,
134+ sleep_time ,
135+ list_retries ,
136+ max_retries ,
137+ )
138+ time .sleep (sleep_time )
139+ else :
140+ logger .error ("Failed to fetch trace list after %d retries: %s" , max_retries , e )
141+ return eval_rows # Return what we have so far
142+
143+ if not traces or not traces .data :
144+ logger .debug ("No more traces found on page %d" , page )
145+ break
146+
147+ logger .debug ("Collected %d traces from page %d" , len (traces .data ), page )
148+
149+ all_traces .extend (traces .data )
150+ collected += len (traces .data )
151+
152+ # Check if we have more pages
153+ if hasattr (traces .meta , "page" ) and hasattr (traces .meta , "total_pages" ):
154+ if traces .meta .page >= traces .meta .total_pages :
155+ break
156+ elif len (traces .data ) < current_page_limit :
157+ break
158+
159+ page += 1
160+
161+ if not all_traces :
162+ logger .debug ("No traces found" )
163+ return eval_rows
164+
165+ # Randomly sample traces to fetch full details (respect rate limits)
166+ actual_sample_size = min (sample_size , len (all_traces ))
167+ selected_traces = random .sample (all_traces , actual_sample_size )
168+
169+ logger .debug ("Randomly selected %d traces from %d collected" , actual_sample_size , len (all_traces ))
170+
171+ # Process each selected trace with sleep and retry logic
172+ for trace_info in selected_traces :
173+ # Sleep between gets to avoid rate limits
174+ if sleep_between_gets > 0 :
175+ time .sleep (sleep_between_gets )
176+
177+ # Fetch full trace details with retry logic
178+ trace_full = None
179+ detail_retries = 0
180+ while detail_retries < max_retries :
181+ try :
182+ trace_full = self .client .api .trace .get (trace_info .id )
183+ break
184+ except Exception as e :
185+ detail_retries += 1
186+ if "429" in str (e ) and detail_retries < max_retries :
187+ sleep_time = 2 ** detail_retries # Exponential backoff
188+ logger .warning (
189+ "Rate limit hit on trace.get(%s), retrying in %ds (attempt %d/%d)" ,
190+ trace_info .id ,
191+ sleep_time ,
192+ detail_retries ,
193+ max_retries ,
194+ )
195+ time .sleep (sleep_time )
196+ else :
197+ logger .warning ("Failed to fetch trace %s after %d retries: %s" , trace_info .id , max_retries , e )
198+ break # Skip this trace
199+
200+ if trace_full :
201+ try :
202+ eval_row = self ._convert_trace_to_evaluation_row (trace_full , include_tool_calls )
203+ if eval_row :
204+ eval_rows .append (eval_row )
205+ except (AttributeError , ValueError , KeyError ) as e :
206+ logger .warning ("Failed to convert trace %s: %s" , trace_info .id , e )
207+ continue
208+
209+ logger .info (
210+ "Successfully processed %d selected traces into %d evaluation rows" , len (selected_traces ), len (eval_rows )
211+ )
109212 return eval_rows
110213
111214 def get_evaluation_rows_by_ids (
@@ -135,7 +238,7 @@ def get_evaluation_rows_by_ids(
135238 return eval_rows
136239
137240 def _convert_trace_to_evaluation_row (
138- self , trace : Trace , include_tool_calls : bool = True
241+ self , trace : TraceWithFullDetails , include_tool_calls : bool = True
139242 ) -> Optional [EvaluationRow ]:
140243 """Convert a Langfuse trace to EvaluationRow format.
141244
@@ -147,8 +250,6 @@ def _convert_trace_to_evaluation_row(
147250 EvaluationRow or None if conversion fails
148251 """
149252 try :
150- trace = self .client .api .trace .get ("2d9f3474-83ab-4431-9788-049ca4219023" )
151-
152253 # Extract messages from trace input and output
153254 messages = self ._extract_messages_from_trace (trace , include_tool_calls )
154255
@@ -163,13 +264,20 @@ def _convert_trace_to_evaluation_row(
163264 return EvaluationRow (
164265 messages = messages ,
165266 tools = tools ,
267+ input_metadata = InputMetadata (
268+ session_data = {
269+ "langfuse_trace_id" : trace .id , # Store the trace ID here
270+ }
271+ ),
166272 )
167273
168274 except (AttributeError , ValueError , KeyError ) as e :
169275 logger .error ("Error converting trace %s: %s" , trace .id , e )
170276 return None
171277
172- def _extract_messages_from_trace (self , trace : Any , include_tool_calls : bool = True ) -> List [Message ]:
278+ def _extract_messages_from_trace (
279+ self , trace : TraceWithFullDetails , include_tool_calls : bool = True
280+ ) -> List [Message ]:
173281 """Extract messages from Langfuse trace input and output.
174282
175283 Args:
@@ -214,6 +322,10 @@ def _extract_messages_from_trace(self, trace: Any, include_tool_calls: bool = Tr
214322 else :
215323 # Fallback: convert entire output to string
216324 messages .append (Message (role = "assistant" , content = str (trace .output )))
325+ elif isinstance (trace .output , list ):
326+ # Direct list of message dicts (same as input handling)
327+ for msg in trace .output :
328+ messages .append (self ._dict_to_message (msg , include_tool_calls ))
217329 elif isinstance (trace .output , str ):
218330 messages .append (Message (role = "assistant" , content = trace .output ))
219331
0 commit comments