@@ -120,46 +120,39 @@ async def _process_row(row: EvaluationRow) -> EvaluationRow:
120120 deadline = time .time () + timeout_seconds
121121
122122 while time .time () < deadline :
123- # Search Fireworks tracing logs for completion (run in thread to avoid blocking event loop)
124- completed_logs = await asyncio .to_thread (
125- self ._tracing_adapter .search_logs , tags = [ f" rollout_id: { row .execution_metadata .rollout_id } " ]
123+ # Poll status (run in thread to avoid blocking event loop)
124+ status_result = await asyncio .to_thread (
125+ self ._tracing_adapter .get_status , rollout_id = row .execution_metadata .rollout_id
126126 )
127- # Filter for logs that actually have status information
128- status_logs = []
129- for log in completed_logs :
130- status_dict = log .get ("status" )
131- if status_dict and isinstance (status_dict , dict ) and "code" in status_dict :
132- status_logs .append (log )
133-
134- if status_logs :
135- if len (status_logs ) > 1 :
136- logger .warning (
137- "Found %s status logs for rollout %s; expected at most 1. Using the first one: %s" ,
138- len (status_logs ),
139- row .execution_metadata .rollout_id ,
140- status_logs [0 ],
141- )
142- # Use the first log with status information
143- status_log = status_logs [0 ]
144- status_dict = status_log .get ("status" )
145- raw_extras = status_log .get ("extras" ) or {}
146- status_extras = {
147- k : v for k , v in raw_extras .items () if k not in ("logger_name" , "level" , "timestamp" )
148- }
127+ if status_result and status_result .get ("status" ):
128+ status_code = status_result ["status" ]["code" ]
149129
150130 logger .info (
151- f"Found status log for rollout { row .execution_metadata .rollout_id } : { status_log .get ('message' , '' )} "
131+ "Found status for rollout %s with code %s" ,
132+ row .execution_metadata .rollout_id ,
133+ status_code ,
152134 )
153135
154- status_code = status_dict .get ("code" )
155- status_message = status_dict .get ("message" , "" )
156- status_details = status_dict .get ("details" , [])
157-
158- logger .info (
159- f"Found Fireworks log for rollout { row .execution_metadata .rollout_id } with status code { status_code } "
136+ # Backfill message/details/extras from the full Logs table (one-shot)
137+ completed_logs = await asyncio .to_thread (
138+ self ._tracing_adapter .search_logs ,
139+ tags = [f"rollout_id:{ row .execution_metadata .rollout_id } " ],
160140 )
141+ status_message = ""
142+ status_details : list = []
143+ status_extras : dict = {}
144+ for log in completed_logs :
145+ sd = log .get ("status" )
146+ if sd and isinstance (sd , dict ) and "code" in sd :
147+ status_message = sd .get ("message" , "" )
148+ status_details = sd .get ("details" , [])
149+ raw_extras = log .get ("extras" ) or {}
150+ status_extras = {
151+ k : v for k , v in raw_extras .items ()
152+ if k not in ("logger_name" , "level" , "timestamp" )
153+ }
154+ break
161155
162- # Create and raise exception if appropriate, preserving original message
163156 exception = exception_for_status_code (status_code , status_message )
164157 if exception is not None :
165158 raise exception
0 commit comments