Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions eval_protocol/mcp/execution/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,10 @@ async def _execute_with_semaphore(idx):

evaluation_row.messages = messages
evaluation_row.tools = shared_tool_schema
# Some OpenAI SDK versions type CompletionUsage as a TypedDict; construct via cast to avoid ctor mismatches
evaluation_row.usage = cast(
CompletionUsage,
{
"prompt_tokens": trajectory.usage.get("prompt_tokens", 0),
"completion_tokens": trajectory.usage.get("completion_tokens", 0),
"total_tokens": trajectory.usage.get("total_tokens", 0),
},
evaluation_row.execution_metadata.usage = CompletionUsage(
prompt_tokens=trajectory.usage.get("prompt_tokens", 0),
completion_tokens=trajectory.usage.get("completion_tokens", 0),
total_tokens=trajectory.usage.get("total_tokens", 0),
)
evaluation_row.input_metadata.completion_params = {
"model": policy.model_id,
Expand Down
23 changes: 17 additions & 6 deletions eval_protocol/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,16 @@ class EvalMetadata(BaseModel):
passed: Optional[bool] = Field(None, description="Whether the evaluation passed based on the threshold")


class CostMetrics(BaseModel):
"""Cost metrics for LLM API calls."""

input_cost_usd: Optional[float] = Field(None, description="Cost in USD for input tokens.")

output_cost_usd: Optional[float] = Field(None, description="Cost in USD for output tokens.")

total_cost_usd: Optional[float] = Field(None, description="Total cost in USD for the API call.")


class ExecutionMetadata(BaseModel):
"""Metadata about the execution of the evaluation."""

Expand All @@ -535,10 +545,16 @@ class ExecutionMetadata(BaseModel):
)

run_id: Optional[str] = Field(
None,
default=None,
description=("The ID of the run that this row belongs to."),
)

usage: Optional[CompletionUsage] = Field(
default=None, description="Token usage statistics from LLM calls during execution."
)

cost_metrics: Optional[CostMetrics] = Field(default=None, description="Cost breakdown for LLM API calls.")


class EvaluationRow(BaseModel):
"""
Expand Down Expand Up @@ -586,11 +602,6 @@ class EvaluationRow(BaseModel):
description="Metadata about the execution of the evaluation.",
)

# LLM usage statistics
usage: Optional[CompletionUsage] = Field(
default=None, description="Token usage statistics from LLM calls during execution."
)

created_at: datetime = Field(default_factory=datetime.now, description="The timestamp when the row was created.")

eval_metadata: Optional[EvalMetadata] = Field(
Expand Down
18 changes: 18 additions & 0 deletions eval_protocol/pytest/default_agent_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from eval_protocol.mcp.execution.policy import LiteLLMPolicy
from eval_protocol.mcp.mcp_multi_client import MCPMultiClient
from eval_protocol.models import EvaluationRow, Message, ChatCompletionContentPartTextParam
from openai.types import CompletionUsage
from eval_protocol.pytest.rollout_processor import RolloutProcessor
from eval_protocol.pytest.types import Dataset, RolloutProcessorConfig
from pydantic import BaseModel
Expand All @@ -38,6 +39,11 @@ def __init__(self, model: str, row: EvaluationRow, config_path: str, logger: Dat
self._policy = LiteLLMPolicy(model_id=model)
self.mcp_client = MCPMultiClient(config_path=config_path) if config_path else None
self.logger: DatasetLogger = logger
self.usage = {
"prompt_tokens": 0,
"completion_tokens": 0,
"total_tokens": 0,
}

async def setup(self):
if self.mcp_client:
Expand Down Expand Up @@ -166,6 +172,11 @@ async def _call_model(self, messages: list[Message], tools: Optional[List[dict[s
payload_tools.append({"type": tool_type, "function": {"name": name, "parameters": params_payload}})

response = await self._policy._make_llm_call(messages=messages_payload, tools=payload_tools)

self.usage["prompt_tokens"] += response["usage"]["prompt_tokens"]
self.usage["completion_tokens"] += response["usage"]["completion_tokens"]
self.usage["total_tokens"] += response["usage"]["total_tokens"]

# Coerce content to a string to align with our Message model type expectations
raw_content = response["choices"][0]["message"].get("content")
if isinstance(raw_content, list):
Expand Down Expand Up @@ -238,6 +249,13 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
try:
await agent.setup()
await agent.call_agent()

agent.evaluation_row.execution_metadata.usage = CompletionUsage(
prompt_tokens=agent.usage["prompt_tokens"],
completion_tokens=agent.usage["completion_tokens"],
total_tokens=agent.usage["total_tokens"],
)

return agent.evaluation_row
finally:
if agent.mcp_client:
Expand Down
23 changes: 23 additions & 0 deletions eval_protocol/pytest/default_langchain_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List

try:
from langchain_core.messages import BaseMessage

Check failure on line 5 in eval_protocol/pytest/default_langchain_rollout_processor.py

View workflow job for this annotation

GitHub Actions / Lint & Type Check

Type "type[langchain_core.messages.base.BaseMessage]" is not assignable to declared type "type[eval_protocol.pytest.default_langchain_rollout_processor.BaseMessage]"   "langchain_core.messages.base.BaseMessage" is not assignable to "eval_protocol.pytest.default_langchain_rollout_processor.BaseMessage"   Type "type[langchain_core.messages.base.BaseMessage]" is not assignable to type "type[eval_protocol.pytest.default_langchain_rollout_processor.BaseMessage]" (reportAssignmentType)
except Exception: # pragma: no cover - optional dependency path
# Minimal fallback base type to satisfy typing when langchain is not present
class BaseMessage: # type: ignore
Expand All @@ -10,6 +10,7 @@


from eval_protocol.models import EvaluationRow, Message
from openai.types import CompletionUsage
from eval_protocol.pytest.rollout_processor import RolloutProcessor
from eval_protocol.pytest.types import RolloutProcessorConfig

Expand All @@ -32,7 +33,7 @@
async def _process_row(row: EvaluationRow) -> EvaluationRow:
# Build LC messages from EP row
try:
from langchain_core.messages import HumanMessage

Check failure on line 36 in eval_protocol/pytest/default_langchain_rollout_processor.py

View workflow job for this annotation

GitHub Actions / Lint & Type Check

Type "type[langchain_core.messages.human.HumanMessage]" is not assignable to declared type "type[eval_protocol.pytest.default_langchain_rollout_processor.LangGraphRolloutProcessor.HumanMessage]"   "langchain_core.messages.human.HumanMessage" is not assignable to "eval_protocol.pytest.default_langchain_rollout_processor.LangGraphRolloutProcessor.HumanMessage"   Type "type[langchain_core.messages.human.HumanMessage]" is not assignable to type "type[eval_protocol.pytest.default_langchain_rollout_processor.LangGraphRolloutProcessor.HumanMessage]" (reportAssignmentType)
except Exception:
# Fallback minimal message if langchain_core is unavailable
class HumanMessage(BaseMessage): # type: ignore
Expand Down Expand Up @@ -86,12 +87,34 @@
else:
result_messages = getattr(result_obj, "messages", [])

# TODO: i didn't see a langgraph example so couldn't fully test this. should uncomment and test when we have example ready.
# total_input_tokens = 0
# total_output_tokens = 0
# total_tokens = 0

# for msg in result_messages:
# if isinstance(msg, BaseMessage):
# usage = getattr(msg, 'response_metadata', {})
# else:
# usage = msg.get("response_metadata", {})

# if usage:
# total_input_tokens += usage.get("prompt_tokens", 0)
# total_output_tokens += usage.get("completion_tokens", 0)
# total_tokens += usage.get("total_tokens", 0)

# row.execution_metadata.usage = CompletionUsage(
# prompt_tokens=total_input_tokens,
# completion_tokens=total_output_tokens,
# total_tokens=total_tokens,
# )

def _serialize_message(msg: BaseMessage) -> Message:
# Prefer SDK-level serializer
try:
from eval_protocol.adapters.langchain import serialize_lc_message_to_ep as _ser

return _ser(msg)

Check failure on line 117 in eval_protocol/pytest/default_langchain_rollout_processor.py

View workflow job for this annotation

GitHub Actions / Lint & Type Check

Argument of type "BaseMessage" cannot be assigned to parameter "msg" of type "BaseMessage" in function "serialize_lc_message_to_ep"   "eval_protocol.pytest.default_langchain_rollout_processor.BaseMessage" is not assignable to "langchain_core.messages.base.BaseMessage" (reportArgumentType)
except Exception:
# Minimal fallback: best-effort string content only
content = getattr(msg, "content", "")
Expand Down
10 changes: 10 additions & 0 deletions eval_protocol/pytest/default_pydantic_ai_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pydantic_ai.models import Model
from typing_extensions import override
from eval_protocol.models import EvaluationRow, Message
from openai.types import CompletionUsage
from eval_protocol.pytest.rollout_processor import RolloutProcessor
from eval_protocol.pytest.types import RolloutProcessorConfig
from openai.types.chat import ChatCompletion, ChatCompletionMessage, ChatCompletionMessageParam
Expand Down Expand Up @@ -89,6 +90,15 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
message_history=model_messages, model=model, usage_limits=config.kwargs.get("usage_limits")
)
row.messages = await self.convert_pyd_message_to_ep_message(response.all_messages())

# TODO: pydantic ai accumulates usage info across all models in multi-agent setup, so this simple tracking doesn't work for cost. to discuss with @dphuang2 when he's back.
# usage_info = response.usage()
# row.execution_metadata.usage = CompletionUsage(
# prompt_tokens=usage_info.request_tokens or 0,
# completion_tokens=usage_info.response_tokens or 0,
# total_tokens=usage_info.total_tokens or 0,
# )

return row

async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow:
Expand Down
7 changes: 7 additions & 0 deletions eval_protocol/pytest/default_single_turn_rollout_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from eval_protocol.dataset_logger import default_logger
from eval_protocol.models import EvaluationRow, Message
from openai.types import CompletionUsage
from eval_protocol.pytest.rollout_processor import RolloutProcessor
from eval_protocol.pytest.types import RolloutProcessorConfig

Expand Down Expand Up @@ -108,6 +109,12 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
)
]

row.execution_metadata.usage = CompletionUsage(
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens,
total_tokens=response.usage.total_tokens,
)

row.messages = messages
default_logger.log(row)
return row
Expand Down
16 changes: 7 additions & 9 deletions eval_protocol/pytest/evaluation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

from eval_protocol.pytest.utils import (
AggregationMethod,
add_cost_metrics,
log_eval_status_and_rows,
parse_ep_completion_params,
parse_ep_max_concurrent_rollouts,
Expand Down Expand Up @@ -430,25 +431,22 @@ async def _collect_result(config, lst): # pyright: ignore[reportUnknownParamete
processed_dataset=input_dataset, # pyright: ignore[reportUnknownArgumentType]
evaluation_test_kwargs=kwargs.get("evaluation_test_kwargs") or {},
)
if results is None: # pyright: ignore[reportUnnecessaryComparison]
raise ValueError(
f"Test function {test_func.__name__} did not return an EvaluationRow instance. You must return an EvaluationRow instance from your test function decorated with @evaluation_test."
)
if not isinstance(results, list):
if (
results is None
or not isinstance(results, list)
or not all(isinstance(r, EvaluationRow) for r in results)
):
raise ValueError(
f"Test function {test_func.__name__} did not return a list of EvaluationRow instances. You must return a list of EvaluationRow instances from your test function decorated with @evaluation_test."
)
if not results:
raise ValueError(
f"Test function {test_func.__name__} returned an empty list. You must return a non-empty list of EvaluationRow instances from your test function decorated with @evaluation_test."
)
if not all(isinstance(r, EvaluationRow) for r in results): # pyright: ignore[reportUnnecessaryIsInstance]
raise ValueError(
f"Test function {test_func.__name__} returned a list containing non-EvaluationRow instances. You must return a list of EvaluationRow instances from your test function decorated with @evaluation_test."
)
all_results[run_idx] = results

for r in results:
add_cost_metrics(r)
if r.eval_metadata is not None:
if r.rollout_status.is_error():
r.eval_metadata.status = Status.error(
Expand Down
56 changes: 55 additions & 1 deletion eval_protocol/pytest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@
from dataclasses import replace
from typing import Any, Literal

from litellm.cost_calculator import cost_per_token
from tqdm import tqdm

from eval_protocol.dataset_logger.dataset_logger import DatasetLogger
from eval_protocol.models import (
CostMetrics,
CompletionParams,
EvalMetadata,
EvaluationRow,
EvaluationThreshold,
EvaluationThresholdDict,
Status,
CompletionParams,
)
from eval_protocol.pytest.rollout_processor import RolloutProcessor
from eval_protocol.pytest.types import (
Expand Down Expand Up @@ -298,3 +300,55 @@ def extract_effort_tag(params: dict) -> str | None: # pyright: ignore[reportMis
except Exception:
return None
return None


def add_cost_metrics(row: EvaluationRow) -> None:
"""Calculate and add cost metrics for an EvaluationRow based on its usage data."""
# Can't calculate cost without usage stats or model info
if not row.execution_metadata.usage or not row.input_metadata.completion_params:
row.execution_metadata.cost_metrics = CostMetrics(
input_cost_usd=0.0,
output_cost_usd=0.0,
total_cost_usd=0.0,
)
return

model = row.input_metadata.completion_params.get("model", "unknown")
provider = row.input_metadata.completion_params.get("provider")

# Pydantic AI mapping to LiteLLM format
# TODO: make more generic for other frameworks too.
provider_mapping = {
"fireworks": "fireworks_ai",
"together": "together_ai",
"openai": "", # No prefix needed
"azure": "azure",
"deepseek": "deepseek",
"openrouter": "openrouter",
"grok": "grok",
"github": "github",
"heroku": "heroku",
}

if provider and provider in provider_mapping:
litellm_prefix = provider_mapping[provider]
model_id = f"{litellm_prefix}/{model}" if litellm_prefix else model
else:
model_id = model

usage = row.execution_metadata.usage

input_tokens = usage.prompt_tokens or 0
output_tokens = usage.completion_tokens or 0

input_cost, output_cost = cost_per_token(
model=model_id, prompt_tokens=input_tokens, completion_tokens=output_tokens
)
total_cost = input_cost + output_cost

# Set all cost metrics on the row
row.execution_metadata.cost_metrics = CostMetrics(
input_cost_usd=input_cost,
output_cost_usd=output_cost,
total_cost_usd=total_cost,
)
Loading