Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 128 additions & 11 deletions eval_protocol/adapters/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
logger = logging.getLogger(__name__)

try:
from langfuse import Langfuse
from langfuse import Langfuse # pyright: ignore[reportPrivateImportUsage]

LANGFUSE_AVAILABLE = True
except ImportError:
Expand Down Expand Up @@ -75,7 +75,7 @@ def get_evaluation_rows(
from_timestamp: Optional[datetime] = None,
to_timestamp: Optional[datetime] = None,
include_tool_calls: bool = True,
) -> Iterator[EvaluationRow]:
) -> List[EvaluationRow]:
"""Pull traces from Langfuse and convert to EvaluationRow format.

Args:
Expand All @@ -90,8 +90,9 @@ def get_evaluation_rows(
Yields:
EvaluationRow: Converted evaluation rows
"""
# Get traces from Langfuse
traces = self.client.get_traces(
# Get traces from Langfuse using new API
eval_rows = []
traces = self.client.api.trace.list(
limit=limit,
tags=tags,
user_id=user_id,
Expand All @@ -104,16 +105,17 @@ def get_evaluation_rows(
try:
eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls)
if eval_row:
yield eval_row
eval_rows.append(eval_row)
except (AttributeError, ValueError, KeyError) as e:
logger.warning("Failed to convert trace %s: %s", trace.id, e)
continue
return eval_rows

def get_evaluation_rows_by_ids(
self,
trace_ids: List[str],
include_tool_calls: bool = True,
) -> Iterator[EvaluationRow]:
) -> List[EvaluationRow]:
"""Get specific traces by their IDs and convert to EvaluationRow format.

Args:
Expand All @@ -123,15 +125,17 @@ def get_evaluation_rows_by_ids(
Yields:
EvaluationRow: Converted evaluation rows
"""
eval_rows = []
for trace_id in trace_ids:
try:
trace = self.client.get_trace(trace_id)
trace = self.client.api.trace.get(trace_id)
eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls)
if eval_row:
yield eval_row
eval_rows.append(eval_row)
except (AttributeError, ValueError, KeyError) as e:
logger.warning("Failed to fetch/convert trace %s: %s", trace_id, e)
continue
return eval_rows

def _convert_trace_to_evaluation_row(self, trace: Any, include_tool_calls: bool = True) -> Optional[EvaluationRow]:
"""Convert a Langfuse trace to EvaluationRow format.
Expand All @@ -145,10 +149,29 @@ def _convert_trace_to_evaluation_row(self, trace: Any, include_tool_calls: bool
"""
try:
# Get observations (generations, spans) from the trace
observations = self.client.get_observations(trace_id=trace.id).data
observations_response = self.client.api.observations.get_many(trace_id=trace.id, limit=100)
observations = (
observations_response.data if hasattr(observations_response, "data") else list(observations_response)
)

# Convert observations to messages
messages = self._extract_messages_from_observations(observations, include_tool_calls)
# Look for conversation history in trace output or observations
messages = []
conversation_found = False

# Look for complete conversation in observations
if not conversation_found:
for obs in observations:
# Check each observation's output for complete conversation array
if hasattr(obs, "output") and obs.output:
conversation = self._extract_conversation_from_output(obs.output)
if conversation:
messages = conversation
conversation_found = True
break

# Fallback: try extracting from observations using old method
if not conversation_found:
messages = self._extract_messages_from_observations(observations, include_tool_calls)

if not messages:
return None
Expand Down Expand Up @@ -266,6 +289,86 @@ def _dict_to_message(self, msg_dict: Dict[str, Any], include_tool_calls: bool =
function_call=function_call,
)

def _extract_conversation_from_output(self, output: Any) -> Optional[List[Message]]:
"""Extract conversation history from PydanticAI agent run output.

This looks for the conversation format like:
[
{"role": "user", "content": "..."},
{"role": "assistant", "content": "...", "tool_calls": [...]},
{"role": "tool", "content": "...", "name": "execute_sql"},
...
]

Args:
output: The output object to search for conversation history

Returns:
List of Message objects or None if no conversation found
"""
messages = []

try:
# Handle different output formats
conversation_data = None

if isinstance(output, list):
# Direct list of messages
conversation_data = output
elif isinstance(output, dict):
# Look for conversation in various nested formats
if "messages" in output:
conversation_data = output["messages"]
elif "conversation" in output:
conversation_data = output["conversation"]
elif "history" in output:
conversation_data = output["history"]
elif "agent_run" in output: # Handle nested conversation data PydanticAI style
agent_run = output["agent_run"]
if isinstance(agent_run, dict) and "messages" in agent_run:
conversation_data = agent_run["messages"]
elif len(output.keys()) == 1:
# Single key, check if its value is a list
single_key = list(output.keys())[0]
if isinstance(output[single_key], list):
conversation_data = output[single_key]
elif isinstance(output, str):
# Try to parse JSON string
import json

try:
parsed = json.loads(output)
return self._extract_conversation_from_output(parsed)
except (json.JSONDecodeError, ValueError):
pass

# Parse conversation data into messages
if conversation_data and isinstance(conversation_data, list):
for msg_data in conversation_data:
if isinstance(msg_data, dict) and "role" in msg_data:
role = msg_data.get("role")
if role is None:
continue
content = msg_data.get("content", "")

# Handle tool calls in assistant messages
tool_calls = None
if role == "assistant" and "tool_calls" in msg_data:
tool_calls = msg_data["tool_calls"]

# Handle tool responses
name = None
if role == "tool":
name = msg_data.get("name")

messages.append(Message(role=role, content=content, name=name, tool_calls=tool_calls))

return messages if messages else None

except Exception as e:
logger.debug("Error extracting conversation from output: %s", e)
return None

def _create_input_metadata(self, trace: Any, observations: List[Any]) -> InputMetadata:
"""Create InputMetadata from trace and observations.

Expand Down Expand Up @@ -331,6 +434,20 @@ def _extract_ground_truth(self, trace: Any) -> Optional[str]:
Returns:
Ground truth string or None
"""
# First check trace input for evaluation test data structure
if hasattr(trace, "input") and trace.input:
if isinstance(trace.input, dict):
# Handle EP test format: kwargs.input_rows[0].ground_truth
kwargs = trace.input.get("kwargs", {})
if "input_rows" in kwargs:
input_rows = kwargs["input_rows"]
if input_rows and len(input_rows) > 0:
first_row = input_rows[0]
if isinstance(first_row, dict) and "ground_truth" in first_row:
ground_truth = first_row["ground_truth"]
if ground_truth: # Only return if not None/empty
return str(ground_truth)

# Check trace metadata for ground truth
if hasattr(trace, "metadata") and trace.metadata:
if isinstance(trace.metadata, dict):
Expand Down
Loading
Loading