From edcb570d0c4c8b5f400f93d7afea0746ba459c78 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 16 Sep 2025 01:35:23 -0700 Subject: [PATCH 1/2] Updated Langfuse --- eval_protocol/adapters/langfuse.py | 405 ++++++++++++++++++----------- 1 file changed, 259 insertions(+), 146 deletions(-) diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index d9dc0c66..b1620749 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -9,7 +9,7 @@ import random import time from datetime import datetime, timedelta -from typing import Any, Dict, Iterator, List, Optional, cast +from typing import Any, Callable, Dict, Iterator, List, Optional, cast from eval_protocol.models import EvaluationRow, InputMetadata, Message @@ -26,6 +26,219 @@ LANGFUSE_AVAILABLE = False +def convert_trace_to_evaluation_row( + trace: TraceWithFullDetails, include_tool_calls: bool = True, span_name: Optional[str] = None +) -> Optional[EvaluationRow]: + """Convert a Langfuse trace to EvaluationRow format. + + Args: + trace: Langfuse trace object + include_tool_calls: Whether to include tool calling information + span_name: If provided, extract messages from generations within this named span + converter: Optional custom function to convert trace to EvaluationRow + + Returns: + EvaluationRow or None if conversion fails + """ + try: + # Extract messages from trace input and output + messages = extract_messages_from_trace(trace, include_tool_calls, span_name) + + # Extract tools if available + tools = None + if include_tool_calls and isinstance(trace.input, dict) and "tools" in trace.input: + tools = trace.input["tools"] + + if not messages: + return None + + return EvaluationRow( + messages=messages, + tools=tools, + input_metadata=InputMetadata( + session_data={ + "langfuse_trace_id": trace.id, # Store the trace ID here + } + ), + ) + + except (AttributeError, ValueError, KeyError) as e: + logger.error("Error converting trace %s: %s", trace.id, e) + return None + + +def extract_messages_from_trace( + trace: TraceWithFullDetails, include_tool_calls: bool = True, span_name: Optional[str] = None +) -> List[Message]: + """Extract messages from Langfuse trace input and output. + + Args: + trace: Langfuse trace object + include_tool_calls: Whether to include tool calling information + span_name: If provided, extract messages from generations within this named span + + Returns: + List of Message objects + """ + messages = [] + + if span_name: # Look for a generation tied to a span name + try: + # Find the final generation in the named span + gen: ObservationsView | None = find_final_generation_in_span(trace, span_name) + if not gen: + return messages + + # Extract messages from generation input and output + if gen.input: + messages.extend(extract_messages_from_data(gen.input, include_tool_calls)) + if gen.output: + messages.extend(extract_messages_from_data(gen.output, include_tool_calls)) + + return messages + + except Exception as e: + logger.error("Failed to extract messages from span '%s' in trace %s: %s", span_name, trace.id, e) + return messages + + else: + try: + # Extract messages from trace input and output + if trace.input: + messages.extend(extract_messages_from_data(trace.input, include_tool_calls)) + if trace.output: + messages.extend(extract_messages_from_data(trace.output, include_tool_calls)) + except (AttributeError, ValueError, KeyError) as e: + logger.warning("Error processing trace %s: %s", trace.id, e) + + return messages + + +def extract_messages_from_data(data, include_tool_calls: bool) -> List[Message]: + """Extract messages from data (works for both input and output). + + Args: + data: Data from trace or generation (input or output) + include_tool_calls: Whether to include tool calling information + + Returns: + List of Message objects + """ + messages = [] + + if isinstance(data, dict): + if "messages" in data: + # OpenAI-style messages format + for msg in data["messages"]: + messages.append(dict_to_message(msg, include_tool_calls)) + elif "role" in data: + # Single message format + messages.append(dict_to_message(data, include_tool_calls)) + elif "prompt" in data: + # Simple prompt format + messages.append(Message(role="user", content=str(data["prompt"]))) + elif "content" in data: + # Simple content format + messages.append(Message(role="assistant", content=str(data["content"]))) + else: + # Fallback: treat as single message + messages.append(dict_to_message(data, include_tool_calls)) + elif isinstance(data, list): + # Direct list of message dicts + for msg in data: + if isinstance(msg, dict): + messages.append(dict_to_message(msg, include_tool_calls)) + elif isinstance(data, str): + # Simple string - role depends on context, default to user + messages.append(Message(role="user", content=data)) + + return messages + + +def dict_to_message(msg_dict: Dict[str, Any], include_tool_calls: bool = True) -> Message: + """Convert a dictionary to a Message object. + + Args: + msg_dict: Dictionary containing message data + include_tool_calls: Whether to include tool calling information + + Returns: + Message object + """ + # Extract basic message components + role = msg_dict.get("role", "assistant") + content = msg_dict.get("content") + name = msg_dict.get("name") + + # Handle tool calls if enabled + tool_calls = None + tool_call_id = None + function_call = None + + if include_tool_calls: + if "tool_calls" in msg_dict: + tool_calls = msg_dict["tool_calls"] + if "tool_call_id" in msg_dict: + tool_call_id = msg_dict["tool_call_id"] + if "function_call" in msg_dict: + function_call = msg_dict["function_call"] + + return Message( + role=role, + content=content, + name=name, + tool_call_id=tool_call_id, + tool_calls=tool_calls, + function_call=function_call, + ) + + +def find_final_generation_in_span(trace: TraceWithFullDetails, span_name: str) -> ObservationsView | None: + """Find the final generation within a named span that contains full message history. + + Args: + trace: Langfuse trace object + span_name: Name of the span to search for + + Returns: + The final generation object, or None if not found + """ + # Get all observations from the trace + all_observations = trace.observations + + # Find a span with the given name that has generation children + parent_span = None + for obs in all_observations: + if obs.name == span_name and obs.type == "SPAN": + # Check if this span has generation children + has_generations = any( + child.type == "GENERATION" and child.parent_observation_id == obs.id for child in all_observations + ) + if has_generations: + parent_span = obs + break + + if not parent_span: + logger.warning("No span named '%s' found in trace %s", span_name, trace.id) + return None + + # Find all generations within this span + generations: List[ObservationsView] = [] + for obs in all_observations: + if obs.type == "GENERATION" and obs.parent_observation_id == parent_span.id: + generations.append(obs) + + if not generations: + logger.warning("No generations found in span '%s' in trace %s", span_name, trace.id) + return None + + # Sort generations by start time for chronological order + generations.sort(key=lambda x: x.start_time) + + # Return the final generation (contains full message history) + return generations[-1] + + class LangfuseAdapter: """Adapter to pull data from Langfuse and convert to EvaluationRow format. @@ -61,31 +274,46 @@ def __init__(self): def get_evaluation_rows( self, limit: int = 100, - sample_size: int = 50, + sample_size: Optional[int] = None, tags: Optional[List[str]] = None, user_id: Optional[str] = None, session_id: Optional[str] = None, + name: Optional[str] = None, + environment: Optional[str] = None, + version: Optional[str] = None, + release: Optional[str] = None, + fields: Optional[str] = None, hours_back: Optional[int] = None, from_timestamp: Optional[datetime] = None, to_timestamp: Optional[datetime] = None, include_tool_calls: bool = True, sleep_between_gets: float = 2.5, max_retries: int = 3, + span_name: Optional[str] = None, + converter: Optional[Callable[[TraceWithFullDetails, bool, Optional[str]], Optional[EvaluationRow]]] = None, ) -> List[EvaluationRow]: """Pull traces from Langfuse and convert to EvaluationRow format. Args: limit: Max number of trace summaries to collect via pagination (pre-sampling) - sample_size: Number of traces to fetch full details for (sampled from collected summaries) + sample_size: Optional number of traces to randomly sample from collected summaries (if None, process all) tags: Filter by specific tags user_id: Filter by user ID session_id: Filter by session ID + name: Filter by trace name + environment: Filter by environment (e.g., production, staging, development) + version: Filter by trace version + release: Filter by trace release + fields: Comma-separated list of fields to include (e.g., 'core,scores,metrics') hours_back: Filter traces from this many hours ago from_timestamp: Explicit start time (overrides hours_back) to_timestamp: Explicit end time (overrides hours_back) include_tool_calls: Whether to include tool calling traces sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit) max_retries: Maximum retries for rate limit errors + span_name: If provided, extract messages from generations within this named span + converter: Optional custom function to convert trace to EvaluationRow. + If provided, this will be used instead of the default conversion logic. Returns: List[EvaluationRow]: Converted evaluation rows @@ -120,6 +348,11 @@ def get_evaluation_rows( tags=tags, user_id=user_id, session_id=session_id, + name=name, + environment=environment, + version=version, + release=release, + fields=fields, from_timestamp=from_timestamp, to_timestamp=to_timestamp, order_by="timestamp.desc", @@ -162,11 +395,14 @@ def get_evaluation_rows( logger.debug("No traces found") return eval_rows - # Randomly sample traces to fetch full details (respect rate limits) - actual_sample_size = min(sample_size, len(all_traces)) - selected_traces = random.sample(all_traces, actual_sample_size) - - logger.debug("Randomly selected %d traces from %d collected", actual_sample_size, len(all_traces)) + # Optionally sample traces to fetch full details (respect rate limits) + if sample_size is not None: + actual_sample_size = min(sample_size, len(all_traces)) + selected_traces = random.sample(all_traces, actual_sample_size) + logger.debug("Randomly selected %d traces from %d collected", actual_sample_size, len(all_traces)) + else: + selected_traces = all_traces + logger.debug("Processing all %d collected traces (no sampling)", len(all_traces)) # Process each selected trace with sleep and retry logic for trace_info in selected_traces: @@ -199,7 +435,10 @@ def get_evaluation_rows( if trace_full: try: - eval_row = self._convert_trace_to_evaluation_row(trace_full, include_tool_calls) + if converter: + eval_row = converter(trace_full, include_tool_calls, span_name) + else: + eval_row = convert_trace_to_evaluation_row(trace_full, include_tool_calls, span_name) if eval_row: eval_rows.append(eval_row) except (AttributeError, ValueError, KeyError) as e: @@ -215,21 +454,29 @@ def get_evaluation_rows_by_ids( self, trace_ids: List[str], include_tool_calls: bool = True, + span_name: Optional[str] = None, + converter: Optional[Callable[[TraceWithFullDetails, bool, Optional[str]], Optional[EvaluationRow]]] = None, ) -> List[EvaluationRow]: """Get specific traces by their IDs and convert to EvaluationRow format. Args: trace_ids: List of trace IDs to fetch include_tool_calls: Whether to include tool calling traces + span_name: If provided, extract messages from generations within this named span + converter: Optional custom function to convert trace to EvaluationRow. + If provided, this will be used instead of the default conversion logic. - Yields: - EvaluationRow: Converted evaluation rows + Returns: + List[EvaluationRow]: Converted evaluation rows """ eval_rows = [] for trace_id in trace_ids: try: trace: TraceWithFullDetails = self.client.api.trace.get(trace_id) - eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls) + if converter: + eval_row = converter(trace, include_tool_calls, span_name) + else: + eval_row = convert_trace_to_evaluation_row(trace, include_tool_calls, span_name) if eval_row: eval_rows.append(eval_row) except (AttributeError, ValueError, KeyError) as e: @@ -237,140 +484,6 @@ def get_evaluation_rows_by_ids( continue return eval_rows - def _convert_trace_to_evaluation_row( - self, trace: TraceWithFullDetails, include_tool_calls: bool = True - ) -> Optional[EvaluationRow]: - """Convert a Langfuse trace to EvaluationRow format. - - Args: - trace: Langfuse trace object - include_tool_calls: Whether to include tool calling information - - Returns: - EvaluationRow or None if conversion fails - """ - try: - # Extract messages from trace input and output - messages = self._extract_messages_from_trace(trace, include_tool_calls) - - # Extract tools if available - tools = None - if include_tool_calls and isinstance(trace.input, dict) and "tools" in trace.input: - tools = trace.input["tools"] - - if not messages: - return None - - return EvaluationRow( - messages=messages, - tools=tools, - input_metadata=InputMetadata( - session_data={ - "langfuse_trace_id": trace.id, # Store the trace ID here - } - ), - ) - - except (AttributeError, ValueError, KeyError) as e: - logger.error("Error converting trace %s: %s", trace.id, e) - return None - - def _extract_messages_from_trace( - self, trace: TraceWithFullDetails, include_tool_calls: bool = True - ) -> List[Message]: - """Extract messages from Langfuse trace input and output. - - Args: - trace: Langfuse trace object - include_tool_calls: Whether to include tool calling information - - Returns: - List of Message objects - """ - messages = [] - - try: - # Handle trace input - if hasattr(trace, "input") and trace.input: - if isinstance(trace.input, dict): - if "messages" in trace.input: - # OpenAI-style messages format - for msg in trace.input["messages"]: - messages.append(self._dict_to_message(msg, include_tool_calls)) - elif "role" in trace.input: - # Single message format - messages.append(self._dict_to_message(trace.input, include_tool_calls)) - elif "prompt" in trace.input: - # Simple prompt format - messages.append(Message(role="user", content=str(trace.input["prompt"]))) - elif isinstance(trace.input, list): - # Direct list of message dicts - for msg in trace.input: - messages.append(self._dict_to_message(msg, include_tool_calls)) - elif isinstance(trace.input, str): - # Simple string input - messages.append(Message(role="user", content=trace.input)) - - # Handle trace output - if hasattr(trace, "output") and trace.output: - if isinstance(trace.output, dict): - if "content" in trace.output: - messages.append(Message(role="assistant", content=str(trace.output["content"]))) - elif "message" in trace.output: - msg_dict = trace.output["message"] - messages.append(self._dict_to_message(msg_dict, include_tool_calls)) - else: - # Fallback: convert entire output to string - messages.append(Message(role="assistant", content=str(trace.output))) - elif isinstance(trace.output, list): - # Direct list of message dicts (same as input handling) - for msg in trace.output: - messages.append(self._dict_to_message(msg, include_tool_calls)) - elif isinstance(trace.output, str): - messages.append(Message(role="assistant", content=trace.output)) - - except (AttributeError, ValueError, KeyError) as e: - logger.warning("Error processing trace %s: %s", trace.id, e) - - return messages - - def _dict_to_message(self, msg_dict: Dict[str, Any], include_tool_calls: bool = True) -> Message: - """Convert a dictionary to a Message object. - - Args: - msg_dict: Dictionary containing message data - include_tool_calls: Whether to include tool calling information - - Returns: - Message object - """ - # Extract basic message components - role = msg_dict.get("role", "assistant") - content = msg_dict.get("content") - name = msg_dict.get("name") - - # Handle tool calls if enabled - tool_calls = None - tool_call_id = None - function_call = None - - if include_tool_calls: - if "tool_calls" in msg_dict: - tool_calls = msg_dict["tool_calls"] - if "tool_call_id" in msg_dict: - tool_call_id = msg_dict["tool_call_id"] - if "function_call" in msg_dict: - function_call = msg_dict["function_call"] - - return Message( - role=role, - content=content, - name=name, - tool_call_id=tool_call_id, - tool_calls=tool_calls, - function_call=function_call, - ) - def create_langfuse_adapter() -> LangfuseAdapter: """Factory function to create a Langfuse adapter.""" From 71c85ee6c509d86e1e6b20ed6fd5653c355e0c51 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Tue, 16 Sep 2025 10:27:27 -0700 Subject: [PATCH 2/2] add protocol --- eval_protocol/adapters/langfuse.py | 39 +++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index b1620749..e3f3144a 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -9,12 +9,39 @@ import random import time from datetime import datetime, timedelta -from typing import Any, Callable, Dict, Iterator, List, Optional, cast +from typing import Any, Dict, List, Optional, Protocol from eval_protocol.models import EvaluationRow, InputMetadata, Message logger = logging.getLogger(__name__) + +class TraceConverter(Protocol): + """Protocol for custom trace-to-EvaluationRow converter functions. + + A converter function should take a Langfuse trace along with processing + options and return an EvaluationRow or None to skip the trace. + """ + + def __call__( + self, + trace: "TraceWithFullDetails", + include_tool_calls: bool, + span_name: Optional[str], + ) -> Optional[EvaluationRow]: + """Convert a Langfuse trace to an EvaluationRow. + + Args: + trace: The Langfuse trace object to convert + include_tool_calls: Whether to include tool calling information + span_name: Optional span name to extract messages from + + Returns: + EvaluationRow or None if the trace should be skipped + """ + ... + + try: from langfuse import get_client # pyright: ignore[reportPrivateImportUsage] from langfuse.api.resources.trace.types.traces import Traces @@ -35,7 +62,7 @@ def convert_trace_to_evaluation_row( trace: Langfuse trace object include_tool_calls: Whether to include tool calling information span_name: If provided, extract messages from generations within this named span - converter: Optional custom function to convert trace to EvaluationRow + converter: Optional custom converter implementing TraceConverter protocol Returns: EvaluationRow or None if conversion fails @@ -290,7 +317,7 @@ def get_evaluation_rows( sleep_between_gets: float = 2.5, max_retries: int = 3, span_name: Optional[str] = None, - converter: Optional[Callable[[TraceWithFullDetails, bool, Optional[str]], Optional[EvaluationRow]]] = None, + converter: Optional[TraceConverter] = None, ) -> List[EvaluationRow]: """Pull traces from Langfuse and convert to EvaluationRow format. @@ -312,7 +339,7 @@ def get_evaluation_rows( sleep_between_gets: Sleep time between individual trace.get() calls (2.5s for 30 req/min limit) max_retries: Maximum retries for rate limit errors span_name: If provided, extract messages from generations within this named span - converter: Optional custom function to convert trace to EvaluationRow. + converter: Optional custom converter implementing TraceConverter protocol. If provided, this will be used instead of the default conversion logic. Returns: @@ -455,7 +482,7 @@ def get_evaluation_rows_by_ids( trace_ids: List[str], include_tool_calls: bool = True, span_name: Optional[str] = None, - converter: Optional[Callable[[TraceWithFullDetails, bool, Optional[str]], Optional[EvaluationRow]]] = None, + converter: Optional[TraceConverter] = None, ) -> List[EvaluationRow]: """Get specific traces by their IDs and convert to EvaluationRow format. @@ -463,7 +490,7 @@ def get_evaluation_rows_by_ids( trace_ids: List of trace IDs to fetch include_tool_calls: Whether to include tool calling traces span_name: If provided, extract messages from generations within this named span - converter: Optional custom function to convert trace to EvaluationRow. + converter: Optional custom converter implementing TraceConverter protocol. If provided, this will be used instead of the default conversion logic. Returns: