Skip to content

Commit 251ed86

Browse files
fix: backfill rollout status fields from logs when polling completes (#451)
* fix: backfill rollout status fields from logs when polling completes The lightweight `/status` endpoint on the tracing gateway only returns the status code; `Message`, `Details`, and `Extras` still live on the Logs table. After PR #446 stopped reading from `/logs` on terminal status, the SDK was constructing `Status(code=..., message="", details=[])` for every completed rollout and `EvalProtocolError(message="")` for failures, which broke `tests/remote_server/test_remote_fireworks_propagate_status.py` (`assert row.rollout_status.message == "test error"`). Restore the two-phase polling shape from the original PR: poll `/status` for the code, and on a terminal (non-RUNNING) code do one `async_search_logs` call to backfill `message`/`details`/`extras` from the matching log row. This is still ~1000x cheaper on the Logs table than the pre-#446 polling loop because the search runs once per rollout completion instead of every poll interval. Made-with: Cursor Co-authored-by: Cursor <cursoragent@cursor.com> * fix: match backfilled log status to terminal status code Bugbot pointed out that the backfill loop could pick an earlier RUNNING/partial status log instead of the terminal one when a rollout emits multiple status-bearing logs. The reported `code` was always correct (it came from /status), but `message`/`details`/`extras` could be attached from the wrong row and the raised exception would carry misleading text. Match the log row's status code to the terminal code returned by /status so the backfill is deterministic. Made-with: Cursor --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 86a52a4 commit 251ed86

1 file changed

Lines changed: 23 additions & 4 deletions

File tree

eval_protocol/pytest/remote_rollout_processor.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,28 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
139139
status_code,
140140
)
141141

142-
status_message = status.get("message", "") or ""
143-
status_details = status.get("details", []) or []
142+
# /status only returns the code; backfill message/details/extras from Logs once.
143+
status_message: str = ""
144+
status_details: list = []
145+
status_extras: dict = {}
146+
completed_logs = await self._tracing_adapter.async_search_logs(
147+
session, tags=[f"rollout_id:{row.execution_metadata.rollout_id}"]
148+
)
149+
# Pick the log row whose status code matches the terminal
150+
# code from /status, so intermediate RUNNING checkpoints
151+
# don't poison the backfill.
152+
for log in completed_logs:
153+
sd = log.get("status")
154+
if isinstance(sd, dict) and sd.get("code") == status_code:
155+
status_message = sd.get("message", "") or ""
156+
status_details = sd.get("details", []) or []
157+
raw_extras = log.get("extras") or {}
158+
status_extras = {
159+
k: v
160+
for k, v in raw_extras.items()
161+
if k not in ("logger_name", "level", "timestamp")
162+
}
163+
break
144164

145165
exception = exception_for_status_code(status_code, status_message)
146166
if exception is not None:
@@ -152,8 +172,7 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
152172
details=status_details,
153173
)
154174

155-
status_extras = (status_result or {}).get("extras")
156-
if isinstance(status_extras, dict):
175+
if status_extras:
157176
if row.execution_metadata.extra:
158177
row.execution_metadata.extra.update(status_extras)
159178
else:

0 commit comments

Comments
 (0)