2020from eval_protocol .utils .vite_server import ViteServer
2121from eval_protocol .logging .elasticsearch_client import ElasticsearchClient
2222from eval_protocol .types .remote_rollout_processor import ElasticsearchConfig
23+ from eval_protocol .utils .logs_models import LogEntry , LogsResponse
2324
2425if TYPE_CHECKING :
2526 from eval_protocol .models import EvaluationRow
@@ -339,12 +340,12 @@ async def status():
339340 "elasticsearch_enabled" : self .elasticsearch_client is not None ,
340341 }
341342
342- @self .app .get ("/api/logs/{rollout_id}" )
343+ @self .app .get ("/api/logs/{rollout_id}" , response_model = LogsResponse , response_model_exclude_none = True )
343344 async def get_logs (
344345 rollout_id : str ,
345346 level : Optional [str ] = Query (None , description = "Filter by log level (DEBUG, INFO, WARNING, ERROR)" ),
346347 limit : int = Query (100 , description = "Maximum number of log entries to return" ),
347- ):
348+ ) -> LogsResponse :
348349 """Get logs for a specific rollout ID from Elasticsearch."""
349350 if not self .elasticsearch_client :
350351 raise HTTPException (status_code = 503 , detail = "Elasticsearch is not configured for this logs server" )
@@ -354,20 +355,35 @@ async def get_logs(
354355 search_results = self .elasticsearch_client .search_by_match ("rollout_id" , rollout_id , size = limit )
355356
356357 if not search_results or "hits" not in search_results :
357- return {"logs" : [], "total" : 0 }
358-
359- logs = []
358+ # Return empty response using Pydantic model
359+ return LogsResponse (
360+ logs = [],
361+ total = 0 ,
362+ rollout_id = rollout_id ,
363+ filtered_by_level = level ,
364+ )
365+
366+ log_entries = []
360367 for hit in search_results ["hits" ]["hits" ]:
361- log_entry = hit ["_source" ]
368+ log_data = hit ["_source" ]
362369
363370 # Filter by level if specified
364- if level and log_entry .get ("level" ) != level :
371+ if level and log_data .get ("level" ) != level :
365372 continue
366373
367- logs .append (log_entry )
374+ # Create LogEntry using Pydantic model for validation
375+ try :
376+ log_entry = LogEntry (
377+ ** log_data # Use ** to unpack the dict, Pydantic will handle field mapping
378+ )
379+ log_entries .append (log_entry )
380+ except Exception as e :
381+ # Log the error but continue processing other entries
382+ logger .warning (f"Failed to parse log entry: { e } , data: { log_data } " )
383+ continue
368384
369385 # Sort by timestamp (most recent first)
370- logs .sort (key = lambda x : x .get ( "@ timestamp" , "" ) , reverse = True )
386+ log_entries .sort (key = lambda x : x .timestamp , reverse = True )
371387
372388 # Get total count
373389 total_hits = search_results ["hits" ]["total" ]
@@ -378,12 +394,13 @@ async def get_logs(
378394 # Elasticsearch 6 format
379395 total_count = total_hits
380396
381- return {
382- "logs" : logs ,
383- "total" : total_count ,
384- "rollout_id" : rollout_id ,
385- "filtered_by_level" : level ,
386- }
397+ # Return response using Pydantic model
398+ return LogsResponse (
399+ logs = log_entries ,
400+ total = total_count ,
401+ rollout_id = rollout_id ,
402+ filtered_by_level = level ,
403+ )
387404
388405 except Exception as e :
389406 logger .error (f"Error retrieving logs for rollout { rollout_id } : { e } " )
0 commit comments