Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eval_protocol/integrations/tinker_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
# Update row
new_messages = list(row.messages) + [Message(role="assistant", content=assistant_content)]
row.messages = new_messages
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

# Log usage (approximate since Tinker might not return usage stats in same format)
# We can count tokens ourselves
Expand Down
2 changes: 1 addition & 1 deletion eval_protocol/mcp/execution/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ async def _execute_with_semaphore(idx):
else:
evaluation_row.rollout_status = Status.rollout_running()

evaluation_row.execution_metadata.duration_seconds = time.perf_counter() - row_start_time
evaluation_row.execution_metadata.rollout_duration_seconds = time.perf_counter() - row_start_time

return evaluation_row

Expand Down
13 changes: 12 additions & 1 deletion eval_protocol/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,20 @@ class ExecutionMetadata(BaseModel):

cost_metrics: Optional[CostMetrics] = Field(default=None, description="Cost breakdown for LLM API calls.")

# deprecated: use rollout_duration_seconds and eval_duration_seconds instead
duration_seconds: Optional[float] = Field(
default=None,
description="Processing duration in seconds for this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.",
description="[Deprecated] Processing duration in seconds for this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, thanks!

)

rollout_duration_seconds: Optional[float] = Field(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding retries, I think it would still be valuable to track total_duration_seconds so that people can get a sense of wall clock time for this row. This can be helpful in the UI as well

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow up PR work though

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, i think we should track number of retries as well. for duration we probably still only count the last successful run maybe. for failure i think failure reason matters more

default=None,
description="Processing duration in seconds for the rollout of this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.",
)

eval_duration_seconds: Optional[float] = Field(
default=None,
description="Processing duration in seconds for the evaluation of this evaluation row. Note that if it gets retried, this will be the duration of the last attempt.",
)

experiment_duration_seconds: Optional[float] = Field(
Expand Down
2 changes: 1 addition & 1 deletion eval_protocol/pytest/default_agent_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
total_tokens=agent.usage["total_tokens"],
)

agent.evaluation_row.execution_metadata.duration_seconds = time.perf_counter() - start_time
agent.evaluation_row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

return agent.evaluation_row
finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
# total_tokens=usage_info.total_tokens or 0,
# )

row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

return row

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:

row.messages = messages

row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

default_logger.log(row)
return row
Expand Down
11 changes: 9 additions & 2 deletions eval_protocol/pytest/evaluation_test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@


async def run_tasks_with_eval_progress(
pointwise_tasks: list[asyncio.Task[EvaluationRow]], run_idx: int
pointwise_tasks: list[asyncio.Task[EvaluationRow]], run_idx: int, disable_tqdm: bool = False
) -> list[EvaluationRow]:
"""
Run evaluation tasks with a progress bar and proper cancellation handling.
Expand All @@ -66,6 +66,7 @@ async def run_tasks_with_eval_progress(
miniters=1,
mininterval=0.1,
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
disable=disable_tqdm,
) as eval_pbar:

async def task_with_progress(task: asyncio.Task[EvaluationRow]) -> EvaluationRow:
Expand All @@ -88,7 +89,10 @@ async def task_with_progress(task: asyncio.Task[EvaluationRow]) -> EvaluationRow


async def run_tasks_with_run_progress(
execute_run_func: Callable[[int, RolloutProcessorConfig], Any], num_runs: int, config: RolloutProcessorConfig
execute_run_func: Callable[[int, RolloutProcessorConfig], Any],
num_runs: int,
config: RolloutProcessorConfig,
disable_tqdm: bool = False,
) -> None:
"""
Run tasks with a parallel runs progress bar, preserving original logic.
Expand All @@ -108,6 +112,7 @@ async def run_tasks_with_run_progress(
dynamic_ncols=True,
miniters=1,
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
disable=disable_tqdm,
) as run_pbar:

async def execute_run_with_progress(run_idx: int, config: RolloutProcessorConfig) -> Any:
Expand Down Expand Up @@ -330,6 +335,7 @@ async def rollout_processor_with_retry(
fresh_dataset: list[EvaluationRow],
config: RolloutProcessorConfig,
run_idx: int = 0,
disable_tqdm: bool = False,
) -> AsyncGenerator[EvaluationRow, None]:
"""
Wrapper around rollout_processor that handles retry logic using the Python backoff library.
Expand Down Expand Up @@ -449,6 +455,7 @@ async def execute_row_with_backoff_and_log(
miniters=1,
mininterval=0.1,
bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]",
disable=disable_tqdm,
) as rollout_pbar:
# Yield results as they complete
for task in asyncio.as_completed(retry_tasks):
Expand Down
8 changes: 4 additions & 4 deletions eval_protocol/pytest/github_action_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,15 @@ def _list_runs():
row.rollout_status = Status.rollout_error(
f"Failed to find workflow run in GHA with rollout_id {row.execution_metadata.rollout_id}"
)
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time
return row

run_id = run.get("id")
if not run_id:
row.rollout_status = Status.rollout_error(
f"Failed to find workflow run in GHA with rollout_id {row.execution_metadata.rollout_id}"
)
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time
return row

# Poll the specific run until completion
Expand All @@ -194,10 +194,10 @@ def _get_run() -> Dict[str, Any]:
row.rollout_status = Status.rollout_error(
f"GitHub Actions run timed out after {self.timeout_seconds} seconds"
)
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time
return row

row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

def _update_with_trace() -> None:
return update_row_with_remote_trace(row, self._output_data_loader, self.model_base_url)
Expand Down
6 changes: 3 additions & 3 deletions eval_protocol/pytest/openenv_rollout_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
completion_tokens=usage["completion_tokens"],
total_tokens=usage["total_tokens"],
)
row.execution_metadata.duration_seconds = time.perf_counter() - start_time
row.execution_metadata.rollout_duration_seconds = time.perf_counter() - start_time

# Attach per-step rewards and accumulated token IDs to
# execution_metadata.extra for downstream integrations
Expand All @@ -436,14 +436,14 @@ async def process_row(row: EvaluationRow) -> EvaluationRow:
logger.info("[OpenEnvRolloutProcessor] Total reward: %.3f", total_reward)
logger.info(
"[OpenEnvRolloutProcessor] Duration: %.2fs",
row.execution_metadata.duration_seconds,
row.execution_metadata.rollout_duration_seconds,
)
logger.debug("[OpenEnvRolloutProcessor] Messages collected: %d", len(messages))

logger.info(
f"Rollout complete: {len(step_rewards)} steps, "
f"total_reward={total_reward:.2f}, "
f"duration={row.execution_metadata.duration_seconds:.2f}s"
f"duration={row.execution_metadata.rollout_duration_seconds:.2f}s"
)
# Final log with complete message history
if getattr(config, "logger", None):
Expand Down
Loading
Loading