From ba8ed4366881f0cb574a8e5505ce4c12bb2853a0 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 3 Sep 2025 16:59:06 -0700 Subject: [PATCH 1/2] Add timing metrics --- eval_protocol/mcp/execution/manager.py | 3 + eval_protocol/models.py | 5 + .../pytest/default_agent_rollout_processor.py | 5 + .../default_langchain_rollout_processor.py | 6 + .../default_pydantic_ai_rollout_processor.py | 5 + .../default_single_turn_rollout_process.py | 5 + tests/pytest/test_execution_metadata.py | 163 ++++++++++++++++++ 7 files changed, 192 insertions(+) create mode 100644 tests/pytest/test_execution_metadata.py diff --git a/eval_protocol/mcp/execution/manager.py b/eval_protocol/mcp/execution/manager.py index 1a4ee392..85784295 100644 --- a/eval_protocol/mcp/execution/manager.py +++ b/eval_protocol/mcp/execution/manager.py @@ -100,6 +100,7 @@ def execute_rollouts( async def _execute_with_semaphore(idx): async with semaphore: evaluation_row: EvaluationRow = evaluation_rows[idx] + row_start_time = time.perf_counter() trajectory = await self._execute_rollout( envs, policy, idx, steps, openai_logger, recording_mode, playback_mode, start_time, evaluation_row @@ -152,6 +153,8 @@ async def _execute_with_semaphore(idx): else: evaluation_row.rollout_status = Status.rollout_running() + evaluation_row.execution_metadata.duration_seconds = time.perf_counter() - row_start_time + return evaluation_row # Create all tasks diff --git a/eval_protocol/models.py b/eval_protocol/models.py index d3384fc2..b1a5c423 100644 --- a/eval_protocol/models.py +++ b/eval_protocol/models.py @@ -555,6 +555,11 @@ class ExecutionMetadata(BaseModel): cost_metrics: Optional[CostMetrics] = Field(default=None, description="Cost breakdown for LLM API calls.") + duration_seconds: Optional[float] = Field( + default=None, + description="Processing duration in seconds for this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.", + ) + class EvaluationRow(BaseModel): """ diff --git a/eval_protocol/pytest/default_agent_rollout_processor.py b/eval_protocol/pytest/default_agent_rollout_processor.py index b17ac1dc..997d229a 100644 --- a/eval_protocol/pytest/default_agent_rollout_processor.py +++ b/eval_protocol/pytest/default_agent_rollout_processor.py @@ -2,6 +2,7 @@ import json import logging import os +import time from typing import Any, AsyncIterator, List, Optional, Union, Dict from mcp.types import CallToolResult, TextContent @@ -240,6 +241,8 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> async def process_row(row: EvaluationRow) -> EvaluationRow: """Process a single row with agent rollout.""" + start_time = time.perf_counter() + agent = Agent( model=row.input_metadata.completion_params["model"], row=row, @@ -256,6 +259,8 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: total_tokens=agent.usage["total_tokens"], ) + agent.evaluation_row.execution_metadata.duration_seconds = time.perf_counter() - start_time + return agent.evaluation_row finally: if agent.mcp_client: diff --git a/eval_protocol/pytest/default_langchain_rollout_processor.py b/eval_protocol/pytest/default_langchain_rollout_processor.py index a6b4ced3..3169987f 100644 --- a/eval_protocol/pytest/default_langchain_rollout_processor.py +++ b/eval_protocol/pytest/default_langchain_rollout_processor.py @@ -1,4 +1,5 @@ import asyncio +import time from typing import List try: @@ -31,6 +32,8 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig): tasks: List[asyncio.Task] = [] async def _process_row(row: EvaluationRow) -> EvaluationRow: + start_time = time.perf_counter() + # Build LC messages from EP row try: from langchain_core.messages import HumanMessage @@ -121,6 +124,9 @@ def _serialize_message(msg: BaseMessage) -> Message: return Message(role=getattr(msg, "type", "assistant"), content=str(content)) row.messages = [_serialize_message(m) for m in result_messages] + + row.execution_metadata.duration_seconds = time.perf_counter() - start_time + return row for r in rows: diff --git a/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py b/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py index 4cf9387f..5bd6dfab 100644 --- a/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py +++ b/eval_protocol/pytest/default_pydantic_ai_rollout_processor.py @@ -2,6 +2,7 @@ import asyncio import logging +import time import types from pydantic_ai.models import Model from typing_extensions import override @@ -85,6 +86,8 @@ def __call__(self, rows: list[EvaluationRow], config: RolloutProcessorConfig) -> async def process_row(row: EvaluationRow) -> EvaluationRow: """Process a single row with agent rollout.""" + start_time = time.perf_counter() + model_messages = [self.convert_ep_message_to_pyd_message(m, row) for m in row.messages] response = await agent_instance.run( message_history=model_messages, model=model, usage_limits=config.kwargs.get("usage_limits") @@ -99,6 +102,8 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: # total_tokens=usage_info.total_tokens or 0, # ) + row.execution_metadata.duration_seconds = time.perf_counter() - start_time + return row async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow: diff --git a/eval_protocol/pytest/default_single_turn_rollout_process.py b/eval_protocol/pytest/default_single_turn_rollout_process.py index ab26f128..96df88b2 100644 --- a/eval_protocol/pytest/default_single_turn_rollout_process.py +++ b/eval_protocol/pytest/default_single_turn_rollout_process.py @@ -25,6 +25,8 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) -> async def process_row(row: EvaluationRow) -> EvaluationRow: """Process a single row asynchronously.""" + start_time = time.perf_counter() + if len(row.messages) == 0: raise ValueError("Messages is empty. Please provide a non-empty dataset") @@ -116,6 +118,9 @@ async def process_row(row: EvaluationRow) -> EvaluationRow: ) row.messages = messages + + row.execution_metadata.duration_seconds = time.perf_counter() - start_time + default_logger.log(row) return row diff --git a/tests/pytest/test_execution_metadata.py b/tests/pytest/test_execution_metadata.py new file mode 100644 index 00000000..ef2c4def --- /dev/null +++ b/tests/pytest/test_execution_metadata.py @@ -0,0 +1,163 @@ +import pytest +from openai.types import CompletionUsage + +from eval_protocol.models import EvaluationRow, ExecutionMetadata, InputMetadata, CostMetrics, Message +from eval_protocol.pytest.utils import add_cost_metrics + + +class TestExecutionMetadata: + """Test execution metadata tracking including cost metrics, usage statistics, and timing.""" + + def test_single_model_with_provider(self): + """Test normal case: single model string with provider.""" + row = EvaluationRow( + messages=[], + input_metadata=InputMetadata( + completion_params={"model": "accounts/fireworks/models/gpt-oss-120b", "provider": "fireworks"} + ), + execution_metadata=ExecutionMetadata( + usage=CompletionUsage(prompt_tokens=100, completion_tokens=50, total_tokens=150) + ), + ) + + add_cost_metrics(row) + + assert row.execution_metadata.cost_metrics is not None + assert row.execution_metadata.cost_metrics.input_cost_usd is not None + assert row.execution_metadata.cost_metrics.output_cost_usd is not None + assert row.execution_metadata.cost_metrics.total_cost_usd is not None + + @pytest.mark.skip(reason="Revisit when we figure out how to get cost metrics for multi-agent Pydantic.") + def test_pydantic_ai_multi_agent_model_dict(self): + """Test Pydantic AI multi-agent case: nested dictionary with multiple models.""" + row = EvaluationRow( + messages=[], + input_metadata=InputMetadata( + completion_params={ + "model": { + "joke_generation_model": { + "model": "accounts/fireworks/models/kimi-k2-instruct", + "provider": "fireworks", + }, + "joke_selection_model": { + "model": "accounts/fireworks/models/deepseek-v3p1", + "provider": "fireworks", + }, + } + } + ), + execution_metadata=ExecutionMetadata( + usage=CompletionUsage(prompt_tokens=200, completion_tokens=75, total_tokens=275) + ), + ) + + add_cost_metrics(row) + + assert row.execution_metadata.cost_metrics is not None + assert row.execution_metadata.cost_metrics.input_cost_usd is not None + assert row.execution_metadata.cost_metrics.output_cost_usd is not None + assert row.execution_metadata.cost_metrics.total_cost_usd is not None + + def test_no_usage_stats(self): + """Test case with no usage statistics.""" + row = EvaluationRow( + messages=[], + input_metadata=InputMetadata(completion_params={"model": "gpt-3.5-turbo", "provider": "openai"}), + execution_metadata=ExecutionMetadata(usage=None), + ) + + add_cost_metrics(row) + + assert row.execution_metadata.cost_metrics is not None + assert row.execution_metadata.cost_metrics.input_cost_usd == 0.0 + assert row.execution_metadata.cost_metrics.output_cost_usd == 0.0 + assert row.execution_metadata.cost_metrics.total_cost_usd == 0.0 + + def test_no_completion_params(self): + """Test case with empty completion parameters.""" + row = EvaluationRow( + messages=[], + input_metadata=InputMetadata(completion_params={}), + execution_metadata=ExecutionMetadata( + usage=CompletionUsage(prompt_tokens=100, completion_tokens=50, total_tokens=150) + ), + ) + + add_cost_metrics(row) + + assert row.execution_metadata.cost_metrics is not None + assert row.execution_metadata.cost_metrics.input_cost_usd == 0.0 + assert row.execution_metadata.cost_metrics.output_cost_usd == 0.0 + assert row.execution_metadata.cost_metrics.total_cost_usd == 0.0 + + def test_zero_tokens(self): + """Test case with zero token usage.""" + row = EvaluationRow( + messages=[], + input_metadata=InputMetadata(completion_params={"model": "gpt-3.5-turbo", "provider": "openai"}), + execution_metadata=ExecutionMetadata( + usage=CompletionUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0) + ), + ) + + add_cost_metrics(row) + + assert row.execution_metadata.cost_metrics is not None + assert row.execution_metadata.cost_metrics.input_cost_usd == 0.0 + assert row.execution_metadata.cost_metrics.output_cost_usd == 0.0 + assert row.execution_metadata.cost_metrics.total_cost_usd == 0.0 + + def test_provider_mapping_variations(self): + """Test different provider mappings.""" + providers_and_expected = [ + ("openai", "gpt-3.5-turbo", "gpt-3.5-turbo"), # No prefix - known model + ( + "fireworks", + "accounts/fireworks/models/llama-v2-7b-chat", + "fireworks_ai/accounts/fireworks/models/llama-v2-7b-chat", + ), + ("unknown_provider", "gpt-3.5-turbo", "gpt-3.5-turbo"), # Fallback to original - use known model + ] + + for provider, model, expected_model_id in providers_and_expected: + row = EvaluationRow( + messages=[], + input_metadata=InputMetadata(completion_params={"model": model, "provider": provider}), + execution_metadata=ExecutionMetadata( + usage=CompletionUsage(prompt_tokens=10, completion_tokens=5, total_tokens=15) + ), + ) + + add_cost_metrics(row) + + # Should not raise an error and should set cost metrics + assert row.execution_metadata.cost_metrics is not None + + def test_model_without_provider(self): + """Test model string without provider field.""" + row = EvaluationRow( + messages=[], + input_metadata=InputMetadata( + completion_params={"model": "gpt-3.5-turbo"} # No provider field + ), + execution_metadata=ExecutionMetadata( + usage=CompletionUsage(prompt_tokens=50, completion_tokens=25, total_tokens=75) + ), + ) + + add_cost_metrics(row) + + assert row.execution_metadata.cost_metrics is not None + # Should still work for OpenAI models even without explicit provider + + def test_execution_metadata_timing_field(self): + """Test that the new duration_seconds field works correctly.""" + metadata = ExecutionMetadata() + + # Check field exists and defaults to None + assert hasattr(metadata, "duration_seconds") + assert metadata.duration_seconds is None + + # Check it can be set + metadata.duration_seconds = 1.234 + assert metadata.duration_seconds == 1.234 From 4578bcf7085f29464440a2a8a02813261a5e8e16 Mon Sep 17 00:00:00 2001 From: Derek Xu Date: Wed, 3 Sep 2025 17:02:19 -0700 Subject: [PATCH 2/2] comment update --- eval_protocol/pytest/evaluation_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/eval_protocol/pytest/evaluation_test.py b/eval_protocol/pytest/evaluation_test.py index 59e48f61..bdf86275 100644 --- a/eval_protocol/pytest/evaluation_test.py +++ b/eval_protocol/pytest/evaluation_test.py @@ -478,7 +478,6 @@ async def _collect_result(config, lst): # pyright: ignore[reportUnknownParamete # else, we execute runs in parallel if isinstance(rollout_processor, MCPGymRolloutProcessor): # For MCPGymRolloutProcessor, create and execute tasks one at a time to avoid port conflicts - # For now, no tqdm progress bar because logs override it, we can revisit this later for run_idx in range(num_runs): task = asyncio.create_task(execute_run(run_idx, config)) await task