From d85f1f930380c36e5fa82e360cb0f7f685b7b4eb Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 30 Mar 2026 21:32:12 +0400 Subject: [PATCH 1/7] feat: add video_buffered option to control video frame buffering MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pass video_buffered=False through ConnectionManager → PeerConnectionManager → SubscriberPeerConnection to relay.subscribe(buffered=False) for video tracks. Default True preserves current behavior. Without buffering, only the latest video frame is kept in memory instead of an unbounded queue. This prevents OOM on voice-only agents that subscribe to video but never consume frames (~400 MiB/10sec leak). --- getstream/video/rtc/connection_manager.py | 5 ++++- getstream/video/rtc/pc.py | 5 ++++- getstream/video/rtc/peer_connection.py | 4 +++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 25f59a8f..44737558 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -58,6 +58,7 @@ def __init__( create: bool = True, subscription_config: Optional[SubscriptionConfig] = None, max_join_retries: int = 3, + video_buffered: bool = True, **kwargs: Any, ): super().__init__() @@ -90,7 +91,9 @@ def __init__( self._subscription_manager: SubscriptionManager = SubscriptionManager( self, subscription_config ) - self._peer_manager: PeerConnectionManager = PeerConnectionManager(self) + self._peer_manager: PeerConnectionManager = PeerConnectionManager( + self, video_buffered=video_buffered + ) self.recording_manager = self._recording_manager self.participants_state = self._participants_state diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index d8fb0bb0..d2fbcdf4 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -131,12 +131,14 @@ def __init__( self, connection, configuration: aiortc.RTCConfiguration, + video_buffered: bool = True, ) -> None: logger.info( f"creating subscriber peer connection with configuration: {configuration}" ) super().__init__(configuration) self.connection = connection + self._video_buffered = video_buffered self.track_map = {} # track_id -> (MediaRelay, original_track) self.video_frame_trackers = {} # track_id -> VideoFrameTracker @@ -177,7 +179,8 @@ def _emit_pcm(pcm: PcmData): handler = AudioTrackHandler(relay.subscribe(tracked_track), _emit_pcm) asyncio.create_task(handler.start()) - self.emit("track_added", relay.subscribe(tracked_track), user) + buffered = self._video_buffered if track.kind == "video" else True + self.emit("track_added", relay.subscribe(tracked_track, buffered=buffered), user) @self.on("icegatheringstatechange") def on_icegatheringstatechange(): diff --git a/getstream/video/rtc/peer_connection.py b/getstream/video/rtc/peer_connection.py index 3155c4bd..bdea297c 100644 --- a/getstream/video/rtc/peer_connection.py +++ b/getstream/video/rtc/peer_connection.py @@ -28,8 +28,9 @@ class PeerConnectionManager: """Manages WebRTC peer connections for publishing and subscribing.""" - def __init__(self, connection_manager): + def __init__(self, connection_manager, video_buffered: bool = True): self.connection_manager = connection_manager + self._video_buffered = video_buffered self.publisher_pc: Optional[PublisherPeerConnection] = None self.subscriber_pc: Optional[SubscriberPeerConnection] = None self.publisher_negotiation_lock = asyncio.Lock() @@ -47,6 +48,7 @@ async def setup_subscriber(self): self.subscriber_pc = SubscriberPeerConnection( connection=self.connection_manager, configuration=self._build_rtc_configuration(), + video_buffered=self._video_buffered, ) # Trace create event From f56a450d8a45feeba675e3781b2f61958bb96ed8 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 30 Mar 2026 22:08:57 +0400 Subject: [PATCH 2/7] fix: drain video frames with MediaBlackhole when video_buffered=False When video_buffered=False, attach a MediaBlackhole to the video proxy track to continuously consume and discard frames. This prevents unbounded queue growth in RTCRtpReceiver._queue (aiortc issue #554). Without draining, unconsumed video frames accumulate at ~400 MiB/10sec because aiortc's receiver queue has no size limit. --- getstream/video/rtc/pc.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index d2fbcdf4..2ee0f9e3 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -3,7 +3,7 @@ from typing import Any, Optional import aiortc -from aiortc.contrib.media import MediaRelay +from aiortc.contrib.media import MediaBlackhole, MediaRelay from aiortc.mediastreams import MediaStreamTrack from aiortc.rtcrtpparameters import RTCRtpCodecCapability from aiortc.rtcrtpsender import RTCRtpSender @@ -142,6 +142,7 @@ def __init__( self.track_map = {} # track_id -> (MediaRelay, original_track) self.video_frame_trackers = {} # track_id -> VideoFrameTracker + self._video_blackhole: Optional[MediaBlackhole] = None @self.on("track") async def on_track(track: aiortc.mediastreams.MediaStreamTrack): @@ -180,7 +181,17 @@ def _emit_pcm(pcm: PcmData): asyncio.create_task(handler.start()) buffered = self._video_buffered if track.kind == "video" else True - self.emit("track_added", relay.subscribe(tracked_track, buffered=buffered), user) + proxy = relay.subscribe(tracked_track, buffered=buffered) + + # Drain unconsumed video frames to prevent unbounded queue growth + # in RTCRtpReceiver (aiortc issue #554) + if track.kind == "video" and not self._video_buffered: + blackhole = MediaBlackhole() + blackhole.addTrack(proxy) + asyncio.create_task(blackhole.start()) + self._video_blackhole = blackhole + + self.emit("track_added", proxy, user) @self.on("icegatheringstatechange") def on_icegatheringstatechange(): From dfe3ee52835a226ace73c491557ae464e074741c Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 30 Mar 2026 22:31:03 +0400 Subject: [PATCH 3/7] refactor: remove buffered parameter from relay.subscribe, keep only blackhole drain --- getstream/video/rtc/pc.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index 2ee0f9e3..6343bca4 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -180,8 +180,7 @@ def _emit_pcm(pcm: PcmData): handler = AudioTrackHandler(relay.subscribe(tracked_track), _emit_pcm) asyncio.create_task(handler.start()) - buffered = self._video_buffered if track.kind == "video" else True - proxy = relay.subscribe(tracked_track, buffered=buffered) + proxy = relay.subscribe(tracked_track) # Drain unconsumed video frames to prevent unbounded queue growth # in RTCRtpReceiver (aiortc issue #554) From 590f2cf9a0050d392347dfdf412d63e0dbbf3e04 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 30 Mar 2026 22:43:25 +0400 Subject: [PATCH 4/7] refactor: rename video_buffered to drain_video_frames, default False --- getstream/video/rtc/connection_manager.py | 4 ++-- getstream/video/rtc/pc.py | 6 +++--- getstream/video/rtc/peer_connection.py | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 44737558..434a7a4e 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -58,7 +58,7 @@ def __init__( create: bool = True, subscription_config: Optional[SubscriptionConfig] = None, max_join_retries: int = 3, - video_buffered: bool = True, + drain_video_frames: bool = False, **kwargs: Any, ): super().__init__() @@ -92,7 +92,7 @@ def __init__( self, subscription_config ) self._peer_manager: PeerConnectionManager = PeerConnectionManager( - self, video_buffered=video_buffered + self, drain_video_frames=drain_video_frames ) self.recording_manager = self._recording_manager diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index 6343bca4..15e88fec 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -131,14 +131,14 @@ def __init__( self, connection, configuration: aiortc.RTCConfiguration, - video_buffered: bool = True, + drain_video_frames: bool = False, ) -> None: logger.info( f"creating subscriber peer connection with configuration: {configuration}" ) super().__init__(configuration) self.connection = connection - self._video_buffered = video_buffered + self._drain_video_frames = drain_video_frames self.track_map = {} # track_id -> (MediaRelay, original_track) self.video_frame_trackers = {} # track_id -> VideoFrameTracker @@ -184,7 +184,7 @@ def _emit_pcm(pcm: PcmData): # Drain unconsumed video frames to prevent unbounded queue growth # in RTCRtpReceiver (aiortc issue #554) - if track.kind == "video" and not self._video_buffered: + if track.kind == "video" and self._drain_video_frames: blackhole = MediaBlackhole() blackhole.addTrack(proxy) asyncio.create_task(blackhole.start()) diff --git a/getstream/video/rtc/peer_connection.py b/getstream/video/rtc/peer_connection.py index bdea297c..26e89df0 100644 --- a/getstream/video/rtc/peer_connection.py +++ b/getstream/video/rtc/peer_connection.py @@ -28,9 +28,9 @@ class PeerConnectionManager: """Manages WebRTC peer connections for publishing and subscribing.""" - def __init__(self, connection_manager, video_buffered: bool = True): + def __init__(self, connection_manager, drain_video_frames: bool = False): self.connection_manager = connection_manager - self._video_buffered = video_buffered + self._drain_video_frames = drain_video_frames self.publisher_pc: Optional[PublisherPeerConnection] = None self.subscriber_pc: Optional[SubscriberPeerConnection] = None self.publisher_negotiation_lock = asyncio.Lock() @@ -48,7 +48,7 @@ async def setup_subscriber(self): self.subscriber_pc = SubscriberPeerConnection( connection=self.connection_manager, configuration=self._build_rtc_configuration(), - video_buffered=self._video_buffered, + drain_video_frames=self._drain_video_frames, ) # Trace create event From 7f5dc2e9915a3a1758df9cd23490dc543f298fe3 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 30 Mar 2026 22:47:21 +0400 Subject: [PATCH 5/7] fix: store blackhole drain task reference to prevent GC collection --- getstream/video/rtc/pc.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index 15e88fec..f43192ab 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -143,6 +143,7 @@ def __init__( self.track_map = {} # track_id -> (MediaRelay, original_track) self.video_frame_trackers = {} # track_id -> VideoFrameTracker self._video_blackhole: Optional[MediaBlackhole] = None + self._video_drain_task: Optional[asyncio.Task] = None @self.on("track") async def on_track(track: aiortc.mediastreams.MediaStreamTrack): @@ -187,8 +188,8 @@ def _emit_pcm(pcm: PcmData): if track.kind == "video" and self._drain_video_frames: blackhole = MediaBlackhole() blackhole.addTrack(proxy) - asyncio.create_task(blackhole.start()) self._video_blackhole = blackhole + self._video_drain_task = asyncio.create_task(blackhole.start()) self.emit("track_added", proxy, user) From 8239876b1650bd332cb27eb2067c2beaffa3e9ac Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Mon, 30 Mar 2026 23:59:43 +0400 Subject: [PATCH 6/7] fix: use separate proxy for blackhole drain, support multiple video tracks - Create a dedicated relay subscription for blackhole drain so it doesn't compete with downstream consumers for frames - Use dicts keyed by track ID instead of single references to support multiple concurrent video tracks --- getstream/video/rtc/pc.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index f43192ab..0b928313 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -142,8 +142,8 @@ def __init__( self.track_map = {} # track_id -> (MediaRelay, original_track) self.video_frame_trackers = {} # track_id -> VideoFrameTracker - self._video_blackhole: Optional[MediaBlackhole] = None - self._video_drain_task: Optional[asyncio.Task] = None + self._video_blackholes: dict[str, MediaBlackhole] = {} + self._video_drain_tasks: dict[str, asyncio.Task] = {} @self.on("track") async def on_track(track: aiortc.mediastreams.MediaStreamTrack): @@ -186,10 +186,11 @@ def _emit_pcm(pcm: PcmData): # Drain unconsumed video frames to prevent unbounded queue growth # in RTCRtpReceiver (aiortc issue #554) if track.kind == "video" and self._drain_video_frames: + drain_proxy = relay.subscribe(tracked_track) blackhole = MediaBlackhole() - blackhole.addTrack(proxy) - self._video_blackhole = blackhole - self._video_drain_task = asyncio.create_task(blackhole.start()) + blackhole.addTrack(drain_proxy) + self._video_blackholes[track.id] = blackhole + self._video_drain_tasks[track.id] = asyncio.create_task(blackhole.start()) self.emit("track_added", proxy, user) From d28776c720711beed2262b11fa73a96cff657016 Mon Sep 17 00:00:00 2001 From: Ali Aliyev Date: Tue, 31 Mar 2026 00:08:52 +0400 Subject: [PATCH 7/7] style: ruff format --- getstream/video/rtc/pc.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index 0b928313..a07d5840 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -190,7 +190,9 @@ def _emit_pcm(pcm: PcmData): blackhole = MediaBlackhole() blackhole.addTrack(drain_proxy) self._video_blackholes[track.id] = blackhole - self._video_drain_tasks[track.id] = asyncio.create_task(blackhole.start()) + self._video_drain_tasks[track.id] = asyncio.create_task( + blackhole.start() + ) self.emit("track_added", proxy, user)