-
Notifications
You must be signed in to change notification settings - Fork 130
Add delivery timeout/expires support #712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Now it's muuuuch easier to support multiple versions in the future.
WalkthroughRefactors track lifecycle: Track now exposes priority and expires and its constructor accepts a TrackArgs object. Broadcast.subscribe accepts Track instances (not name/priority). Many modules switched to creating and subscribing Moq.Track objects. Lite Draft03 support and expires handling were added across protocol layers. Catalog publishing was changed to an update(closure) pattern and location-related models/modules were removed. Group/Frame/Track producer–consumer APIs were reworked for explicit error returns, proxying, expiration, and lifecycle management. Package versions bumped (js/moq 0.10.0, rs/moq 0.10.0). Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
rs/moq-relay/src/web.rs (1)
249-257: ReplaceDuration::default()with a meaningful expires value for HTTP fetchThe code at lines 249–253 in rs/moq-relay/src/web.rs sets
expires: std::time::Duration::default(), which equals 0 milliseconds. However, in rs/moq/src/model/track.rs (lines 97–98), groups withexpires.is_zero()immediately returnErr(Error::Expired). This means groups are rejected on creation, soserve_fetchwill not retrieve any buffered data and only see groups created after subscription.Use a positive duration (e.g., the codebase's MAX_EXPIRES constant of 10 seconds) or a shared constant instead.
rs/moq/src/ietf/subscriber.rs (1)
168-178: Avoid leaving alias/publish state stale whenclose()failsIf
track.producer.close()returns an error, the function exits via?before removing the alias or handlingpublishes, potentially leaving stale state behind.You can ensure cleanup always happens while still propagating the close error:
- if let Some(mut track) = state.subscribes.remove(&msg.request_id) { - track.producer.close()?; - - if let Some(alias) = track.alias { - state.aliases.remove(&alias); - } - } + if let Some(mut track) = state.subscribes.remove(&msg.request_id) { + let res = track.producer.close(); + + if let Some(alias) = track.alias { + state.aliases.remove(&alias); + } + + res?; + }This keeps maps consistent even if closing the producer fails.
js/moq/src/lite/subscriber.ts (1)
48-55: Guard against unhandled rejections in#runSubscribe
consume()intentionally fire‑and‑forgetsthis.#runSubscribe(path, track), but thetryblock in#runSubscribestarts only after the subscribe frame is sent:const stream = await Stream.open(this.#quic, { sendOrder: track.priority }); await stream.writer.u53(StreamId.Subscribe); await msg.encode(stream.writer); try { await SubscribeOk.decode(stream.reader, this.version); // ... }If
Stream.openor the initial writes fail, the Promise returned by#runSubscribewill reject and nothing is awaiting it, which may surface as unhandled rejections.Consider either:
- Moving the
tryto wrap the entire body, or- Explicitly discarding errors at the call site, e.g.
void this.#runSubscribe(path, track)or wrapping in a.catch().The rest of the lifecycle (saving the track in
#subscribes, usingtrack.closedandstream.reader.closedinPromise.race, closing/aborting both sides and cleaning up the map) looks consistent with the new Track API.Also applies to: 92-104, 106-144
🧹 Nitpick comments (19)
rs/moq-clock/src/clock.rs (1)
23-23: Consider propagating error instead of panicking.While
expect("not closed")is an improvement overunwrap(), consider using the?operator for consistency with the error propagation pattern adopted elsewhere in this function. Ifcreate_groupfails (likely indicating the track is closed), propagating the error would allowrun()to return gracefully rather than panic.Apply this diff:
- let segment = self.track.create_group(sequence.into()).expect("not closed"); + let segment = self.track.create_group(sequence.into())?;js/moq/src/util/promise.ts (1)
1-17: Deferred promise helper is correct; consider widening types to match Promise semanticsThe implementation is a standard, sound deferred-promise pattern and should behave correctly.
If you want this utility to be a drop-in match for native
Promisesemantics, you could optionally widen the function types a bit:
- Allow
PromiseLike<T>inresolve.- Allow any/unknown rejection reason instead of only
Error.For example:
-export type Defer<T> = { promise: Promise<T>; resolve: (value: T) => void; reject: (error: Error) => void }; +export type Defer<T> = { + promise: Promise<T>; + resolve: (value: T | PromiseLike<T>) => void; + reject: (reason?: unknown) => void; +}; export function defer<T>(): Defer<T> { - let resolve!: (value: T) => void; - let reject!: (error: Error) => void; + let resolve!: (value: T | PromiseLike<T>) => void; + let reject!: (reason?: unknown) => void; const promise = new Promise<T>((r, rj) => { resolve = r; reject = rj; }); return { promise, resolve, reject }; }Totally fine to keep the narrower signatures if you specifically want to constrain rejections to
Errorin this codebase.rs/moq/src/lite/version.rs (1)
20-30: Consider refactoring to usematchfor improved readability.The if-else chain for version matching could be more idiomatic using a
matchexpression, which would provide better exhaustiveness checking and readability.Apply this diff to refactor to
match:fn try_from(value: coding::Version) -> Result<Self, Self::Error> { - if value == Self::Draft01.coding() { - Ok(Self::Draft01) - } else if value == Self::Draft02.coding() { - Ok(Self::Draft02) - } else if value == Self::Draft03.coding() { - Ok(Self::Draft03) - } else { - Err(()) - } + match value { + v if v == Self::Draft01.coding() => Ok(Self::Draft01), + v if v == Self::Draft02.coding() => Ok(Self::Draft02), + v if v == Self::Draft03.coding() => Ok(Self::Draft03), + _ => Err(()), + } }rs/hang/src/annexb/import.rs (1)
11-13: Local MAX_EXPIRES looks fine; consider centralizing default laterHard‑coding a 10s expires here is reasonable as a starting point, especially with the TODO. Longer term, you may want a shared constant/config (also used by the CMAF importer) so changing the default keeps all import paths in sync.
js/hang/src/watch/broadcast.ts (1)
134-138: Catalog subscription migration toMoq.Tracklooks correctCreating a
Moq.Trackfor"catalog.json"and passing it through subscribe/cleanup/fetch aligns with the new track‑based APIs and keeps lifecycle management explicit. If we later want catalog updates to respect delivery timeouts, we can add anexpireshere or a catalog‑specific default, but current behavior is consistent with the prior implementation.js/moq-clock/src/main.ts (2)
83-90: Publisher side correctly adapts toBroadcast.requested()returningTrackUsing the
trackreturned frombroadcast.requested()directly (name check,publishTrack(track), andtrack.close(new Error("not found"))) matches the updated API and keeps behavior equivalent to the pre‑refactor request object.
141-142: Subscriber side Track construction matches new subscribe APIInstantiating
new Moq.Track({ name: config.track, priority: 0 })and subscribing it viabroadcast.subscribe(track)is consistent with the rest of the codebase. If you ever expand this tool, you might consider explicitly closingtrackwhen the outer loop exits to mirror other watch paths, but it’s not critical for this CLI.js/moq/src/lite/subscribe.ts (2)
8-24: SubscribeUpdate now always encodesexpires; verify version-compat expectationsThe addition of
expirestoSubscribeUpdateand unconditionalu53encode/decode keeps the message simple, but it does change the wire layout for all versions. If SubscribeUpdate is ever sent/received for DRAFT_01/02 peers that expect only apriority, this could break compatibility; if it’s only used for DRAFT_03, we’re fine.Also, since you’re already providing defaults in the destructuring, making
expiresoptional in the constructor’s type (e.g.{ priority: number; expires?: DOMHighResTimeStamp }) would better reflect how it’s used and give a bit more flexibility at call sites.
44-103: Version-awareSubscribeencoding/decoding forexpiresis consistentAdding
expiresandversiontoSubscribe, and gating the extraexpiresfield onVersion.DRAFT_03in both#encodeand#decode, matches the intended protocol evolution while keeping DRAFT_01/02 layouts unchanged. The exhaustiveelsebranches that throw on unknown versions are also helpful for catching misconfigurations early.One minor ergonomic nit: as with
SubscribeUpdate, the constructor’s options type could declarepriority/expiresas optional with defaults, since you already provide default values in the destructuring.rs/hang/src/model/track.rs (1)
20-26: Per-track group management and Draft03 timestamp encoding look correct; consider avoidingexpectpanicsThe new
group: Option<moq_lite::GroupProducer>cache and thewritepath (closing the previous group on keyframes and lazily creating/appending groups) look logically consistent, and switching timestamp encoding tolite::Version::Draft03matches the broader protocol changes.The series of
expect("impossible")calls onappend_group,create_frame,write_chunk, andcloseeffectively treat any failure (e.g., closed/aborted track) as a hard panic. If those errors can be triggered by peer behavior or normal shutdown races, it would be more robust to makewritereturn aResultand propagate/log these errors instead of panicking.Also applies to: 44-66
rs/moq/src/model/broadcast.rs (1)
14-17: Expiration wiring and requested-track lifecycle look coherentCapping dynamically created requested tracks with
MAX_EXPIRESand then applyingexpires(track.expires)per consumer gives a reasonable upper bound on retention while still letting callers request shorter lifetimes. The flow insubscribe_track—deduplicating via therequestedmap, pushing the producer over the channel, and spawning a cleanup task that removes the producer from the lookup afterunused().await—addresses the earlier “producer never becomes unused” issue without obvious races (the lock is held only for map mutations).The hard-coded
MAX_EXPIRES = 10swith TODOs for configurability is fine as an initial safety guard, but in the longer term this likely wants to be configurable and/or derived from policy, especially if some consumers expect longer-lived tracks.Also applies to: 210-219, 223-237, 241-245
rs/moq/src/lite/subscriber.rs (1)
176-207: Track-based subscribe flow and cancellation semantics look right; consider softeningexpectSwitching
run_subscribeto operate onTrackProducerdirectly, includingexpiresandversioninlite::Subscribe, and racingtrack.unused()againstrun_track(msg)gives a clear lifecycle:
- if the track becomes unused first, you cancel with
Error::Canceland abort the producer;- if the transport or peer errors, you abort with that error;
- on a clean
run_trackcompletion, you close the track.The logging and
subscribesbookkeeping around this are consistent. As with other places, usingtrack.close().expect("impossible")makes a close failure a panic; ifclosecan fail due to peer-driven races, consider returning aResulthere and logging/propagating instead of panicking.js/moq/src/stream.ts (1)
36-37: New options bag foropenlooks good but is a breaking API changeSwitching
Stream.open/Writer.opento an options object with{ sendOrder }is a nice improvement for clarity and future extensibility. The one caveat is compatibility: any existing call sites that still doStream.open(quic, 0)/Writer.open(quic, 0)will now throw at runtime because the second argument is destructured as an object.If you expect external consumers or older internal call sites to still pass a number, consider a small compatibility shim:
static async open( quic: WebTransport, opts?: number | { sendOrder?: number }, ): Promise<Stream> { const sendOrder = typeof opts === "number" ? opts : opts?.sendOrder; return new Stream(await quic.createBidirectionalStream({ sendOrder })); }Same pattern can be applied to
Writer.open. Otherwise, just ensure all call sites are updated to the new{ sendOrder }shape.Also applies to: 299-301
js/hang/src/watch/location/peers.ts (1)
44-50: Track‑based subscription and cleanup are correct; minor redundancy intrack.closeCreating a
Moq.Trackwith{ name: catalog.name, priority: catalog.priority }, subscribing it, and wiringeffect.cleanup(() => track.close())pluseffect.spawn(this.#runTrack.bind(this, track))is consistent with the other watch modules.
#runTrackalso callstrack.close()in itsfinallyblock, whileeffect.cleanupalready closes the same track when the effect tears down. That double close is harmless (theTrackAPI is idempotent here), but a bit redundant and slightly diverges from the chat message/typing watchers, which rely on the effect cleanup alone.If you want to keep behavior uniform and reduce duplication, you could leave
track.closeto the effect cleanup and have#runTrack’sfinallyonly reset#positions.js/moq/src/ietf/publisher.ts (1)
145-149: Passing track priority intosendOrderis reasonable; consider guarding extreme valuesUsing
track.priorityas thesendOrderwhen opening the per‑group unidirectional stream ties the transport’s scheduling to the Track’s priority, which is exactly what you want for priority‑aware delivery.If
subscriberPriority(and thustrack.priority) is ever derived from untrusted input, you may want to clamp or validate it before passing intosendOrder(e.g., enforce a bounded integer range or a small set of allowed priorities), to avoid surprising behavior from extreme values.js/moq/src/lite/publisher.ts (2)
184-207:#runTrack’sPromise.raceand cleanup logic are subtle but soundThe updated
#runTrackloop:
- Starts
next = track.nextGroup()once per iteration.- Races
nextagainststream.closedso we stop scheduling new groups when the subscribe stream is closed.- If
stream.closedwins (nogroup), it schedulesnext.then(group => group?.close()).catch(() => {})and breaks, ensuring any late‑resolving group is closed and any rejection is swallowed instead of turning into an unhandled rejection.- On normal completion, it
closes the track and stream; on error it logs,track.close(e), andstream.reset(e).This looks correct and prevents both leaks and unhandled promise rejections, though it is a bit non‑obvious at first glance. A short comment explaining why
next.then(...close...)is needed (i.e., to clean up a pendingnextGroup()afterstream.closedwins the race) would help future maintainers.
211-215: Per‑group streamsendOrdertied to Track priority is appropriatePassing
{ sendOrder: track.priority }when opening the per‑group Writer aligns group stream scheduling with the Track’s priority, consistent with the IETF publisher and the subscribe side.As above, if priorities can ever come from untrusted clients, you may want to enforce a bounded, well‑defined range before using them as
sendOrder, but the wiring itself is correct.rs/moq/src/ietf/subscriber.rs (2)
288-331: Unify group closing to avoid double‑close onGroupProducer
GroupProducer::close()is invoked both:
- At the end of
run_group(producer.close()?), and- In
recv_groupon theOkpath (producer.close()?).Because both operate on clones of the same underlying producer, this can lead to a double close. If
close()is not explicitly idempotent, the second call may start returning an error and bubble up where previously it was ignored.Consider centralizing the close in one place (either only in
run_groupor only inrecv_group). One option is to treatrun_groupas the owner of the group lifecycle and drop theproducer.close()?in the_ => { ... }arm ofrecv_group, keeping the logging there but not closing again.Also applies to: 352-377, 394-399
439-446: Expires placeholder is fine for now but will be easy to forgetUsing
expires: std::time::Duration::default()with a TODO is a reasonable stop‑gap, but it effectively disables delivery timeouts for published tracks on this path.Once the delivery timeout parameter is wired through, this struct init is the key place to thread it:
- let track = Track { - name: msg.track_name.to_string(), - priority: 0, - expires: std::time::Duration::default(), // TODO parse delivery timeout parameter - } + let track = Track { + name: msg.track_name.to_string(), + priority: 0, + expires: parsed_timeout, // TODO: plumb from Publish params + }Might be worth tracking this with an issue so it doesn’t get lost.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
rs/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (55)
js/hang-demo/src/index.html(1 hunks)js/hang/src/publish/broadcast.ts(1 hunks)js/hang/src/watch/audio/source.ts(2 hunks)js/hang/src/watch/broadcast.ts(1 hunks)js/hang/src/watch/chat/message.ts(2 hunks)js/hang/src/watch/chat/typing.ts(2 hunks)js/hang/src/watch/location/peers.ts(2 hunks)js/hang/src/watch/location/window.ts(1 hunks)js/hang/src/watch/preview.ts(1 hunks)js/hang/src/watch/video/source.ts(2 hunks)js/moq-clock/src/main.ts(2 hunks)js/moq/package.json(1 hunks)js/moq/src/announced.ts(1 hunks)js/moq/src/broadcast.ts(4 hunks)js/moq/src/connection/connect.ts(3 hunks)js/moq/src/group.ts(1 hunks)js/moq/src/ietf/connection.ts(1 hunks)js/moq/src/ietf/publisher.ts(4 hunks)js/moq/src/ietf/subscriber.ts(4 hunks)js/moq/src/lite/connection.ts(2 hunks)js/moq/src/lite/publisher.ts(4 hunks)js/moq/src/lite/subscribe.ts(4 hunks)js/moq/src/lite/subscriber.ts(4 hunks)js/moq/src/lite/version.ts(1 hunks)js/moq/src/stream.ts(2 hunks)js/moq/src/track.ts(4 hunks)js/moq/src/util/promise.ts(1 hunks)rs/Cargo.toml(1 hunks)rs/hang/examples/video.rs(1 hunks)rs/hang/src/annexb/import.rs(3 hunks)rs/hang/src/catalog/location.rs(0 hunks)rs/hang/src/catalog/mod.rs(0 hunks)rs/hang/src/catalog/root.rs(4 hunks)rs/hang/src/cmaf/import.rs(4 hunks)rs/hang/src/model/group.rs(1 hunks)rs/hang/src/model/location.rs(0 hunks)rs/hang/src/model/mod.rs(0 hunks)rs/hang/src/model/track.rs(2 hunks)rs/moq-clock/src/clock.rs(3 hunks)rs/moq-clock/src/main.rs(1 hunks)rs/moq-native/examples/chat.rs(2 hunks)rs/moq-relay/src/web.rs(1 hunks)rs/moq/Cargo.toml(1 hunks)rs/moq/src/error.rs(3 hunks)rs/moq/src/ietf/publisher.rs(1 hunks)rs/moq/src/ietf/subscriber.rs(9 hunks)rs/moq/src/lite/publisher.rs(4 hunks)rs/moq/src/lite/subscribe.rs(3 hunks)rs/moq/src/lite/subscriber.rs(6 hunks)rs/moq/src/lite/version.rs(2 hunks)rs/moq/src/model/broadcast.rs(8 hunks)rs/moq/src/model/frame.rs(6 hunks)rs/moq/src/model/group.rs(6 hunks)rs/moq/src/model/track.rs(5 hunks)rs/moq/src/session.rs(2 hunks)
💤 Files with no reviewable changes (4)
- rs/hang/src/model/mod.rs
- rs/hang/src/catalog/mod.rs
- rs/hang/src/model/location.rs
- rs/hang/src/catalog/location.rs
🧰 Additional context used
🧬 Code graph analysis (32)
rs/moq/src/session.rs (2)
rs/moq/src/lite/version.rs (1)
coding(40-42)js/moq/src/lite/version.ts (2)
Version(1-5)Version(7-7)
rs/hang/src/model/group.rs (4)
rs/moq/src/coding/decode.rs (9)
decode(5-5)decode(52-58)decode(62-67)decode(71-76)decode(81-86)decode(90-99)decode(103-112)decode(116-123)decode(128-131)rs/moq/src/coding/version.rs (2)
decode(27-30)decode(51-61)rs/moq/src/coding/varint.rs (1)
decode(162-200)js/moq/src/lite/version.ts (2)
Version(1-5)Version(7-7)
js/hang/src/publish/broadcast.ts (2)
js/moq/src/broadcast.ts (1)
Broadcast(14-76)js/hang/src/publish/preview.ts (1)
Preview(11-44)
js/hang/src/watch/broadcast.ts (3)
js/hang/src/publish/priority.ts (1)
PRIORITY(3-11)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)js/hang/src/publish/broadcast.ts (1)
broadcast(73-118)
js/moq/src/stream.ts (1)
js/moq/src/ietf/control.ts (1)
Stream(65-160)
js/moq/src/lite/connection.ts (1)
js/moq/src/lite/subscribe.ts (1)
Subscribe(39-112)
rs/moq/src/lite/subscribe.rs (3)
js/moq/src/lite/version.ts (2)
Version(1-5)Version(7-7)js/moq/src/lite/subscribe.ts (6)
version(86-103)version(135-147)r(20-24)w(15-18)w(70-84)w(124-133)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)
rs/hang/src/model/track.rs (1)
js/moq/src/lite/version.ts (2)
Version(1-5)Version(7-7)
js/moq/src/lite/subscribe.ts (3)
js/moq/src/stream.ts (2)
Writer(217-303)Reader(53-214)js/moq/src/lite/version.ts (2)
Version(1-5)Version(7-7)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)
rs/moq/src/lite/subscriber.rs (4)
rs/moq/src/ietf/subscriber.rs (2)
run_subscribe(259-286)stream(342-342)rs/moq/src/lite/publisher.rs (3)
run_subscribe(179-206)stream(54-54)stream(145-145)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)js/moq/src/lite/subscribe.ts (1)
Subscribe(39-112)
js/moq-clock/src/main.ts (4)
js/hang/src/publish/broadcast.ts (2)
track(120-139)broadcast(73-118)js/hang/src/watch/location/peers.ts (1)
track(52-64)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)js/moq/src/ietf/subscriber.ts (1)
broadcast(117-159)
js/hang/src/watch/location/peers.ts (3)
js/hang/src/watch/broadcast.ts (4)
catalog(141-158)effect(77-111)effect(113-124)effect(126-139)js/hang/src/watch/chat/message.ts (1)
effect(41-68)js/hang/src/watch/chat/typing.ts (1)
effect(39-63)
rs/moq/src/ietf/subscriber.rs (2)
js/moq/src/ietf/object.ts (1)
Frame(80-140)rs/moq/src/ietf/group.rs (1)
default(92-99)
js/moq/src/ietf/subscriber.ts (4)
js/moq/src/path.ts (1)
Valid(31-31)js/moq/src/track.ts (1)
Track(16-216)js/moq/src/ietf/subscribe.ts (2)
Subscribe(10-69)Unsubscribe(158-183)js/moq/src/group.ts (1)
Group(9-104)
js/hang/src/watch/video/source.ts (4)
js/moq/src/lite/publisher.ts (2)
sub(184-209)sub(211-238)js/hang/src/watch/audio/source.ts (3)
effect(81-129)effect(131-141)effect(143-200)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)js/moq/src/ietf/subscriber.ts (1)
broadcast(117-159)
js/moq/src/lite/subscriber.ts (4)
js/moq/src/stream.ts (1)
Stream(12-49)js/moq/src/track.ts (1)
Track(16-216)js/moq/src/lite/subscribe.ts (2)
Subscribe(39-112)SubscribeOk(114-156)js/moq/src/group.ts (1)
Group(9-104)
js/hang/src/watch/location/window.ts (4)
js/hang/src/publish/broadcast.ts (3)
track(120-139)effect(55-71)broadcast(73-118)js/hang/src/watch/location/peers.ts (2)
track(52-64)effect(34-50)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)js/moq/src/ietf/subscriber.ts (1)
broadcast(117-159)
js/hang/src/watch/chat/message.ts (4)
js/hang/src/watch/location/peers.ts (2)
track(52-64)effect(34-50)js/hang/src/watch/broadcast.ts (4)
catalog(141-158)effect(77-111)effect(113-124)effect(126-139)js/hang/src/watch/chat/typing.ts (1)
effect(39-63)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)
js/hang/src/watch/audio/source.ts (3)
js/hang/src/watch/broadcast.ts (4)
catalog(141-158)effect(77-111)effect(113-124)effect(126-139)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)js/moq/src/ietf/subscriber.ts (1)
broadcast(117-159)
rs/moq/src/model/frame.rs (3)
rs/moq/src/model/broadcast.rs (5)
close(110-112)select(371-379)closed(248-254)closed(340-368)produce(36-40)rs/moq/src/model/group.rs (8)
close(86-93)close(172-179)abort(95-106)abort(186-188)abort(291-293)proxy(212-247)closed(363-369)produce(30-34)rs/moq/src/model/track.rs (7)
close(138-145)close(268-277)abort(147-154)abort(283-285)proxy(235-266)closed(451-456)produce(47-51)
rs/moq/src/model/group.rs (2)
rs/moq/src/model/frame.rs (8)
close(63-70)close(130-142)abort(72-79)abort(147-149)new(96-102)produce(13-17)proxy(164-189)closed(272-278)rs/moq/src/model/track.rs (13)
close(138-145)close(268-277)abort(147-154)abort(283-285)new(39-45)new(165-171)write_frame(224-229)produce(47-51)is_closed(279-281)proxy(235-266)clone(332-339)closed(451-456)drop(323-328)
js/moq/src/track.ts (2)
js/moq/src/group.ts (1)
Group(9-104)js/moq/src/util/promise.ts (1)
defer(9-17)
rs/hang/src/annexb/import.rs (3)
js/hang/src/watch/broadcast.ts (1)
catalog(141-158)js/hang/src/catalog/video.ts (1)
Video(96-96)js/hang/src/watch/video/source.ts (1)
renditions(292-326)
js/hang/src/watch/chat/typing.ts (3)
js/hang/src/watch/broadcast.ts (4)
catalog(141-158)effect(77-111)effect(113-124)effect(126-139)js/hang/src/watch/chat/message.ts (1)
effect(41-68)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)
js/hang/src/watch/preview.ts (2)
js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)js/moq/src/ietf/subscriber.ts (1)
broadcast(117-159)
rs/moq/src/model/track.rs (2)
rs/moq/src/model/frame.rs (8)
new(96-102)close(63-70)close(130-142)abort(72-79)abort(147-149)proxy(164-189)closed(272-278)consume(152-158)rs/moq/src/model/group.rs (14)
new(123-129)close(86-93)close(172-179)abort(95-106)abort(186-188)abort(291-293)write_frame(137-146)proxy(212-247)clone(264-271)closed(363-369)is_closed(181-183)consume(191-198)weak(249-254)drop(275-280)
js/moq/src/lite/publisher.ts (4)
js/moq/src/track.ts (1)
Track(16-216)js/moq/src/lite/subscriber.ts (1)
broadcast(106-144)js/moq/src/lite/subscribe.ts (1)
SubscribeOk(114-156)js/moq/src/stream.ts (2)
next(377-381)Writer(217-303)
rs/moq/src/lite/publisher.rs (4)
js/moq/src/lite/subscribe.ts (4)
SubscribeOk(114-156)Subscribe(39-112)version(86-103)version(135-147)rs/moq/src/model/track.rs (2)
next_group(386-425)clone(332-339)rs/moq/src/lite/priority.rs (3)
insert(56-58)insert(80-132)next(203-206)js/moq/src/lite/group.ts (1)
Group(4-33)
js/moq/src/broadcast.ts (2)
js/signals/src/index.ts (1)
Signal(28-152)js/moq/src/track.ts (1)
Track(16-216)
rs/hang/src/catalog/root.rs (3)
rs/hang/src/annexb/import.rs (1)
new(24-32)rs/hang/src/cmaf/import.rs (1)
new(61-75)js/hang/src/publish/broadcast.ts (1)
track(120-139)
rs/hang/src/cmaf/import.rs (3)
js/hang/src/watch/broadcast.ts (1)
catalog(141-158)js/hang/src/catalog/video.ts (1)
Video(96-96)js/hang/src/catalog/audio.ts (1)
Audio(54-54)
js/moq/src/ietf/publisher.ts (3)
js/moq/src/track.ts (1)
Track(16-216)js/moq/src/ietf/subscriber.ts (1)
broadcast(117-159)js/moq/src/stream.ts (1)
Writer(217-303)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Check
| interface TrackArgs { | ||
| name: string; | ||
| priority?: number; | ||
| expires?: DOMHighResTimeStamp; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Track expiration never actually starts; #expire stays undefined
The new expiration wiring around #expire and deferExpires has a structural bug:
#expireis never initialized.- Both places that call
deferExpiresare gated onif (this.#expire):appendGroup():if (this.#expire) { this.#expire.resolve(); this.#expire = deferExpires(this.expires, this.closed); } const group = new Group({ sequence: this.#max ?? 0 }); this.#expire?.promise.catch((err) => group.close(err));
writeGroup():if (this.#max && group.sequence < this.#max) { this.#expire?.promise.catch((err) => group.close(err)); } else { this.#max = group.sequence; if (this.#expire) { this.#expire.resolve(); this.#expire = deferExpires(this.expires, this.closed); } }
Given #expire starts as undefined and is only ever assigned inside if (this.#expire), it never becomes non‑undefined, so:
deferExpiresis never called.- No timeout is scheduled.
this.#expire?.promise.catch(...)never attaches, so groups are never closed due to expiration.
This effectively disables the new per‑track delivery timeout.
A minimal fix is to initialize the first DeferExpires when groups are first appended/written, and to no‑op when expires is zero (if “no timeout” is the intended semantics), e.g.:
appendGroup(): Group {
if (this.state.closed.peek()) throw new Error("track is closed");
- if (this.#expire) {
- this.#expire.resolve();
- this.#expire = deferExpires(this.expires, this.closed);
- }
+ if (this.expires > 0) {
+ if (!this.#expire) {
+ this.#expire = deferExpires(this.expires, this.closed);
+ } else {
+ this.#expire.resolve();
+ this.#expire = deferExpires(this.expires, this.closed);
+ }
+ }
const group = new Group({ sequence: this.#max ?? 0 });
this.#expire?.promise.catch((err) => group.close(err));
// ...
}and similarly in writeGroup():
writeGroup(group: Group) {
if (this.state.closed.peek()) throw new Error("track is closed");
if (this.#max && group.sequence < this.#max) {
this.#expire?.promise.catch((err) => group.close(err));
} else {
this.#max = group.sequence;
- // Start expiring older groups.
- if (this.#expire) {
- this.#expire.resolve();
- this.#expire = deferExpires(this.expires, this.closed);
- }
+ if (this.expires > 0) {
+ if (!this.#expire) {
+ this.#expire = deferExpires(this.expires, this.closed);
+ } else {
+ this.#expire.resolve();
+ this.#expire = deferExpires(this.expires, this.closed);
+ }
+ }
}
// ...
}This ensures expiration actually engages and lets you tune expires === 0 as “no timeout”.
Also applies to: 18-20, 22-25, 28-32, 49-58, 73-83, 218-237
🤖 Prompt for AI Agents
js/moq/src/track.ts around lines 10 to 14 (and also applies to ranges 18-20,
22-25, 28-32, 49-58, 73-83, 218-237): the private field #expire is never
initialized so deferExpires never runs and groups never get the expiration
handler; initialize #expire the first time a group is appended or written (i.e.,
if this.expires is falsy treat expires===0 as "no timeout" and skip
initializing), otherwise set this.#expire = deferExpires(this.expires,
this.closed) when first needed, call this.#expire.resolve() before replacing it
on subsequent resets, and always attach this.#expire.promise.catch(err =>
group.close(err)) whenever a group is created so the expiration closure will
run; ensure the logic no-ops if expires===0 to preserve "no timeout" semantics.
| let track = Track { | ||
| name: msg.track_name.to_string(), | ||
| priority: msg.subscriber_priority, | ||
| expires: std::time::Duration::default(), // TODO parse delivery timeout parameter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete implementation: expires field defaults without parsing delivery timeout.
The expires field is initialized with std::time::Duration::default() but the TODO comment indicates it should parse the delivery timeout parameter from the subscription message. Since this PR's title explicitly mentions "Add delivery timeout/expires support," this incomplete implementation could result in incorrect expiration behavior.
Please implement the parsing of the delivery timeout parameter from the subscription message, or clarify if this is intentionally deferred to a follow-up PR.
| /// Proxy all chunks and errors from the given consumer. | ||
| /// | ||
| /// This takes ownership of the frame and publishes identical chunks to the other consumer. | ||
| /// Returns an error on an unexpected close, which can happen if the [FrameProducer] is cloned. | ||
| pub async fn proxy(mut self, other: FrameConsumer) -> Result<()> { | ||
| let mut chunks = Some(other.clone()); | ||
| loop { | ||
| tokio::select! { | ||
| biased; | ||
| Some(chunk) = async { Some(chunks.as_mut()?.read_chunk().await) } => match chunk { | ||
| Ok(Some(chunk)) => self.write_chunk(chunk)?, | ||
| Ok(None) => { | ||
| chunks = None; | ||
| self.close()? | ||
| }, | ||
| Err(err) => { | ||
| self.abort(err); | ||
| break | ||
| }, | ||
| }, | ||
| Err(err) = other.closed() => { | ||
| self.abort(err); | ||
| break | ||
| }, | ||
| else => break, | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FrameProducer::proxy never returns an error but its docs still claim it does
The implementation of FrameProducer::proxy always returns Ok(()), even on upstream errors (read_chunk or closed() errors cause abort + break, not return Err). The doc comment still says:
Returns an error on an unexpected close, which can happen if the [FrameProducer] is cloned.
Consider updating the comment (or the behavior) so callers are not misled about when to expect an Err.
- /// Returns an error on an unexpected close, which can happen if the [FrameProducer] is cloned.
+ /// Aborts this frame on any upstream error and always returns `Ok(())`.
+ /// (Unexpected closes are signaled via `Error` to consumers, not via this return value.)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// Proxy all chunks and errors from the given consumer. | |
| /// | |
| /// This takes ownership of the frame and publishes identical chunks to the other consumer. | |
| /// Returns an error on an unexpected close, which can happen if the [FrameProducer] is cloned. | |
| pub async fn proxy(mut self, other: FrameConsumer) -> Result<()> { | |
| let mut chunks = Some(other.clone()); | |
| loop { | |
| tokio::select! { | |
| biased; | |
| Some(chunk) = async { Some(chunks.as_mut()?.read_chunk().await) } => match chunk { | |
| Ok(Some(chunk)) => self.write_chunk(chunk)?, | |
| Ok(None) => { | |
| chunks = None; | |
| self.close()? | |
| }, | |
| Err(err) => { | |
| self.abort(err); | |
| break | |
| }, | |
| }, | |
| Err(err) = other.closed() => { | |
| self.abort(err); | |
| break | |
| }, | |
| else => break, | |
| } | |
| } | |
| Ok(()) | |
| } | |
| /// Proxy all chunks and errors from the given consumer. | |
| /// | |
| /// This takes ownership of the frame and publishes identical chunks to the other consumer. | |
| /// Aborts this frame on any upstream error and always returns `Ok(())`. | |
| /// (Unexpected closes are signaled via `Error` to consumers, not via this return value.) | |
| pub async fn proxy(mut self, other: FrameConsumer) -> Result<()> { | |
| let mut chunks = Some(other.clone()); | |
| loop { | |
| tokio::select! { | |
| biased; | |
| Some(chunk) = async { Some(chunks.as_mut()?.read_chunk().await) } => match chunk { | |
| Ok(Some(chunk)) => self.write_chunk(chunk)?, | |
| Ok(None) => { | |
| chunks = None; | |
| self.close()? | |
| }, | |
| Err(err) => { | |
| self.abort(err); | |
| break | |
| }, | |
| }, | |
| Err(err) = other.closed() => { | |
| self.abort(err); | |
| break | |
| }, | |
| else => break, | |
| } | |
| } | |
| Ok(()) | |
| } |
🤖 Prompt for AI Agents
In rs/moq/src/model/frame.rs around lines 160 to 189, the doc comment claims
proxy "Returns an error on an unexpected close" but the implementation always
returns Ok(()) (upstream read_chunk/closed() errors call self.abort(...) and
break instead of returning Err). Update the doc comment to accurately describe
the behavior: state that proxy consumes the frame, forwards chunks, aborts the
producer on upstream errors and always returns Ok(()) (errors are handled via
abort, not via a Result Err), or alternatively change the implementation to
propagate and return Err on those upstream errors if you prefer error
propagation; pick one behavior and make the doc and implementation consistent.
| /// Insert an existing group into the track. | ||
| /// | ||
| /// This is used to insert a group that was received from the network. | ||
| /// The group will be closed with [Error::Expired] if it is active too long. | ||
| pub fn insert_group(&mut self, group: GroupProducer) -> Result<()> { | ||
| let mut result = Err(Error::Closed); // We will replace this. | ||
|
|
||
| if let Some(latest) = &state.latest { | ||
| match group.info.cmp(&latest.info) { | ||
| Ordering::Less => return false, | ||
| Ordering::Equal => return false, | ||
| Ordering::Greater => (), | ||
| } | ||
| } | ||
| let producer = group.clone(); | ||
| self.state.send_if_modified(|state| { | ||
| result = state.insert_group(producer, self.info.expires); | ||
| result.is_ok() | ||
| }); | ||
|
|
||
| state.latest = Some(group.clone()); | ||
| true | ||
| }) | ||
| let expires = result?; | ||
| web_async::spawn(self.weak().expire(group.weak(), expires)); | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Create a new group with the given sequence number. | ||
| /// | ||
| /// If the sequence number is not the latest, this method will return None. | ||
| pub fn create_group(&mut self, info: Group) -> Option<GroupProducer> { | ||
| let group = info.produce(); | ||
| self.insert_group(group.consumer).then_some(group.producer) | ||
| /// The group will be closed with [Error::Expired] if it is active too long. | ||
| pub fn create_group(&mut self, info: Group) -> Result<GroupProducer> { | ||
| let mut result = Err(Error::Closed); // We will replace this. | ||
|
|
||
| self.state.send_if_modified(|state| { | ||
| result = state.create_group(info, self.info.expires); | ||
| result.is_ok() | ||
| }); | ||
|
|
||
| let (producer, expires) = result?; | ||
| web_async::spawn(self.weak().expire(producer.weak(), expires)); | ||
| Ok(producer) | ||
| } | ||
|
|
||
| /// Create a new group with the next sequence number. | ||
| pub fn append_group(&mut self) -> GroupProducer { | ||
| let mut producer = None; | ||
| /// | ||
| /// The group will eventually be closed with [Error::Expired] if active too long. | ||
| pub fn append_group(&mut self) -> Result<GroupProducer> { | ||
| let mut result = Err(Error::Closed); // We will replace this. | ||
|
|
||
| self.state.send_if_modified(|state| { | ||
| assert!(state.closed.is_none()); | ||
| result = state.append_group(); | ||
| result.is_ok() | ||
| }); | ||
|
|
||
| let sequence = state.latest.as_ref().map_or(0, |group| group.info.sequence + 1); | ||
| let group = Group { sequence }.produce(); | ||
| state.latest = Some(group.consumer); | ||
| producer = Some(group.producer); | ||
| let producer = result?; | ||
| web_async::spawn(self.weak().expire(producer.weak(), self.info.expires)); | ||
| Ok(producer) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid unwrap() in TrackProducer::write_frame to prevent panics under races/expiry
TrackProducer::write_frame currently does:
let mut group = self.append_group()?;
group.write_frame(frame.into()).unwrap();
group.close().unwrap();Both group.write_frame and group.close return Result and can fail—for example if:
- The track or group was closed/aborted concurrently, or
- The expiry logic races and aborts the group.
Panic here would bubble up as a crash instead of a recoverable Error.
pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) -> Result<()> {
- let mut group = self.append_group()?;
- group.write_frame(frame.into()).unwrap();
- group.close().unwrap();
- Ok(())
+ let mut group = self.append_group()?;
+ group.write_frame(frame.into())?;
+ group.close()?;
+ Ok(())
}Also applies to: 223-229
🤖 Prompt for AI Agents
In rs/moq/src/model/track.rs around lines 173-221 (and also update similar code
at 223-229), the examples call group.write_frame(...).unwrap() and
group.close().unwrap(), which can panic if the group or track is closed/expired
concurrently; replace these unwraps with proper Result handling so failures are
returned as Err rather than panicking: propagate the error with the ? operator
(or map it into the function's Error type) after calling write_frame, check and
propagate the result of close (or attempt close and ignore only if safe) so all
fallible calls return a Result to the caller instead of calling unwrap.
Track‑level expiry and weak handles are a good fit; double‑check zero‑duration semantics
The combination of:
TrackProducer::insert_group/create_group/append_groupspawningTrackProducerWeak::expirewith aGroupProducerWeak, andTrackProducerWeak::expirewaiting until the group is no longer latest (or the track is closed) before sleeping forexpiresand aborting/removing the group,
is a sensible implementation of “expire N ms after a newer group appears”.
One behavior to confirm: when Track.info.expires is zero (the default from Track::new), groups will be aborted immediately once:
- There is a newer group for appended groups (via
state.next > sequence + 1), or - The track is closed.
If the intended meaning of expires == Duration::default() is “no expiry”, you may want to treat zero specially (e.g., skip spawning expiry tasks when expires.is_zero()), otherwise the current behavior is “expire as soon as there’s a successor or the track closes”.
Also applies to: 342-368
🤖 Prompt for AI Agents
rs/moq/src/model/track.rs lines 173-221 (also apply same change at 342-368): the
current code always spawns an expiry task using self.info.expires which means a
zero Duration (the default) will cause immediate expiry behavior; to fix this,
treat zero as “no expiry” by checking if self.info.expires.is_zero() and only
call web_async::spawn(self.weak().expire(...)) when expires is non-zero (or
alternatively change the expire routine to return early when given a zero
duration), and apply the same conditional to insert_group, create_group,
append_group and the other similar region so expiry tasks are not spawned for
zero durations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
rs/moq/src/model/track.rs (2)
34-36: Clarify and possibly special‑case zero‑durationexpiressemanticsRight now
Track::newsetsexpirestoDuration::default()(zero), and zero propagates through:
TrackState::insert_group/create_group/append_groupas the base expiry used to compute per‑groupexpires.TrackProducer::insert_group/create_group/append_group, which always spawnTrackProducerWeak::expire(...)using the (possibly zero) expiry.TrackProducerWeak::expire, which waits until the group is no longer latest (or the track is closed) and then doestokio::time::sleep(expires).awaitfollowed bygroup.abort(Error::Expired)and removal fromstate.groups.TrackConsumer::expires, which treatsexpires >= self.info.expiresas a no‑op; when the track was created with the default zero, any positiveexpiresis considered “not stricter” and therefore ignored.Taken together this means:
- With
Track::new(defaultexpires = 0), groups will be expired immediately once they’re no longer the latest group or the track is closed (subject to scheduling), and- You cannot use
TrackConsumer::expires(...)to get a stricter expiry for the common case where the source track was created with the defaultexpires = 0, sincenew_expires >= 0will always short‑circuit and returnself.This behavior is somewhat surprising for a default and likely not what callers expect when they don’t explicitly opt into expiry, especially given tests like
test_track_basic_write_read,test_track_multiple_consumers, andtest_track_close_flushes_pendingthat appear to assume “normal” caching semantics for a default track. It also matches the earlier review concern that zero‑duration expires acts as “expire as soon as there’s a successor/close” rather than “no expiry”.Consider:
- Deciding explicitly whether
expires == Duration::ZEROshould mean “no expiry” vs “immediate expiry once superseded/closed”, and documenting that on the field.- If “no expiry” is intended:
- Short‑circuit
TrackState::insert_groupand the call‑sites so that a zeroTrack.info.expiresdoes not cause immediateError::Expiredfor older inserted groups.- Gate the spawn points in
TrackProducer::{insert_group,create_group,append_group}and/or early‑return inTrackProducerWeak::expirewhenexpires.is_zero()to avoid starting per‑group expiry tasks at all.- Adjust
TrackConsumer::expiresso that a non‑zeroexpiresis considered “stricter than infinite” whenself.info.expires.is_zero(), instead of being treated as a no‑op.If “immediate expiry” is intentional for zero, it’s worth calling that out explicitly in the docs and adding tests that assert the current behavior so it doesn’t regress accidentally.
Also applies to: 43-44, 66-109, 173-221, 342-369, 428-449
223-229: Replaceunwrap()s inTrackProducer::write_framewith proper error propagation
write_framecurrently does:let mut group = self.append_group()?; group.write_frame(frame.into()).unwrap(); group.close().unwrap(); Ok(())Both
group.write_frameandgroup.closecan fail (e.g., if the group/track is closed or expired concurrently). Usingunwrap()will panic the process under those races instead of surfacing a recoverable error to the caller—this is especially problematic in networked, concurrent use.Propagate the errors instead:
pub fn write_frame<B: Into<bytes::Bytes>>(&mut self, frame: B) -> Result<()> { - let mut group = self.append_group()?; - group.write_frame(frame.into()).unwrap(); - group.close().unwrap(); - Ok(()) + let mut group = self.append_group()?; + group.write_frame(frame.into())?; + group.close()?; + Ok(()) }
🧹 Nitpick comments (4)
js/moq/src/track.ts (2)
10-14: Clarifyexpiressemantics and type to avoid misinterpretation
expiresis exposed asDOMHighResTimeStamponTrackand passed straight intosetTimeoutas a delay. That assumes it’s a relative TTL in milliseconds, but the name/type suggest it might instead be an absolute timestamp coming from the protocol.If
expiresis an absolute time, this will behave incorrectly (treating it as a delay). If it is a TTL, the current naming and type make that easy to misread.Consider:
- Documenting clearly (in comments or type alias) whether this is “TTL in ms” vs “absolute timestamp”.
- Encoding that in the type, e.g.
ttlMs: numberor a dedicated alias, and possibly treating0explicitly as either “no timeout” or “expire immediately” so call sites are unambiguous.Also applies to: 18-20, 28-32
213-233: MakedeferExpiresAPI reflect its behavior and guard ondelay
deferExpires’sresolvefunction is actually “start the expiry timer”, not “resolve the promise”:const start = () => { const timeout = setTimeout(() => d.resolve(new Error("expired")), delay); closed.then((err) => { if (err) d.resolve(err); clearTimeout(timeout); }); }; return { promise: d.promise, resolve: start };That’s a bit confusing when read alongside
defer, whereresolvereally does resolve the promise.To improve clarity and avoid accidental misuse:
- Rename the field to something like
start(and the type accordingly), so call sites read asthis.#expire?.start()rather thanresolve().- Optionally no‑op when
delay <= 0if0(or negative) is meant to mean “no timeout”, and only construct/return aDeferExpireswhen expiry is actually enabled.For example:
-type DeferExpires = { - promise: Promise<Error | undefined>; - resolve: () => void; -}; +type DeferExpires = { + promise: Promise<Error | undefined>; + start: () => void; +}; -function deferExpires(delay: DOMHighResTimeStamp, closed: Promise<Error | undefined>): DeferExpires { +function deferExpires(delay: DOMHighResTimeStamp, closed: Promise<Error | undefined>): DeferExpires { const d = defer<Error | undefined>(); - const start = () => { + const start = () => { const timeout = setTimeout(() => d.resolve(new Error("expired")), delay); closed.then((err) => { if (err) d.resolve(err); clearTimeout(timeout); }); }; return { promise: d.promise, - resolve: start, + start, }; }Call sites would then use
#expire?.start()instead ofresolve(), making intent much clearer.rs/moq/src/model/track.rs (2)
74-81: Avoid panicking on duplicate group sequences inTrackState::insert_group
insert_groupcurrently uses:assert!( !self .groups .iter() .any(|(other, _)| other.info.sequence == group.info.sequence), "group already exists" );Given that group sequences can ultimately be influenced by remote peers or higher layers, asserting here turns a protocol violation or bug into a hard process panic. It may be safer to treat duplicates as a normal error path (e.g., return a dedicated
Error::DuplicateGroupor reuse an existing variant) rather than aborting the whole process.
509-849: Test coverage is strong; consider adding explicit zero‑expiresbehaviorsThe new tests cover basic read/write, single‑frame helper, explicit
create_group, abort, multiple consumers, expiration timing, immediate expiration of overly old groups, out‑of‑order groups, close‑with‑pending, explicitinsert_group, proxying, proxy abort, and theexpiresmodifier. That’s a solid matrix around the new APIs and lifecycle logic.Given the subtle semantics around
expires == Duration::ZEROin the main code, it would be helpful to add one or two focused tests that pin down the expected behavior for:
- A default
Track::new("...")(zero expiry) as new groups arrive and the track closes.- Using
TrackConsumer::expires(...)against a track whoseTrack.info.expiresis zero.That would make the intent around the default case explicit and prevent regressions once you decide how zero should behave.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
js/moq/src/track.ts(4 hunks)rs/moq/src/model/track.rs(5 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
rs/moq/src/model/track.rs (2)
rs/moq/src/model/frame.rs (8)
new(96-102)close(63-70)close(130-142)abort(72-79)abort(147-149)proxy(164-189)closed(272-278)consume(152-158)rs/moq/src/model/group.rs (14)
new(123-129)close(86-93)close(172-179)abort(95-106)abort(186-188)abort(291-293)write_frame(137-146)proxy(212-247)clone(264-271)closed(363-369)is_closed(181-183)consume(191-198)weak(249-254)drop(275-280)
js/moq/src/track.ts (3)
js/signals/src/index.ts (2)
Signal(28-152)closed(546-548)js/moq/src/group.ts (1)
Group(9-104)js/moq/src/util/promise.ts (1)
defer(9-17)
🔇 Additional comments (3)
js/moq/src/track.ts (1)
22-25: Review comment contains an incorrect suggested fix; the.start()method referenced doesn't existThe review correctly identifies that the newest group's expiration timer never starts (only previous generations' timers are started when a new generation arrives). However, the suggested fix is unworkable:
The
DeferExpiresinterface only haspromiseandresolveproperties—there is no.start()method. More importantly, theresolveproperty IS already the function that starts the timer (line 228:resolve: start). Callingthis.#expire.resolve()already invokes the timer.The actual issue is that newly created
DeferExpiresinstances never haveresolve()called on them, so their timers never start. A working fix would callresolve()immediately on the newly created#expire, not a non-existentstart()method. For example:this.#expire = deferExpires(this.expires, this.closed); if (this.expires > 0) { this.#expire.resolve(); // Start the timer for this new group }Likely an incorrect or invalid review comment.
rs/moq/src/model/track.rs (2)
268-277: Drop/clone semantics forTrackProducerlook consistent with group/frame patternsThe
refs: Arc<AtomicUsize>scheme with:
Clonedoingfetch_add(1, Relaxed)and sharinginfo,state, andrefs, andDropdoingfetch_sub(1, Relaxed)and callingabort(Error::Dropped)only whenrefs == 1 && !self.is_closed()matches the existing group/frame patterns and ensures the track is aborted exactly once when the last producer is dropped without an explicit
close/abort. This provides a reasonable safety net against leaked producers without interfering with explicit lifecycle management.Also applies to: 322-340
379-381:TrackConsumer::next_group/seenlogic appears sound for out‑of‑order and multi‑consumer useThe combination of:
seen: HashSet<u64>per consumer,- waiting on
state.groups.back()becoming “new to this consumer” or the track closing,- periodically pruning
seenwhen it grows to >4× the number of active groups, and- iterating
state.groupsin insertion order to return the first unseen groupgives you:
- Out‑of‑order delivery (bounded only by arrival order), as required.
- Per‑consumer delivery semantics where clones created before consumption see all groups, and clones created after consumption inherit the parent’s
seenset (as documented).- Reasonable control of
seenmemory growth.The error/close handling (
state.closedshort‑circuit and finalexpect("should be closed")) also looks correct for bothOk(())and error closures.Also applies to: 384-426
Replaces the 2 group limit on the Rust side with N groups. Groups are cached/transmitted based on an expiration value sent on the wire. It's not a perfect implementation, but it's a start.
Summary by CodeRabbit
New Features
Bug Fixes
Chores
Breaking Changes