66import time
77from contextlib import asynccontextmanager
88from queue import Queue
9- from typing import TYPE_CHECKING , Any , List , Optional
9+ from typing import TYPE_CHECKING , Any , Dict , List , Optional
1010
1111import psutil
1212import uvicorn
13- from fastapi import FastAPI , WebSocket , WebSocketDisconnect
13+ from fastapi import FastAPI , WebSocket , WebSocketDisconnect , HTTPException , Query
14+ from fastapi .middleware .cors import CORSMiddleware
1415
1516from eval_protocol .dataset_logger import default_logger
1617from eval_protocol .dataset_logger .dataset_logger import LOG_EVENT_TYPE
1718from eval_protocol .event_bus import event_bus
1819from eval_protocol .models import Status
1920from eval_protocol .utils .vite_server import ViteServer
21+ from eval_protocol .logging .elasticsearch_client import ElasticsearchClient
22+ from eval_protocol .types .remote_rollout_processor import ElasticsearchConfig
2023
2124if TYPE_CHECKING :
2225 from eval_protocol .models import EvaluationRow
@@ -71,8 +74,11 @@ async def _start_broadcast_loop(self):
7174 while True :
7275 try :
7376 # Wait for a message to be queued
74- message = await asyncio .get_event_loop ().run_in_executor (None , self ._broadcast_queue .get )
75- await self ._send_text_to_all_connections (message )
77+ message_data = await asyncio .get_event_loop ().run_in_executor (None , self ._broadcast_queue .get )
78+
79+ # Regular string message for all connections
80+ await self ._send_text_to_all_connections (str (message_data ))
81+
7682 except Exception as e :
7783 logger .error (f"Error in broadcast loop: { e } " )
7884 await asyncio .sleep (0.1 )
@@ -238,8 +244,8 @@ class LogsServer(ViteServer):
238244 Enhanced server for serving Vite-built SPA with file watching and WebSocket support.
239245
240246 This server extends ViteServer to add:
241- - WebSocket connections for real-time updates
242- - Live log streaming
247+ - WebSocket connections for real-time evaluation row updates
248+ - REST API for log querying
243249 """
244250
245251 def __init__ (
@@ -250,17 +256,40 @@ def __init__(
250256 host : str = "localhost" ,
251257 port : Optional [int ] = 8000 ,
252258 index_file : str = "index.html" ,
259+ elasticsearch_config : Optional [ElasticsearchConfig ] = None ,
253260 ):
254261 # Initialize WebSocket manager
255262 self .websocket_manager = WebSocketManager ()
256263
264+ # Initialize Elasticsearch client if config is provided
265+ self .elasticsearch_client : Optional [ElasticsearchClient ] = None
266+ if elasticsearch_config :
267+ self .elasticsearch_client = ElasticsearchClient (elasticsearch_config )
268+
257269 super ().__init__ (build_dir , host , port if port is not None else 8000 , index_file )
258270
271+ # Add CORS middleware to allow frontend access
272+ allowed_origins = [
273+ "http://localhost:5173" , # Vite dev server
274+ "http://127.0.0.1:5173" , # Vite dev server (alternative)
275+ f"http://{ host } :{ port } " , # Server's own origin
276+ f"http://localhost:{ port } " , # Server on localhost
277+ ]
278+
279+ self .app .add_middleware (
280+ CORSMiddleware ,
281+ allow_origins = allowed_origins ,
282+ allow_credentials = True ,
283+ allow_methods = ["*" ],
284+ allow_headers = ["*" ],
285+ )
286+
259287 # Initialize evaluation watcher
260288 self .evaluation_watcher = EvaluationWatcher (self .websocket_manager )
261289
262- # Add WebSocket endpoint
290+ # Add WebSocket endpoint and API routes
263291 self ._setup_websocket_routes ()
292+ self ._setup_api_routes ()
264293
265294 # Subscribe to events and start listening for cross-process events
266295 event_bus .subscribe (self ._handle_event )
@@ -275,14 +304,17 @@ async def websocket_endpoint(websocket: WebSocket):
275304 await self .websocket_manager .connect (websocket )
276305 try :
277306 while True :
278- # Keep connection alive
307+ # Keep connection alive (for evaluation row updates)
279308 await websocket .receive_text ()
280309 except WebSocketDisconnect :
281310 self .websocket_manager .disconnect (websocket )
282311 except Exception as e :
283312 logger .error (f"WebSocket error: { e } " )
284313 self .websocket_manager .disconnect (websocket )
285314
315+ def _setup_api_routes (self ):
316+ """Set up API routes."""
317+
286318 @self .app .get ("/api/status" )
287319 async def status ():
288320 """Get server status including active connections."""
@@ -295,8 +327,59 @@ async def status():
295327 # LogsServer inherits from ViteServer which doesn't expose watch_paths
296328 # Expose an empty list to satisfy consumers and type checker
297329 "watch_paths" : [],
330+ "elasticsearch_enabled" : self .elasticsearch_client is not None ,
298331 }
299332
333+ @self .app .get ("/api/logs/{rollout_id}" )
334+ async def get_logs (
335+ rollout_id : str ,
336+ level : Optional [str ] = Query (None , description = "Filter by log level (DEBUG, INFO, WARNING, ERROR)" ),
337+ limit : int = Query (100 , description = "Maximum number of log entries to return" ),
338+ ):
339+ """Get logs for a specific rollout ID from Elasticsearch."""
340+ if not self .elasticsearch_client :
341+ raise HTTPException (status_code = 503 , detail = "Elasticsearch is not configured for this logs server" )
342+
343+ try :
344+ # Search for logs by rollout_id
345+ search_results = self .elasticsearch_client .search_by_match ("rollout_id" , rollout_id , size = limit )
346+
347+ if not search_results or "hits" not in search_results :
348+ return {"logs" : [], "total" : 0 }
349+
350+ logs = []
351+ for hit in search_results ["hits" ]["hits" ]:
352+ log_entry = hit ["_source" ]
353+
354+ # Filter by level if specified
355+ if level and log_entry .get ("level" ) != level :
356+ continue
357+
358+ logs .append (log_entry )
359+
360+ # Sort by timestamp (most recent first)
361+ logs .sort (key = lambda x : x .get ("@timestamp" , "" ), reverse = True )
362+
363+ # Get total count
364+ total_hits = search_results ["hits" ]["total" ]
365+ if isinstance (total_hits , dict ):
366+ # Elasticsearch 7+ format
367+ total_count = total_hits ["value" ]
368+ else :
369+ # Elasticsearch 6 format
370+ total_count = total_hits
371+
372+ return {
373+ "logs" : logs ,
374+ "total" : total_count ,
375+ "rollout_id" : rollout_id ,
376+ "filtered_by_level" : level ,
377+ }
378+
379+ except Exception as e :
380+ logger .error (f"Error retrieving logs for rollout { rollout_id } : { e } " )
381+ raise HTTPException (status_code = 500 , detail = f"Failed to retrieve logs: { str (e )} " )
382+
300383 def _handle_event (self , event_type : str , data : Any ) -> None :
301384 """Handle events from the event bus."""
302385 if event_type in [LOG_EVENT_TYPE ]:
@@ -353,7 +436,12 @@ def run(self):
353436 asyncio .run (self .run_async ())
354437
355438
356- def create_app (host : str = "localhost" , port : int = 8000 , build_dir : Optional [str ] = None ) -> FastAPI :
439+ def create_app (
440+ host : str = "localhost" ,
441+ port : int = 8000 ,
442+ build_dir : Optional [str ] = None ,
443+ elasticsearch_config : Optional [ElasticsearchConfig ] = None ,
444+ ) -> FastAPI :
357445 """
358446 Factory function to create a FastAPI app instance and start the server with async loops.
359447
@@ -364,6 +452,7 @@ def create_app(host: str = "localhost", port: int = 8000, build_dir: Optional[st
364452 host: Host to bind to
365453 port: Port to bind to
366454 build_dir: Optional custom build directory path
455+ elasticsearch_config: Optional Elasticsearch configuration for log querying
367456
368457 Returns:
369458 FastAPI app instance with server running in background
@@ -373,17 +462,17 @@ def create_app(host: str = "localhost", port: int = 8000, build_dir: Optional[st
373462 os .path .join (os .path .dirname (os .path .dirname (os .path .dirname (__file__ ))), "vite-app" , "dist" )
374463 )
375464
376- server = LogsServer (host = host , port = port , build_dir = build_dir )
465+ server = LogsServer (host = host , port = port , build_dir = build_dir , elasticsearch_config = elasticsearch_config )
377466 server .start_loops ()
378467 return server .app
379468
380469
381470# For backward compatibility and direct usage
382- def serve_logs (port : Optional [int ] = None ):
471+ def serve_logs (port : Optional [int ] = None , elasticsearch_config : Optional [ ElasticsearchConfig ] = None ):
383472 """
384473 Convenience function to create and run a LogsServer.
385474 """
386- server = LogsServer (port = port )
475+ server = LogsServer (port = port , elasticsearch_config = elasticsearch_config )
387476 server .run ()
388477
389478
0 commit comments