From b593fce3ab193bda1f8bea6061a54e59ae49d0aa Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Fri, 5 Sep 2025 14:20:02 -0700 Subject: [PATCH] langfuse example --- eval_protocol/adapters/langfuse.py | 139 +++++++++++- tests/chinook/langfuse/generate_traces.py | 211 ++++++++++++++++++ .../chinook/langfuse/test_langfuse_chinook.py | 142 ++++++++++++ tests/chinook/{ => pydantic}/agent.py | 5 + .../{ => pydantic}/test_pydantic_chinook.py | 2 +- 5 files changed, 487 insertions(+), 12 deletions(-) create mode 100644 tests/chinook/langfuse/generate_traces.py create mode 100644 tests/chinook/langfuse/test_langfuse_chinook.py rename tests/chinook/{ => pydantic}/agent.py (95%) rename tests/chinook/{ => pydantic}/test_pydantic_chinook.py (98%) diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index cc51e1b2..ede6d9fe 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) try: - from langfuse import Langfuse + from langfuse import Langfuse # pyright: ignore[reportPrivateImportUsage] LANGFUSE_AVAILABLE = True except ImportError: @@ -75,7 +75,7 @@ def get_evaluation_rows( from_timestamp: Optional[datetime] = None, to_timestamp: Optional[datetime] = None, include_tool_calls: bool = True, - ) -> Iterator[EvaluationRow]: + ) -> List[EvaluationRow]: """Pull traces from Langfuse and convert to EvaluationRow format. Args: @@ -90,8 +90,9 @@ def get_evaluation_rows( Yields: EvaluationRow: Converted evaluation rows """ - # Get traces from Langfuse - traces = self.client.get_traces( + # Get traces from Langfuse using new API + eval_rows = [] + traces = self.client.api.trace.list( limit=limit, tags=tags, user_id=user_id, @@ -104,16 +105,17 @@ def get_evaluation_rows( try: eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls) if eval_row: - yield eval_row + eval_rows.append(eval_row) except (AttributeError, ValueError, KeyError) as e: logger.warning("Failed to convert trace %s: %s", trace.id, e) continue + return eval_rows def get_evaluation_rows_by_ids( self, trace_ids: List[str], include_tool_calls: bool = True, - ) -> Iterator[EvaluationRow]: + ) -> List[EvaluationRow]: """Get specific traces by their IDs and convert to EvaluationRow format. Args: @@ -123,15 +125,17 @@ def get_evaluation_rows_by_ids( Yields: EvaluationRow: Converted evaluation rows """ + eval_rows = [] for trace_id in trace_ids: try: - trace = self.client.get_trace(trace_id) + trace = self.client.api.trace.get(trace_id) eval_row = self._convert_trace_to_evaluation_row(trace, include_tool_calls) if eval_row: - yield eval_row + eval_rows.append(eval_row) except (AttributeError, ValueError, KeyError) as e: logger.warning("Failed to fetch/convert trace %s: %s", trace_id, e) continue + return eval_rows def _convert_trace_to_evaluation_row(self, trace: Any, include_tool_calls: bool = True) -> Optional[EvaluationRow]: """Convert a Langfuse trace to EvaluationRow format. @@ -145,10 +149,29 @@ def _convert_trace_to_evaluation_row(self, trace: Any, include_tool_calls: bool """ try: # Get observations (generations, spans) from the trace - observations = self.client.get_observations(trace_id=trace.id).data + observations_response = self.client.api.observations.get_many(trace_id=trace.id, limit=100) + observations = ( + observations_response.data if hasattr(observations_response, "data") else list(observations_response) + ) - # Convert observations to messages - messages = self._extract_messages_from_observations(observations, include_tool_calls) + # Look for conversation history in trace output or observations + messages = [] + conversation_found = False + + # Look for complete conversation in observations + if not conversation_found: + for obs in observations: + # Check each observation's output for complete conversation array + if hasattr(obs, "output") and obs.output: + conversation = self._extract_conversation_from_output(obs.output) + if conversation: + messages = conversation + conversation_found = True + break + + # Fallback: try extracting from observations using old method + if not conversation_found: + messages = self._extract_messages_from_observations(observations, include_tool_calls) if not messages: return None @@ -266,6 +289,86 @@ def _dict_to_message(self, msg_dict: Dict[str, Any], include_tool_calls: bool = function_call=function_call, ) + def _extract_conversation_from_output(self, output: Any) -> Optional[List[Message]]: + """Extract conversation history from PydanticAI agent run output. + + This looks for the conversation format like: + [ + {"role": "user", "content": "..."}, + {"role": "assistant", "content": "...", "tool_calls": [...]}, + {"role": "tool", "content": "...", "name": "execute_sql"}, + ... + ] + + Args: + output: The output object to search for conversation history + + Returns: + List of Message objects or None if no conversation found + """ + messages = [] + + try: + # Handle different output formats + conversation_data = None + + if isinstance(output, list): + # Direct list of messages + conversation_data = output + elif isinstance(output, dict): + # Look for conversation in various nested formats + if "messages" in output: + conversation_data = output["messages"] + elif "conversation" in output: + conversation_data = output["conversation"] + elif "history" in output: + conversation_data = output["history"] + elif "agent_run" in output: # Handle nested conversation data PydanticAI style + agent_run = output["agent_run"] + if isinstance(agent_run, dict) and "messages" in agent_run: + conversation_data = agent_run["messages"] + elif len(output.keys()) == 1: + # Single key, check if its value is a list + single_key = list(output.keys())[0] + if isinstance(output[single_key], list): + conversation_data = output[single_key] + elif isinstance(output, str): + # Try to parse JSON string + import json + + try: + parsed = json.loads(output) + return self._extract_conversation_from_output(parsed) + except (json.JSONDecodeError, ValueError): + pass + + # Parse conversation data into messages + if conversation_data and isinstance(conversation_data, list): + for msg_data in conversation_data: + if isinstance(msg_data, dict) and "role" in msg_data: + role = msg_data.get("role") + if role is None: + continue + content = msg_data.get("content", "") + + # Handle tool calls in assistant messages + tool_calls = None + if role == "assistant" and "tool_calls" in msg_data: + tool_calls = msg_data["tool_calls"] + + # Handle tool responses + name = None + if role == "tool": + name = msg_data.get("name") + + messages.append(Message(role=role, content=content, name=name, tool_calls=tool_calls)) + + return messages if messages else None + + except Exception as e: + logger.debug("Error extracting conversation from output: %s", e) + return None + def _create_input_metadata(self, trace: Any, observations: List[Any]) -> InputMetadata: """Create InputMetadata from trace and observations. @@ -331,6 +434,20 @@ def _extract_ground_truth(self, trace: Any) -> Optional[str]: Returns: Ground truth string or None """ + # First check trace input for evaluation test data structure + if hasattr(trace, "input") and trace.input: + if isinstance(trace.input, dict): + # Handle EP test format: kwargs.input_rows[0].ground_truth + kwargs = trace.input.get("kwargs", {}) + if "input_rows" in kwargs: + input_rows = kwargs["input_rows"] + if input_rows and len(input_rows) > 0: + first_row = input_rows[0] + if isinstance(first_row, dict) and "ground_truth" in first_row: + ground_truth = first_row["ground_truth"] + if ground_truth: # Only return if not None/empty + return str(ground_truth) + # Check trace metadata for ground truth if hasattr(trace, "metadata") and trace.metadata: if isinstance(trace.metadata, dict): diff --git a/tests/chinook/langfuse/generate_traces.py b/tests/chinook/langfuse/generate_traces.py new file mode 100644 index 00000000..16e52dba --- /dev/null +++ b/tests/chinook/langfuse/generate_traces.py @@ -0,0 +1,211 @@ +import pytest +import os + +from eval_protocol.models import EvaluationRow, Message +from eval_protocol.pytest import evaluation_test + +from eval_protocol.pytest.default_pydantic_ai_rollout_processor import PydanticAgentRolloutProcessor +from tests.chinook.pydantic.agent import setup_agent + +from tests.chinook.dataset import collect_dataset + +try: + from langfuse import get_client, observe # pyright: ignore[reportPrivateImportUsage] + from pydantic_ai.agent import Agent + + LANGFUSE_AVAILABLE = True + langfuse_client = get_client() + + Agent.instrument_all() + +except ImportError: + LANGFUSE_AVAILABLE = False + langfuse_client = None + + def observe(*args, **kwargs): + def decorator(func): + return func + + return decorator if args and callable(args[0]) else decorator + + +LLM_JUDGE_PROMPT = ( + "Your job is to compare the response to the expected answer.\n" + "The response will be a narrative report of the query results.\n" + "If the response contains the same or well summarized information as the expected answer, return 1.0.\n" + "If the response does not contain the same information or is missing information, return 0.0." +) + + +@pytest.mark.skipif( + os.environ.get("CI") == "true", + reason="Only run this test locally (skipped in CI)", +) +@pytest.mark.asyncio +@observe() +@evaluation_test( + input_rows=[collect_dataset()[0:1]], + completion_params=[ + { + "model": { + "orchestrator_agent_model": { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + } + }, + ], + rollout_processor=PydanticAgentRolloutProcessor(), + rollout_processor_kwargs={"agent": setup_agent}, + mode="pointwise", +) +async def test_complex_query_0(row: EvaluationRow) -> EvaluationRow: + """ + Complex queries - PydanticAI automatically creates rich Langfuse traces. + """ + return row + + +@pytest.mark.skipif( + os.environ.get("CI") == "true", + reason="Only run this test locally (skipped in CI)", +) +@pytest.mark.asyncio +@observe() +@evaluation_test( + input_rows=[collect_dataset()[1:2]], + completion_params=[ + { + "model": { + "orchestrator_agent_model": { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + } + }, + ], + rollout_processor=PydanticAgentRolloutProcessor(), + rollout_processor_kwargs={"agent": setup_agent}, + mode="pointwise", +) +async def test_complex_query_1(row: EvaluationRow) -> EvaluationRow: + """ + Complex queries - PydanticAI automatically creates rich Langfuse traces. + """ + return row + + +@pytest.mark.skipif( + os.environ.get("CI") == "true", + reason="Only run this test locally (skipped in CI)", +) +@pytest.mark.asyncio +@observe() +@evaluation_test( + input_rows=[collect_dataset()[2:3]], + completion_params=[ + { + "model": { + "orchestrator_agent_model": { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + } + }, + ], + rollout_processor=PydanticAgentRolloutProcessor(), + rollout_processor_kwargs={"agent": setup_agent}, + mode="pointwise", +) +async def test_complex_query_2(row: EvaluationRow) -> EvaluationRow: + """ + Complex queries - PydanticAI automatically creates rich Langfuse traces. + """ + return row + + +@pytest.mark.skipif( + os.environ.get("CI") == "true", + reason="Only run this test locally (skipped in CI)", +) +@pytest.mark.asyncio +@observe() +@evaluation_test( + input_rows=[collect_dataset()[3:4]], + completion_params=[ + { + "model": { + "orchestrator_agent_model": { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + } + }, + ], + rollout_processor=PydanticAgentRolloutProcessor(), + rollout_processor_kwargs={"agent": setup_agent}, + mode="pointwise", +) +async def test_complex_query_3(row: EvaluationRow) -> EvaluationRow: + """ + Complex queries - PydanticAI automatically creates rich Langfuse traces. + """ + return row + + +@pytest.mark.skipif( + os.environ.get("CI") == "true", + reason="Only run this test locally (skipped in CI)", +) +@pytest.mark.asyncio +@observe() +@evaluation_test( + input_rows=[collect_dataset()[4:5]], + completion_params=[ + { + "model": { + "orchestrator_agent_model": { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + } + }, + ], + rollout_processor=PydanticAgentRolloutProcessor(), + rollout_processor_kwargs={"agent": setup_agent}, + mode="pointwise", +) +async def test_complex_query_4(row: EvaluationRow) -> EvaluationRow: + """ + Complex queries - PydanticAI automatically creates rich Langfuse traces. + """ + return row + + +@pytest.mark.skipif( + os.environ.get("CI") == "true", + reason="Only run this test locally (skipped in CI)", +) +@pytest.mark.asyncio +@observe() +@evaluation_test( + input_rows=[collect_dataset()[5:6]], + completion_params=[ + { + "model": { + "orchestrator_agent_model": { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + } + }, + ], + rollout_processor=PydanticAgentRolloutProcessor(), + rollout_processor_kwargs={"agent": setup_agent}, + mode="pointwise", +) +async def test_complex_query_5(row: EvaluationRow) -> EvaluationRow: + """ + Complex queries - PydanticAI automatically creates rich Langfuse traces. + """ + return row diff --git a/tests/chinook/langfuse/test_langfuse_chinook.py b/tests/chinook/langfuse/test_langfuse_chinook.py new file mode 100644 index 00000000..2aaf7f16 --- /dev/null +++ b/tests/chinook/langfuse/test_langfuse_chinook.py @@ -0,0 +1,142 @@ +""" +External Evaluation Pipeline: Pull Langfuse traces and evaluate them using EP framework. + +This script: +1. Pulls traces from Langfuse (created by generate_traces.py) +2. Uses the fixed LangfuseAdapter for proper conversation extraction +3. Evaluates them using the same LLM judge as test_pydantic_chinook.py +4. Uses NoOpRolloutProcessor since traces already exist +5. Pushes evaluation scores back to Langfuse +""" + +import os +from datetime import datetime, timedelta +from typing import List + +import pytest +from pydantic import BaseModel +from pydantic_ai import Agent +from pydantic_ai.models.openai import OpenAIModel + +from eval_protocol.models import EvaluateResult, EvaluationRow, Message, InputMetadata +from eval_protocol.pytest import evaluation_test, NoOpRolloutProcessor + +# Langfuse client setup +try: + from langfuse import get_client # pyright: ignore[reportPrivateImportUsage] + + LANGFUSE_AVAILABLE = True + langfuse = get_client() +except ImportError: + LANGFUSE_AVAILABLE = False + langfuse = None + +# Same LLM judge logic from test_pydantic_chinook.py +LLM_JUDGE_PROMPT = ( + "Your job is to compare the response to the expected answer.\n" + "The response will be a narrative report of the query results.\n" + "If the response contains the same or well summarized information as the expected answer, return 1.0.\n" + "If the response does not contain the same information or is missing information, return 0.0." +) + + +class Response(BaseModel): + score: float + reason: str + + +def fetch_langfuse_traces_as_evaluation_rows(hours_back: int = 168) -> List[EvaluationRow]: + try: + from eval_protocol.adapters.langfuse import create_langfuse_adapter + + adapter = create_langfuse_adapter( + public_key=os.getenv("LANGFUSE_PUBLIC_KEY"), # pyright: ignore[reportArgumentType] + secret_key=os.getenv("LANGFUSE_SECRET_KEY"), # pyright: ignore[reportArgumentType] + host=os.getenv("LANGFUSE_HOST", "https://cloud.langfuse.com"), + ) + + now = datetime.now() + from_timestamp = now - timedelta(hours=hours_back) + + return adapter.get_evaluation_rows( + limit=20, from_timestamp=from_timestamp, to_timestamp=now, include_tool_calls=True + ) + + except Exception as e: + print(f"❌ LangfuseAdapter failed: {e}") + return [] + + +@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip in CI") +@pytest.mark.asyncio +@evaluation_test( + input_rows=[fetch_langfuse_traces_as_evaluation_rows()], + rollout_processor=NoOpRolloutProcessor(), + mode="pointwise", +) +async def test_langfuse_evaluation(row: EvaluationRow) -> EvaluationRow: + """ + Pull the complex query traces from Langfuse and evaluate using logic from test_pydantic_chinook.py::test_complex_queries + + This test: + 1. Gets traces from Langfuse (via fixed LangfuseAdapter) + 2. Uses NoOpRolloutProcessor (traces already exist) + 3. Evaluates each trace using same LLM judge as PydanticAI test + 4. Pushes scores back to Langfuse + """ + # Same eval logic as PydanticAI example + last_assistant_message = row.last_assistant_message() + if last_assistant_message is None: + row.evaluation_result = EvaluateResult( + score=0.0, + reason="No assistant message found", + ) + elif not last_assistant_message.content: + row.evaluation_result = EvaluateResult( + score=0.0, + reason="No assistant message found", + ) + else: + model = OpenAIModel( + "accounts/fireworks/models/kimi-k2-instruct", + provider="fireworks", + ) + + class Response(BaseModel): + """ + A score between 0.0 and 1.0 indicating whether the response is correct. + """ + + score: float + + """ + A short explanation of why the response is correct or incorrect. + """ + reason: str + + comparison_agent = Agent( + model=model, + system_prompt=LLM_JUDGE_PROMPT, + output_type=Response, + output_retries=5, + ) + result = await comparison_agent.run( + f"Expected answer: {row.ground_truth}\nResponse: {last_assistant_message.content}" + ) + row.evaluation_result = EvaluateResult( + score=result.output.score, + reason=result.output.reason, + ) + + # Push score back to Langfuse + if langfuse and row.evaluation_result and row.input_metadata: + trace_id = row.input_metadata.dataset_info.get("trace_id") if row.input_metadata.dataset_info else None + if trace_id: + langfuse.create_score( + trace_id=trace_id, + name="ep_chinook_accuracy", + value=row.evaluation_result.score, + comment=row.evaluation_result.reason, + ) + + return row diff --git a/tests/chinook/agent.py b/tests/chinook/pydantic/agent.py similarity index 95% rename from tests/chinook/agent.py rename to tests/chinook/pydantic/agent.py index 7a8790bc..33bf6d58 100644 --- a/tests/chinook/agent.py +++ b/tests/chinook/pydantic/agent.py @@ -3,6 +3,10 @@ from pydantic_ai.models import Model from pydantic_ai.models.openai import OpenAIModel from pydantic_ai.exceptions import ModelRetry +import sys +import os + +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) from db import connect_database @@ -25,6 +29,7 @@ def setup_agent(orchestrator_agent_model: Model): agent = Agent( system_prompt=SYSTEM_PROMPT, model=orchestrator_agent_model, + instrument=True, ) @agent.tool(retries=5) diff --git a/tests/chinook/test_pydantic_chinook.py b/tests/chinook/pydantic/test_pydantic_chinook.py similarity index 98% rename from tests/chinook/test_pydantic_chinook.py rename to tests/chinook/pydantic/test_pydantic_chinook.py index 07e12478..0fc33277 100644 --- a/tests/chinook/test_pydantic_chinook.py +++ b/tests/chinook/pydantic/test_pydantic_chinook.py @@ -6,7 +6,7 @@ from eval_protocol.pytest import evaluation_test from eval_protocol.pytest.default_pydantic_ai_rollout_processor import PydanticAgentRolloutProcessor -from tests.chinook.agent import setup_agent +from tests.chinook.pydantic.agent import setup_agent import os from pydantic_ai.models.openai import OpenAIModel