Skip to content

Commit 2d0592b

Browse files
authored
Merge pull request #74 from dmgav/web-sockets
Web sockets for streaming console output and system info (status)
2 parents 5215bf9 + de4c367 commit 2d0592b

7 files changed

Lines changed: 543 additions & 2 deletions

File tree

bluesky_httpserver/app.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from fastapi.openapi.utils import get_openapi
1717

1818
from .authentication import Mode
19-
from .console_output import CollectPublishedConsoleOutput
19+
from .console_output import CollectPublishedConsoleOutput, ConsoleOutputStream, SystemInfoStream
2020
from .core import PatchedStreamingResponse
2121
from .database.core import purge_expired
2222
from .resources import SERVER_RESOURCES as SR
@@ -346,6 +346,11 @@ async def purge_expired_sessions_and_api_keys():
346346

347347
SR.set_console_output_loader(CollectPublishedConsoleOutput(rm_ref=RM))
348348
SR.console_output_loader.start()
349+
SR.set_console_output_stream(ConsoleOutputStream(rm_ref=RM))
350+
SR.console_output_stream.start()
351+
SR.console_output_loader.subscribe(SR.console_output_stream.add_message)
352+
SR.set_system_info_stream(SystemInfoStream(rm_ref=RM))
353+
SR.system_info_stream.start()
349354

350355
# Import module with custom code
351356
module_names_str = os.getenv("QSERVER_CUSTOM_MODULES", None)
@@ -387,6 +392,8 @@ async def purge_expired_sessions_and_api_keys():
387392
async def shutdown_event():
388393
await SR.RM.close()
389394
await SR.console_output_loader.stop()
395+
await SR.console_output_stream.stop()
396+
await SR.system_info_stream.stop()
390397

391398
@lru_cache(1)
392399
def override_get_authenticators():

bluesky_httpserver/console_output.py

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import inspect
23
import json
34
import logging
45
import queue
@@ -51,6 +52,9 @@ def __init__(self, *, rm_ref):
5152
self._background_task_stopped = asyncio.Event()
5253
self._background_task_stopped.set()
5354

