Skip to content

Commit 2d07fff

Browse files
committed
fix: stop drain_proxy on subscriber arrival to prevent relay queue leak
blackhole.stop() alone doesn't remove the drain_proxy from relay.__proxies. Relay continues putting frames into the dead proxy's queue, causing unbounded memory growth. Now add_track_subscriber calls drain_proxy.stop() (removes from relay) + blackhole.stop() (cancels internal recv task) + drain_task.cancel() (safety net).
1 parent 4e2e147 commit 2d07fff

File tree

2 files changed

+22
-14
lines changed

2 files changed

+22
-14
lines changed

getstream/video/rtc/pc.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ def __init__(
142142

143143
self.track_map = {} # track_id -> (MediaRelay, original_track)
144144
self.video_frame_trackers = {} # track_id -> VideoFrameTracker
145-
self._video_blackholes: dict[str, tuple[MediaBlackhole, asyncio.Task]] = {}
145+
self._video_drains: dict[
146+
str, tuple[MediaBlackhole, asyncio.Task, MediaStreamTrack]
147+
] = {}
146148
self._background_tasks: set[asyncio.Task] = set()
147149

148150
@self.on("track")
@@ -168,6 +170,15 @@ async def on_track(track: aiortc.mediastreams.MediaStreamTrack):
168170
tracked_track = VideoFrameTracker(track)
169171
self.video_frame_trackers[track.id] = tracked_track
170172

173+
# Drain unconsumed video frames to prevent unbounded queue growth
174+
# in RTCRtpReceiver (aiortc issue #554)
175+
if self._drain_video_frames:
176+
drain_proxy = relay.subscribe(tracked_track)
177+
blackhole = MediaBlackhole()
178+
blackhole.addTrack(drain_proxy)
179+
drain_task = asyncio.create_task(blackhole.start())
180+
self._video_drains[track.id] = (blackhole, drain_task, drain_proxy)
181+
171182
self.track_map[track.id] = (relay, tracked_track)
172183

173184
if track.kind == "audio":
@@ -183,14 +194,6 @@ def _emit_pcm(pcm: PcmData):
183194

184195
proxy = relay.subscribe(tracked_track)
185196

186-
# Drain unconsumed video frames to prevent unbounded queue growth
187-
# in RTCRtpReceiver (aiortc issue #554)
188-
if track.kind == "video" and self._drain_video_frames:
189-
drain_proxy = relay.subscribe(tracked_track)
190-
blackhole = MediaBlackhole()
191-
blackhole.addTrack(drain_proxy)
192-
drain_task = asyncio.create_task(blackhole.start())
193-
self._video_blackholes[track.id] = (blackhole, drain_task)
194197
self.emit("track_added", proxy, user)
195198

196199
@self.on("icegatheringstatechange")
@@ -205,10 +208,13 @@ def add_track_subscriber(
205208
"""Add a new subscriber to an existing track's MediaRelay."""
206209
track_data = self.track_map.get(track_id)
207210

208-
blackhole, drain_task = self._video_blackholes.pop(track_id, (None, None))
211+
blackhole, drain_task, drain_proxy = self._video_drains.pop(
212+
track_id, (None, None, None)
213+
)
209214

210-
if blackhole and drain_task:
215+
if blackhole and drain_task and drain_proxy:
211216
task = asyncio.create_task(blackhole.stop())
217+
drain_proxy.stop()
212218
drain_task.cancel() # safety net if start() becomes long-lived in future aiortc
213219
self._background_tasks.add(task)
214220
task.add_done_callback(self._background_tasks.discard)

tests/rtc/test_subscriber_drain.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ def subscriber_pc():
1616
pc._drain_video_frames = True
1717
pc.track_map = {}
1818
pc.video_frame_trackers = {}
19-
pc._video_blackholes = {}
19+
pc._video_drains = {}
2020
pc._background_tasks = set()
2121
pc._listeners = {}
2222
return pc
@@ -32,12 +32,14 @@ async def test_blackhole_stopped_when_subscriber_added(self, subscriber_pc):
3232

3333
blackhole = Mock()
3434
blackhole.stop = AsyncMock()
35-
subscriber_pc._video_blackholes[track_id] = (blackhole, Mock())
35+
drain_proxy = Mock()
36+
subscriber_pc._video_drains[track_id] = (blackhole, Mock(), drain_proxy)
3637

3738
subscriber_pc.add_track_subscriber(track_id)
3839

3940
blackhole.stop.assert_called_once()
40-
assert track_id not in subscriber_pc._video_blackholes
41+
drain_proxy.stop.assert_called_once()
42+
assert track_id not in subscriber_pc._video_drains
4143

4244
def test_no_error_when_no_drain_exists(self, subscriber_pc):
4345
track_id = "user123:video:0"

0 commit comments

Comments
 (0)