Skip to content

Commit 033f0d7

Browse files
committed
E2E Test: select video quality when publish simulcast video track.
1 parent 03744fb commit 033f0d7

4 files changed

Lines changed: 319 additions & 2 deletions

File tree

.github/workflows/tests.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,14 @@ jobs:
128128
LIVEKIT_API_SECRET: ${{ secrets.LIVEKIT_API_SECRET }}
129129
run: |
130130
source .test-venv/bin/activate
131-
pytest tests/
131+
pytest tests/ livekit-rtc/tests/
132132
133133
- name: Run tests (Windows)
134134
if: runner.os == 'Windows'
135135
env:
136136
LIVEKIT_URL: ${{ secrets.LIVEKIT_URL }}
137137
LIVEKIT_API_KEY: ${{ secrets.LIVEKIT_API_KEY }}
138138
LIVEKIT_API_SECRET: ${{ secrets.LIVEKIT_API_SECRET }}
139-
run: .test-venv\Scripts\python.exe -m pytest tests/
139+
run: .test-venv\Scripts\python.exe -m pytest tests/ livekit-rtc/tests/
140140
shell: pwsh
141141

livekit-rtc/livekit/rtc/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
TrackSource,
3838
ParticipantTrackPermission,
3939
)
40+
from ._proto.track_publication_pb2 import VideoQuality
4041
from ._proto.video_frame_pb2 import VideoBufferType, VideoCodec, VideoRotation
4142
from .audio_frame import AudioFrame
4243
from .audio_source import AudioSource
@@ -177,6 +178,7 @@
177178
"TranscriptionSegment",
178179
"VideoCodec",
179180
"VideoEncoding",
181+
"VideoQuality",
180182
"VideoFrame",
181183
"VideoFrameEvent",
182184
"VideoSource",

livekit-rtc/livekit/rtc/track_publication.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
from ._proto import e2ee_pb2 as proto_e2ee
2020
from ._proto import ffi_pb2 as proto_ffi
2121
from ._proto import track_pb2 as proto_track
22+
from ._proto import track_publication_pb2 as proto_track_pub
2223
from .track import Track, LocalTrack, RemoteTrack
2324

25+
VideoQuality = proto_track_pub.VideoQuality
26+
2427

2528
class TrackPublication:
2629
def __init__(self, owned_info: proto_track.OwnedTrackPublication):
@@ -112,5 +115,19 @@ def set_subscribed(self, subscribed: bool):
112115
req.set_subscribed.publication_handle = self._ffi_handle.handle
113116
FfiClient.instance.request(req)
114117

