From 80b39ad1c9c803b4544d9aa7df58d919820c430d Mon Sep 17 00:00:00 2001 From: "Yufei (Benny) Chen" <1585539+benjibc@users.noreply.github.com> Date: Wed, 17 Sep 2025 11:02:47 -0700 Subject: [PATCH] Ensure LangSmith traces include tool runs --- eval_protocol/adapters/langfuse.py | 8 +- eval_protocol/adapters/langsmith.py | 11 +- examples/langsmith/README.md | 24 -- examples/langsmith/dump_traces_langsmith.py | 115 ------ examples/langsmith/emit_tool_calls.py | 116 ------ .../langsmith/llm_judge_from_langsmith.py | 168 -------- tests/chinook/langsmith/generate_traces.py | 366 ++++++++++++++++++ .../langsmith/test_langsmith_chinook.py | 178 +++++++++ 8 files changed, 555 insertions(+), 431 deletions(-) delete mode 100644 examples/langsmith/README.md delete mode 100644 examples/langsmith/dump_traces_langsmith.py delete mode 100644 examples/langsmith/emit_tool_calls.py delete mode 100644 examples/langsmith/llm_judge_from_langsmith.py create mode 100644 tests/chinook/langsmith/generate_traces.py create mode 100644 tests/chinook/langsmith/test_langsmith_chinook.py diff --git a/eval_protocol/adapters/langfuse.py b/eval_protocol/adapters/langfuse.py index f02de2f7..9b630178 100644 --- a/eval_protocol/adapters/langfuse.py +++ b/eval_protocol/adapters/langfuse.py @@ -5,13 +5,13 @@ """ from __future__ import annotations - import logging import random import time from datetime import datetime, timedelta -from typing import Any, Dict, List, Optional, Protocol, TYPE_CHECKING +from typing import Any, Dict, List, Optional, Protocol, TYPE_CHECKING, cast +from langfuse.api.resources.commons.types.observations_view import ObservationsView from eval_protocol.models import EvaluationRow, InputMetadata, Message from .base import BaseAdapter from .utils import extract_messages_from_data @@ -232,12 +232,12 @@ class LangfuseAdapter(BaseAdapter): ... )) """ - def __init__(self): + def __init__(self, client: Optional[Any] = None): """Initialize the Langfuse adapter.""" if not LANGFUSE_AVAILABLE: raise ImportError("Langfuse not installed. Install with: pip install 'eval-protocol[langfuse]'") - self.client = get_client() + self.client = client or cast(Any, get_client)() def get_evaluation_rows( self, diff --git a/eval_protocol/adapters/langsmith.py b/eval_protocol/adapters/langsmith.py index 79b23ea7..88363292 100644 --- a/eval_protocol/adapters/langsmith.py +++ b/eval_protocol/adapters/langsmith.py @@ -10,7 +10,7 @@ from __future__ import annotations import logging -from typing import Any, Dict, List, Optional, Iterable +from typing import Any, Dict, List, Optional, Iterable, cast from eval_protocol.models import EvaluationRow, InputMetadata, Message from .base import BaseAdapter @@ -23,6 +23,7 @@ LANGSMITH_AVAILABLE = True except ImportError: LANGSMITH_AVAILABLE = False + Client = None # type: ignore[misc] class LangSmithAdapter(BaseAdapter): @@ -38,9 +39,11 @@ class LangSmithAdapter(BaseAdapter): def __init__(self, client: Optional[Any] = None) -> None: if not LANGSMITH_AVAILABLE: raise ImportError("LangSmith not installed. Install with: pip install 'eval-protocol[langsmith]'") - # Client is provided by langsmith package; typing is relaxed to Any to avoid - # static analysis issues when stubs aren't available. - self.client = client or Client() # type: ignore[reportCallIssue] + if client is not None: + self.client = client + else: + assert Client is not None + self.client = cast(Any, Client)() def get_evaluation_rows( self, diff --git a/examples/langsmith/README.md b/examples/langsmith/README.md deleted file mode 100644 index 079cd874..00000000 --- a/examples/langsmith/README.md +++ /dev/null @@ -1,24 +0,0 @@ -# LangSmith Bootstrap Scripts - -These scripts are ONLY for dumping synthetic traces into LangSmith to exercise the adapter and quickstart examples. - -- `dump_traces_langsmith.py`: emits simple @traceable runs and an optional mini LangGraph echo flow. -- `emit_tool_calls.py`: emits runs that include assistant tool calls and a tool response message. - -Usage: -1) Set your API key: - -```bash -export LANGSMITH_API_KEY=... -export LANGSMITH_TRACING=true -export LS_PROJECT=ep-langgraph-examples -``` - -2) Run emitters: - -```bash -python examples/langsmith/dump_traces_langsmith.py -python examples/langsmith/emit_tool_calls.py -``` - -These are not production examples; they exist to seed LangSmith with traces that the adapter can consume. diff --git a/examples/langsmith/dump_traces_langsmith.py b/examples/langsmith/dump_traces_langsmith.py deleted file mode 100644 index 68bca4f6..00000000 --- a/examples/langsmith/dump_traces_langsmith.py +++ /dev/null @@ -1,115 +0,0 @@ -"""Quick script to send a few throwaway traces to LangSmith. - -Usage: - export LANGSMITH_API_KEY=... # required - export LANGSMITH_TRACING=true # recommended - python python-sdk/examples/langsmith/dump_traces_langsmith.py - -Notes: -- This does not require any external model keys. It logs a few synthetic - traced function calls, and optionally a tiny LangGraph flow if available. -""" - -import asyncio -import os -from typing import Any, Dict, List -import importlib - - -def _ensure_env_defaults() -> None: - # Prefer modern env vars; fall back maintained for compatibility. - if os.environ.get("LANGSMITH_TRACING") is None: - os.environ["LANGSMITH_TRACING"] = "true" - # Project name helps organize traces in the LangSmith UI - os.environ.setdefault("LANGCHAIN_PROJECT", "ep-langgraph-examples") - - -def _log_synthetic_traces() -> None: - traceable = None - try: - mod = importlib.import_module("langsmith") - traceable = getattr(mod, "traceable", None) - except ImportError: - pass - if traceable is None: - print("LangSmith not installed; skipping @traceable demo. `pip install langsmith`.") - return - - @traceable(name="toy_pipeline") - def toy_pipeline(user_input: str) -> Dict[str, Any]: - reversed_text = user_input[::-1] - upper_text = reversed_text.upper() - return {"result": upper_text, "len": len(upper_text)} - - print("Emitting synthetic traces via @traceable...") - toy_pipeline("hello langsmith") - toy_pipeline("trace number two") - toy_pipeline("final short run") - - -async def _maybe_run_tiny_langgraph() -> None: - """Optionally run a tiny LangGraph flow to log a couple of runs. - - This avoids any external LLM providers by using a pure-Python node. - """ - try: - graph_mod = importlib.import_module("langgraph.graph") - msg_mod = importlib.import_module("langgraph.graph.message") - lc_msgs = importlib.import_module("langchain_core.messages") - te_mod = importlib.import_module("typing_extensions") - except ImportError: - print("LangGraph/LangChain not installed; skipping tiny graph demo. `pip install langgraph langchain-core`.") - return - - END = getattr(graph_mod, "END") - StateGraph = getattr(graph_mod, "StateGraph") - add_messages = getattr(msg_mod, "add_messages") - AIMessage = getattr(lc_msgs, "AIMessage") - BaseMessage = getattr(lc_msgs, "BaseMessage") - HumanMessage = getattr(lc_msgs, "HumanMessage") - Annotated = getattr(te_mod, "Annotated") - TypedDict = getattr(te_mod, "TypedDict") - - class State(TypedDict): # type: ignore[misc] - messages: Annotated[List[BaseMessage], add_messages] # type: ignore[index] - - async def echo_node(state: State, **_: Any) -> Dict[str, Any]: - messages: List[BaseMessage] = state.get("messages", []) - last_user = next((m for m in reversed(messages) if isinstance(m, HumanMessage)), None) - content = getattr(last_user, "content", "") - reply = AIMessage(content=f"Echo: {content}") - return {"messages": [reply]} - - graph = StateGraph(State) - graph.add_node("echo", echo_node) - graph.set_entry_point("echo") - graph.add_edge("echo", END) - app = graph.compile() - - print("Emitting a couple LangGraph runs...") - await app.ainvoke({"messages": [HumanMessage(content="hi there")]}) - await app.ainvoke({"messages": [HumanMessage(content="how are you?")]}) - - -def main() -> None: - _ensure_env_defaults() - - if not os.getenv("LANGSMITH_API_KEY") and not os.getenv("LANGCHAIN_API_KEY"): - print("Missing LangSmith API key. Set LANGSMITH_API_KEY (or LANGCHAIN_API_KEY) and rerun.") - return - - _log_synthetic_traces() - - try: - asyncio.run(_maybe_run_tiny_langgraph()) - except RuntimeError: - # Fallback for event loop already running (e.g. in notebooks) - loop = asyncio.get_event_loop() - loop.create_task(_maybe_run_tiny_langgraph()) - loop.run_until_complete(asyncio.sleep(0.1)) - - print("Done. Visit LangSmith to see your new traces.") - - -if __name__ == "__main__": - main() diff --git a/examples/langsmith/emit_tool_calls.py b/examples/langsmith/emit_tool_calls.py deleted file mode 100644 index 5cc474dc..00000000 --- a/examples/langsmith/emit_tool_calls.py +++ /dev/null @@ -1,116 +0,0 @@ -"""Emit a few tool-call traces into LangSmith for adapter testing. - -Requirements: - export LANGSMITH_API_KEY=... - optional: export LANGCHAIN_PROJECT=ep-langgraph-examples (or set --project) - -Run: - python python-sdk/examples/langsmith/emit_tool_calls.py -""" - -import os -from typing import Any, Dict, List - - -def make_messages_with_tool_call(user_text: str) -> Dict[str, Any]: - """Return inputs/outputs shaped like LangChain messages with tool calls.""" - inputs = { - "messages": [ - { - "role": "user", - "content": user_text, - "type": "human", - } - ] - } - # Assistant proposes a tool call (function) - assistant_with_tool = { - "role": "assistant", - "content": "I'll call the calculator.", - "type": "ai", - "tool_calls": [ - { - "id": "call_1", - "type": "function", - "function": { - "name": "calculator.add", - "arguments": '{"a": 2, "b": 3}', - }, - } - ], - } - # Tool response message - tool_message = { - "role": "tool", - "name": "calculator.add", - "tool_call_id": "call_1", - "content": "5", - } - # Final assistant message - final_assistant = { - "role": "assistant", - "content": "The result is 5.", - "type": "ai", - } - outputs = { - "messages": [ - inputs["messages"][0], - assistant_with_tool, - tool_message, - final_assistant, - ] - } - return {"inputs": inputs, "outputs": outputs} - - -def main() -> None: - try: - from langsmith import Client # type: ignore - except Exception as e: - print(f"Missing langsmith dependency: {e}") - return - - project = os.getenv("LANGCHAIN_PROJECT", os.getenv("LS_PROJECT", "ep-langgraph-examples")) - client = Client() - - samples: List[str] = [ - "Add 2 and 3", - "Compute 7 + 11", - "Sum 10 and 25", - ] - - for i, text in enumerate(samples, start=1): - payload = make_messages_with_tool_call(text) - name = f"tool-demo-{i}" - # Create a chain run as container - client.create_run(name=name, inputs=payload["inputs"], run_type="chain", project_name=project) - # Log an llm child run carrying the assistant/tool messages as outputs - client.create_run( - name=f"{name}-llm", - inputs=payload["inputs"], - run_type="llm", - project_name=project, - ) - # Finalize by writing one more chain run with the aggregated outputs - client.create_run( - name=f"{name}-final", - inputs=payload["inputs"], - run_type="chain", - project_name=project, - ) - # Note: For simplicity, we attach outputs only on the final chain run - # using update_run is possible, but create_run keeps the example lightweight - # and the adapter reads from root runs' inputs/outputs or messages arrays. - # Many LangSmith clients attach outputs via end_run; here we keep it minimal. - try: - # If available, end_run to attach outputs on the final run - client.end_run(outputs=payload["outputs"]) # type: ignore[arg-type] - except Exception: - # Fallback: best-effort; runs may still be visible with inputs and llm child - pass - - print(f"Emitted {len(samples)} tool-call demo traces to project '{project}'.") - - -if __name__ == "__main__": - main() diff --git a/examples/langsmith/llm_judge_from_langsmith.py b/examples/langsmith/llm_judge_from_langsmith.py deleted file mode 100644 index e17f40f0..00000000 --- a/examples/langsmith/llm_judge_from_langsmith.py +++ /dev/null @@ -1,168 +0,0 @@ -"""Run a quick LLM-as-judge evaluation using LangSmith datasets and evaluators. - -This mirrors our Langfuse example: we define a tiny dataset, a trivial target, -and run a rubric-based LLM judge via LangSmith's evaluation API. - -Requirements: - pip install -U langsmith langchain-openai - -Env Vars: - export LANGSMITH_API_KEY=... # required - export OPENAI_API_KEY=... # optional; if absent uses heuristic judge - export LANGSMITH_TRACING=true # optional, to record runs - -Run: - python python-sdk/examples/langsmith/llm_judge_from_langsmith.py -""" - -from __future__ import annotations - -import os -from typing import Any, Dict -import importlib - - -def _ensure_env() -> None: - os.environ.setdefault("LANGCHAIN_PROJECT", "ep-langgraph-examples") - # Enable tracing so target runs + evaluator runs are visible in the UI - os.environ.setdefault("LANGSMITH_TRACING", "true") - - -def main() -> None: - _ensure_env() - - if not os.getenv("LANGSMITH_API_KEY") and not os.getenv("LANGCHAIN_API_KEY"): - raise SystemExit("Please set LANGSMITH_API_KEY (or LANGCHAIN_API_KEY).") - use_openai = bool(os.getenv("OPENAI_API_KEY")) - - # Import here to allow the script to print clearer errors if deps are missing. - try: - ls = importlib.import_module("langsmith") - eval_mod = importlib.import_module("langsmith.evaluation") - except ImportError as e: - raise SystemExit("Missing dependency. Please `pip install -U langsmith`. ") from e - - Client = getattr(ls, "Client") - evaluate = getattr(eval_mod, "evaluate") - - client = Client() - - dataset_name = "ep_langsmith_demo_ds" - # Create or get dataset - try: - dataset = client.create_dataset(dataset_name, description="EP demo dataset for LLM-as-judge") - except Exception: - dataset = client.read_dataset(dataset_name=dataset_name) - - # Seed examples (idempotent-ish: try to insert; duplicates are okay for demo) - examples = [ - ({"prompt": "Say hello to Bob."}, {"answer": "Hello Bob!"}), - ({"prompt": "What is 2+2?"}, {"answer": "4"}), - ( - {"prompt": "Respond with a haiku about spring."}, - {"answer": "Gentle rains arrive\nBuds whisper to warming winds\nEarth breathes life anew"}, - ), - ] - for inputs, outputs in examples: - try: - client.create_example(inputs=inputs, outputs=outputs, dataset_id=dataset.id) - except Exception: - # Ignore duplicate errors in throwaway demo - pass - - # Define the target function: pretend model that returns uppercase - def target_func(example_inputs: Dict[str, Any]) -> Dict[str, Any]: - text = example_inputs.get("prompt", "") - return {"answer": str(text).upper()} - - # Define an evaluator that either uses OpenAI (if available) or a heuristic fallback - import json - import re - from typing import cast - - def _normalize_text(text: str) -> str: - return re.sub(r"\s+", " ", text.strip().lower()) - - def heuristic_score(pred: str, ref: str) -> float: - if not ref: - return 0.0 if not pred else 0.5 - p = _normalize_text(pred) - r = _normalize_text(ref) - if p == r: - return 1.0 - if r in p: - return 0.8 - return 0.0 - - def llm_as_judge(run, example): # type: ignore[no-untyped-def] - # Extract strings - pred = "" - try: - out = run.outputs or {} - pred = cast(str, out.get("answer") or out.get("output") or "") - except Exception: - pred = "" - ref = "" - try: - ex_out = example.outputs or {} - ref = cast(str, ex_out.get("answer") or ex_out.get("output") or "") - except Exception: - ref = "" - - if not use_openai: - score = heuristic_score(pred, ref) - return {"key": "llm_judge", "score": float(score), "comment": "heuristic"} - - try: - from langchain_openai import ChatOpenAI # type: ignore - except Exception: - score = heuristic_score(pred, ref) - return {"key": "llm_judge", "score": float(score), "comment": "heuristic (no openai)"} - - llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.0) - system = ( - "You are an impartial grader. Compare the candidate answer to the reference answer. " - "Return a JSON object with fields 'score' (float 0.0-1.0) and 'reason' (short string). " - "Award 1.0 for semantic equivalence, 0.8 for close paraphrase, else 0.0." - ) - user = json.dumps({"reference": ref, "candidate": pred}) - try: - resp = llm.invoke([{"role": "system", "content": system}, {"role": "user", "content": user}]) - content = getattr(resp, "content", "") - data = {} - try: - if isinstance(content, str): - data = json.loads(content) - else: - # langchain message content may be a list of dicts - data = json.loads(content[0].get("text", "{}")) # type: ignore[index] - except Exception: - data = {"score": heuristic_score(pred, ref), "reason": "fallback parse"} - score = float(max(0.0, min(1.0, float(data.get("score", 0.0))))) - reason = str(data.get("reason", ""))[:500] - return {"key": "llm_judge", "score": score, "comment": reason} - except Exception as e: - score = heuristic_score(pred, ref) - return {"key": "llm_judge", "score": float(score), "comment": f"heuristic (error: {e})"} - - print("Running evaluation... this will create an experiment in LangSmith.") - results = evaluate( - target_func, - data=dataset_name, - evaluators=[llm_as_judge], - experiment_prefix="ep-llm-judge-demo", - max_concurrency=4, - metadata={"source": "examples/langsmith"}, - ) - - print("Experiment URL:") - try: - print(results.get("url")) # type: ignore[reportUnknownMemberType] - except Exception: - pass - - print("Done. Visit LangSmith to review scores and details.") - - -if __name__ == "__main__": - main() diff --git a/tests/chinook/langsmith/generate_traces.py b/tests/chinook/langsmith/generate_traces.py new file mode 100644 index 00000000..81a24053 --- /dev/null +++ b/tests/chinook/langsmith/generate_traces.py @@ -0,0 +1,366 @@ +"""Generate synthetic Chinook traces and send them to LangSmith. + +This module mirrors the Braintrust and Langfuse generators by: +- Reusing the Chinook dataset for inputs/ground truth +- Emitting assistant/tool conversations so the LangSmith adapter has + realistic transcripts to parse +- Storing identifiers so follow-up evaluations can map runs back to + their dataset rows + +Run with pytest to create traces locally (skipped in CI): + LANGSMITH_API_KEY=... pytest tests/chinook/langsmith/generate_traces.py +""" + +import json +import os +import uuid +from datetime import datetime, timedelta, timezone +from typing import List + +import pytest + +from eval_protocol.models import EvaluationRow, InputMetadata, Message +from eval_protocol.pytest import NoOpRolloutProcessor, evaluation_test + +from tests.chinook.dataset import collect_dataset + +try: # Optional dependency: LangSmith client for logging traces + from langsmith import Client, RunTree # type: ignore + + LANGSMITH_INSTALLED = True +except ImportError: # pragma: no cover - handled gracefully at runtime + LANGSMITH_INSTALLED = False + Client = None # type: ignore + + +PROJECT_NAME = os.getenv("LANGCHAIN_PROJECT") or os.getenv("LS_PROJECT") or "ep-chinook-langsmith" +TRACE_TAGS = ["chinook_sql"] + +langsmith_client = None +if LANGSMITH_INSTALLED: + os.environ.setdefault("LANGSMITH_TRACING", "true") + os.environ.setdefault("LANGCHAIN_PROJECT", PROJECT_NAME) + try: + langsmith_client = Client() + except Exception as exc: # pragma: no cover - network/auth issues surfaced to caller + print(f"⚠️ LangSmith client unavailable: {exc}") + langsmith_client = None + +dataset_rows = collect_dataset() + +pytestmark = pytest.mark.skipif(os.environ.get("CI") == "true", reason="Only run this generator locally") + + +def _synthetic_tool_schema() -> List[dict]: + """Return a lightweight tool schema describing the Chinook SQL executor.""" + + return [ + { + "type": "function", + "function": { + "name": "chinook.sql_query", + "description": "Execute a SQL statement against the Chinook dataset", + "parameters": { + "type": "object", + "properties": { + "sql": {"type": "string", "description": "The SQL query to run"}, + "notes": {"type": "string", "description": "Optional execution context"}, + }, + "required": ["sql"], + }, + }, + } + ] + + +def _populate_conversation(row: EvaluationRow, index: int) -> None: + """Populate the row with a synthetic tool-call conversation.""" + + question = row.messages[0].content if row.messages else f"Chinook task {index + 1}" + task_id = f"chinook_task_{index + 1}" + tool_call_id = f"chinook_sql_call_{index + 1}" + + row.tools = _synthetic_tool_schema() + row.input_metadata = InputMetadata(dataset_info={"task_id": task_id}) + + row.messages = [ + Message(role="user", content=question, name=task_id), + Message( + role="assistant", + content="Let me query the Chinook database to gather those results.", + tool_calls=[ + { + "id": tool_call_id, + "type": "function", + "function": { + "name": "chinook.sql_query", + "arguments": json.dumps( + { + "sql": f"-- Placeholder SQL for {task_id}", + "notes": "Synthetic LangSmith trace to exercise the adapter", + } + ), + }, + } + ], + ), + Message( + role="tool", + name="chinook.sql_query", + tool_call_id=tool_call_id, + content=row.ground_truth or "No matching rows were returned.", + ), + Message( + role="assistant", + content=( + row.ground_truth + if row.ground_truth + else "I could not find matching information in the Chinook catalog." + ), + ), + ] + + +def _log_to_langsmith(row: EvaluationRow, index: int) -> None: + """Send the populated conversation to LangSmith, if possible.""" + + if langsmith_client is None: + if LANGSMITH_INSTALLED: + print("⚠️ Skipping LangSmith logging because the client could not be initialised.") + else: + print("⚠️ LangSmith package not installed; install `eval-protocol[langsmith]` to log traces.") + return + + try: + conversation = [message.model_dump(exclude_none=True) for message in row.messages] + question = next((m.content for m in row.messages if m.role == "user"), "") + conversation_inputs = conversation[:-1] if len(conversation) > 1 else conversation + final_assistant = next( + (m for m in reversed(row.messages) if m.role == "assistant" and m.content), + None, + ) + final_answer = ( + final_assistant.content + if final_assistant and final_assistant.content + else row.ground_truth + if row.ground_truth + else "" + ) + + task_id = None + if row.input_metadata and row.input_metadata.dataset_info: + task_id = row.input_metadata.dataset_info.get("task_id") + if task_id is None: + task_id = next( + (m.name for m in row.messages if m.role == "user" and m.name), + None, + ) + + structured_inputs = { + "question": question, + "ground_truth": row.ground_truth, + "task_id": task_id, + "messages": conversation_inputs, + } + if row.tools: + structured_inputs["tools"] = row.tools + + now = datetime.now(timezone.utc) + run_id = uuid.uuid4() + run_tree = RunTree( + id=run_id, + trace_id=run_id, + name=f"chinook-sql-trace-{index + 1}", + run_type="chain", + project_name=PROJECT_NAME, + inputs=structured_inputs, + outputs={"output": final_answer}, + tags=list(TRACE_TAGS), + start_time=now, + end_time=now, + extra={ + "metadata": { + "task_id": task_id, + "logged_at": now.isoformat(), + "source": "eval-protocol chinook synthetic trace", + } + }, + ls_client=langsmith_client, + ) + + for offset, message in enumerate(row.messages): + event_time = now + timedelta(milliseconds=offset) + run_tree.add_event( + { + "name": f"{message.role}_message", + "time": event_time.isoformat(), + "kwargs": {"message": message.model_dump(exclude_none=True)}, + } + ) + + assistant_with_tools = next( + (m for m in row.messages if m.role == "assistant" and m.tool_calls), + None, + ) + if assistant_with_tools: + for call_index, tool_call in enumerate(assistant_with_tools.tool_calls or []): + tool_call_dict = ( + tool_call.model_dump(exclude_none=True) if hasattr(tool_call, "model_dump") else tool_call + ) + if not isinstance(tool_call_dict, dict): + tool_call_dict = {"raw_tool_call": tool_call_dict} + + function_call = tool_call_dict.get("function", {}) + if not isinstance(function_call, dict): + function_call = { + "name": function_call, + } + arguments = function_call.get("arguments") + parsed_arguments = {} + if isinstance(arguments, str): + try: + parsed_arguments = json.loads(arguments) + except json.JSONDecodeError: + parsed_arguments = {"raw_arguments": arguments} + elif isinstance(arguments, dict): + parsed_arguments = arguments + + call_time = now + timedelta(milliseconds=100 + call_index) + tool_run = run_tree.create_child( + name=function_call.get("name", tool_call_dict.get("id", "chinook.sql_query")), + run_type="tool", + inputs=parsed_arguments, + outputs={"output": row.ground_truth if row.ground_truth else "No matching rows were returned."}, + tags=list(TRACE_TAGS), + start_time=call_time, + end_time=call_time + timedelta(milliseconds=1), + ) + metadata = tool_run.extra.setdefault("metadata", {}) + metadata.update( + { + "tool_call_id": tool_call_dict.get("id"), + "task_id": task_id, + "call_index": call_index, + } + ) + + run_tree.end_time = datetime.now(timezone.utc) + run_tree.post(exclude_child_runs=False) + + if row.input_metadata is None: + row.input_metadata = InputMetadata(dataset_info={}) + if row.input_metadata.dataset_info is None: + row.input_metadata.dataset_info = {} + if task_id: + row.input_metadata.dataset_info.setdefault("task_id", task_id) + session_data = row.input_metadata.session_data or {} + session_data.setdefault("langsmith_run_id", str(run_id)) + session_data.setdefault("langsmith_trace_id", str(run_tree.trace_id)) + session_data.setdefault("langsmith_project", PROJECT_NAME) + row.input_metadata.session_data = session_data + except Exception as exc: # pragma: no cover - surfaces API/network failures to the user + print(f"❌ Failed to create LangSmith run: {exc}") + + +def _process_row(row: EvaluationRow, index: int) -> EvaluationRow: + """Populate, log, and return the evaluation row.""" + + _populate_conversation(row, index) + _log_to_langsmith(row, index) + return row + + +@pytest.mark.asyncio +@evaluation_test( + input_rows=[dataset_rows[0:1]], + completion_params=[ + { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + ], + rollout_processor=NoOpRolloutProcessor(), + mode="pointwise", +) +async def test_generate_chinook_trace_0(row: EvaluationRow) -> EvaluationRow: + return _process_row(row, 0) + + +@pytest.mark.asyncio +@evaluation_test( + input_rows=[dataset_rows[1:2]], + completion_params=[ + { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + ], + rollout_processor=NoOpRolloutProcessor(), + mode="pointwise", +) +async def test_generate_chinook_trace_1(row: EvaluationRow) -> EvaluationRow: + return _process_row(row, 1) + + +@pytest.mark.asyncio +@evaluation_test( + input_rows=[dataset_rows[2:3]], + completion_params=[ + { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + ], + rollout_processor=NoOpRolloutProcessor(), + mode="pointwise", +) +async def test_generate_chinook_trace_2(row: EvaluationRow) -> EvaluationRow: + return _process_row(row, 2) + + +@pytest.mark.asyncio +@evaluation_test( + input_rows=[dataset_rows[3:4]], + completion_params=[ + { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + ], + rollout_processor=NoOpRolloutProcessor(), + mode="pointwise", +) +async def test_generate_chinook_trace_3(row: EvaluationRow) -> EvaluationRow: + return _process_row(row, 3) + + +@pytest.mark.asyncio +@evaluation_test( + input_rows=[dataset_rows[4:5]], + completion_params=[ + { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + ], + rollout_processor=NoOpRolloutProcessor(), + mode="pointwise", +) +async def test_generate_chinook_trace_4(row: EvaluationRow) -> EvaluationRow: + return _process_row(row, 4) + + +@pytest.mark.asyncio +@evaluation_test( + input_rows=[dataset_rows[5:6]], + completion_params=[ + { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + } + ], + rollout_processor=NoOpRolloutProcessor(), + mode="pointwise", +) +async def test_generate_chinook_trace_5(row: EvaluationRow) -> EvaluationRow: + return _process_row(row, 5) diff --git a/tests/chinook/langsmith/test_langsmith_chinook.py b/tests/chinook/langsmith/test_langsmith_chinook.py new file mode 100644 index 00000000..a2f03dcb --- /dev/null +++ b/tests/chinook/langsmith/test_langsmith_chinook.py @@ -0,0 +1,178 @@ +"""Evaluate LangSmith traces generated from the Chinook dataset.""" + +import os +from typing import Dict, List, Optional + +import pytest +from pydantic import BaseModel +from pydantic_ai import Agent +from pydantic_ai.models.openai import OpenAIChatModel + +from eval_protocol.models import EvaluateResult, EvaluationRow, InputMetadata +from eval_protocol.pytest import NoOpRolloutProcessor, evaluation_test + +from tests.chinook.dataset import collect_dataset + +try: + from eval_protocol.adapters.langsmith import create_langsmith_adapter + + ADAPTER_AVAILABLE = True +except ImportError: # pragma: no cover - adapter extras not installed + ADAPTER_AVAILABLE = False + create_langsmith_adapter = None # type: ignore + +try: + from langsmith import Client # type: ignore + + LANGSMITH_CLIENT: Optional[Client] + try: + LANGSMITH_CLIENT = Client() + except Exception as exc: # pragma: no cover - surfaced to the caller + print(f"⚠️ LangSmith client unavailable: {exc}") + LANGSMITH_CLIENT = None +except ImportError: # pragma: no cover - optional dependency + LANGSMITH_CLIENT = None + +PROJECT_NAME = os.getenv("LANGCHAIN_PROJECT") or os.getenv("LS_PROJECT") or "ep-chinook-langsmith" +TRACE_TAGS = ["chinook_sql"] + +dataset_rows = collect_dataset() +TASK_ID_TO_GROUND_TRUTH: Dict[str, str] = { + f"chinook_task_{index + 1}": row.ground_truth for index, row in enumerate(dataset_rows) +} +QUESTION_TO_GROUND_TRUTH: Dict[str, str] = { + row.messages[0].content: row.ground_truth for row in dataset_rows if row.messages +} + +LLM_JUDGE_PROMPT = ( + "Your job is to compare the response to the expected answer.\n" + "The response will be a narrative report of the query results.\n" + "If the response contains the same or well summarized information as the expected answer, return 1.0.\n" + "If the response does not contain the same information or is missing information, return 0.0." +) + + +def _attach_ground_truth(row: EvaluationRow) -> None: + """Populate the row's ground truth using the stored task identifiers.""" + + dataset_id = None + for message in row.messages: + if message.role == "user" and message.name: + dataset_id = message.name + break + + ground_truth = None + if dataset_id: + ground_truth = TASK_ID_TO_GROUND_TRUTH.get(dataset_id) + + if ground_truth is None: + user_message = next((msg.content for msg in row.messages if msg.role == "user"), None) + if user_message: + ground_truth = QUESTION_TO_GROUND_TRUTH.get(user_message) + + if ground_truth is None: + ground_truth = "" + + row.ground_truth = ground_truth + + if row.input_metadata is None: + row.input_metadata = InputMetadata(session_data={}) + + if row.input_metadata.session_data is None: + row.input_metadata.session_data = {} + + if dataset_id: + row.input_metadata.session_data.setdefault("chinook_task_id", dataset_id) + + dataset_info = row.input_metadata.dataset_info or {} + if dataset_id: + dataset_info["task_id"] = dataset_id + row.input_metadata.dataset_info = dataset_info or None + + +def fetch_langsmith_traces(limit: int = 20) -> List[EvaluationRow]: + """Use the LangSmith adapter to convert traces into evaluation rows.""" + + if not ADAPTER_AVAILABLE or create_langsmith_adapter is None: + print("⚠️ LangSmith adapter unavailable - install `eval-protocol[langsmith]`.") + return [] + + try: + adapter = create_langsmith_adapter() + except Exception as exc: + print(f"❌ Failed to create LangSmithAdapter: {exc}") + return [] + + try: + rows = adapter.get_evaluation_rows( + project_name=PROJECT_NAME, + limit=limit, + include_tool_calls=True, + tags=TRACE_TAGS, + order_by="-created_at", + ) + except Exception as exc: + print(f"❌ LangSmithAdapter failed to pull rows: {exc}") + return [] + + for row in rows: + _attach_ground_truth(row) + + return rows + + +@pytest.mark.skipif(os.environ.get("CI") == "true", reason="Skip LangSmith adapter test in CI") +@pytest.mark.asyncio +@evaluation_test( + input_rows=[fetch_langsmith_traces()], + rollout_processor=NoOpRolloutProcessor(), + mode="pointwise", +) +async def test_langsmith_chinook(row: EvaluationRow) -> EvaluationRow: + """Evaluate LangSmith-sourced traces using the Chinook judge.""" + + assert row.tools, "Expected LangSmith traces to include available tool metadata" + assert any(message.tool_calls for message in row.messages if message.role == "assistant"), ( + "Expected at least one assistant message to include tool calls" + ) + + last_assistant_message = row.last_assistant_message() + if last_assistant_message is None or not last_assistant_message.content: + row.evaluation_result = EvaluateResult(score=0.0, reason="No assistant message found") + return row + + model = OpenAIChatModel("accounts/fireworks/models/kimi-k2-instruct", provider="fireworks") + + class JudgeResponse(BaseModel): + score: float + reason: str + + comparison_agent = Agent( + model=model, + system_prompt=LLM_JUDGE_PROMPT, + output_type=JudgeResponse, + output_retries=5, + ) + + result = await comparison_agent.run( + f"Expected answer: {row.ground_truth}\nResponse: {last_assistant_message.content}" + ) + row.evaluation_result = EvaluateResult( + score=result.output.score, + reason=result.output.reason, + ) + + if LANGSMITH_CLIENT and row.input_metadata and row.input_metadata.session_data: + run_id = row.input_metadata.session_data.get("langsmith_run_id") + if run_id: + try: + LANGSMITH_CLIENT.create_feedback( + run_id=run_id, + key="ep_chinook_accuracy", + score=row.evaluation_result.score, + comment=row.evaluation_result.reason, + ) + except Exception as exc: + print(f"⚠️ Failed to push LangSmith feedback: {exc}") + + return row