Skip to content

Commit 3964d74

Browse files
committed
WIP: Add a test for the MJPEG Stream
Currently this fails, because the `TestClient` doesn't actually stream responses :( A work-around might be to implement stopping (i.e. get it to raise an exception and close the stream), then check the response includes some frames.
1 parent 7957450 commit 3964d74

File tree

3 files changed

+98
-2
lines changed

3 files changed

+98
-2
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ dev = [
3030
"mypy>=1.6.1, <2",
3131
"ruff>=0.1.3",
3232
"types-jsonschema",
33+
"Pillow",
3334
]
3435

3536
[project.urls]

src/labthings_fastapi/outputs/mjpeg_stream.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,11 @@ def __init__(self, gen: AsyncGenerator[bytes, None], status_code: int = 200):
4141
4242
This response is initialised with an async generator that yields `bytes`
4343
objects, each of which is a JPEG file. We add the --frame markers and mime
44-
types that enable it to work in an `img` tag.
44+
types that mark it as an MJPEG stream. This is sufficient to enable it to
45+
work in an `img` tag, with the `src` set to the MJPEG stream's endpoint.
46+
47+
It expects an async generator that supplies individual JPEGs to be streamed,
48+
such as the one provided by `.MJPEGStream`.
4549
4650
NB the ``status_code`` argument is used by FastAPI to set the status code of
4751
the response in OpenAPI.
@@ -63,6 +67,24 @@ async def mjpeg_async_generator(self) -> AsyncGenerator[bytes, None]:
6367

6468

6569
class MJPEGStream:
70+
"""Manage streaming images over HTTP as an MJPEG stream
71+
72+
An MJPEGStream object handles accepting images (already in
73+
JPEG format) and streaming them to HTTP clients as a multipart
74+
response.
75+
76+
The minimum needed to make the stream work is to periodically
77+
call `add_frame` with JPEG image data.
78+
79+
To add a stream to a `.Thing`, use the `.MJPEGStreamDescriptor`
80+
which will handle creating an `MJPEGStream` object on first access,
81+
and will also add it to the HTTP API.
82+
83+
The MJPEG stream buffers the last few frames (10 by default) and
84+
also has a hook to notify the size of each frame as it is added.
85+
The latter is used by OpenFlexure's autofocus routine.
86+
"""
87+
6688
def __init__(self, ringbuffer_size: int = 10):
6789
self._lock = threading.Lock()
6890
self.condition = anyio.Condition()
@@ -182,7 +204,14 @@ async def notify_new_frame(self, i):
182204

183205

184206
class MJPEGStreamDescriptor:
185-
"""A descriptor that returns a MJPEGStream object when accessed"""
207+
"""A descriptor that returns a MJPEGStream object when accessed
208+
209+
If this descriptor is added to a `.Thing`, it will create an `.MJPEGStream`
210+
object when it is first accessed. It will also add two HTTP endpoints,
211+
one with the name of the descriptor serving the MJPEG stream, and another
212+
with `/viewer` appended, which serves a basic HTML page that views the stream.
213+
214+
"""
186215

187216
def __init__(self, **kwargs):
188217
self._kwargs = kwargs

tests/test_mjpeg_stream.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import io
2+
import threading
3+
import time
4+
from PIL import Image
5+
from fastapi.testclient import TestClient
6+
import labthings_fastapi as lt
7+
8+
9+
class Telly(lt.Thing):
10+
_stream_thread: threading.Thread
11+
_streaming: bool = False
12+
framerate: float = 1000
13+
14+
stream = lt.outputs.MJPEGStreamDescriptor()
15+
16+
def __enter__(self):
17+
self._streaming = True
18+
self._stream_thread = threading.Thread(target=self._make_images)
19+
self._stream_thread.start()
20+
21+
def __exit__(self, exc_t, exc_v, exc_tb):
22+
self._streaming = False
23+
self._stream_thread.join()
24+
25+
def _make_images(self):
26+
"""Stream a series of solid colours"""
27+
colours = ["#F00", "#0F0", "#00F"]
28+
jpegs = []
29+
for c in colours:
30+
image = Image.new("RGB", (10, 10), c)
31+
dest = io.BytesIO()
32+
image.save(dest, "jpeg")
33+
jpegs.append(dest.getvalue())
34+
35+
i = -1
36+
start_time = time.time()
37+
while self._streaming:
38+
i = (i + 1) % len(jpegs)
39+
print(f"sending frame {i}")
40+
self.stream.add_frame(jpegs[i], self._labthings_blocking_portal)
41+
time.sleep(1 / self.framerate)
42+
43+
if time.time() - start_time > 10:
44+
break
45+
print("stopped sending frames")
46+
self._streaming = False
47+
48+
49+
def test_mjpeg_stream():
50+
server = lt.ThingServer()
51+
telly = Telly()
52+
server.add_thing(telly, "telly")
53+
with TestClient(server.app) as client:
54+
with client.stream("GET", "/telly/stream", timeout=0.1) as stream:
55+
stream.raise_for_status()
56+
received = 0
57+
for b in stream.iter_bytes():
58+
received += 1
59+
print(f"Got packet {received}")
60+
assert b.startswith(b"--frame")
61+
if received > 5:
62+
break
63+
64+
65+
if __name__ == "__main__":
66+
test_mjpeg_stream()

0 commit comments

Comments
 (0)