Skip to content

Commit f8af642

Browse files
authored
Merge pull request #214 from GetStream/add-frame-info
Send frame info to SFU
2 parents 72ccce0 + d735c91 commit f8af642

6 files changed

Lines changed: 421 additions & 9 deletions

File tree

.github/workflows/release.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ jobs:
5252
- name: Checkout
5353
uses: actions/checkout@v5
5454
- uses: ./.github/actions/python-uv-setup
55+
with:
56+
sync-flags: "--all-extras --dev"
5557
- name: Install from PyPI using uv
5658
run: |
5759
uv pip install getstream==${{ needs.build.outputs.version }}

getstream/video/rtc/pc.py

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ def __init__(
129129
self.connection = connection
130130

131131
self.track_map = {} # track_id -> (MediaRelay, original_track)
132+
self.video_frame_trackers = {} # track_id -> VideoFrameTracker
132133

133134
@self.on("track")
134135
async def on_track(track: aiortc.mediastreams.MediaStreamTrack):
@@ -144,7 +145,16 @@ async def on_track(track: aiortc.mediastreams.MediaStreamTrack):
144145
)
145146

146147
relay = MediaRelay()
147-
self.track_map[track.id] = (relay, track)
148+
tracked_track = track
149+
150+
# For video tracks, wrap with VideoFrameTracker to capture frame metrics
151+
if track.kind == "video":
152+
from getstream.video.rtc.track_util import VideoFrameTracker
153+
154+
tracked_track = VideoFrameTracker(track)
155+
self.video_frame_trackers[track.id] = tracked_track
156+
157+
self.track_map[track.id] = (relay, tracked_track)
148158

149159
if track.kind == "audio":
150160
from getstream.video.rtc import PcmData
@@ -154,10 +164,10 @@ def _emit_pcm(pcm: PcmData):
154164
pcm.participant = user
155165
self.emit("audio", pcm)
156166

157-
handler = AudioTrackHandler(relay.subscribe(track), _emit_pcm)
167+
handler = AudioTrackHandler(relay.subscribe(tracked_track), _emit_pcm)
158168
asyncio.create_task(handler.start())
159169

160-
self.emit("track_added", relay.subscribe(track), user)
170+
self.emit("track_added", relay.subscribe(tracked_track), user)
161171

162172
@self.on("icegatheringstatechange")
163173
def on_icegatheringstatechange():
@@ -182,6 +192,20 @@ def handle_track_ended(self, track: aiortc.mediastreams.MediaStreamTrack) -> Non
182192
# Clean up stored references when track ends
183193
if track.id in self.track_map:
184194
del self.track_map[track.id]
195+
if track.id in self.video_frame_trackers:
196+
del self.video_frame_trackers[track.id]
197+
198+
def get_video_frame_tracker(self) -> Optional[Any]:
199+
"""Get a video frame tracker for stats collection.
200+
201+
Note: Returns the first tracker by insertion order. When multiple video
202+
tracks exist simultaneously (e.g., webcam + screenshare), this may not
203+
match the track being actively consumed. Performance stats calculation
204+
in StatsTracer mitigates this by selecting the highest-resolution track.
205+
"""
206+
if self.video_frame_trackers:
207+
return next(iter(self.video_frame_trackers.values()))
208+
return None
185209

186210
async def restartIce(self):
187211
"""Restart ICE connection for reconnection scenarios."""

getstream/video/rtc/peer_connection.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from getstream.video.rtc.track_util import patch_sdp_offer
2020
from getstream.video.rtc.twirp_client_wrapper import SfuRpcError
2121
from getstream.video.rtc.pc import PublisherPeerConnection, SubscriberPeerConnection
22+
from getstream.video.rtc.stats_reporter import DEFAULT_STATS_INTERVAL_MS
2223
from getstream.video.rtc.stats_tracer import StatsTracer
2324

2425
logger = logging.getLogger(__name__)
@@ -57,7 +58,9 @@ async def setup_subscriber(self):
5758
self._setup_pc_tracing(self.subscriber_pc, pc_id)
5859

5960
# Create stats tracer
60-
self.subscriber_stats = StatsTracer(self.subscriber_pc, "subscriber")
61+
self.subscriber_stats = StatsTracer(
62+
self.subscriber_pc, "subscriber", DEFAULT_STATS_INTERVAL_MS / 1000
63+
)
6164

