1414 advance_state_request_after_snapshot ,
1515 build_snapshot_from_request ,
1616)
17- from helpers .ws import ConnectionNotFoundError
17+ from helpers .ws import ConnectionIdentity , ConnectionNotFoundError , _ws_debug_enabled , ws_debug
1818
1919if TYPE_CHECKING : # pragma: no cover - hints only
2020 from helpers .ws_manager import WsManager
2121
2222
23- ConnectionIdentity = tuple [str , str ] # (namespace, sid)
24-
25-
26- def _ws_debug_enabled () -> bool :
27- value = os .getenv ("A0_WS_DEBUG" , "" ).strip ().lower ()
28- return value in {"1" , "true" , "yes" , "on" }
29-
30-
31- def _debug_log (message : str ) -> None :
32- if not _ws_debug_enabled ():
33- return
34- PrintStyle .debug (message )
35-
36-
3723@dataclass
3824class ConnectionProjection :
3925 namespace : str
@@ -73,7 +59,7 @@ def bind_manager(self, manager: "WsManager", *, handler_id: str | None = None) -
7359 # Use the manager's dispatcher loop for all scheduling so mark_dirty can be
7460 # invoked safely from non-async contexts and other threads.
7561 self ._dispatcher_loop = getattr (manager , "_dispatcher_loop" , None )
76- _debug_log (
62+ ws_debug (
7763 f"[StateMonitor] bind_manager handler_id={ handler_id or self ._emit_handler_id } "
7864 )
7965
@@ -83,7 +69,7 @@ def register_sid(self, namespace: str, sid: str) -> None:
8369 self ._projections .setdefault (
8470 identity , ConnectionProjection (namespace = namespace , sid = sid )
8571 )
86- _debug_log (f"[StateMonitor] register_sid namespace={ namespace } sid={ sid } " )
72+ ws_debug (f"[StateMonitor] register_sid namespace={ namespace } sid={ sid } " )
8773
8874 def unregister_sid (self , namespace : str , sid : str ) -> None :
8975 identity : ConnectionIdentity = (namespace , sid )
@@ -95,7 +81,7 @@ def unregister_sid(self, namespace: str, sid: str) -> None:
9581 if task is not None :
9682 task .cancel ()
9783 self ._projections .pop (identity , None )
98- _debug_log (f"[StateMonitor] unregister_sid namespace={ namespace } sid={ sid } " )
84+ ws_debug (f"[StateMonitor] unregister_sid namespace={ namespace } sid={ sid } " )
9985
10086 def mark_dirty_all (self , * , reason : str | None = None ) -> None :
10187 wave_id = None
@@ -142,7 +128,7 @@ def update_projection(
142128 projection .request = request
143129 projection .seq_base = seq_base
144130 projection .seq = seq_base
145- _debug_log (
131+ ws_debug (
146132 f"[StateMonitor] update_projection namespace={ namespace } sid={ sid } context={ request .context !r} "
147133 f"log_from={ request .log_from } notifications_from={ request .notifications_from } "
148134 f"timezone={ request .timezone !r} seq_base={ seq_base } "
@@ -221,7 +207,7 @@ def _schedule_debounce_on_loop(self, identity: ConnectionIdentity) -> None:
221207 self .debounce_seconds , self ._on_debounce_fire , identity
222208 )
223209 self ._debounce_handles [identity ] = handle
224- _debug_log (
210+ ws_debug (
225211 f"[StateMonitor] schedule_push namespace={ projection .namespace } sid={ projection .sid } "
226212 f"delay_s={ self .debounce_seconds } "
227213 f"dirty={ projection .dirty_version } pushed={ projection .pushed_version } "
@@ -298,7 +284,7 @@ async def _flush_push(self, identity: ConnectionIdentity) -> None:
298284 if isinstance (snapshot .get ("logs" ), list )
299285 else None
300286 )
301- _debug_log (
287+ ws_debug (
302288 f"[StateMonitor] emit state_push namespace={ namespace } sid={ sid } seq={ seq } "
303289 f"context={ request .context !r} logs_len={ logs_len } "
304290 f"reason={ dirty_reason !r} wave={ dirty_wave_id !r} "
@@ -312,13 +298,13 @@ async def _flush_push(self, identity: ConnectionIdentity) -> None:
312298 )
313299 except ConnectionNotFoundError :
314300 # Sid was removed before the emit; treat as benign.
315- _debug_log (
301+ ws_debug (
316302 f"[StateMonitor] emit skipped: sid not found namespace={ namespace } sid={ sid } "
317303 )
318304 return
319305 except RuntimeError :
320306 # Dispatcher loop may be closing (e.g., during shutdown or test teardown).
321- _debug_log (
307+ ws_debug (
322308 f"[StateMonitor] emit skipped: dispatcher closing namespace={ namespace } sid={ sid } "
323309 )
324310 return
@@ -341,7 +327,7 @@ async def _flush_push(self, identity: ConnectionIdentity) -> None:
341327 if not follow_up :
342328 return
343329
344- _debug_log (
330+ ws_debug (
345331 f"[StateMonitor] follow_up_push namespace={ namespace } sid={ sid } dirty={ dirty_version } pushed={ pushed_version } "
346332 )
347333 try :
0 commit comments