|
| 1 | +import asyncio |
| 2 | +from typing import Any |
| 3 | + |
| 4 | +from helpers.ws import WsHandler |
| 5 | +from helpers.ws_manager import WsResult |
| 6 | +from helpers.print_style import PrintStyle |
| 7 | +from helpers import runtime |
| 8 | + |
| 9 | + |
| 10 | +class WsDevTest(WsHandler): |
| 11 | + """Developer-only WebSocket test harness handler.""" |
| 12 | + |
| 13 | + async def process(self, event: str, data: dict, sid: str) -> dict[str, Any] | WsResult | None: |
| 14 | + if event == "ws_event_console_subscribe": |
| 15 | + if not runtime.is_development(): |
| 16 | + return WsResult.error( |
| 17 | + code="NOT_AVAILABLE", |
| 18 | + message="Event console is available only in development mode", |
| 19 | + ) |
| 20 | + registered = self.manager.register_diagnostic_watcher(self.namespace, sid) |
| 21 | + if not registered: |
| 22 | + return WsResult.error( |
| 23 | + code="SUBSCRIBE_FAILED", |
| 24 | + message="Unable to subscribe to diagnostics", |
| 25 | + ) |
| 26 | + return {"status": "subscribed", "timestamp": data.get("requestedAt")} |
| 27 | + |
| 28 | + if event == "ws_event_console_unsubscribe": |
| 29 | + self.manager.unregister_diagnostic_watcher(self.namespace, sid) |
| 30 | + return {"status": "unsubscribed"} |
| 31 | + |
| 32 | + if event == "ws_tester_emit": |
| 33 | + message = data.get("message", "emit") |
| 34 | + payload = {"message": message, "echo": True, "timestamp": data.get("timestamp")} |
| 35 | + await self.broadcast("ws_tester_broadcast", payload) |
| 36 | + PrintStyle.info(f"Harness emit broadcasted message='{message}'") |
| 37 | + return None |
| 38 | + |
| 39 | + if event == "ws_tester_request": |
| 40 | + value = data.get("value") |
| 41 | + PrintStyle.debug("Harness request responded with echo %s", value) |
| 42 | + return {"echo": value, "handler": self.identifier, "status": "ok"} |
| 43 | + |
| 44 | + if event == "ws_tester_request_delayed": |
| 45 | + delay_ms = int(data.get("delay_ms", 0)) |
| 46 | + await asyncio.sleep(delay_ms / 1000) |
| 47 | + PrintStyle.warning("Harness delayed request finished after %s ms", delay_ms) |
| 48 | + return {"status": "delayed", "delay_ms": delay_ms, "handler": self.identifier} |
| 49 | + |
| 50 | + if event == "ws_tester_trigger_persistence": |
| 51 | + phase = data.get("phase", "unknown") |
| 52 | + payload = {"phase": phase, "handler": self.identifier} |
| 53 | + await self.emit_to(sid, "ws_tester_persistence", payload) |
| 54 | + PrintStyle.info(f"Harness persistence event phase='{phase}' -> {sid}") |
| 55 | + return None |
| 56 | + |
| 57 | + if event == "ws_tester_broadcast_demo_trigger": |
| 58 | + payload = {"demo": True, "requested_at": data.get("requested_at")} |
| 59 | + await self.broadcast("ws_tester_broadcast_demo", payload) |
| 60 | + PrintStyle.info("Harness broadcast demo event dispatched") |
| 61 | + return None |
| 62 | + |
| 63 | + if event == "ws_tester_request_all": |
| 64 | + correlation_id = data.get("correlationId") |
| 65 | + aggregated = await self.dispatch_to_all_sids( |
| 66 | + "ws_tester_request", |
| 67 | + {"value": data.get("marker", "aggregate")}, |
| 68 | + correlation_id=correlation_id, |
| 69 | + ) |
| 70 | + return {"results": aggregated} |
| 71 | + |
| 72 | + # Ignore events not targeted at this handler (other activated handlers |
| 73 | + # may process them). Only warn for events that look like dev-harness |
| 74 | + # traffic so we don't spam logs with unrelated events. |
| 75 | + if event.startswith("ws_tester_"): |
| 76 | + PrintStyle.warning(f"Harness received unknown event '{event}'") |
| 77 | + return None |
0 commit comments