diff --git a/getstream/video/rtc/connection_manager.py b/getstream/video/rtc/connection_manager.py index 25f59a8f..434a7a4e 100644 --- a/getstream/video/rtc/connection_manager.py +++ b/getstream/video/rtc/connection_manager.py @@ -58,6 +58,7 @@ def __init__( create: bool = True, subscription_config: Optional[SubscriptionConfig] = None, max_join_retries: int = 3, + drain_video_frames: bool = False, **kwargs: Any, ): super().__init__() @@ -90,7 +91,9 @@ def __init__( self._subscription_manager: SubscriptionManager = SubscriptionManager( self, subscription_config ) - self._peer_manager: PeerConnectionManager = PeerConnectionManager(self) + self._peer_manager: PeerConnectionManager = PeerConnectionManager( + self, drain_video_frames=drain_video_frames + ) self.recording_manager = self._recording_manager self.participants_state = self._participants_state diff --git a/getstream/video/rtc/pc.py b/getstream/video/rtc/pc.py index d8fb0bb0..a07d5840 100644 --- a/getstream/video/rtc/pc.py +++ b/getstream/video/rtc/pc.py @@ -3,7 +3,7 @@ from typing import Any, Optional import aiortc -from aiortc.contrib.media import MediaRelay +from aiortc.contrib.media import MediaBlackhole, MediaRelay from aiortc.mediastreams import MediaStreamTrack from aiortc.rtcrtpparameters import RTCRtpCodecCapability from aiortc.rtcrtpsender import RTCRtpSender @@ -131,15 +131,19 @@ def __init__( self, connection, configuration: aiortc.RTCConfiguration, + drain_video_frames: bool = False, ) -> None: logger.info( f"creating subscriber peer connection with configuration: {configuration}" ) super().__init__(configuration) self.connection = connection + self._drain_video_frames = drain_video_frames self.track_map = {} # track_id -> (MediaRelay, original_track) self.video_frame_trackers = {} # track_id -> VideoFrameTracker + self._video_blackholes: dict[str, MediaBlackhole] = {} + self._video_drain_tasks: dict[str, asyncio.Task] = {} @self.on("track") async def on_track(track: aiortc.mediastreams.MediaStreamTrack): @@ -177,7 +181,20 @@ def _emit_pcm(pcm: PcmData): handler = AudioTrackHandler(relay.subscribe(tracked_track), _emit_pcm) asyncio.create_task(handler.start()) - self.emit("track_added", relay.subscribe(tracked_track), user) + proxy = relay.subscribe(tracked_track) + + # Drain unconsumed video frames to prevent unbounded queue growth + # in RTCRtpReceiver (aiortc issue #554) + if track.kind == "video" and self._drain_video_frames: + drain_proxy = relay.subscribe(tracked_track) + blackhole = MediaBlackhole() + blackhole.addTrack(drain_proxy) + self._video_blackholes[track.id] = blackhole + self._video_drain_tasks[track.id] = asyncio.create_task( + blackhole.start() + ) + + self.emit("track_added", proxy, user) @self.on("icegatheringstatechange") def on_icegatheringstatechange(): diff --git a/getstream/video/rtc/peer_connection.py b/getstream/video/rtc/peer_connection.py index 3155c4bd..26e89df0 100644 --- a/getstream/video/rtc/peer_connection.py +++ b/getstream/video/rtc/peer_connection.py @@ -28,8 +28,9 @@ class PeerConnectionManager: """Manages WebRTC peer connections for publishing and subscribing.""" - def __init__(self, connection_manager): + def __init__(self, connection_manager, drain_video_frames: bool = False): self.connection_manager = connection_manager + self._drain_video_frames = drain_video_frames self.publisher_pc: Optional[PublisherPeerConnection] = None self.subscriber_pc: Optional[SubscriberPeerConnection] = None self.publisher_negotiation_lock = asyncio.Lock() @@ -47,6 +48,7 @@ async def setup_subscriber(self): self.subscriber_pc = SubscriberPeerConnection( connection=self.connection_manager, configuration=self._build_rtc_configuration(), + drain_video_frames=self._drain_video_frames, ) # Trace create event