118+
def set_video_quality(self, quality: "VideoQuality.ValueType") -> None:
119+
"""For simulcasted video tracks, request a specific simulcast layer
120+
from the server. Use one of `rtc.VideoQuality.VIDEO_QUALITY_LOW` (q),
121+
`VIDEO_QUALITY_MEDIUM` (h), or `VIDEO_QUALITY_HIGH` (f).
122+
123+
This is a no-op (the SDK logs a warning) if the publication is not
124+
simulcasted."""
125+
req = proto_ffi.FfiRequest()
126+
req.set_remote_track_publication_quality.track_publication_handle = (
127+
self._ffi_handle.handle
128+
)
129+
req.set_remote_track_publication_quality.quality = quality
130+
FfiClient.instance.request(req)
131+
115132
def __repr__(self) -> str:
116133
return f"rtc.RemoteTrackPublication(sid={self.sid}, name={self.name}, kind={self.kind}, source={self.source})"
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
"""End-to-end Test for simulcast video quality layers.
2+
3+
The test publishes a 1280x720 simulcast video track (rolling colored bars) using
4+
both VP8 and H264 codecs, and on the receiver side verifies that subscribing to
5+
each simulcast quality layer (HIGH=f, MEDIUM=h, LOW=q) yields frames of the
6+
expected resolution.
7+
8+
Requires the following environment variables to run:
9+
LIVEKIT_URL
10+
LIVEKIT_API_KEY
11+
LIVEKIT_API_SECRET
12+
"""
13+
14+
from __future__ import annotations
15+
16+
import asyncio
17+
import os
18+
import time
19+
import uuid
20+
from typing import Callable, Optional, Tuple
21+
22+
import numpy as np
23+
import pytest
24+
25+
from livekit import api, rtc
26+
from livekit.rtc._proto.track_publication_pb2 import VideoQuality
27+
from livekit.rtc.room import EventTypes
28+
29+
30+
WAIT_TIMEOUT = 30.0
31+
WAIT_INTERVAL = 0.1
32+
PUBLISH_WIDTH = 1280
33+
PUBLISH_HEIGHT = 720
34+
PUBLISH_FPS = 15
35+
36+
# Default simulcast layer dimensions for a 720p source publication.
37+
LAYER_DIMENSIONS = {
38+
"f": (1280, 720),
39+
"h": (640, 360),
40+
"q": (320, 180),
41+
}
42+
43+
QUALITY_SEQUENCE = [
44+
(VideoQuality.VIDEO_QUALITY_HIGH, "f"),
45+
(VideoQuality.VIDEO_QUALITY_MEDIUM, "h"),
46+
(VideoQuality.VIDEO_QUALITY_LOW, "q"),
47+
]
48+
49+
50+
def skip_if_no_credentials():
51+
required_vars = ["LIVEKIT_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET"]
52+
missing = [var for var in required_vars if not os.getenv(var)]
53+
return pytest.mark.skipif(
54+
bool(missing), reason=f"Missing environment variables: {', '.join(missing)}"
55+
)
56+
57+
58+
def create_token(identity: str, room_name: str) -> str:
59+
return (
60+
api.AccessToken()
61+
.with_identity(identity)
62+
.with_name(identity)
63+
.with_grants(
64+
api.VideoGrants(
65+
room_join=True,
66+
room=room_name,
67+
)
68+
)
69+
.to_jwt()
70+
)
71+
72+
73+
def unique_room_name(base: str) -> str:
74+
return f"{base}-{uuid.uuid4().hex[:8]}"
75+
76+
77+
async def _wait_until(
78+
predicate: Callable[[], bool],
79+
*,
80+
timeout: float = WAIT_TIMEOUT,
81+
interval: float = WAIT_INTERVAL,
82+
message: str = "condition not met",
83+
) -> None:
84+
loop = asyncio.get_event_loop()
85+
deadline = loop.time() + timeout
86+
while loop.time() < deadline:
87+
if predicate():
88+
return
89+
await asyncio.sleep(interval)
90+
raise AssertionError(f"timeout waiting: {message}")
91+
92+
93+
async def _ensure_all_connected(rooms: list[rtc.Room]) -> None:
94+
await _wait_until(
95+
lambda: all(r.connection_state == rtc.ConnectionState.CONN_CONNECTED for r in rooms),
96+
message="not all rooms reached CONN_CONNECTED",
97+
)
98+
99+
100+
async def _ensure_track_subscribed(room: rtc.Room, track_sid: str) -> rtc.RemoteTrackPublication:
101+
holder: dict[str, rtc.RemoteTrackPublication] = {}
102+
103+
def _has_subscribed() -> bool:
104+
for participant in room.remote_participants.values():
105+
pub = participant.track_publications.get(track_sid)
106+
if pub is not None and pub.subscribed and pub.track is not None:
107+
holder["pub"] = pub
108+
return True
109+
return False
110+
111+
await _wait_until(
112+
_has_subscribed,
113+
message=f"room did not subscribe to track {track_sid}",
114+
)
115+
return holder["pub"]
116+
117+
118+
def _expect_event(
119+
room: rtc.Room,
120+
event: EventTypes,
121+
predicate: Optional[Callable[..., bool]] = None,
122+
) -> asyncio.Future:
123+
loop = asyncio.get_event_loop()
124+
fut: asyncio.Future = loop.create_future()
125+
126+
def _on_event(*args, **kwargs) -> None:
127+
if fut.done():
128+
return
129+
if predicate is None or predicate(*args, **kwargs):
130+
fut.set_result(args)
131+
132+
room.on(event, _on_event)
133+
return fut
134+
135+
136+
async def _await_event(fut: asyncio.Future, timeout: float = WAIT_TIMEOUT) -> None:
137+
try:
138+
await asyncio.wait_for(fut, timeout=timeout)
139+
except asyncio.TimeoutError as e:
140+
raise AssertionError("timed out waiting for event") from e
141+
142+
143+
def _make_rolling_i420(width: int, height: int, t: float) -> rtc.VideoFrame:
144+
"""Build a 1280x720 I420 frame containing 8 vertical color bars that scroll
145+
horizontally over time, so the encoder always sees motion."""
146+
bar_w = max(width // 8, 1)
147+
offset = int(t * 240) % bar_w
148+
149+
cols_y = np.arange(width, dtype=np.int32)
150+
bar_idx_y = ((cols_y + offset) // bar_w) % 8
151+
y_row = (bar_idx_y * 28 + 32).astype(np.uint8)
152+
153+
cw = width // 2
154+
cols_c = np.arange(cw, dtype=np.int32)
155+
bar_idx_c = (((cols_c * 2) + offset) // bar_w) % 8
156+
u_row = (bar_idx_c * 18 + 80).astype(np.uint8)
157+
v_row = (220 - bar_idx_c * 18).astype(np.uint8)
158+
159+
y_plane = np.tile(y_row, (height, 1))
160+
u_plane = np.tile(u_row, (height // 2, 1))
161+
v_plane = np.tile(v_row, (height // 2, 1))
162+
163+
data = np.concatenate([y_plane.ravel(), u_plane.ravel(), v_plane.ravel()])
164+
return rtc.VideoFrame(width, height, rtc.VideoBufferType.I420, data.tobytes())
165+
166+
167+
async def _publish_loop(source: rtc.VideoSource, stop: asyncio.Event) -> None:
168+
interval = 1.0 / PUBLISH_FPS
169+
start = time.monotonic()
170+
while not stop.is_set():
171+
t = time.monotonic() - start
172+
frame = _make_rolling_i420(PUBLISH_WIDTH, PUBLISH_HEIGHT, t)
173+
source.capture_frame(frame)
174+
try:
175+
await asyncio.wait_for(stop.wait(), timeout=interval)
176+
except asyncio.TimeoutError:
177+
pass
178+
179+
180+
async def _wait_for_layer(
181+
stream: rtc.VideoStream,
182+
expected_w: int,
183+
expected_h: int,
184+
*,
185+
timeout: float = 20.0,
186+
samples: int = 5,
187+
tolerance: float = 0.20,
188+
) -> Tuple[int, int]:
189+
"""Drain frames until we observe `samples` consecutive frames whose
190+
dimensions match the expected layer (within `tolerance`)."""
191+
deadline = asyncio.get_event_loop().time() + timeout
192+
matches = 0
193+
last: Optional[Tuple[int, int]] = None
194+
iterator = stream.__aiter__()
195+
while asyncio.get_event_loop().time() < deadline:
196+
try:
197+
ev = await asyncio.wait_for(iterator.__anext__(), timeout=2.0)
198+
except asyncio.TimeoutError:
199+
continue
200+
except StopAsyncIteration:
201+
break
202+
w, h = ev.frame.width, ev.frame.height
203+
last = (w, h)
204+
if (
205+
abs(w - expected_w) / expected_w <= tolerance
206+
and abs(h - expected_h) / expected_h <= tolerance
207+
):
208+
matches += 1
209+
if matches >= samples:
210+
return last
211+
else:
212+
matches = 0
213+
raise AssertionError(
214+
f"timed out waiting for ~{expected_w}x{expected_h}, last seen={last}"
215+
)
216+
217+
218+
@skip_if_no_credentials()
219+
@pytest.mark.asyncio
220+
@pytest.mark.parametrize(
221+
"video_codec, codec_name",
222+
[
223+
(rtc.VideoCodec.VP8, "vp8"),
224+
(rtc.VideoCodec.H264, "h264"),
225+
],
226+
)
227+
async def test_simulcast_quality_layers(
228+
video_codec: rtc.VideoCodec.ValueType, codec_name: str
229+
) -> None:
230+
room_name = unique_room_name(f"py-simulcast-{codec_name}")
231+
url = os.environ["LIVEKIT_URL"]
232+
233+
sender, receiver = rtc.Room(), rtc.Room()
234+
await sender.connect(url, create_token("sender", room_name))
235+
await receiver.connect(url, create_token("receiver", room_name))
236+
await _ensure_all_connected([sender, receiver])
237+
238+
source = rtc.VideoSource(PUBLISH_WIDTH, PUBLISH_HEIGHT)
239+
track = rtc.LocalVideoTrack.create_video_track(f"simulcast-{codec_name}", source)
240+
options = rtc.TrackPublishOptions(
241+
source=rtc.TrackSource.SOURCE_CAMERA,
242+
simulcast=True,
243+
video_codec=video_codec,
244+
video_encoding=rtc.VideoEncoding(max_bitrate=3_000_000, max_framerate=PUBLISH_FPS),
245+
)
246+
247+
stop = asyncio.Event()
248+
pub_task = asyncio.create_task(_publish_loop(source, stop))
249+
250+
stream: Optional[rtc.VideoStream] = None
251+
try:
252+
track_published = _expect_event(
253+
receiver,
254+
"track_published",
255+
predicate=lambda pub, _p: pub.kind == rtc.TrackKind.KIND_VIDEO,
256+
)
257+
local_pub = await sender.local_participant.publish_track(track, options)
258+
await _await_event(track_published)
259+
260+
print(
261+
f"[{codec_name}] local_pub: sid={local_pub.sid} "
262+
f"simulcasted={local_pub.simulcasted} "
263+
f"mime_type={local_pub.mime_type} "
264+
f"{local_pub.width}x{local_pub.height}"
265+
)
266+
remote_pub = await _ensure_track_subscribed(receiver, local_pub.sid)
267+
assert remote_pub.track is not None
268+
269+
# Give the SFU a moment to propagate simulcast layer metadata and
270+
# let the encoder/bandwidth estimator ramp up to all layers before
271+
# we start switching qualities.
272+
await asyncio.sleep(5.0)
273+
print(
274+
f"[{codec_name}] remote_pub: sid={remote_pub.sid} "
275+
f"simulcasted={remote_pub.simulcasted} "
276+
f"mime_type={remote_pub.mime_type} "
277+
f"{remote_pub.width}x{remote_pub.height}"
278+
)
279+
280+
stream = rtc.VideoStream.from_track(track=remote_pub.track)
281+
282+
for quality, layer in QUALITY_SEQUENCE:
283+
remote_pub.set_video_quality(quality)
284+
ew, eh = LAYER_DIMENSIONS[layer]
285+
actual = await _wait_for_layer(stream, ew, eh, timeout=20.0)
286+
print(
287+
f"[{codec_name}] layer={layer} expected~{ew}x{eh} got={actual[0]}x{actual[1]}"
288+
)
289+
finally:
290+
stop.set()
291+
try:
292+
await pub_task
293+
except Exception:
294+
pass
295+
if stream is not None:
296+
await stream.aclose()
297+
await sender.disconnect()
298+
await receiver.disconnect()

0 commit comments

Comments
 (0)