55+
self._callbacks = []
56+
self._callbacks_async = []
57+
5458
@property
5559
def queues_set(self):
5660
"""
@@ -67,6 +71,22 @@ def text_buffer_uid(self):
6771
async def get_text_buffer(self, n_lines):
6872
return await self._RM.console_monitor.text(n_lines)
6973

74+
def subscribe(self, cb):
75+
"""
76+
Add a function or a coroutine to the list of callbacks. The callbacks must accept
77+
message as a parameter: cb(msg)
78+
"""
79+
if inspect.iscoroutinefunction(cb):
80+
self._callbacks_async.append(cb)
81+
else:
82+
self._callbacks.append(cb)
83+
84+
def unsubscribe(self, cb):
85+
if inspect.iscoroutinefunction(cb):
86+
self._callbacks_async.remove(cb)
87+
else:
88+
self._callbacks.remove(cb)
89+
7090
def get_new_msgs(self, last_msg_uid):
7191
msg_list = []
7292
try:
@@ -94,6 +114,10 @@ async def _load_msgs_task(self):
94114
try:
95115
msg = await self._RM.console_monitor.next_msg(timeout=0.5)
96116
self._add_message(msg=msg)
117+
for cb in self._callbacks:
118+
cb(msg)
119+
for cb in self._callbacks_async:
120+
await cb(msg)
97121
except self._RM.RequestTimeoutError:
98122
pass
99123
self._background_task_stopped.set()
@@ -167,3 +191,142 @@ def __init__(self, content_class, *args, **kwargs):
167191

168192
def __del__(self):
169193
del self._content
194+
195+
196+
class ConsoleOutputStream:
197+
def __init__(self, *, rm_ref):
198+
self._queues = {}
199+
self._queue_max_size = 1000
200+
201+
@property
202+
def queues(self):
203+
return self._queues
204+
205+
def add_queue(self, key):
206+
"""
207+
Add a new queue to the dictionary of queues. The key is a reference to the socket for
208+
for connection with the client.
209+
"""
210+
queue = asyncio.Queue(maxsize=self._queue_max_size)
211+
self._queues[key] = queue
212+
return queue
213+
214+
def remove_queue(self, key):
215+
"""
216+
Remove the queue identified by the key from the dictionary of queues.
217+
"""
218+
if key in self._queues:
219+
del self._queues[key]
220+
221+
async def add_message(self, msg):
222+
msg_json = json.dumps(msg)
223+
for q in self._queues.values():
224+
# Protect from overflow. It's ok to discard old messages.
225+
if q.full():
226+
q.get_nowait()
227+
await q.put(msg_json)
228+
229+
def start(self):
230+
pass
231+
232+
async def stop(self):
233+
pass
234+
235+
236+
class SystemInfoStream:
237+
def __init__(self, *, rm_ref):
238+
self._RM = rm_ref
239+
self._queues_status = {}
240+
self._queues_info = {}
241+
self._background_task = None
242+
self._background_task_running = False
243+
self._background_task_stopped = asyncio.Event()
244+
self._background_task_stopped.set()
245+
self._num = 0
246+
self._queue_max_size = 1000
247+
248+
@property
249+
def background_task_running(self):
250+
return self._background_task_running
251+
252+
@property
253+
def queues_status(self):
254+
return self._queues_status
255+
256+
@property
257+
def queues_info(self):
258+
return self._queues_info
259+
260+
def add_queue_status(self, key):
261+
"""
262+
Add a new queue to the dictionary of queues. The key is a reference to the socket for
263+
for connection with the client.
264+
"""
265+
queue = asyncio.Queue(maxsize=self._queue_max_size)
266+
self._queues_status[key] = queue
267+
return queue
268+
269+
def add_queue_info(self, key):
270+
"""
271+
Add a new queue to the dictionary of queues. The key is a reference to the socket for
272+
for connection with the client.
273+
"""
274+
queue = asyncio.Queue(maxsize=self._queue_max_size)
275+
self._queues_info[key] = queue
276+
return queue
277+
278+
def remove_queue_status(self, key):
279+
"""
280+
Remove the queue identified by the key from the dictionary of queues.
281+
"""
282+
if key in self._queues_status:
283+
del self._queues_status[key]
284+
285+
def remove_queue_info(self, key):
286+
"""
287+
Remove the queue identified by the key from the dictionary of queues.
288+
"""
289+
if key in self._queues_info:
290+
del self._queues_info[key]
291+
292+
def _start_background_task(self):
293+
if not self._background_task_running:
294+
self._background_task = asyncio.create_task(self._load_msgs_task())
295+
296+
async def _stop_background_task(self):
297+
self._background_task_running = False
298+
await self._background_task_stopped.wait()
299+
300+
async def _load_msgs_task(self):
301+
self._background_task_stopped.clear()
302+
self._background_task_running = True
303+
while self._background_task_running:
304+
try:
305+
msg = await self._RM.system_info_monitor.next_msg(timeout=0.5)
306+
307+
if isinstance(msg, dict) and "msg" in msg:
308+
msg_json = json.dumps(msg)
309+
# ALL 'info' messages
310+
for q in self._queues_info.values():
311+
# Protect from overflow. It's ok to discard old messages.
312+
if q.full():
313+
q.get_nowait()
314+
await q.put(msg_json)
315+
if isinstance(msg["msg"], dict) and "status" in msg["msg"]:
316+
# ONLY 'status' messages
317+
for q in self._queues_status.values():
318+
# Protect from overflow. It's ok to discard old messages.
319+
if q.full():
320+
q.get_nowait()
321+
await q.put(msg_json)
322+
except self._RM.RequestTimeoutError:
323+
pass
324+
self._background_task_stopped.set()
325+
326+
def start(self):
327+
self._RM.system_info_monitor.enable()
328+
self._start_background_task()
329+
330+
async def stop(self):
331+
await self._stop_background_task()
332+
await self._RM.system_info_monitor.disable_wait()

bluesky_httpserver/resources.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ def __init__(self):
33
self._RM = None
44
self._custom_code_modules = []
55
self._console_output_loader = None
6+
self._stop_server = False
67

78
def set_RM(self, RM):
89
self._RM = RM
@@ -37,5 +38,27 @@ def console_output_loader(self):
3738
def console_output_loader(self, _):
3839
raise RuntimeError("Attempting to set read-only property 'console_output_loader'")
3940

41+
def set_console_output_stream(self, console_output_stream):
42+
self._console_output_stream = console_output_stream
43+
44+
@property
45+
def console_output_stream(self):
46+
return self._console_output_stream
47+
48+
@console_output_stream.setter
49+
def console_output_stream(self, _):
50+
raise RuntimeError("Attempting to set read-only property 'console_output_stream'")
51+
52+
def set_system_info_stream(self, system_info_stream):
53+
self._system_info_stream = system_info_stream
54+
55+
@property
56+
def system_info_stream(self):
57+
return self._system_info_stream
58+
59+
@system_info_stream.setter
60+
def system_info_stream(self, _):
61+
raise RuntimeError("Attempting to set read-only property 'system_info_stream'")
62+
4063

4164
SERVER_RESOURCES = _ServerResources()

bluesky_httpserver/routers/core_api.py

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import pydantic
88
from bluesky_queueserver.manager.conversions import simplify_plan_descriptions, spreadsheet_to_plan_list
9-
from fastapi import APIRouter, Depends, File, Form, Request, Security, UploadFile
9+
from fastapi import APIRouter, Depends, File, Form, Request, Security, UploadFile, WebSocket, WebSocketDisconnect
1010
from packaging import version
1111

1212
if version.parse(pydantic.__version__) < version.parse("2.0.0"):
@@ -1098,3 +1098,98 @@ def console_output_update(payload: dict, principal=Security(get_current_principa
10981098
process_exception()
10991099

11001100
return response
1101+
1102+
1103+
class WebSocketMonitor:
1104+
"""
1105+
Works for sockets that only send data to clients (not receive).
1106+
1107+
The class monitors the status of a socket connection. The property 'is_alive' returns True
1108+
until the socket is disconnected. The purpose of the class is to break the loop in the
1109+
implementation of the socket that only sends data to a client when the application
1110+
is closed. If there is no data to send, the loop continues to run indefinitely and
1111+
prevents the application from closing properly. No better solution was found.
1112+
"""
1113+
1114+
def __init__(self, websocket):
1115+
self._websocket = websocket
1116+
self._is_alive = True
1117+
self._task_ref = None
1118+
1119+
async def _task(self):
1120+
while True:
1121+
try:
1122+
await asyncio.sleep(1)
1123+
try:
1124+
# The following will raise an exception if the socket is disconnected.
1125+
await asyncio.wait_for(self._websocket.receive(), timeout=0.01)
1126+
except asyncio.TimeoutError:
1127+
# The socket is still connected.
1128+
pass
1129+
except Exception:
1130+
self._is_alive = False
1131+
break
1132+
1133+
def start(self):
1134+
self._task_ref = asyncio.create_task(self._task())
1135+
1136+
@property
1137+
def is_alive(self):
1138+
return self._is_alive
1139+
1140+
1141+
@router.websocket("/console_output/ws")
1142+
async def console_output_ws(websocket: WebSocket):
1143+
await websocket.accept()
1144+
q = SR.console_output_stream.add_queue(websocket)
1145+
wsmon = WebSocketMonitor(websocket)
1146+
wsmon.start()
1147+
try:
1148+
while wsmon.is_alive:
1149+
try:
1150+
msg = await asyncio.wait_for(q.get(), timeout=1)
1151+
await websocket.send_text(msg)
1152+
except asyncio.TimeoutError:
1153+
pass
1154+
except WebSocketDisconnect:
1155+
pass
1156+
finally:
1157+
SR.console_output_stream.remove_queue(websocket)
1158+
1159+
1160+
@router.websocket("/status/ws")
1161+
async def status_ws(websocket: WebSocket):
1162+
await websocket.accept()
1163+
q = SR.system_info_stream.add_queue_status(websocket)
1164+
wsmon = WebSocketMonitor(websocket)
1165+
wsmon.start()
1166+
try:
1167+
while wsmon.is_alive:
1168+
try:
1169+
msg = await asyncio.wait_for(q.get(), timeout=1)
1170+
await websocket.send_text(msg)
1171+
except asyncio.TimeoutError:
1172+
pass
1173+
except WebSocketDisconnect:
1174+
pass
1175+
finally:
1176+
SR.system_info_stream.remove_queue_status(websocket)
1177+
1178+
1179+
@router.websocket("/info/ws")
1180+
async def info_ws(websocket: WebSocket):
1181+
await websocket.accept()
1182+
q = SR.system_info_stream.add_queue_info(websocket)
1183+
wsmon = WebSocketMonitor(websocket)
1184+
wsmon.start()
1185+
try:
1186+
while wsmon.is_alive:
1187+
try:
1188+
msg = await asyncio.wait_for(q.get(), timeout=1)
1189+
await websocket.send_text(msg)
1190+
except asyncio.TimeoutError:
1191+
pass
1192+
except WebSocketDisconnect:
1193+
pass
1194+
finally:
1195+
SR.system_info_stream.remove_queue_info(websocket)

0 commit comments

Comments
 (0)