Skip to content

Commit 3e64efd

Browse files
committed
weave integration
1 parent 9196f83 commit 3e64efd

File tree

7 files changed

+947
-0
lines changed

7 files changed

+947
-0
lines changed

docs/developer_guide/tracing_integration_guide.mdx

Lines changed: 141 additions & 0 deletions
Large diffs are not rendered by default.

eval_protocol/adapters/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,10 @@
8787
__all__.extend(["LangSmithAdapter"])
8888
except ImportError:
8989
pass
90+
91+
try:
92+
from .weave import WeaveAdapter, create_weave_adapter
93+
94+
__all__.extend(["WeaveAdapter", "create_weave_adapter"])
95+
except ImportError:
96+
pass

eval_protocol/adapters/weave.py

Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
"""Weave adapter for Eval Protocol.
2+
3+
This adapter pulls traces from Weights & Biases Weave Service API and converts
4+
them to EvaluationRow format for use in evaluation pipelines.
5+
6+
References:
7+
- Guides: https://weave-docs.wandb.ai/guides/integrations/litellm/
8+
- Service API: https://weave-docs.wandb.ai/reference/gen_notebooks/weave_via_service_api/
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import logging
14+
import os
15+
from typing import Any, Dict, List, Optional, Protocol
16+
17+
import requests
18+
19+
from eval_protocol.models import EvaluationRow, InputMetadata, Message
20+
from .base import BaseAdapter
21+
from .utils import extract_messages_from_data
22+
23+
24+
logger = logging.getLogger(__name__)
25+
26+
27+
class TraceConverter(Protocol):
28+
"""Protocol for custom Weave trace-to-EvaluationRow converter functions.
29+
30+
A converter function should take a Weave trace dict along with processing
31+
options and return an EvaluationRow or None to skip the trace.
32+
"""
33+
34+
def __call__(
35+
self,
36+
trace: Dict[str, Any],
37+
include_tool_calls: bool,
38+
) -> Optional[EvaluationRow]:
39+
"""Convert a Weave trace to an EvaluationRow.
40+
41+
Args:
42+
trace: The Weave trace object (as returned by Service API) to convert
43+
include_tool_calls: Whether to include tool calling information
44+
45+
Returns:
46+
EvaluationRow or None if the trace should be skipped
47+
"""
48+
raise NotImplementedError
49+
50+
51+
def _extract_messages_from_trace(trace: Dict[str, Any], include_tool_calls: bool = True) -> List[Message]:
52+
"""Extract messages from Weave trace inputs/outputs.
53+
54+
Weave Service API typically returns a root call document with fields like:
55+
- id, project_id, op_name, inputs, output, summary, ...
56+
57+
We handle common payload shapes:
58+
- inputs: { messages: [...] } | { prompt } | list[dict] | str
59+
- output: { messages: [...] } | { content } | { result } | { choices: [{message: {...}}] } | list[dict] | str
60+
"""
61+
messages: List[Message] = []
62+
63+
try:
64+
inp = trace.get("inputs") or trace.get("input")
65+
out = trace.get("output") or trace.get("outputs")
66+
67+
if inp is not None:
68+
messages.extend(extract_messages_from_data(inp, include_tool_calls))
69+
70+
if out is not None:
71+
# Prefer explicit messages array (preserves tool_calls and tool role messages)
72+
if isinstance(out, dict) and isinstance(out.get("messages"), list):
73+
messages.extend(extract_messages_from_data({"messages": out["messages"]}, include_tool_calls))
74+
# Otherwise, support OpenAI-style choices for UI-rendered assistant content
75+
elif isinstance(out, dict) and isinstance(out.get("choices"), list) and out["choices"]:
76+
choice0 = out["choices"][0]
77+
msg_dict = None
78+
if isinstance(choice0, dict):
79+
msg_dict = choice0.get("message") or choice0.get("delta")
80+
if isinstance(msg_dict, dict):
81+
if "role" not in msg_dict:
82+
msg_dict = {**msg_dict, "role": "assistant"}
83+
messages.append(Message.model_validate(msg_dict))
84+
else:
85+
content = (
86+
choice0.get("message", {}).get("content") if isinstance(choice0, dict) else None
87+
)
88+
if content is not None:
89+
messages.append(Message(role="assistant", content=str(content)))
90+
else:
91+
# Generic extraction
92+
messages.extend(extract_messages_from_data(out, include_tool_calls))
93+
except (KeyError, TypeError, ValueError) as e:
94+
logger.warning("Failed to extract Weave messages: %s", e)
95+
96+
return messages
97+
98+
99+
def convert_trace_to_evaluation_row(trace: Dict[str, Any], include_tool_calls: bool = True) -> Optional[EvaluationRow]:
100+
"""Convert a Weave trace dict to EvaluationRow format.
101+
102+
Args:
103+
trace: Weave trace object as returned by Service API
104+
include_tool_calls: Whether to include tool calling information
105+
106+
Returns:
107+
EvaluationRow or None if conversion fails
108+
"""
109+
try:
110+
messages = _extract_messages_from_trace(trace, include_tool_calls)
111+
112+
tools = None
113+
if include_tool_calls:
114+
# Prefer tool schema from inputs.tools when present
115+
inputs_obj = trace.get("inputs") or {}
116+
if isinstance(inputs_obj, dict) and "tools" in inputs_obj:
117+
tools = inputs_obj.get("tools")
118+
119+
if not messages:
120+
return None
121+
122+
project_id = str(trace.get("project_id", ""))
123+
weave_trace_id = str(trace.get("id", ""))
124+
125+
return EvaluationRow(
126+
messages=messages,
127+
tools=tools,
128+
input_metadata=InputMetadata(
129+
session_data={
130+
"weave_trace_id": weave_trace_id,
131+
"weave_project_id": project_id,
132+
}
133+
),
134+
)
135+
except (KeyError, TypeError, ValueError) as e:
136+
logger.error("Error converting Weave trace %s: %s", trace.get("id", "unknown"), e)
137+
return None
138+
139+
140+
class WeaveAdapter(BaseAdapter):
141+
"""Adapter to pull Weave traces and convert to EvaluationRow format.
142+
143+
Configuration is sourced from parameters or environment variables:
144+
- team_id: defaults to WANDB_ENTITY
145+
- project_id: defaults to WANDB_PROJECT
146+
- api_token: defaults to WANDB_API_KEY
147+
- base_url: defaults to WEAVE_TRACE_BASE_URL or 'https://trace.wandb.ai'
148+
"""
149+
150+
def __init__(
151+
self,
152+
*,
153+
team_id: Optional[str] = None,
154+
project_id: Optional[str] = None,
155+
api_token: Optional[str] = None,
156+
base_url: Optional[str] = None,
157+
) -> None:
158+
self.team_id = team_id or os.getenv("WEAVE_TEAM_ID") or os.getenv("WANDB_ENTITY")
159+
self.project_id = project_id or os.getenv("WEAVE_PROJECT_ID") or os.getenv("WANDB_PROJECT")
160+
self.api_token = api_token or os.getenv("WANDB_API_KEY")
161+
self.base_url = base_url or os.getenv("WEAVE_TRACE_BASE_URL", "https://trace.wandb.ai")
162+
163+
if not self.api_token:
164+
raise ValueError("WANDB_API_KEY environment variable or api_token parameter required")
165+
if not self.team_id or not self.project_id:
166+
raise ValueError(
167+
"Weave project not configured. Provide team_id/project_id or set WANDB_ENTITY and WANDB_PROJECT"
168+
)
169+
170+
def get_evaluation_rows(self, *args: Any, **kwargs: Any) -> List[EvaluationRow]:
171+
"""Query Weave Service API for root traces and convert to EvaluationRow.
172+
173+
Args:
174+
limit: Max number of results (kwarg, default 100)
175+
offset: Offset into result set (kwarg, default 0)
176+
include_tool_calls: Whether to include tool calling information (kwarg, default True)
177+
query: Server-side expression object (e.g., {"$expr": {...}}) per Weave docs (kwarg)
178+
filter_obj: Additional filter options, defaults to {"trace_roots_only": True} (kwarg)
179+
sort_by: Sort directives, defaults to started_at desc (kwarg)
180+
include_feedback: Whether to include feedback in results (kwarg)
181+
converter: Optional custom converter implementing TraceConverter protocol (kwarg)
182+
"""
183+
limit: int = kwargs.pop("limit", 100)
184+
offset: int = kwargs.pop("offset", 0)
185+
include_tool_calls: bool = kwargs.pop("include_tool_calls", True)
186+
query: Optional[Dict[str, Any]] = kwargs.pop("query", None)
187+
filter_obj: Optional[Dict[str, Any]] = kwargs.pop("filter_obj", None)
188+
sort_by: Optional[List[Dict[str, Any]]] = kwargs.pop("sort_by", None)
189+
include_feedback: bool = kwargs.pop("include_feedback", False)
190+
converter: Optional[TraceConverter] = kwargs.pop("converter", None)
191+
192+
# ignore remaining kwargs to remain forward compatible
193+
url_stream_query = f"{self.base_url}/calls/stream_query"
194+
195+
payload: Dict[str, Any] = {
196+
"project_id": f"{self.team_id}/{self.project_id}",
197+
"filter": {"trace_roots_only": True},
198+
"limit": limit,
199+
"offset": offset,
200+
"sort_by": sort_by or [{"field": "started_at", "direction": "desc"}],
201+
"include_feedback": include_feedback,
202+
}
203+
if query is not None:
204+
payload["query"] = query
205+
if filter_obj is not None:
206+
payload["filter"] = filter_obj
207+
208+
headers = {"Content-Type": "application/json"}
209+
210+
resp = requests.post(
211+
url_stream_query, headers=headers, json=payload, auth=("api", self.api_token), timeout=30
212+
)
213+
resp.raise_for_status()
214+
215+
rows: List[EvaluationRow] = []
216+
217+
# The API may return either a JSON array/object or newline-delimited JSON (stream)
218+
data: Any
219+
try:
220+
data = resp.json()
221+
# Normalize to list
222+
traces: List[Dict[str, Any]]
223+
if isinstance(data, dict):
224+
# Some endpoints may return a single object
225+
traces = [data]
226+
else:
227+
traces = list(data) if isinstance(data, list) else []
228+
except ValueError:
229+
# Fallback decode for newline-delimited JSON
230+
lines = [ln for ln in resp.text.strip().split("\n") if ln.strip()]
231+
import json as _json
232+
233+
traces = []
234+
for ln in lines:
235+
try:
236+
obj = _json.loads(ln)
237+
traces.append(obj)
238+
except _json.JSONDecodeError:
239+
continue
240+
241+
if not traces:
242+
return []
243+
244+
for tr in traces:
245+
try:
246+
eval_row = converter(tr, include_tool_calls) if converter else convert_trace_to_evaluation_row(
247+
tr, include_tool_calls
248+
)
249+
if eval_row:
250+
rows.append(eval_row)
251+
except (KeyError, TypeError, ValueError) as e:
252+
logger.warning("Failed to convert Weave trace %s: %s", tr.get("id", "unknown"), e)
253+
254+
return rows
255+
256+
def upload_scores(self, rows: List[EvaluationRow], model_name: str, mean_score: float) -> None: # noqa: D401
257+
"""No-op: Weave Service API does not expose a score feedback endpoint yet.
258+
259+
If/when Weave exposes an official feedback API for traces, this method can
260+
be implemented to push evaluation results back to the provider.
261+
"""
262+
logger.info("Weave upload_scores not implemented: no public feedback API available")
263+
264+
def upload_score(self, row: EvaluationRow, model_name: str) -> None: # noqa: D401
265+
"""No-op per upload_scores; see note there."""
266+
logger.info("Weave upload_score not implemented: no public feedback API available")
267+
268+
269+
def create_weave_adapter(
270+
*, team_id: Optional[str] = None, project_id: Optional[str] = None, api_token: Optional[str] = None, base_url: Optional[str] = None
271+
) -> WeaveAdapter:
272+
"""Factory function to create a Weave adapter."""
273+
return WeaveAdapter(team_id=team_id, project_id=project_id, api_token=api_token, base_url=base_url)
274+
275+
276+
__all__ = ["WeaveAdapter", "create_weave_adapter", "convert_trace_to_evaluation_row"]
277+
278+

0 commit comments

Comments
 (0)