Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions eval_protocol/mcp/execution/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def execute_rollouts(
async def _execute_with_semaphore(idx):
async with semaphore:
evaluation_row: EvaluationRow = evaluation_rows[idx]
row_start_time = time.perf_counter()

trajectory = await self._execute_rollout(
envs, policy, idx, steps, openai_logger, recording_mode, playback_mode, start_time, evaluation_row
Expand Down Expand Up @@ -152,6 +153,8 @@ async def _execute_with_semaphore(idx):
else:
evaluation_row.rollout_status = Status.rollout_running()

evaluation_row.execution_metadata.duration_seconds = time.perf_counter() - row_start_time

return evaluation_row

# Create all tasks
Expand Down
5 changes: 5 additions & 0 deletions eval_protocol/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,11 @@ class ExecutionMetadata(BaseModel):

cost_metrics: Optional[CostMetrics] = Field(default=None, description="Cost breakdown for LLM API calls.")

duration_seconds: Optional[float] = Field(
default=None,
description="Processing duration in seconds for this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.",
)


class EvaluationRow(BaseModel):
"""
Expand Down
5 changes: 5 additions & 0 deletions eval_protocol/pytest/default_agent_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import logging
import os
import time
from typing import Any, AsyncIterator, List, Optional, Union, Dict

from mcp.types import CallToolResult, TextContent
Expand Down Expand Up @@ -240,6 +241,8 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) ->

async def process_row(row: EvaluationRow) -> EvaluationRow:
"""Process a single row with agent rollout."""
start_time = time.perf_counter()

agent = Agent(
model=row.input_metadata.completion_params["model"],
row=row,
Expand All @@ -256,6 +259,8 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
total_tokens=agent.usage["total_tokens"],
)

agent.evaluation_row.execution_metadata.duration_seconds = time.perf_counter() - start_time

return agent.evaluation_row
finally:
if agent.mcp_client:
Expand Down
6 changes: 6 additions & 0 deletions eval_protocol/pytest/default_langchain_rollout_processor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import time
from typing import List

try:
from langchain_core.messages import BaseMessage

Check failure on line 6 in eval_protocol/pytest/default_langchain_rollout_processor.py

View workflow job for this annotation

GitHub Actions / Lint & Type Check

Type "type[langchain_core.messages.base.BaseMessage]" is not assignable to declared type "type[eval_protocol.pytest.default_langchain_rollout_processor.BaseMessage]"   "langchain_core.messages.base.BaseMessage" is not assignable to "eval_protocol.pytest.default_langchain_rollout_processor.BaseMessage"   Type "type[langchain_core.messages.base.BaseMessage]" is not assignable to type "type[eval_protocol.pytest.default_langchain_rollout_processor.BaseMessage]" (reportAssignmentType)
except Exception: # pragma: no cover - optional dependency path
# Minimal fallback base type to satisfy typing when langchain is not present
class BaseMessage: # type: ignore
Expand Down Expand Up @@ -31,9 +32,11 @@
tasks: List[asyncio.Task] = []

async def _process_row(row: EvaluationRow) -> EvaluationRow:
start_time = time.perf_counter()

# Build LC messages from EP row
try:
from langchain_core.messages import HumanMessage

Check failure on line 39 in eval_protocol/pytest/default_langchain_rollout_processor.py

View workflow job for this annotation

GitHub Actions / Lint & Type Check

Type "type[langchain_core.messages.human.HumanMessage]" is not assignable to declared type "type[eval_protocol.pytest.default_langchain_rollout_processor.LangGraphRolloutProcessor.HumanMessage]"   "langchain_core.messages.human.HumanMessage" is not assignable to "eval_protocol.pytest.default_langchain_rollout_processor.LangGraphRolloutProcessor.HumanMessage"   Type "type[langchain_core.messages.human.HumanMessage]" is not assignable to type "type[eval_protocol.pytest.default_langchain_rollout_processor.LangGraphRolloutProcessor.HumanMessage]" (reportAssignmentType)
except Exception:
# Fallback minimal message if langchain_core is unavailable
class HumanMessage(BaseMessage): # type: ignore
Expand Down Expand Up @@ -114,13 +117,16 @@
try:
from eval_protocol.adapters.langchain import serialize_lc_message_to_ep as _ser

return _ser(msg)

Check failure on line 120 in eval_protocol/pytest/default_langchain_rollout_processor.py

View workflow job for this annotation

GitHub Actions / Lint & Type Check

Argument of type "BaseMessage" cannot be assigned to parameter "msg" of type "BaseMessage" in function "serialize_lc_message_to_ep"   "eval_protocol.pytest.default_langchain_rollout_processor.BaseMessage" is not assignable to "langchain_core.messages.base.BaseMessage" (reportArgumentType)
except Exception:
# Minimal fallback: best-effort string content only
content = getattr(msg, "content", "")
return Message(role=getattr(msg, "type", "assistant"), content=str(content))

row.messages = [_serialize_message(m) for m in result_messages]

row.execution_metadata.duration_seconds = time.perf_counter() - start_time

return row

for r in rows:
Expand Down
5 changes: 5 additions & 0 deletions eval_protocol/pytest/default_pydantic_ai_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
import time
import types
from pydantic_ai.models import Model
from typing_extensions import override
Expand Down Expand Up @@ -85,6 +86,8 @@ def __call__(self, rows: list[EvaluationRow], config: RolloutProcessorConfig) ->

async def process_row(row: EvaluationRow) -> EvaluationRow:
"""Process a single row with agent rollout."""
start_time = time.perf_counter()

model_messages = [self.convert_ep_message_to_pyd_message(m, row) for m in row.messages]
response = await agent_instance.run(
message_history=model_messages, model=model, usage_limits=config.kwargs.get("usage_limits")
Expand All @@ -99,6 +102,8 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
# total_tokens=usage_info.total_tokens or 0,
# )

row.execution_metadata.duration_seconds = time.perf_counter() - start_time

return row

async def _sem_wrapper(r: EvaluationRow) -> EvaluationRow:
Expand Down
5 changes: 5 additions & 0 deletions eval_protocol/pytest/default_single_turn_rollout_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def __call__(self, rows: List[EvaluationRow], config: RolloutProcessorConfig) ->

async def process_row(row: EvaluationRow) -> EvaluationRow:
"""Process a single row asynchronously."""
start_time = time.perf_counter()

if len(row.messages) == 0:
raise ValueError("Messages is empty. Please provide a non-empty dataset")

Expand Down Expand Up @@ -116,6 +118,9 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
)

row.messages = messages

row.execution_metadata.duration_seconds = time.perf_counter() - start_time

default_logger.log(row)
return row

Expand Down
1 change: 0 additions & 1 deletion eval_protocol/pytest/evaluation_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,6 @@ async def _collect_result(config, lst): # pyright: ignore[reportUnknownParamete
# else, we execute runs in parallel
if isinstance(rollout_processor, MCPGymRolloutProcessor):
# For MCPGymRolloutProcessor, create and execute tasks one at a time to avoid port conflicts
# For now, no tqdm progress bar because logs override it, we can revisit this later
for run_idx in range(num_runs):
task = asyncio.create_task(execute_run(run_idx, config))
await task
Expand Down
163 changes: 163 additions & 0 deletions tests/pytest/test_execution_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import pytest
from openai.types import CompletionUsage

from eval_protocol.models import EvaluationRow, ExecutionMetadata, InputMetadata, CostMetrics, Message
from eval_protocol.pytest.utils import add_cost_metrics


class TestExecutionMetadata:
"""Test execution metadata tracking including cost metrics, usage statistics, and timing."""

def test_single_model_with_provider(self):
"""Test normal case: single model string with provider."""
row = EvaluationRow(
messages=[],
input_metadata=InputMetadata(
completion_params={"model": "accounts/fireworks/models/gpt-oss-120b", "provider": "fireworks"}
),
execution_metadata=ExecutionMetadata(
usage=CompletionUsage(prompt_tokens=100, completion_tokens=50, total_tokens=150)
),
)

add_cost_metrics(row)

assert row.execution_metadata.cost_metrics is not None
assert row.execution_metadata.cost_metrics.input_cost_usd is not None
assert row.execution_metadata.cost_metrics.output_cost_usd is not None
assert row.execution_metadata.cost_metrics.total_cost_usd is not None

@pytest.mark.skip(reason="Revisit when we figure out how to get cost metrics for multi-agent Pydantic.")
def test_pydantic_ai_multi_agent_model_dict(self):
"""Test Pydantic AI multi-agent case: nested dictionary with multiple models."""
row = EvaluationRow(
messages=[],
input_metadata=InputMetadata(
completion_params={
"model": {
"joke_generation_model": {
"model": "accounts/fireworks/models/kimi-k2-instruct",
"provider": "fireworks",
},
"joke_selection_model": {
"model": "accounts/fireworks/models/deepseek-v3p1",
"provider": "fireworks",
},
}
}
),
execution_metadata=ExecutionMetadata(
usage=CompletionUsage(prompt_tokens=200, completion_tokens=75, total_tokens=275)
),
)

add_cost_metrics(row)

assert row.execution_metadata.cost_metrics is not None
assert row.execution_metadata.cost_metrics.input_cost_usd is not None
assert row.execution_metadata.cost_metrics.output_cost_usd is not None
assert row.execution_metadata.cost_metrics.total_cost_usd is not None

def test_no_usage_stats(self):
"""Test case with no usage statistics."""
row = EvaluationRow(
messages=[],
input_metadata=InputMetadata(completion_params={"model": "gpt-3.5-turbo", "provider": "openai"}),
execution_metadata=ExecutionMetadata(usage=None),
)

add_cost_metrics(row)

assert row.execution_metadata.cost_metrics is not None
assert row.execution_metadata.cost_metrics.input_cost_usd == 0.0
assert row.execution_metadata.cost_metrics.output_cost_usd == 0.0
assert row.execution_metadata.cost_metrics.total_cost_usd == 0.0

def test_no_completion_params(self):
"""Test case with empty completion parameters."""
row = EvaluationRow(
messages=[],
input_metadata=InputMetadata(completion_params={}),
execution_metadata=ExecutionMetadata(
usage=CompletionUsage(prompt_tokens=100, completion_tokens=50, total_tokens=150)
),
)

add_cost_metrics(row)

assert row.execution_metadata.cost_metrics is not None
assert row.execution_metadata.cost_metrics.input_cost_usd == 0.0
assert row.execution_metadata.cost_metrics.output_cost_usd == 0.0
assert row.execution_metadata.cost_metrics.total_cost_usd == 0.0

def test_zero_tokens(self):
"""Test case with zero token usage."""
row = EvaluationRow(
messages=[],
input_metadata=InputMetadata(completion_params={"model": "gpt-3.5-turbo", "provider": "openai"}),
execution_metadata=ExecutionMetadata(
usage=CompletionUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0)
),
)

add_cost_metrics(row)

assert row.execution_metadata.cost_metrics is not None
assert row.execution_metadata.cost_metrics.input_cost_usd == 0.0
assert row.execution_metadata.cost_metrics.output_cost_usd == 0.0
assert row.execution_metadata.cost_metrics.total_cost_usd == 0.0

def test_provider_mapping_variations(self):
"""Test different provider mappings."""
providers_and_expected = [
("openai", "gpt-3.5-turbo", "gpt-3.5-turbo"), # No prefix - known model
(
"fireworks",
"accounts/fireworks/models/llama-v2-7b-chat",
"fireworks_ai/accounts/fireworks/models/llama-v2-7b-chat",
),
("unknown_provider", "gpt-3.5-turbo", "gpt-3.5-turbo"), # Fallback to original - use known model
]

for provider, model, expected_model_id in providers_and_expected:
row = EvaluationRow(
messages=[],
input_metadata=InputMetadata(completion_params={"model": model, "provider": provider}),
execution_metadata=ExecutionMetadata(
usage=CompletionUsage(prompt_tokens=10, completion_tokens=5, total_tokens=15)
),
)

add_cost_metrics(row)

# Should not raise an error and should set cost metrics
assert row.execution_metadata.cost_metrics is not None

def test_model_without_provider(self):
"""Test model string without provider field."""
row = EvaluationRow(
messages=[],
input_metadata=InputMetadata(
completion_params={"model": "gpt-3.5-turbo"} # No provider field
),
execution_metadata=ExecutionMetadata(
usage=CompletionUsage(prompt_tokens=50, completion_tokens=25, total_tokens=75)
),
)

add_cost_metrics(row)

assert row.execution_metadata.cost_metrics is not None
# Should still work for OpenAI models even without explicit provider

def test_execution_metadata_timing_field(self):
"""Test that the new duration_seconds field works correctly."""
metadata = ExecutionMetadata()

# Check field exists and defaults to None
assert hasattr(metadata, "duration_seconds")
assert metadata.duration_seconds is None

# Check it can be set
metadata.duration_seconds = 1.234
assert metadata.duration_seconds == 1.234
Loading