Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 235 additions & 3 deletions eval_protocol/adapters/braintrust.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,240 @@
"""Deprecated adapter wrappers for Braintrust.
"""Braintrust adapter for Eval Protocol.

This module forwards imports to :mod:`eval_protocol.integrations.braintrust`.
This adapter allows pulling data from Braintrust deployments and converting it
to EvaluationRow format for use in evaluation pipelines.
"""

import logging
import os
import random
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional, Protocol

import requests

from eval_protocol.models import EvaluationRow, InputMetadata, Message
from .utils import extract_messages_from_data

# Keep backward compatibility
from ..integrations.braintrust import reward_fn_to_scorer, scorer_to_reward_fn

__all__ = ["scorer_to_reward_fn", "reward_fn_to_scorer"]

logger = logging.getLogger(__name__)


class TraceConverter(Protocol):
"""Protocol for custom trace-to-EvaluationRow converter functions.

A converter function should take a Braintrust trace along with processing
options and return an EvaluationRow or None to skip the trace.
"""

def __call__(
self,
trace: Dict[str, Any],
include_tool_calls: bool,
) -> Optional[EvaluationRow]:
"""Convert a Braintrust trace to an EvaluationRow.

Args:
trace: The Braintrust trace object to convert
include_tool_calls: Whether to include tool calling information

Returns:
EvaluationRow or None if the trace should be skipped
"""
...


def convert_trace_to_evaluation_row(trace: Dict[str, Any], include_tool_calls: bool = True) -> Optional[EvaluationRow]:
"""Convert a Braintrust trace to EvaluationRow format.

Args:
trace: Braintrust trace object
include_tool_calls: Whether to include tool calling information

Returns:
EvaluationRow or None if conversion fails
"""
try:
# Extract messages from the trace
messages = extract_messages_from_trace(trace, include_tool_calls)

# Extract tools if available
tools = None
if include_tool_calls:
metadata = trace.get("metadata", {})
tools = metadata.get("tools")
if not tools:
hidden_params = metadata.get("hidden_params", {})
optional_params = hidden_params.get("optional_params", {})
tools = optional_params.get("tools")

if not messages:
return None

return EvaluationRow(
messages=messages,
tools=tools,
input_metadata=InputMetadata(
session_data={
"braintrust_trace_id": trace.get("id"),
}
),
)

except (AttributeError, ValueError, KeyError) as e:
logger.error("Error converting trace %s: %s", trace.get("id", "unknown"), e)
return None


def extract_messages_from_trace(trace: Dict[str, Any], include_tool_calls: bool = True) -> List[Message]:
"""Extract messages from Braintrust trace input and output.

Args:
trace: Braintrust trace object
include_tool_calls: Whether to include tool calling information

Returns:
List of Message objects
"""
messages = []

try:
# Look for complete conversations (input + output arrays)
input_data = trace.get("input")

output_data = None
output_list = trace.get("output", [])
if output_list and len(output_list) > 0:
first_output = output_list[0]
if isinstance(first_output, dict):
output_data = first_output.get("message")

# Skip spans without meaningful conversation data
if not input_data or not output_data:
return messages

# Extract messages from input and output
if input_data:
messages.extend(extract_messages_from_data(input_data, include_tool_calls))
if output_data:
messages.extend(extract_messages_from_data(output_data, include_tool_calls))

except (AttributeError, ValueError, KeyError) as e:
logger.warning("Error processing trace %s: %s", trace.get("id", "unknown"), e)

return messages


class BraintrustAdapter:
"""Adapter to pull data from Braintrust and convert to EvaluationRow format.

This adapter can pull both chat conversations and tool calling traces from
Braintrust deployments and convert them into the EvaluationRow format expected
by the evaluation protocol.

Examples:
Basic usage:
>>> adapter = BraintrustAdapter(
... api_key="your_api_key",
... project_id="your_project_id"
... )
>>> btql_query = "select: * from: project_logs('your_project_id') traces limit: 10"
>>> rows = adapter.get_evaluation_rows(btql_query)

Using BTQL for custom queries:
>>> btql_query = '''
... select: *
... from: project_logs('your_project_id') traces
... filter: metadata.agent_name = 'agent_instance'
... limit: 50
... '''
>>> rows = adapter.get_evaluation_rows(btql_query)
"""

def __init__(
self,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
project_id: Optional[str] = None,
):
"""Initialize the Braintrust adapter.

Args:
api_key: Braintrust API key (defaults to BRAINTRUST_API_KEY env var)
api_url: Braintrust API URL (defaults to BRAINTRUST_API_URL env var)
project_id: Project ID to fetch logs from (defaults to BRAINTRUST_PROJECT_ID env var)
"""
self.api_key = api_key or os.getenv("BRAINTRUST_API_KEY")
self.api_url = api_url or os.getenv("BRAINTRUST_API_URL", "https://api.braintrust.dev")
self.project_id = project_id or os.getenv("BRAINTRUST_PROJECT_ID")

if not self.api_key:
raise ValueError("BRAINTRUST_API_KEY environment variable or api_key parameter required")
if not self.project_id:
raise ValueError("BRAINTRUST_PROJECT_ID environment variable or project_id parameter required")

def get_evaluation_rows(
self,
btql_query: str,
include_tool_calls: bool = True,
converter: Optional[TraceConverter] = None,
) -> List[EvaluationRow]:
"""Get evaluation rows using a custom BTQL query.

Args:
btql_query: The BTQL query string to execute
include_tool_calls: Whether to include tool calling information
converter: Optional custom converter implementing TraceConverter protocol

