Skip to content

Commit aff0614

Browse files
authored
Update to hang 0.9.0 / moq-lite 0.10.1 / moq-native 0.10.1 (#27)
* Update to hang 0.9.0 / moq-lite 0.10.1 / moq-native 0.10.1 Update dependencies to latest versions and fix breaking API changes: Sink (hangsink): - hang::cmaf::Import → hang::import::Fmp4 - .parse() → .decode() - Add BytesMut buffer to accumulate incoming data across GStreamer buffer boundaries, fixing "invalid size" panic when MP4 atoms span multiple buffers - Add bytes crate dependency Source (hangsrc): - hang::Catalog::default_track() → hang::catalog::Catalog::default_track() - TrackConsumer now requires explicit construction with max_latency - .read() → .read_frame() - frame.payload is now chunked, requires flattening to Vec<u8> - Timestamp subtraction returns Timestamp, needs .into() for Duration * Update to hang 0.9.0 / moq-lite 0.10.1 / moq-native 0.10.1 Update dependencies to latest versions and fix breaking API changes: Sink (hangsink): - hang::cmaf::Import → hang::import::Fmp4 - .parse() → .decode() - Add BytesMut buffer to accumulate incoming data across GStreamer buffer boundaries, fixing "invalid size" panic when MP4 atoms span multiple buffers - Add bytes crate dependency Source (hangsrc): - hang::Catalog::default_track() → hang::catalog::Catalog::default_track() - TrackConsumer now requires explicit construction with max_latency - .read() → .read_frame() - frame.payload is now chunked, requires flattening to Vec<u8> - Timestamp subtraction returns Timestamp, needs .into() for Duration
1 parent 2a70f8a commit aff0614

4 files changed

Lines changed: 65 additions & 31 deletions

File tree

Cargo.lock

Lines changed: 29 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@ path = "src/lib.rs"
1919
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
2020

2121
[dependencies]
22-
# Using published versions from crates.io instead of workspace
23-
hang = "0.6"
24-
moq-lite = "0.8"
25-
moq-native = "0.8"
22+
# Using local path dependencies from moq-dev
23+
hang = "0.9.0"
24+
moq-lite = "0.10.1"
25+
moq-native = "0.10.1"
2626

2727
anyhow = { version = "1", features = ["backtrace"] }
28+
bytes = "1"
2829
gst = { package = "gstreamer", version = "0.23" }
2930
gst-base = { package = "gstreamer-base", version = "0.23" }
3031
#gst-app = { package = "gstreamer-app", version = "0.23", features = ["v1_20"] }

src/sink/imp.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::Context as _;
2+
use bytes::BytesMut;
23
use gst::glib;
34
use gst::prelude::*;
45
use gst::subclass::prelude::*;
@@ -28,7 +29,8 @@ struct Settings {
2829

2930
#[derive(Default)]
3031
struct State {
31-
pub media: Option<hang::cmaf::Import>,
32+
pub media: Option<hang::import::Fmp4>,
33+
pub buffer: BytesMut,
3234
}
3335

3436
#[derive(Default)]
@@ -140,12 +142,24 @@ impl BaseSinkImpl for HangSink {
140142
let data = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
141143

142144
let mut state = self.state.lock().unwrap();
145+
146+
// Append incoming data to our buffer
147+
state.buffer.extend_from_slice(data.as_slice());
148+
149+
// Take media out temporarily to avoid borrow conflict
143150
let mut media = state.media.take().expect("not initialized");
144151

145-
// TODO avoid full media parsing? gst should be able to provide the necessary info
146-
media.parse(data.as_slice()).expect("failed to parse");
152+
// Try to decode what we have buffered
153+
let result = media.decode(&mut state.buffer);
154+
155+
// Put media back
147156
state.media = Some(media);
148157

158+
if let Err(e) = result {
159+
gst::error!(gst::CAT_DEFAULT, "Failed to decode: {}", e);
160+
return Err(gst::FlowError::Error);
161+
}
162+
149163
Ok(gst::FlowSuccess::Ok)
150164
}
151165
}
@@ -180,7 +194,7 @@ impl HangSink {
180194
.await
181195
.expect("failed to connect");
182196

183-
let media = hang::cmaf::Import::new(broadcast.producer);
197+
let media = hang::import::Fmp4::new(broadcast.producer.into());
184198

185199
let mut state = self.state.lock().unwrap();
186200
state.media = Some(media);

src/source/imp.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ impl HangSrc {
185185
.consume_broadcast(&name)
186186
.ok_or_else(|| anyhow::anyhow!("Broadcast '{}' not found", name))?;
187187

188-
let catalog = broadcast.subscribe_track(&hang::Catalog::default_track());
188+
let catalog = broadcast.subscribe_track(&hang::catalog::Catalog::default_track());
189189
let mut catalog = hang::catalog::CatalogConsumer::new(catalog);
190190

191191
// TODO handle catalog updates
@@ -194,7 +194,8 @@ impl HangSrc {
194194
if let Some(video) = catalog.video {
195195
for (track_name, config) in video.renditions {
196196
let track_ref = hang::moq_lite::Track::new(&track_name);
197-
let mut track: hang::TrackConsumer = broadcast.subscribe_track(&track_ref).into();
197+
let mut track =
198+
hang::TrackConsumer::new(broadcast.subscribe_track(&track_ref), std::time::Duration::from_secs(1));
198199

199200
let caps = match config.codec {
200201
hang::catalog::VideoCodec::H264(_) => {
@@ -239,14 +240,15 @@ impl HangSrc {
239240
let mut reference = None;
240241
tokio::spawn(async move {
241242
loop {
242-
match track.read().await {
243+
match track.read_frame().await {
243244
Ok(Some(frame)) => {
244-
let mut buffer = gst::Buffer::from_slice(frame.payload);
245+
let payload: Vec<u8> = frame.payload.into_iter().flatten().collect();
246+
let mut buffer = gst::Buffer::from_slice(payload);
245247
let buffer_mut = buffer.get_mut().unwrap();
246248

247249
// Make timestamps relative to the first frame for proper playback
248250
let pts = if let Some(reference_ts) = reference {
249-
let timestamp: std::time::Duration = frame.timestamp - reference_ts;
251+
let timestamp: std::time::Duration = (frame.timestamp - reference_ts).into();
250252
gst::ClockTime::from_nseconds(timestamp.as_nanos() as _)
251253
} else {
252254
reference = Some(frame.timestamp);
@@ -287,7 +289,8 @@ impl HangSrc {
287289
if let Some(audio) = catalog.audio {
288290
for (track_name, config) in audio.renditions {
289291
let track_ref = hang::moq_lite::Track::new(&track_name);
290-
let mut track: hang::TrackConsumer = broadcast.subscribe_track(&track_ref).into();
292+
let mut track =
293+
hang::TrackConsumer::new(broadcast.subscribe_track(&track_ref), std::time::Duration::from_secs(1));
291294

292295
let caps = match &config.codec {
293296
hang::catalog::AudioCodec::AAC(_aac) => {
@@ -346,14 +349,15 @@ impl HangSrc {
346349
let mut reference = None;
347350
tokio::spawn(async move {
348351
loop {
349-
match track.read().await {
352+
match track.read_frame().await {
350353
Ok(Some(frame)) => {
351-
let mut buffer = gst::Buffer::from_slice(frame.payload);
354+
let payload: Vec<u8> = frame.payload.into_iter().flatten().collect();
355+
let mut buffer = gst::Buffer::from_slice(payload);
352356
let buffer_mut = buffer.get_mut().unwrap();
353357

354358
// Make timestamps relative to the first frame for proper playback
355359
let pts = if let Some(reference_ts) = reference {
356-
let timestamp: std::time::Duration = frame.timestamp - reference_ts;
360+
let timestamp: std::time::Duration = (frame.timestamp - reference_ts).into();
357361
gst::ClockTime::from_nseconds(timestamp.as_nanos() as _)
358362
} else {
359363
reference = Some(frame.timestamp);

0 commit comments

Comments
 (0)