From 08592894253278a7a13020d2aaa1091ef19c3d24 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 3 Sep 2025 14:57:49 -0700 Subject: [PATCH 1/2] Cost Metrics added --- eval_protocol/mcp/execution/manager.py | 12 ++-- eval_protocol/models.py | 21 +++++-- .../pytest/default_agent_rollout_processor.py | 18 ++++++ .../default_langchain_rollout_processor.py | 23 ++++++++ .../default_pydantic_ai_rollout_processor.py | 9 +++ .../default_single_turn_rollout_process.py | 7 +++ eval_protocol/pytest/evaluation_test.py | 16 +++--- eval_protocol/pytest/utils.py | 56 ++++++++++++++++++- 8 files changed, 139 insertions(+), 23 deletions(-) diff --git a/eval_protocol/mcp/execution/manager.py b/eval_protocol/mcp/execution/manager.py index 0b8f0668..1a4ee392 100644 --- a/eval_protocol/mcp/execution/manager.py +++ b/eval_protocol/mcp/execution/manager.py @@ -124,14 +124,10 @@ async def _execute_with_semaphore(idx): evaluation_row.messages = messages evaluation_row.tools = shared_tool_schema - # Some OpenAI SDK versions type CompletionUsage as a TypedDict; construct via cast to avoid ctor mismatches - evaluation_row.usage = cast( - CompletionUsage, - { - "prompt_tokens": trajectory.usage.get("prompt_tokens", 0), - "completion_tokens": trajectory.usage.get("completion_tokens", 0), - "total_tokens": trajectory.usage.get("total_tokens", 0), - }, + evaluation_row.execution_metadata.usage = CompletionUsage( + prompt_tokens=trajectory.usage.get("prompt_tokens", 0), + completion_tokens=trajectory.usage.get("completion_tokens", 0), + total_tokens=trajectory.usage.get("total_tokens", 0), ) evaluation_row.input_metadata.completion_params = { "model": policy.model_id, diff --git a/eval_protocol/models.py b/eval_protocol/models.py index 550bc328..b28c5c7c 100644 --- a/eval_protocol/models.py +++ b/eval_protocol/models.py @@ -516,6 +516,16 @@ class EvalMetadata(BaseModel): passed: Optional[bool] = Field(None, description="Whether the evaluation passed based on the threshold") +class CostMetrics(BaseModel): + """Cost metrics for LLM API calls.""" + + input_cost_usd: Optional[float] = Field(None, description="Cost in USD for input tokens.") + + output_cost_usd: Optional[float] = Field(None, description="Cost in USD for output tokens.") + + total_cost_usd: Optional[float] = Field(None, description="Total cost in USD for the API call.") + + class ExecutionMetadata(BaseModel): """Metadata about the execution of the evaluation.""" @@ -539,6 +549,12 @@ class ExecutionMetadata(BaseModel): description=("The ID of the run that this row belongs to."), ) + usage: Optional[CompletionUsage] = Field( + default=None, description="Token usage statistics from LLM calls during execution." + ) + + cost_metrics: Optional[CostMetrics] = Field(default=None, description="Cost breakdown for LLM API calls.") + class EvaluationRow(BaseModel): """ @@ -586,11 +602,6 @@ class EvaluationRow(BaseModel): description="Metadata about the execution of the evaluation.", ) - # LLM usage statistics - usage: Optional[CompletionUsage] = Field( - default=None, description="Token usage statistics from LLM calls during execution." - ) - created_at: datetime = Field(default_factory=datetime.now, description="The timestamp when the row was created.") eval_metadata: Optional[EvalMetadata] = Field( diff --git a/eval_protocol/pytest/default_agent_rollout_processor.py b/eval_protocol/pytest/default_agent_rollout_processor.py index fac02dd9..b17ac1dc 100644 --- a/eval_protocol/pytest/default_agent_rollout_processor.py +++ b/eval_protocol/pytest/default_agent_rollout_processor.py @@ -13,6 +13,7 @@ from eval_protocol.mcp.execution.policy import LiteLLMPolicy from eval_protocol.mcp.mcp_multi_client import MCPMultiClient from eval_protocol.models import EvaluationRow, Message, ChatCompletionContentPartTextParam +from openai.types import CompletionUsage from eval_protocol.pytest.rollout_processor import RolloutProcessor from eval_protocol.pytest.types import Dataset, RolloutProcessorConfig from pydantic import BaseModel @@ -38,6 +39,11 @@ def __init__(self, model: str, row: EvaluationRow, config_path: str, logger: Dat self._policy = LiteLLMPolicy(model_id=model) self.mcp_client = MCPMultiClient(config_path=config_path) if config_path else None self.logger: DatasetLogger = logger + self.usage = { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + } async def setup(self): if self.mcp_client: @@ -166,6 +172,11 @@ async def _call_model(self, messages: list[Message], tools: Optional[List[dict[s payload_tools.append({"type": tool_type, "function": {"name": name, "parameters": params_payload}}) response = await self._policy._make_llm_call(messages=messages_payload, tools=payload_tools) + + self.usage["prompt_tokens"] += response["usage"]["prompt_tokens"] + self.usage["completion_tokens"] += response["usage"]["completion_tokens"] + self.usage["total_tokens"] += response["usage"]["total_tokens"] + # Coerce content to a string to align with our Message model type expectations raw_content = response["choices"][0]["message"].get("content") if isinstance(raw_content, list): @@ -238,6 +249,13 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: try: await agent.setup() await agent.call_agent() + + agent.evaluation_row.execution_metadata.usage = CompletionUsage( + prompt_tokens=agent.usage["prompt_tokens"], + completion_tokens=agent.usage["completion_tokens"], + total_tokens=agent.usage["total_tokens"], + ) + return agent.evaluation_row finally: if agent.mcp_client: diff --git a/eval_protocol/pytest/default_langchain_rollout_processor.py b/eval_protocol/pytest/default_langchain_rollout_processor.py index 4c807633..a6b4ced3 100644 --- a/eval_protocol/pytest/default_langchain_rollout_processor.py +++ b/eval_protocol/pytest/default_langchain_rollout_processor.py @@ -10,6 +10,7 @@ class BaseMessage: # type: ignore from eval_protocol.models import EvaluationRow, Message +from openai.types import CompletionUsage from eval_protocol.pytest.rollout_processor import RolloutProcessor from eval_protocol.pytest.types import RolloutProcessorConfig @@ -86,6 +87,28 @@ async def _invoke_wrapper(payload): else: result_messages = getattr(result_obj, "messages", []) + # TODO: i didn't see a langgraph example so couldn't fully test this. should uncomment and test when we have example ready. + # total_input_tokens = 0 + # total_output_tokens = 0 + # total_tokens = 0 + + # for msg in result_messages: + # if isinstance(msg, BaseMessage): + # usage = getattr(msg, 'response_metadata', {}) + # else: + # usage = msg.get("response_metadata", {}) + + # if usage: + # total_input_tokens += usage.get("prompt_tokens", 0) + # total_output_tokens += usage.get("completion_tokens", 0) + # total_tokens += usage.get("total_tokens", 0) + + # row.execution_metadata.usage = CompletionUsage( + # prompt_tokens=total_input_tokens, + # completion_tokens=total_output_tokens, + # total_tokens=total_tokens, + # ) + def _serialize_message(msg: BaseMessage) -> Message: # Prefer SDK-level serializer try: diff --git a/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py b/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py index 26cf3915..96f2585b 100644 --- a/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py +++ b/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py @@ -6,6 +6,7 @@ from pydantic_ai.models import Model from typing_extensions import override from eval_protocol.models import EvaluationRow, Message +from openai.types import CompletionUsage from eval_protocol.pytest.rollout_processor import RolloutProcessor from eval_protocol.pytest.types import RolloutProcessorConfig from openai.types.chat import ChatCompletion, ChatCompletionMessage, ChatCompletionMessageParam @@ -89,6 +90,14 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: message_history=model_messages, model=model, usage_limits=config.kwargs.get("usage_limits") ) row.messages = await self.convert_pyd_message_to_ep_message(response.all_messages()) + + usage_info = response.usage() + row.execution_metadata.usage = CompletionUsage( + prompt_tokens=usage_info.request_tokens or 0, + completion_tokens=usage_info.response_tokens or 0, + total_tokens=usage_info.total_tokens or 0, + ) + return row async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow: diff --git a/eval_protocol/pytest/default_single_turn_rollout_process.py b/eval_protocol/pytest/default_single_turn_rollout_process.py index 48a12fa3..ab26f128 100644 --- a/eval_protocol/pytest/default_single_turn_rollout_process.py +++ b/eval_protocol/pytest/default_single_turn_rollout_process.py @@ -9,6 +9,7 @@ from eval_protocol.dataset_logger import default_logger from eval_protocol.models import EvaluationRow, Message +from openai.types import CompletionUsage from eval_protocol.pytest.rollout_processor import RolloutProcessor from eval_protocol.pytest.types import RolloutProcessorConfig @@ -108,6 +109,12 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: ) ] + row.execution_metadata.usage = CompletionUsage( + prompt_tokens=response.usage.prompt_tokens, + completion_tokens=response.usage.completion_tokens, + total_tokens=response.usage.total_tokens, + ) + row.messages = messages default_logger.log(row) return row diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index fc51fc49..59e48f61 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -49,6 +49,7 @@ from eval_protocol.pytest.utils import ( AggregationMethod, + add_cost_metrics, log_eval_status_and_rows, parse_ep_completion_params, parse_ep_max_concurrent_rollouts, @@ -430,11 +431,11 @@ async def _collect_result(config, lst): # pyright: ignore[reportUnknownParamete processed_dataset=input_dataset, # pyright: ignore[reportUnknownArgumentType] evaluation_test_kwargs=kwargs.get("evaluation_test_kwargs") or {}, ) - if results is None: # pyright: ignore[reportUnnecessaryComparison] - raise ValueError( - f"Test function {test_func.__name__} did not return an EvaluationRow instance. You must return an EvaluationRow instance from your test function decorated with @evaluation_test." - ) - if not isinstance(results, list): + if ( + results is None + or not isinstance(results, list) + or not all(isinstance(r, EvaluationRow) for r in results) + ): raise ValueError( f"Test function {test_func.__name__} did not return a list of EvaluationRow instances. You must return a list of EvaluationRow instances from your test function decorated with @evaluation_test." ) @@ -442,13 +443,10 @@ async def _collect_result(config, lst): # pyright: ignore[reportUnknownParamete raise ValueError( f"Test function {test_func.__name__} returned an empty list. You must return a non-empty list of EvaluationRow instances from your test function decorated with @evaluation_test." ) - if not all(isinstance(r, EvaluationRow) for r in results): # pyright: ignore[reportUnnecessaryIsInstance] - raise ValueError( - f"Test function {test_func.__name__} returned a list containing non-EvaluationRow instances. You must return a list of EvaluationRow instances from your test function decorated with @evaluation_test." - ) all_results[run_idx] = results for r in results: + add_cost_metrics(r) if r.eval_metadata is not None: if r.rollout_status.is_error(): r.eval_metadata.status = Status.error( diff --git a/eval_protocol/pytest/utils.py b/eval_protocol/pytest/utils.py index fda65331..44bfffcc 100644 --- a/eval_protocol/pytest/utils.py +++ b/eval_protocol/pytest/utils.py @@ -6,16 +6,18 @@ from dataclasses import replace from typing import Any, Literal +from litellm.cost_calculator import cost_per_token from tqdm import tqdm from eval_protocol.dataset_logger.dataset_logger import DatasetLogger from eval_protocol.models import ( + CostMetrics, + CompletionParams, EvalMetadata, EvaluationRow, EvaluationThreshold, EvaluationThresholdDict, Status, - CompletionParams, ) from eval_protocol.pytest.rollout_processor import RolloutProcessor from eval_protocol.pytest.types import ( @@ -298,3 +300,55 @@ def extract_effort_tag(params: dict) -> str | None: # pyright: ignore[reportMis except Exception: return None return None + + +def add_cost_metrics(row: EvaluationRow) -> None: + """Calculate and add cost metrics for an EvaluationRow based on its usage data.""" + # Can't calculate cost without usage stats or model info + if not row.execution_metadata.usage or not row.input_metadata.completion_params: + row.execution_metadata.cost_metrics = CostMetrics( + input_cost_usd=0.0, + output_cost_usd=0.0, + total_cost_usd=0.0, + ) + return + + model = row.input_metadata.completion_params.get("model", "unknown") + provider = row.input_metadata.completion_params.get("provider") + + # Pydantic AI mapping to LiteLLM format + # TODO: make more generic for other frameworks too. + provider_mapping = { + "fireworks": "fireworks_ai", + "together": "together_ai", + "openai": "", # No prefix needed + "azure": "azure", + "deepseek": "deepseek", + "openrouter": "openrouter", + "grok": "grok", + "github": "github", + "heroku": "heroku", + } + + if provider and provider in provider_mapping: + litellm_prefix = provider_mapping[provider] + model_id = f"{litellm_prefix}/{model}" if litellm_prefix else model + else: + model_id = model + + usage = row.execution_metadata.usage + + input_tokens = usage.prompt_tokens or 0 + output_tokens = usage.completion_tokens or 0 + + input_cost, output_cost = cost_per_token( + model=model_id, prompt_tokens=input_tokens, completion_tokens=output_tokens + ) + total_cost = input_cost + output_cost + + # Set all cost metrics on the row + row.execution_metadata.cost_metrics = CostMetrics( + input_cost_usd=input_cost, + output_cost_usd=output_cost, + total_cost_usd=total_cost, + ) From d76d9675ed88c8aabbac70be0eec8fb9cc35de7c Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 3 Sep 2025 16:10:14 -0700 Subject: [PATCH 2/2] remove pydantic usage tracking for now --- eval_protocol/models.py | 2 +- .../pytest/default_pydantic_ai_rollout_processor.py | 13 +++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/eval_protocol/models.py b/eval_protocol/models.py index b28c5c7c..d3384fc2 100644 --- a/eval_protocol/models.py +++ b/eval_protocol/models.py @@ -545,7 +545,7 @@ class ExecutionMetadata(BaseModel): ) run_id: Optional[str] = Field( - None, + default=None, description=("The ID of the run that this row belongs to."), ) diff --git a/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py b/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py index 96f2585b..4cf9387f 100644 --- a/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py +++ b/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py @@ -91,12 +91,13 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: ) row.messages = await self.convert_pyd_message_to_ep_message(response.all_messages()) - usage_info = response.usage() - row.execution_metadata.usage = CompletionUsage( - prompt_tokens=usage_info.request_tokens or 0, - completion_tokens=usage_info.response_tokens or 0, - total_tokens=usage_info.total_tokens or 0, - ) + # TODO: pydantic ai accumulates usage info across all models in multi-agent setup, so this simple tracking doesn't work for cost. to discuss with @dphuang2 when he's back. + # usage_info = response.usage() + # row.execution_metadata.usage = CompletionUsage( + # prompt_tokens=usage_info.request_tokens or 0, + # completion_tokens=usage_info.response_tokens or 0, + # total_tokens=usage_info.total_tokens or 0, + # ) return row