Skip to content

Commit 5175cfa

Browse files
committed
finished
1 parent 696ee52 commit 5175cfa

File tree

2 files changed

+54
-281
lines changed

2 files changed

+54
-281
lines changed

eval_protocol/adapters/langfuse.py

Lines changed: 53 additions & 280 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
to EvaluationRow format for use in evaluation pipelines.
55
"""
66

7+
from langfuse.api.resources.commons.types.observations_view import ObservationsView
78
import logging
89
from datetime import datetime, timedelta
910
from typing import Any, Dict, Iterator, List, Optional, cast
@@ -15,6 +16,7 @@
1516
try:
1617
from langfuse import get_client # pyright: ignore[reportPrivateImportUsage]
1718
from langfuse.api.resources.trace.types.traces import Traces
19+
from langfuse.api.resources.commons.types.trace import Trace
1820
from langfuse.api.resources.commons.types.trace_with_full_details import TraceWithFullDetails
1921

2022
LANGFUSE_AVAILABLE = True
@@ -86,6 +88,7 @@ def get_evaluation_rows(
8688
from_timestamp = None
8789

8890
eval_rows = []
91+
8992
traces: Traces = self.client.api.trace.list(
9093
limit=limit,
9194
tags=tags,
@@ -131,7 +134,9 @@ def get_evaluation_rows_by_ids(
131134
continue
132135
return eval_rows
133136

134-
def _convert_trace_to_evaluation_row(self, trace: Any, include_tool_calls: bool = True) -> Optional[EvaluationRow]:
137+
def _convert_trace_to_evaluation_row(
138+
self, trace: Trace, include_tool_calls: bool = True
139+
) -> Optional[EvaluationRow]:
135140
"""Convert a Langfuse trace to EvaluationRow format.
136141
137142
Args:
@@ -142,96 +147,78 @@ def _convert_trace_to_evaluation_row(self, trace: Any, include_tool_calls: bool
142147
EvaluationRow or None if conversion fails
143148
"""
144149
try:
145-
# Get observations (generations, spans) from the trace
146-
observations_response = self.client.api.observations.get_many(trace_id=trace.id, limit=100)
147-
# print(observations_response)
148-
observations = (
149-
observations_response.data if hasattr(observations_response, "data") else list(observations_response)
150-
)
150+
trace = self.client.api.trace.get("2d9f3474-83ab-4431-9788-049ca4219023")
151151

152-
messages = []
152+
# Extract messages from trace input and output
153+
messages = self._extract_messages_from_trace(trace, include_tool_calls)
153154

154-
for obs in observations:
155-
if obs.name == "agent run":
156-
messages = self._extract_conversation_from_output(obs.output)
157-
break
155+
# Extract tools if available
156+
tools = None
157+
if include_tool_calls and isinstance(trace.input, dict) and "tools" in trace.input:
158+
tools = trace.input["tools"]
158159

159160
if not messages:
160161
return None
161162

162-
# Extract metadata
163-
input_metadata = self._create_input_metadata(trace, observations)
164-
165-
# Extract ground truth if available (from trace metadata or tags)
166-
ground_truth = self._extract_ground_truth(trace)
167-
168-
# Extract tools if available
169-
tools = self._extract_tools(observations, trace) if include_tool_calls else None
170-
171163
return EvaluationRow(
172164
messages=messages,
173165
tools=tools,
174-
input_metadata=input_metadata,
175-
ground_truth=ground_truth,
176166
)
177167

178168
except (AttributeError, ValueError, KeyError) as e:
179169
logger.error("Error converting trace %s: %s", trace.id, e)
180170
return None
181171

182-
def _extract_messages_from_observations(
183-
self, observations: List[Any], include_tool_calls: bool = True
184-
) -> List[Message]:
185-
"""Extract messages from Langfuse observations.
172+
def _extract_messages_from_trace(self, trace: Any, include_tool_calls: bool = True) -> List[Message]:
173+
"""Extract messages from Langfuse trace input and output.
186174
187175
Args:
188-
observations: List of Langfuse observation objects
176+
trace: Langfuse trace object
189177
include_tool_calls: Whether to include tool calling information
190178
191179
Returns:
192180
List of Message objects
193181
"""
194182
messages = []
195183

196-
# Sort observations by timestamp
197-
sorted_observations = sorted(observations, key=lambda x: x.start_time or datetime.min)
198-
199-
for obs in sorted_observations:
200-
try:
201-
if hasattr(obs, "input") and obs.input:
202-
# Handle different input formats
203-
if isinstance(obs.input, dict):
204-
if "messages" in obs.input:
205-
# OpenAI-style messages format
206-
for msg in obs.input["messages"]:
207-
messages.append(self._dict_to_message(msg, include_tool_calls))
208-
elif "role" in obs.input:
209-
# Single message format
210-
messages.append(self._dict_to_message(obs.input, include_tool_calls))
211-
elif "prompt" in obs.input:
212-
# Simple prompt format
213-
messages.append(Message(role="user", content=str(obs.input["prompt"])))
214-
elif isinstance(obs.input, str):
215-
# Simple string input
216-
messages.append(Message(role="user", content=obs.input))
217-
218-
if hasattr(obs, "output") and obs.output:
219-
# Handle output
220-
if isinstance(obs.output, dict):
221-
if "content" in obs.output:
222-
messages.append(Message(role="assistant", content=str(obs.output["content"])))
223-
elif "message" in obs.output:
224-
msg_dict = obs.output["message"]
225-
messages.append(self._dict_to_message(msg_dict, include_tool_calls))
226-
else:
227-
# Fallback: convert entire output to string
228-
messages.append(Message(role="assistant", content=str(obs.output)))
229-
elif isinstance(obs.output, str):
230-
messages.append(Message(role="assistant", content=obs.output))
184+
try:
185+
# Handle trace input
186+
if hasattr(trace, "input") and trace.input:
187+
if isinstance(trace.input, dict):
188+
if "messages" in trace.input:
189+
# OpenAI-style messages format
190+
for msg in trace.input["messages"]:
191+
messages.append(self._dict_to_message(msg, include_tool_calls))
192+
elif "role" in trace.input:
193+
# Single message format
194+
messages.append(self._dict_to_message(trace.input, include_tool_calls))
195+
elif "prompt" in trace.input:
196+
# Simple prompt format
197+
messages.append(Message(role="user", content=str(trace.input["prompt"])))
198+
elif isinstance(trace.input, list):
199+
# Direct list of message dicts
200+
for msg in trace.input:
201+
messages.append(self._dict_to_message(msg, include_tool_calls))
202+
elif isinstance(trace.input, str):
203+
# Simple string input
204+
messages.append(Message(role="user", content=trace.input))
205+
206+
# Handle trace output
207+
if hasattr(trace, "output") and trace.output:
208+
if isinstance(trace.output, dict):
209+
if "content" in trace.output:
210+
messages.append(Message(role="assistant", content=str(trace.output["content"])))
211+
elif "message" in trace.output:
212+
msg_dict = trace.output["message"]
213+
messages.append(self._dict_to_message(msg_dict, include_tool_calls))
214+
else:
215+
# Fallback: convert entire output to string
216+
messages.append(Message(role="assistant", content=str(trace.output)))
217+
elif isinstance(trace.output, str):
218+
messages.append(Message(role="assistant", content=trace.output))
231219

232-
except (AttributeError, ValueError, KeyError) as e:
233-
logger.warning("Error processing observation %s: %s", obs.id, e)
234-
continue
220+
except (AttributeError, ValueError, KeyError) as e:
221+
logger.warning("Error processing trace %s: %s", trace.id, e)
235222

236223
return messages
237224

@@ -272,220 +259,6 @@ def _dict_to_message(self, msg_dict: Dict[str, Any], include_tool_calls: bool =
272259
function_call=function_call,
273260
)
274261

275-
def _extract_conversation_from_output(self, output: Any) -> Optional[List[Message]]:
276-
"""Extract conversation history from PydanticAI agent run output.
277-
278-
This looks for the conversation format like:
279-
[
280-
{"role": "user", "content": "..."},
281-
{"role": "assistant", "content": "...", "tool_calls": [...]},
282-
{"role": "tool", "content": "...", "name": "execute_sql"},
283-
...
284-
]
285-
286-
Args:
287-
output: The output object to search for conversation history
288-
289-
Returns:
290-
List of Message objects or None if no conversation found
291-
"""
292-
messages = []
293-
294-
try:
295-
# Handle different output formats
296-
conversation_data = None
297-
298-
if isinstance(output, list):
299-
# Direct list of messages
300-
conversation_data = output
301-
elif isinstance(output, dict):
302-
# Look for conversation in various nested formats
303-
if "messages" in output:
304-
conversation_data = output["messages"]
305-
elif "conversation" in output:
306-
conversation_data = output["conversation"]
307-
elif "history" in output:
308-
conversation_data = output["history"]
309-
elif "agent_run" in output: # Handle nested conversation data PydanticAI style
310-
agent_run = output["agent_run"]
311-
if isinstance(agent_run, dict) and "messages" in agent_run:
312-
conversation_data = agent_run["messages"]
313-
elif len(output.keys()) == 1:
314-
# Single key, check if its value is a list
315-
single_key = list(output.keys())[0]
316-
if isinstance(output[single_key], list):
317-
conversation_data = output[single_key]
318-
elif isinstance(output, str):
319-
# Try to parse JSON string
320-
import json
321-
322-
try:
323-
parsed = json.loads(output)
324-
return self._extract_conversation_from_output(parsed)
325-
except (json.JSONDecodeError, ValueError):
326-
pass
327-
328-
# Parse conversation data into messages
329-
if conversation_data and isinstance(conversation_data, list):
330-
for msg_data in conversation_data:
331-
if isinstance(msg_data, dict) and "role" in msg_data:
332-
role = msg_data.get("role")
333-
if role is None:
334-
continue
335-
content = msg_data.get("content", "")
336-
337-
# Handle tool calls in assistant messages
338-
tool_calls = None
339-
if role == "assistant" and "tool_calls" in msg_data:
340-
tool_calls = msg_data["tool_calls"]
341-
342-
# Handle tool responses
343-
name = None
344-
tool_call_id = None
345-
if role == "tool":
346-
name = msg_data.get("name")
347-
tool_call_id = msg_data.get("id")
348-
349-
messages.append(
350-
Message(
351-
role=role, content=content, name=name, tool_calls=tool_calls, tool_call_id=tool_call_id
352-
)
353-
)
354-
355-
return messages if messages else None
356-
357-
except Exception as e:
358-
logger.debug("Error extracting conversation from output: %s", e)
359-
return None
360-
361-
def _create_input_metadata(self, trace: Any, observations: List[Any]) -> InputMetadata:
362-
"""Create InputMetadata from trace and observations.
363-
364-
Args:
365-
trace: Langfuse trace object
366-
observations: List of observation objects
367-
368-
Returns:
369-
InputMetadata object
370-
"""
371-
# Extract completion parameters from trace input first, then observations
372-
completion_params = {}
373-
374-
# First check trace input for evaluation test completion_params
375-
if hasattr(trace, "input") and trace.input:
376-
if isinstance(trace.input, dict):
377-
kwargs = trace.input.get("kwargs", {})
378-
if "completion_params" in kwargs:
379-
trace_completion_params = kwargs["completion_params"]
380-
if trace_completion_params and isinstance(trace_completion_params, dict):
381-
completion_params.update(trace_completion_params)
382-
383-
# Fallback: Look for model parameters in observations if not found in trace input
384-
if not completion_params:
385-
for obs in observations:
386-
if hasattr(obs, "model") and obs.model:
387-
completion_params["model"] = obs.model
388-
if hasattr(obs, "model_parameters") and obs.model_parameters:
389-
params = obs.model_parameters
390-
if "temperature" in params:
391-
completion_params["temperature"] = params["temperature"]
392-
if "max_tokens" in params:
393-
completion_params["max_tokens"] = params["max_tokens"]
394-
if "top_p" in params:
395-
completion_params["top_p"] = params["top_p"]
396-
break
397-
398-
# Create dataset info from trace metadata
399-
dataset_info = {
400-
"trace_id": trace.id,
401-
"trace_name": getattr(trace, "name", None),
402-
"trace_tags": getattr(trace, "tags", []),
403-
}
404-
405-
# Add trace metadata if available
406-
if hasattr(trace, "metadata") and trace.metadata:
407-
dataset_info["trace_metadata"] = trace.metadata
408-
409-
# Create session data
410-
session_data = {
411-
"session_id": getattr(trace, "session_id", None),
412-
"user_id": getattr(trace, "user_id", None),
413-
"timestamp": getattr(trace, "timestamp", None),
414-
}
415-
416-
return InputMetadata(
417-
row_id=trace.id,
418-
completion_params=completion_params,
419-
dataset_info=dataset_info,
420-
session_data=session_data,
421-
)
422-
423-
def _extract_ground_truth(self, trace: Any) -> Optional[str]:
424-
"""Extract ground truth from trace if available.
425-
426-
Args:
427-
trace: Langfuse trace object
428-
429-
Returns:
430-
Ground truth string or None
431-
"""
432-
# First check trace input for evaluation test data structure
433-
if hasattr(trace, "input") and trace.input:
434-
if isinstance(trace.input, dict):
435-
# Handle EP test format: kwargs.input_rows[0].ground_truth
436-
kwargs = trace.input.get("kwargs", {})
437-
if "input_rows" in kwargs:
438-
input_rows = kwargs["input_rows"]
439-
if input_rows and len(input_rows) > 0:
440-
first_row = input_rows[0]
441-
if isinstance(first_row, dict) and "ground_truth" in first_row:
442-
ground_truth = first_row["ground_truth"]
443-
if ground_truth: # Only return if not None/empty
444-
return str(ground_truth)
445-
446-
# Check trace metadata for ground truth
447-
if hasattr(trace, "metadata") and trace.metadata:
448-
if isinstance(trace.metadata, dict):
449-
return trace.metadata.get("ground_truth") or trace.metadata.get("expected_answer")
450-
451-
# Check tags for ground truth indicators
452-
if hasattr(trace, "tags") and trace.tags:
453-
for tag in trace.tags:
454-
if tag.startswith("ground_truth:"):
455-
return tag.replace("ground_truth:", "", 1)
456-
457-
return None
458-
459-
def _extract_tools(self, observations: List[Any], trace: Any = None) -> Optional[List[Dict[str, Any]]]:
460-
"""Extract tool definitions from trace metadata or observations.
461-
462-
Args:
463-
observations: List of observation objects
464-
trace: Trace object that may contain metadata with tools
465-
466-
Returns:
467-
List of tool definitions or None
468-
"""
469-
# First, try to extract tools from trace metadata (preferred)
470-
if trace and hasattr(trace, "metadata") and trace.metadata:
471-
if isinstance(trace.metadata, dict) and "tools" in trace.metadata:
472-
tools_from_metadata = trace.metadata["tools"]
473-
if tools_from_metadata:
474-
return tools_from_metadata
475-
476-
# Fallback: extract from observations
477-
tools = []
478-
for obs in observations:
479-
if hasattr(obs, "input") and obs.input and isinstance(obs.input, dict):
480-
if "tools" in obs.input:
481-
tools.extend(obs.input["tools"])
482-
elif "functions" in obs.input:
483-
# Convert functions to tools format
484-
for func in obs.input["functions"]:
485-
tools.append({"type": "function", "function": func})
486-
487-
return tools if tools else None
488-
489262

490263
def create_langfuse_adapter() -> LangfuseAdapter:
491264
"""Factory function to create a Langfuse adapter."""

0 commit comments

Comments
 (0)