Returns:
List[EvaluationRow]: Converted evaluation rows
"""
eval_rows = []

headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}

response = requests.post(f"{self.api_url}/btql", headers=headers, json={"query": btql_query, "fmt": "json"})
response.raise_for_status()
query_response = response.json()

if not query_response or not query_response.get("data"):
logger.debug("No data returned from BTQL query")
return eval_rows

all_traces = query_response["data"]
logger.debug("BTQL query returned %d traces", len(all_traces))

# Process each selected trace
for trace in all_traces:
try:
if converter:
eval_row = converter(trace, include_tool_calls)
else:
eval_row = convert_trace_to_evaluation_row(trace, include_tool_calls)
if eval_row:
eval_rows.append(eval_row)
except (AttributeError, ValueError, KeyError) as e:
logger.warning("Failed to convert trace %s: %s", trace.get("id", "unknown"), e)
continue

logger.info("Successfully processed %d BTQL results into %d evaluation rows", len(all_traces), len(eval_rows))
return eval_rows


def create_braintrust_adapter(
api_key: Optional[str] = None,
api_url: Optional[str] = None,
project_id: Optional[str] = None,
) -> BraintrustAdapter:
"""Factory function to create a Braintrust adapter."""
return BraintrustAdapter(
api_key=api_key,
api_url=api_url,
project_id=project_id,
)


__all__ = ["scorer_to_reward_fn", "reward_fn_to_scorer", "BraintrustAdapter", "create_braintrust_adapter"]
116 changes: 34 additions & 82 deletions eval_protocol/adapters/langfuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from typing import Any, Dict, List, Optional, Protocol

from eval_protocol.models import EvaluationRow, InputMetadata, Message
from .utils import extract_messages_from_data

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -112,7 +113,7 @@ def extract_messages_from_trace(
if span_name: # Look for a generation tied to a span name
try:
# Find the final generation in the named span
gen: ObservationsView | None = find_final_generation_in_span(trace, span_name)
gen: ObservationsView | None = get_final_generation_in_span(trace, span_name)
if not gen:
return messages

Expand Down Expand Up @@ -141,87 +142,8 @@ def extract_messages_from_trace(
return messages


def extract_messages_from_data(data, include_tool_calls: bool) -> List[Message]:
"""Extract messages from data (works for both input and output).

Args:
data: Data from trace or generation (input or output)
include_tool_calls: Whether to include tool calling information

Returns:
List of Message objects
"""
messages = []

if isinstance(data, dict):
if "messages" in data:
# OpenAI-style messages format
for msg in data["messages"]:
messages.append(dict_to_message(msg, include_tool_calls))
elif "role" in data:
# Single message format
messages.append(dict_to_message(data, include_tool_calls))
elif "prompt" in data:
# Simple prompt format
messages.append(Message(role="user", content=str(data["prompt"])))
elif "content" in data:
# Simple content format
messages.append(Message(role="assistant", content=str(data["content"])))
else:
# Fallback: treat as single message
messages.append(dict_to_message(data, include_tool_calls))
elif isinstance(data, list):
# Direct list of message dicts
for msg in data:
if isinstance(msg, dict):
messages.append(dict_to_message(msg, include_tool_calls))
elif isinstance(data, str):
# Simple string - role depends on context, default to user
messages.append(Message(role="user", content=data))

return messages


def dict_to_message(msg_dict: Dict[str, Any], include_tool_calls: bool = True) -> Message:
"""Convert a dictionary to a Message object.

Args:
msg_dict: Dictionary containing message data
include_tool_calls: Whether to include tool calling information

Returns:
Message object
"""
# Extract basic message components
role = msg_dict.get("role", "assistant")
content = msg_dict.get("content")
name = msg_dict.get("name")

# Handle tool calls if enabled
tool_calls = None
tool_call_id = None
function_call = None

if include_tool_calls:
if "tool_calls" in msg_dict:
tool_calls = msg_dict["tool_calls"]
if "tool_call_id" in msg_dict:
tool_call_id = msg_dict["tool_call_id"]
if "function_call" in msg_dict:
function_call = msg_dict["function_call"]

return Message(
role=role,
content=content,
name=name,
tool_call_id=tool_call_id,
tool_calls=tool_calls,
function_call=function_call,
)


def find_final_generation_in_span(trace: TraceWithFullDetails, span_name: str) -> ObservationsView | None:
"""Find the final generation within a named span that contains full message history.
def get_final_generation_in_span(trace: TraceWithFullDetails, span_name: str) -> ObservationsView | None:
"""Get the final generation within a named span that contains full message history.

Args:
trace: Langfuse trace object
Expand Down Expand Up @@ -511,6 +433,36 @@ def get_evaluation_rows_by_ids(
continue
return eval_rows

def push_scores(self, rows: List[EvaluationRow], model_name: str, mean_score: float) -> None:
"""Push evaluation scores back to Langfuse traces for tracking and analysis.

Creates a score entry in Langfuse for each unique trace_id found in the evaluation
rows' session data. This allows you to see evaluation results directly in the
Langfuse UI alongside the original traces.

Args:
rows: List of EvaluationRow objects with session_data containing trace IDs
model_name: Name of the model (used as the score name in Langfuse)
mean_score: The calculated mean score to push to Langfuse

Note:
Silently handles errors if rows lack session data
"""
try:
for trace_id in set(
row.input_metadata.session_data["langfuse_trace_id"]
for row in rows
if row.evaluation_result and row.input_metadata and row.input_metadata.session_data
):
if trace_id:
self.client.create_score(
trace_id=trace_id,
name=model_name,
value=mean_score,
)
except Exception as e:
logger.warning("Failed to push scores to Langfuse: %s", e)


def create_langfuse_adapter() -> LangfuseAdapter:
"""Factory function to create a Langfuse adapter."""
Expand Down
Loading
Loading