Skip to content

Commit b593fce

Browse files
committed
langfuse example
1 parent 95a94c8 commit b593fce

File tree

5 files changed

+487
-12
lines changed

5 files changed

+487
-12
lines changed

eval_protocol/adapters/langfuse.py

Lines changed: 128 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
logger = logging.getLogger(__name__)
1414

1515
try:
16-
from langfuse import Langfuse
16+
from langfuse import Langfuse # pyright: ignore[reportPrivateImportUsage]
1717

1818
LANGFUSE_AVAILABLE = True
1919
except ImportError:
@@ -75,7 +75,7 @@ def get_evaluation_rows(
7575
from_timestamp: Optional[datetime] = None,
7676
to_timestamp: Optional[datetime] = None,
7777
include_tool_calls: bool = True,
78-
) -> Iterator[EvaluationRow]:
78+
) -> List[EvaluationRow]:
7979
"""Pull traces from Langfuse and convert to EvaluationRow format.
8080
8181
Args:
@@ -90,8 +90,9 @@ def get_evaluation_rows(
9090
Yields:
9191
EvaluationRow: Converted evaluation rows
9292
"""
93-
# Get traces from Langfuse
94-
traces = self.client.get_traces(
93+
# Get traces from Langfuse using new API
94+
eval_rows = []
95+
traces = self.client.api.trace.list(
9596
limit=limit,
9697
tags=tags,
9798
user_id=user_id,
@@ -104,16 +105,17 @@ def get_evaluation_rows(
104105
try:
105106
eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls)
106107
if eval_row:
107-
yield eval_row
108+
eval_rows.append(eval_row)
108109
except (AttributeError, ValueError, KeyError) as e:
109110
logger.warning("Failed to convert trace %s: %s", trace.id, e)
110111
continue
112+
return eval_rows
111113

112114
def get_evaluation_rows_by_ids(
113115
self,
114116
trace_ids: List[str],
115117
include_tool_calls: bool = True,
116-
) -> Iterator[EvaluationRow]:
118+
) -> List[EvaluationRow]:
117119
"""Get specific traces by their IDs and convert to EvaluationRow format.
118120
119121
Args:
@@ -123,15 +125,17 @@ def get_evaluation_rows_by_ids(
123125
Yields:
124126
EvaluationRow: Converted evaluation rows
125127
"""
128+
eval_rows = []
126129
for trace_id in trace_ids:
127130
try:
128-
trace = self.client.get_trace(trace_id)
131+
trace = self.client.api.trace.get(trace_id)
129132
eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls)
130133
if eval_row:
131-
yield eval_row
134+
eval_rows.append(eval_row)
132135
except (AttributeError, ValueError, KeyError) as e:
133136
logger.warning("Failed to fetch/convert trace %s: %s", trace_id, e)
134137
continue
138+
return eval_rows
135139

136140
def _convert_trace_to_evaluation_row(self, trace: Any, include_tool_calls: bool = True) -> Optional[EvaluationRow]:
137141
"""Convert a Langfuse trace to EvaluationRow format.
@@ -145,10 +149,29 @@ def _convert_trace_to_evaluation_row(self, trace: Any, include_tool_calls: bool
145149
"""
146150
try:
147151
# Get observations (generations, spans) from the trace
148-
observations = self.client.get_observations(trace_id=trace.id).data
152+
observations_response = self.client.api.observations.get_many(trace_id=trace.id, limit=100)
153+
observations = (
154+
observations_response.data if hasattr(observations_response, "data") else list(observations_response)
155+
)
149156

150-
# Convert observations to messages
151-
messages = self._extract_messages_from_observations(observations, include_tool_calls)
157+
# Look for conversation history in trace output or observations
158+
messages = []
159+
conversation_found = False
160+
161+
# Look for complete conversation in observations
162+
if not conversation_found:
163+
for obs in observations:
164+
# Check each observation's output for complete conversation array
165+
if hasattr(obs, "output") and obs.output:
166+
conversation = self._extract_conversation_from_output(obs.output)
167+
if conversation:
168+
messages = conversation
169+
conversation_found = True
170+
break
171+
172+
# Fallback: try extracting from observations using old method
173+
if not conversation_found:
174+
messages = self._extract_messages_from_observations(observations, include_tool_calls)
152175

