Skip to content

Commit fd70d4f

Browse files
author
Dylan Huang
committed
Enhance logging and debugging capabilities
- Added a `--debug` flag to the logs command for enabling debug mode. - Updated logs command to display debug mode status. - Enhanced logging in the SqliteDatasetLoggerAdapter and SqliteEventBus for better traceability of events and errors. - Implemented debug logging in the WebSocketManager and LogsServer to provide detailed insights during WebSocket connections and broadcasts. - Enabled debug mode for all relevant loggers in the logs server system.
1 parent 1648047 commit fd70d4f

File tree

5 files changed

+179
-29
lines changed

5 files changed

+179
-29
lines changed

eval_protocol/cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@ def parse_args(args=None):
300300
# Logs command
301301
logs_parser = subparsers.add_parser("logs", help="Serve logs with file watching and real-time updates")
302302
logs_parser.add_argument("--port", type=int, default=8000, help="Port to bind to (default: 8000)")
303+
logs_parser.add_argument("--debug", action="store_true", help="Enable debug mode")
303304

304305
# Upload command
305306
upload_parser = subparsers.add_parser(

eval_protocol/cli_commands/logs.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ def logs_command(args):
1616
print(f"🌐 URL: http://localhost:{port}")
1717
print(f"🔌 WebSocket: ws://localhost:{port}/ws")
1818
print(f"👀 Watching paths: {['current directory']}")
19+
print(f"🔍 Debug mode: {args.debug}")
1920
print("Press Ctrl+C to stop the server")
2021
print("-" * 50)
2122

@@ -25,7 +26,7 @@ def logs_command(args):
2526
elasticsearch_config = ElasticsearchSetup().setup_elasticsearch()
2627

2728
try:
28-
serve_logs(port=args.port, elasticsearch_config=elasticsearch_config)
29+
serve_logs(port=args.port, elasticsearch_config=elasticsearch_config, debug=args.debug)
2930
return 0
3031
except KeyboardInterrupt:
3132
print("\n🛑 Server stopped by user")

eval_protocol/dataset_logger/sqlite_dataset_logger_adapter.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,19 @@ def __init__(self, db_path: Optional[str] = None, store: Optional[SqliteEvaluati
2323

2424
def log(self, row: "EvaluationRow") -> None:
2525
data = row.model_dump(exclude_none=True, mode="json")
26+
rollout_id = data.get("execution_metadata", {}).get("rollout_id", "unknown")
27+
logger.debug(f"[EVENT_BUS_EMIT] Starting to log row with rollout_id: {rollout_id}")
28+
2629
self._store.upsert_row(data=data)
30+
logger.debug(f"[EVENT_BUS_EMIT] Successfully stored row in database for rollout_id: {rollout_id}")
31+
2732
try:
33+
logger.debug(f"[EVENT_BUS_EMIT] Emitting event '{LOG_EVENT_TYPE}' for rollout_id: {rollout_id}")
2834
event_bus.emit(LOG_EVENT_TYPE, EvaluationRow(**data))
35+
logger.debug(f"[EVENT_BUS_EMIT] Successfully emitted event for rollout_id: {rollout_id}")
2936
except Exception as e:
3037
# Avoid breaking storage due to event emission issues
31-
logger.error(f"Failed to emit row_upserted event: {e}")
38+
logger.error(f"[EVENT_BUS_EMIT] Failed to emit row_upserted event for rollout_id {rollout_id}: {e}")
3239
pass
3340

3441
def read(self, rollout_id: Optional[str] = None) -> List["EvaluationRow"]:

eval_protocol/event_bus/sqlite_event_bus.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,75 +35,106 @@ def emit(self, event_type: str, data: Any) -> None:
3535
event_type: Type of event (e.g., "log")
3636
data: Event data
3737
"""
38+
logger.debug(f"[CROSS_PROCESS_EMIT] Emitting event type: {event_type}")
39+
3840
# Call local listeners immediately
41+
logger.debug(f"[CROSS_PROCESS_EMIT] Calling {len(self._listeners)} local listeners")
3942
super().emit(event_type, data)
43+
logger.debug("[CROSS_PROCESS_EMIT] Completed local listener calls")
4044

4145
# Publish to cross-process subscribers
46+
logger.debug("[CROSS_PROCESS_EMIT] Publishing to cross-process subscribers")
4247
self._publish_cross_process(event_type, data)
48+
logger.debug("[CROSS_PROCESS_EMIT] Completed cross-process publish")
4349

4450
def _publish_cross_process(self, event_type: str, data: Any) -> None:
4551
"""Publish event to cross-process subscribers via database."""
46-
self._db.publish_event(event_type, data, self._process_id)
52+
logger.debug(f"[CROSS_PROCESS_PUBLISH] Publishing event {event_type} to database")
53+
try:
54+
self._db.publish_event(event_type, data, self._process_id)
55+
logger.debug(f"[CROSS_PROCESS_PUBLISH] Successfully published event {event_type} to database")
56+
except Exception as e:
57+
logger.error(f"[CROSS_PROCESS_PUBLISH] Failed to publish event {event_type} to database: {e}")
4758

4859
def start_listening(self) -> None:
4960
"""Start listening for cross-process events."""
5061
if self._running:
62+
logger.debug("[CROSS_PROCESS_LISTEN] Already listening, skipping start")
5163
return
5264

65+
logger.debug("[CROSS_PROCESS_LISTEN] Starting cross-process event listening")
5366
self._running = True
5467
self._start_database_listener()
68+
logger.debug("[CROSS_PROCESS_LISTEN] Started database listener thread")
5569

5670
def stop_listening(self) -> None:
5771
"""Stop listening for cross-process events."""
72+
logger.debug("[CROSS_PROCESS_LISTEN] Stopping cross-process event listening")
5873
self._running = False
5974
if self._listener_thread and self._listener_thread.is_alive():
75+
logger.debug("[CROSS_PROCESS_LISTEN] Waiting for listener thread to stop")
6076
self._listener_thread.join(timeout=1)
77+
logger.debug("[CROSS_PROCESS_LISTEN] Listener thread stopped")
6178

6279
def _start_database_listener(self) -> None:
6380
"""Start database-based event listener."""
6481

6582
def database_listener():
83+
logger.debug("[CROSS_PROCESS_LISTENER] Starting database listener loop")
6684
last_cleanup = time.time()
6785

6886
while self._running:
6987
try:
7088
# Get unprocessed events from other processes
7189
events = self._db.get_unprocessed_events(self._process_id)
90+
if events:
91+
logger.debug(f"[CROSS_PROCESS_LISTENER] Found {len(events)} unprocessed events")
7292

7393
for event in events:
7494
if not self._running:
7595
break
7696

7797
try:
98+
logger.debug(
99+
f"[CROSS_PROCESS_LISTENER] Processing event {event['event_id']} of type {event['event_type']}"
100+
)
78101
# Handle the event
79102
self._handle_cross_process_event(event["event_type"], event["data"])
103+
logger.debug(f"[CROSS_PROCESS_LISTENER] Successfully processed event {event['event_id']}")
80104

81105
# Mark as processed
82106
self._db.mark_event_processed(event["event_id"])
107+
logger.debug(f"[CROSS_PROCESS_LISTENER] Marked event {event['event_id']} as processed")
83108

84109
except Exception as e:
85-
logger.debug(f"Failed to process event {event['event_id']}: {e}")
110+
logger.debug(f"[CROSS_PROCESS_LISTENER] Failed to process event {event['event_id']}: {e}")
86111

87112
# Clean up old events every hour
88113
current_time = time.time()
89114
if current_time - last_cleanup >= 3600:
115+
logger.debug("[CROSS_PROCESS_LISTENER] Cleaning up old events")
90116
self._db.cleanup_old_events()
91117
last_cleanup = current_time
92118

93119
# Small sleep to prevent busy waiting
94120
time.sleep(0.1)
95121

96122
except Exception as e:
97-
logger.debug(f"Database listener error: {e}")
123+
logger.debug(f"[CROSS_PROCESS_LISTENER] Database listener error: {e}")
98124
time.sleep(1)
99125

100126
self._listener_thread = threading.Thread(target=database_listener, daemon=True)
101127
self._listener_thread.start()
102128

103129
def _handle_cross_process_event(self, event_type: str, data: Any) -> None:
104130
"""Handle events received from other processes."""
105-
for listener in self._listeners:
131+
logger.debug(f"[CROSS_PROCESS_HANDLE] Handling cross-process event type: {event_type}")
132+
logger.debug(f"[CROSS_PROCESS_HANDLE] Calling {len(self._listeners)} listeners")
133+
134+
for i, listener in enumerate(self._listeners):
106135
try:
136+
logger.debug(f"[CROSS_PROCESS_HANDLE] Calling listener {i}")
107137
listener(event_type, data)
138+
logger.debug(f"[CROSS_PROCESS_HANDLE] Successfully called listener {i}")
108139
except Exception as e:
109-
logger.debug(f"Cross-process event listener failed for {event_type}: {e}")
140+
logger.debug(f"[CROSS_PROCESS_HANDLE] Cross-process event listener {i} failed for {event_type}: {e}")

0 commit comments

Comments
 (0)