Skip to content

Commit 30f662f

Browse files
fix: backfill rollout status fields from logs when polling completes
The lightweight `/status` endpoint on the tracing gateway only returns the status code; `Message`, `Details`, and `Extras` still live on the Logs table. After PR #446 stopped reading from `/logs` on terminal status, the SDK was constructing `Status(code=..., message="", details=[])` for every completed rollout and `EvalProtocolError(message="")` for failures, which broke `tests/remote_server/test_remote_fireworks_propagate_status.py` (`assert row.rollout_status.message == "test error"`). Restore the two-phase polling shape from the original PR: poll `/status` for the code, and on a terminal (non-RUNNING) code do one `async_search_logs` call to backfill `message`/`details`/`extras` from the matching log row. This is still ~1000x cheaper on the Logs table than the pre-#446 polling loop because the search runs once per rollout completion instead of every poll interval. Made-with: Cursor Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 86a52a4 commit 30f662f

1 file changed

Lines changed: 20 additions & 4 deletions

File tree

eval_protocol/pytest/remote_rollout_processor.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,25 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
139139
status_code,
140140
)
141141

142-
status_message = status.get("message", "") or ""
143-
status_details = status.get("details", []) or []
142+
# /status only returns the code; backfill message/details/extras from Logs once.
143+
status_message: str = ""
144+
status_details: list = []
145+
status_extras: dict = {}
146+
completed_logs = await self._tracing_adapter.async_search_logs(
147+
session, tags=[f"rollout_id:{row.execution_metadata.rollout_id}"]
148+
)
149+
for log in completed_logs:
150+
sd = log.get("status")
151+
if sd and isinstance(sd, dict) and "code" in sd:
152+
status_message = sd.get("message", "") or ""
153+
status_details = sd.get("details", []) or []
154+
raw_extras = log.get("extras") or {}
155+
status_extras = {
156+
k: v
157+
for k, v in raw_extras.items()
158+
if k not in ("logger_name", "level", "timestamp")
159+
}
160+
break
144161

145162
exception = exception_for_status_code(status_code, status_message)
146163
if exception is not None:
@@ -152,8 +169,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
152169
details=status_details,
153170
)
154171

155-
status_extras = (status_result or {}).get("extras")
156-
if isinstance(status_extras, dict):
172+
if status_extras:
157173
if row.execution_metadata.extra:
158174
row.execution_metadata.extra.update(status_extras)
159175
else:

0 commit comments

Comments
 (0)