153176
if not messages:
154177
return None
@@ -266,6 +289,86 @@ def _dict_to_message(self, msg_dict: Dict[str, Any], include_tool_calls: bool =
266289
function_call=function_call,
267290
)
268291

292+
def _extract_conversation_from_output(self, output: Any) -> Optional[List[Message]]:
293+
"""Extract conversation history from PydanticAI agent run output.
294+
295+
This looks for the conversation format like:
296+
[
297+
{"role": "user", "content": "..."},
298+
{"role": "assistant", "content": "...", "tool_calls": [...]},
299+
{"role": "tool", "content": "...", "name": "execute_sql"},
300+
...
301+
]
302+
303+
Args:
304+
output: The output object to search for conversation history
305+
306+
Returns:
307+
List of Message objects or None if no conversation found
308+
"""
309+
messages = []
310+
311+
try:
312+
# Handle different output formats
313+
conversation_data = None
314+
315+
if isinstance(output, list):
316+
# Direct list of messages
317+
conversation_data = output
318+
elif isinstance(output, dict):
319+
# Look for conversation in various nested formats
320+
if "messages" in output:
321+
conversation_data = output["messages"]
322+
elif "conversation" in output:
323+
conversation_data = output["conversation"]
324+
elif "history" in output:
325+
conversation_data = output["history"]
326+
elif "agent_run" in output: # Handle nested conversation data PydanticAI style
327+
agent_run = output["agent_run"]
328+
if isinstance(agent_run, dict) and "messages" in agent_run:
329+
conversation_data = agent_run["messages"]
330+
elif len(output.keys()) == 1:
331+
# Single key, check if its value is a list
332+
single_key = list(output.keys())[0]
333+
if isinstance(output[single_key], list):
334+
conversation_data = output[single_key]
335+
elif isinstance(output, str):
336+
# Try to parse JSON string
337+
import json
338+
339+
try:
340+
parsed = json.loads(output)
341+
return self._extract_conversation_from_output(parsed)
342+
except (json.JSONDecodeError, ValueError):
343+
pass
344+
345+
# Parse conversation data into messages
346+
if conversation_data and isinstance(conversation_data, list):
347+
for msg_data in conversation_data:
348+
if isinstance(msg_data, dict) and "role" in msg_data:
349+
role = msg_data.get("role")
350+
if role is None:
351+
continue
352+
content = msg_data.get("content", "")
353+
354+
# Handle tool calls in assistant messages
355+
tool_calls = None
356+
if role == "assistant" and "tool_calls" in msg_data:
357+
tool_calls = msg_data["tool_calls"]
358+
359+
# Handle tool responses
360+
name = None
361+
if role == "tool":
362+
name = msg_data.get("name")
363+
364+
messages.append(Message(role=role, content=content, name=name, tool_calls=tool_calls))
365+
366+
return messages if messages else None
367+
368+
except Exception as e:
369+
logger.debug("Error extracting conversation from output: %s", e)
370+
return None
371+
269372
def _create_input_metadata(self, trace: Any, observations: List[Any]) -> InputMetadata:
270373
"""Create InputMetadata from trace and observations.
271374
@@ -331,6 +434,20 @@ def _extract_ground_truth(self, trace: Any) -> Optional[str]:
331434
Returns:
332435
Ground truth string or None
333436
"""
437+
# First check trace input for evaluation test data structure
438+
if hasattr(trace, "input") and trace.input:
439+
if isinstance(trace.input, dict):
440+
# Handle EP test format: kwargs.input_rows[0].ground_truth
441+
kwargs = trace.input.get("kwargs", {})
442+
if "input_rows" in kwargs:
443+
input_rows = kwargs["input_rows"]
444+
if input_rows and len(input_rows) > 0:
445+
first_row = input_rows[0]
446+
if isinstance(first_row, dict) and "ground_truth" in first_row:
447+
ground_truth = first_row["ground_truth"]
448+
if ground_truth: # Only return if not None/empty
449+
return str(ground_truth)
450+
334451
# Check trace metadata for ground truth
335452
if hasattr(trace, "metadata") and trace.metadata:
336453
if isinstance(trace.metadata, dict):

0 commit comments

Comments
 (0)