6265
@self.subscriber_pc.on("audio")
6366
async def on_audio(pcm_data):
@@ -131,14 +134,19 @@ async def add_tracks(
131134
self._setup_pc_tracing(self.publisher_pc, pc_id)
132135

133136
# Create stats tracer
134-
self.publisher_stats = StatsTracer(self.publisher_pc, "publisher")
137+
self.publisher_stats = StatsTracer(
138+
self.publisher_pc, "publisher", DEFAULT_STATS_INTERVAL_MS / 1000
139+
)
135140

136141
if audio and relayed_audio:
137142
self.publisher_pc.addTrack(relayed_audio)
138143
logger.info(f"Added relayed audio track {relayed_audio.id}")
139144
if video and relayed_video:
140145
self.publisher_pc.addTrack(relayed_video)
141146
logger.info(f"Added relayed video track {relayed_video.id}")
147+
# Set frame tracker for video stats (BufferedMediaTrack has frame tracking)
148+
if self.publisher_stats:
149+
self.publisher_stats.set_frame_tracker(relayed_video)
142150

143151
# Trace createOffer
144152
tracer.trace("createOffer", pc_id, [])

getstream/video/rtc/stats_tracer.py

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,28 @@ class StatsTracer:
4545
- Delta compression to reduce size by ~90%
4646
- Performance stats calculation (encode/decode metrics)
4747
- Frame time and FPS history for averaging
48+
- Integration with VideoFrameTracker/BufferedMediaTrack for frame metrics
4849
"""
4950

50-
def __init__(self, pc, peer_type: str):
51+
def __init__(self, pc, peer_type: str, interval_s: float = 8.0):
5152
"""Initialize StatsTracer for a peer connection.
5253
5354
Args:
5455
pc: The RTCPeerConnection to collect stats from
5556
peer_type: "publisher" or "subscriber"
57+
interval_s: Interval between stats collections in seconds (for FPS calculation)
5658
"""
5759
self._pc = pc
5860
self._peer_type = peer_type
61+
self._interval_s = interval_s
5962
self._previous_stats: Dict[str, Dict] = {}
6063
self._frame_time_history: List[float] = []
6164
self._fps_history: List[float] = []
65+
self._frame_tracker: Optional[Any] = None
66+
67+
def set_frame_tracker(self, tracker: Any) -> None:
68+
"""Set the frame tracker for publisher stats (video track wrapper)."""
69+
self._frame_tracker = tracker
6270

6371
async def get(self) -> ComputedStats:
6472
"""Get stats with delta compression and performance metrics.
@@ -131,6 +139,9 @@ def _report_to_dict(self, report) -> Dict[str, Dict]:
131139
# Add ICE candidate stats (not provided by aiortc getStats)
132140
self._add_ice_candidate_stats(result)
133141

142+
# Inject frame stats from tracker (not provided by aiortc)
143+
self._inject_frame_stats(result)
144+
134145
return result
135146

136147
def _delta_compress(
@@ -579,3 +590,85 @@ def _codec_id(self, codec, mid: Optional[str]) -> str:
579590
clock_rate = getattr(codec, "clockRate", 0)
580591
key = f"{mime_type}:{payload_type}:{clock_rate}:{mid}"
581592
return hashlib.md5(key.encode()).hexdigest()[:8]
593+
594+
def _inject_frame_stats(self, result: Dict[str, Dict]) -> None:
595+
"""Inject frame stats from trackers into RTP stats.
596+
597+
aiortc doesn't provide frame metrics (dimensions, frame count, encode/decode time).
598+
We inject these from our frame trackers into the appropriate RTP stats entries.
599+
"""
600+
if self._peer_type == "publisher":
601+
self._inject_publisher_stats(result)
602+
else:
603+
self._inject_subscriber_stats(result)
604+
605+
def _inject_publisher_stats(self, result: Dict[str, Dict]) -> None:
606+
"""Inject stats for publisher (outbound-rtp)."""
607+
if not self._frame_tracker:
608+
return
609+
610+
try:
611+
frame_stats = self._frame_tracker.get_frame_stats()
612+
if frame_stats.get("framesSent", 0) == 0:
613+
return
614+
615+
for stat in result.values():
616+
if not isinstance(stat, dict):
617+
continue
618+
if stat.get("kind") != "video" or stat.get("type") != "outbound-rtp":
619+
continue
620+
621+
stat["framesSent"] = frame_stats["framesSent"]
622+
stat["frameWidth"] = frame_stats["frameWidth"]
623+
stat["frameHeight"] = frame_stats["frameHeight"]
624+
stat["totalEncodeTime"] = frame_stats["totalEncodeTime"]
625+
626+
if self._previous_stats:
627+
prev = self._previous_stats.get(stat.get("id", ""), {})
628+
delta = frame_stats["framesSent"] - prev.get("framesSent", 0)
629+
if delta > 0:
630+
stat["framesPerSecond"] = delta / self._interval_s
631+
632+
except Exception as e:
633+
logger.debug(f"Failed to inject publisher stats: {e}")
634+
635+
def _inject_subscriber_stats(self, result: Dict[str, Dict]) -> None:
636+
"""Inject frame stats for subscriber (inbound-rtp video).
637+
638+
Note: When multiple video tracks exist (e.g., webcam + screenshare),
639+
get_video_frame_tracker() returns the first by insertion order, which
640+
may not match the actively consumed track. This is a known limitation;
641+
_get_decode_stats() mitigates by selecting the highest-resolution track
642+
for performance calculations.
643+
"""
644+
# Get video tracker from PC if not set
645+
if not self._frame_tracker and hasattr(self._pc, "get_video_frame_tracker"):
646+
self._frame_tracker = self._pc.get_video_frame_tracker()
647+
648+
if not self._frame_tracker:
649+
return
650+
651+
try:
652+
frame_stats = self._frame_tracker.get_frame_stats()
653+
if frame_stats.get("framesDecoded", 0) == 0:
654+
return
655+
656+
for stat in result.values():
657+
if not isinstance(stat, dict):
658+
continue
659+
if stat.get("type") != "inbound-rtp" or stat.get("kind") != "video":
660+
continue
661+
662+
stat["framesDecoded"] = frame_stats["framesDecoded"]
663+
stat["frameWidth"] = frame_stats["frameWidth"]
664+
stat["frameHeight"] = frame_stats["frameHeight"]
665+
stat["totalDecodeTime"] = frame_stats["totalDecodeTime"]
666+
667+
if self._previous_stats:
668+
prev = self._previous_stats.get(stat.get("id", ""), {})
669+
delta = frame_stats["framesDecoded"] - prev.get("framesDecoded", 0)
670+
if delta > 0:
671+
stat["framesPerSecond"] = delta / self._interval_s
672+
673+
except Exception as e:
674+
logger.debug(f"Failed to inject subscriber stats: {e}")

getstream/video/rtc/track_util.py

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io
55
import logging
66
import re
7+
import time
78
import wave
89
from enum import Enum
910
from fractions import Fraction
@@ -2143,7 +2144,13 @@ def parse_track_stream_mapping(sdp: str) -> dict:
21432144

21442145

21452146
class BufferedMediaTrack(aiortc.mediastreams.MediaStreamTrack):
2146-
"""A wrapper for MediaStreamTrack that buffers one peeked frame."""
2147+
"""A wrapper for MediaStreamTrack that buffers one peeked frame.
2148+
2149+
Also tracks video frame statistics when kind is 'video':
2150+
- frames_processed: total frames that passed through recv()
2151+
- frame_width, frame_height: dimensions of the last frame
2152+
- total_processing_time_ms: cumulative time spent in recv()
2153+
"""
21472154

21482155
def __init__(self, track):
21492156
super().__init__()
@@ -2153,6 +2160,12 @@ def __init__(self, track):
21532160
self._id = track.id
21542161
self._ended = False
21552162

2163+
# Frame statistics (for video tracks)
2164+
self.frames_processed: int = 0
2165+
self.frame_width: int = 0
2166+
self.frame_height: int = 0
2167+
self.total_processing_time_ms: float = 0.0
2168+
21562169
@property
21572170
def kind(self):
21582171
return self._kind
@@ -2165,17 +2178,44 @@ def id(self):
21652178
def readyState(self):
21662179
return "ended" if self._ended else self._track.readyState
21672180

2181+
def get_frame_stats(self) -> Dict[str, Any]:
2182+
"""Get current frame statistics for StatsTracer injection."""
2183+
return {
2184+
"framesSent": self.frames_processed,
2185+
"frameWidth": self.frame_width,
2186+
"frameHeight": self.frame_height,
2187+
"totalEncodeTime": self.total_processing_time_ms / 1000.0,
2188+
}
2189+
2190+
def _update_frame_stats(self, frame, processing_time_ms: float) -> None:
2191+
"""Update frame statistics from a video frame."""
2192+
if (
2193+
self._kind == "video"
2194+
and hasattr(frame, "width")
2195+
and hasattr(frame, "height")
2196+
):
2197+
self.frames_processed += 1
2198+
self.frame_width = frame.width
2199+
self.frame_height = frame.height
2200+
self.total_processing_time_ms += processing_time_ms
2201+
21682202
async def recv(self):
21692203
"""Returns the next buffered frame if available, otherwise gets a new frame from the track."""
21702204
if self._ended:
21712205
raise MediaStreamError("Track is ended")
21722206

21732207
if self._buffered_frames:
21742208
# Return the oldest buffered frame (FIFO order)
2175-
return self._buffered_frames.pop(0)
2209+
frame = self._buffered_frames.pop(0)
2210+
self._update_frame_stats(frame, 0.0)
2211+
return frame
21762212

2213+
start_time = time.monotonic()
21772214
try:
2178-
return await self._track.recv()
2215+
frame = await self._track.recv()
2216+
elapsed_ms = (time.monotonic() - start_time) * 1000
2217+
self._update_frame_stats(frame, elapsed_ms)
2218+
return frame
21792219
except Exception as e:
21802220
logger.error(f"Error receiving frame from track: {e}")
21812221
self._ended = True
@@ -2214,6 +2254,78 @@ def stop(self):
22142254
logger.error(f"Error stopping track: {e}")
22152255

22162256

2257+
class VideoFrameTracker(aiortc.mediastreams.MediaStreamTrack):
2258+
"""A transparent wrapper that tracks video frame statistics.
2259+
2260+
Used for subscriber video tracks to capture frame metrics that aiortc
2261+
doesn't provide natively (dimensions, frame count, decode time).
2262+
"""
2263+
2264+
kind = "video"
2265+
2266+
def __init__(self, track: MediaStreamTrack):
2267+
super().__init__()
2268+
self._track = track
2269+
self._id = track.id
2270+
self._ended = False
2271+
2272+
# Frame statistics
2273+
self.frames_processed: int = 0
2274+
self.frame_width: int = 0
2275+
self.frame_height: int = 0
2276+
self.total_processing_time_ms: float = 0.0
2277+
2278+
@property
2279+
def id(self):
2280+
return self._id
2281+
2282+
@property
2283+
def readyState(self):
2284+
return "ended" if self._ended else self._track.readyState
2285+
2286+
def get_frame_stats(self) -> Dict[str, Any]:
2287+
"""Get current frame statistics for StatsTracer injection."""
2288+
return {
2289+
"framesDecoded": self.frames_processed,
2290+
"frameWidth": self.frame_width,
2291+
"frameHeight": self.frame_height,
2292+
"totalDecodeTime": self.total_processing_time_ms / 1000.0,
2293+
}
2294+
2295+
async def recv(self):
2296+
"""Receive a frame, tracking statistics."""
2297+
if self._ended:
2298+
raise MediaStreamError("Track is ended")
2299+
2300+
start_time = time.monotonic()
2301+
try:
2302+
frame = await self._track.recv()
2303+
elapsed_ms = (time.monotonic() - start_time) * 1000
2304+
2305+
# Update stats for video frames
2306+
if isinstance(frame, av.VideoFrame):
2307+
self.frames_processed += 1
2308+
self.frame_width = frame.width
2309+
self.frame_height = frame.height
2310+
self.total_processing_time_ms += elapsed_ms
2311+
2312+
return frame
2313+
except MediaStreamError:
2314+
self._ended = True
2315+
raise
2316+
except Exception as e:
2317+
logger.error(f"Error receiving frame: {e}")
2318+
self._ended = True
2319+
raise MediaStreamError(f"Error receiving frame: {e}") from e
2320+
2321+
def stop(self):
2322+
"""Stop the track."""
2323+
if not self._ended:
2324+
self._ended = True
2325+
if hasattr(self._track, "stop"):
2326+
self._track.stop()
2327+
2328+
22172329
async def detect_video_properties(
22182330
video_track: aiortc.mediastreams.MediaStreamTrack,
22192331
) -> Dict[str, Any]:

0 commit comments

Comments
 (0)