Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/video-streamer/src/streamer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,10 @@ pub fn webm_stream(
}
}
});
// If the oneshot sender is dropped (task panicked), treat as Break
rx.blocking_recv().unwrap_or(WhenEofControlFlow::Break)
rx.blocking_recv().unwrap_or_else(|_| {
warn!("when_eof oneshot sender dropped unexpectedly, treating as shutdown");
WhenEofControlFlow::Break
})
}

enum WhenEofControlFlow {
Expand Down
41 changes: 36 additions & 5 deletions crates/video-streamer/src/streamer/tag_writers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ const VPX_EFLAG_FORCE_KF: u32 = 0x00000001;
/// - 0 = never skip (disables adaptive skipping effectively)
const MAX_CONSECUTIVE_FRAME_SKIPS: u32 = 1;

/// Interval in milliseconds at which keyframes are forced in the outgoing stream.
/// Periodic keyframes improve seekability and resilience to packet loss.
const KEYFRAME_INTERVAL_MS: u64 = 10_000;

#[cfg(feature = "perf-diagnostics")]
fn duration_as_millis_u64(duration: Duration) -> u64 {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
Expand Down Expand Up @@ -111,6 +115,7 @@ where
codec: VpxCodec,
cut_block_state: CutBlockState,
last_encoded_abs_time: Option<u64>,
last_keyframe_abs_time: Option<u64>,

// Adaptive frame skipping state
#[cfg(feature = "perf-diagnostics")]
Expand Down Expand Up @@ -204,6 +209,7 @@ where
codec: config.codec,
cut_block_state: CutBlockState::HaventMet,
last_encoded_abs_time: None,
last_keyframe_abs_time: None,
#[cfg(feature = "perf-diagnostics")]
stream_start: Instant::now(),
processing_time: Duration::ZERO,
Expand Down Expand Up @@ -399,6 +405,13 @@ where
}
}

fn should_force_keyframe(&self, abs_time: u64) -> bool {
match self.last_keyframe_abs_time {
Some(last_kf_time) => abs_time.saturating_sub(last_kf_time) >= KEYFRAME_INTERVAL_MS,
None => false,
}
}

fn should_skip_encode(&self) -> bool {
// Skip encoding when falling behind real-time. The ratio naturally self-regulates:
// skipping makes processing faster (decode-only), which pushes ratio back above 1.0,
Expand Down Expand Up @@ -484,6 +497,7 @@ where
perf_trace!(block_timestamp, "Writing block to output");
self.write_block(block)?;
self.last_encoded_abs_time = Some(abs_time);
self.last_keyframe_abs_time = Some(abs_time);
}
CutBlockState::Met {
cut_block_absolute_time,
Expand Down Expand Up @@ -513,8 +527,10 @@ where
);
self.frames_since_last_encode = 0;

let force_keyframe = self.should_force_keyframe(abs_time);

let duration = self.compute_encode_duration(abs_time);
let frame = self.reencode(current_video_block, false, duration)?;
let frame = self.reencode(current_video_block, force_keyframe, duration)?;
let Some(frame) = frame else {
perf_trace!(block_timestamp, "No frame available from encoder - skipping");
return Ok(());
Expand All @@ -524,10 +540,25 @@ where
let frame_size = frame.len();
perf_trace!(block_timestamp, frame_size, "Frame available from encoder");

let timestamp = self.compute_met_timestamp(cut_block_absolute_time, abs_time)?;
let block = SimpleBlock::new_uncheked(&frame, 1, timestamp, false, None, false, false);
perf_trace!(block_timestamp, "Writing block to output");
self.write_block(block)?;
if force_keyframe {
// Start a new cluster for the forced keyframe so the stream remains seekable.
let cluster_rel = abs_time - cut_block_absolute_time;
self.maybe_report_realtime_ratio(abs_time, cluster_rel);
self.start_new_cluster(cluster_rel)?;
self.cut_block_state = CutBlockState::Met {
cut_block_absolute_time,
last_cluster_relative_time: cluster_rel,
};
let block = SimpleBlock::new_uncheked(&frame, 1, 0, false, None, false, true);
perf_trace!(block_timestamp, "Writing forced keyframe block to output");
self.write_block(block)?;
self.last_keyframe_abs_time = Some(abs_time);
} else {
let timestamp = self.compute_met_timestamp(cut_block_absolute_time, abs_time)?;
let block = SimpleBlock::new_uncheked(&frame, 1, timestamp, false, None, false, false);
perf_trace!(block_timestamp, "Writing block to output");
self.write_block(block)?;
}
self.last_encoded_abs_time = Some(abs_time);
}
}
Expand Down