-
Notifications
You must be signed in to change notification settings - Fork 130
IETF: Joining fetch is gross (JS) #667
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis pull request refactors the IETF MOQ protocol implementation to support fetch operations. Changes introduce new message types (FetchHeader, FetchObject, GroupHeader, GroupObject) and restructure encoding and decoding logic. The Subscriber and Publisher classes are updated to handle fetch requests and responses. A new GroupOrder type replaces numeric group ordering. A Location class is added for encoding and decoding object positions. The Reader class gains a peek method for buffer inspection. Minor logging and error message improvements are included. Pre-merge checks and finishing touches❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
js/moq/src/ietf/connection.ts(3 hunks)js/moq/src/ietf/fetch.ts(7 hunks)js/moq/src/ietf/group.ts(6 hunks)js/moq/src/ietf/index.ts(0 hunks)js/moq/src/ietf/location.ts(1 hunks)js/moq/src/ietf/publish.ts(5 hunks)js/moq/src/ietf/publisher.ts(4 hunks)js/moq/src/ietf/subscriber.ts(10 hunks)js/moq/src/stream.ts(1 hunks)rs/moq-clock/src/main.rs(1 hunks)rs/moq/src/error.rs(1 hunks)
💤 Files with no reviewable changes (1)
- js/moq/src/ietf/index.ts
🧰 Additional context used
📓 Path-based instructions (1)
rs/**/src/**/*.rs
📄 CodeRabbit inference engine (CLAUDE.md)
Write Rust tests integrated within source files (unit tests alongside code).
Files:
rs/moq/src/error.rsrs/moq-clock/src/main.rs
🧠 Learnings (1)
📚 Learning: 2025-09-11T20:16:03.134Z
Learnt from: CR
Repo: kixelated/moq PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-09-11T20:16:03.134Z
Learning: Applies to rs/moq-relay/** : In the CDN/relay (moq-relay), do not include application logic, media codecs, or track-specific details; keep it transport-agnostic.
Applied to files:
js/moq/src/ietf/subscriber.ts
🧬 Code graph analysis (8)
js/moq/src/ietf/connection.ts (2)
js/moq/src/ietf/fetch.ts (4)
FetchOk(129-168)FetchError(170-203)FetchCancel(205-230)FetchHeader(232-249)js/moq/src/ietf/group.ts (1)
GroupHeader(44-106)
js/moq/src/ietf/publish.ts (1)
js/moq/src/ietf/group.ts (1)
GroupOrder(3-31)
js/moq/src/ietf/publisher.ts (2)
js/moq/src/ietf/group.ts (2)
GroupHeader(44-106)GroupObject(110-170)js/moq/src/ietf/fetch.ts (5)
Fetch(36-127)FetchType(11-15)FetchType(17-34)FetchError(170-203)FetchCancel(205-230)
js/moq/src/ietf/group.ts (3)
js/moq/src/ietf/fetch.ts (8)
w(51-72)w(144-150)w(183-187)w(214-216)r(82-126)r(160-167)r(197-202)r(226-229)js/moq/src/ietf/publish.ts (4)
w(43-59)w(125-129)r(69-89)r(139-144)js/moq/src/stream.ts (2)
Writer(222-308)Reader(53-219)
js/moq/src/ietf/location.ts (2)
js/moq/src/ietf/fetch.ts (8)
w(51-72)w(144-150)w(183-187)w(214-216)r(82-126)r(160-167)r(197-202)r(226-229)js/moq/src/stream.ts (2)
Writer(222-308)Reader(53-219)
js/moq/src/ietf/fetch.ts (5)
js/moq/src/path.ts (1)
Valid(31-31)js/moq/src/ietf/location.ts (1)
Location(3-22)js/moq/src/ietf/group.ts (1)
GroupOrder(3-31)js/moq/src/stream.ts (2)
Writer(222-308)Reader(53-219)js/moq/src/ietf/parameters.ts (1)
Parameters(8-109)
rs/moq-clock/src/main.rs (1)
rs/moq-native/src/server.rs (2)
url(261-266)url(289-291)
js/moq/src/ietf/subscriber.ts (7)
js/moq/src/track.ts (1)
Track(9-190)js/moq/src/group.ts (1)
Group(9-104)js/moq/src/util/error.ts (1)
error(2-4)js/moq/src/ietf/subscribe.ts (2)
SubscribeOk(71-120)Subscribe(10-69)js/moq/src/ietf/fetch.ts (8)
Fetch(36-127)FetchType(11-15)FetchType(17-34)FetchCancel(205-230)FetchHeader(232-249)FetchObject(251-315)FetchOk(129-168)FetchError(170-203)js/moq/src/ietf/group.ts (3)
GroupOrder(3-31)GroupHeader(44-106)GroupObject(110-170)js/moq/src/stream.ts (2)
Reader(53-219)done(207-210)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Check
🔇 Additional comments (5)
rs/moq-clock/src/main.rs (1)
49-49: LGTM! Improved log readability.Using Display formatting (
%) for the URL makes the log output cleaner and more user-friendly compared to Debug formatting.rs/moq/src/error.rs (1)
59-59: Change is justified by broader WrongSize usage.The verification confirms that
WrongSizeis used for size-related validation beyond frames—specifically in protocol message setup validation (session.rs lines 68, 120, 208) and frame chunk reading (subscriber.rs files). The change from "wrong frame size" to "wrong size" appropriately reflects this broader applicability and does not hinder debugging since the error code remains unchanged at code 14.js/moq/src/stream.ts (1)
135-138: Peek implementation fits the Reader contract
Non-destructive access viapeek()keeps buffer semantics consistent withread()/#slice(), so downstream decoders can safely inspect headers before consuming bytes. Nicely done.js/moq/src/ietf/location.ts (1)
12-21: u62-based location encoding looks solid
Location.encode/decodemirror the surrounding message contract and keep group/object identifiers lossless in bigint space.js/moq/src/ietf/publish.ts (1)
48-85: Nice reuse ofGroupOrderhelpers
Delegating encode/decode toGroupOrderkeeps the wire format in sync with the new enum wrapper and avoids duplicating the switch logic here.
| await Namespace.encode(w, this.fetchType.namespace); | ||
| await w.string(this.fetchType.track); | ||
| this.fetchType.start.encode(w); | ||
| this.fetchType.end.encode(w); | ||
| } else if (this.fetchType.type === FetchType.Relative) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Await the Location encodes in Fetch.#encode.
Line 59 and Line 60 call Location.encode(...) without awaiting, so any write failure becomes an unhandled rejection and the message may finish encoding before those coordinates are on the wire. Please await both calls to keep the frame deterministic and error-safe.
- this.fetchType.start.encode(w);
- this.fetchType.end.encode(w);
+ await this.fetchType.start.encode(w);
+ await this.fetchType.end.encode(w);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await Namespace.encode(w, this.fetchType.namespace); | |
| await w.string(this.fetchType.track); | |
| this.fetchType.start.encode(w); | |
| this.fetchType.end.encode(w); | |
| } else if (this.fetchType.type === FetchType.Relative) { | |
| await Namespace.encode(w, this.fetchType.namespace); | |
| await w.string(this.fetchType.track); | |
| await this.fetchType.start.encode(w); | |
| await this.fetchType.end.encode(w); | |
| } else if (this.fetchType.type === FetchType.Relative) { |
🤖 Prompt for AI Agents
In js/moq/src/ietf/fetch.ts around lines 57 to 61, the calls to
this.fetchType.start.encode(w) and this.fetchType.end.encode(w) are not awaited
which can cause unhandled rejections and non-deterministic framing; update the
method to await both Location.encode calls (i.e., await
this.fetchType.start.encode(w) and await this.fetchType.end.encode(w)) so write
failures propagate correctly and encoding completes deterministically.
| await w.u62(this.requestId); | ||
| await this.groupOrder.encode(w); | ||
| await w.bool(this.endOfTrack); | ||
| this.endLocation.encode(w); | ||
| await w.u53(0); // no parameters |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Await endLocation.encode inside FetchOk.#encode.
Location.encode is async; Line 148 invokes it without await, which drops any rejection on the floor and allows the outer encode to resolve before the location bytes are flushed. Await it so failures propagate and the frame stays well-ordered.
- this.endLocation.encode(w);
+ await this.endLocation.encode(w);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| await w.u62(this.requestId); | |
| await this.groupOrder.encode(w); | |
| await w.bool(this.endOfTrack); | |
| this.endLocation.encode(w); | |
| await w.u53(0); // no parameters | |
| await w.u62(this.requestId); | |
| await this.groupOrder.encode(w); | |
| await w.bool(this.endOfTrack); | |
| await this.endLocation.encode(w); | |
| await w.u53(0); // no parameters |
🤖 Prompt for AI Agents
In js/moq/src/ietf/fetch.ts around lines 145 to 149, the call to
this.endLocation.encode(w) is missing an await, which allows its async rejection
to be lost and permits the outer encode to finish before location bytes are
flushed; update the code to await this.endLocation.encode(w) so the method waits
for completion and any errors propagate, keeping frame ordering correct.
| if (!flags.hasEnd && status === 3) { | ||
| return new GroupObject(delta); | ||
| } | ||
|
|
||
| throw new Error(`Unsupported object status: ${status}`); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix GROUP_END handling when hasEnd is true
GroupObject.decode only accepts status === 3 (GROUP_END) when flags.hasEnd is false, but Publisher.#runGroup now sets hasEnd: true while still emitting a GROUP_END object. Every stream end therefore throws Unsupported object status: 3, breaking fetch/subscribe flows. Allow GROUP_END regardless of hasEnd (or stop emitting it when hasEnd is true). A minimal fix is:
- if (!flags.hasEnd && status === 3) {
- return new GroupObject(delta);
- }
+ if (status === GROUP_END) {
+ return new GroupObject(delta);
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (!flags.hasEnd && status === 3) { | |
| return new GroupObject(delta); | |
| } | |
| throw new Error(`Unsupported object status: ${status}`); | |
| if (status === 3) { | |
| return new GroupObject(delta); | |
| } | |
| throw new Error(`Unsupported object status: ${status}`); |
🤖 Prompt for AI Agents
In js/moq/src/ietf/group.ts around lines 164 to 168, GroupObject.decode
currently only returns a GROUP_END (status === 3) when flags.hasEnd is false
which causes an error when Publisher.#runGroup emits GROUP_END with hasEnd:
true; change the branch so GROUP_END (status === 3) is accepted regardless of
flags.hasEnd (i.e., treat status === 3 as a GroupObject return unconditionally),
leaving the alternative logic for other statuses unchanged; update or add a unit
test to cover decoding a GROUP_END when flags.hasEnd is true to prevent
regressions.
| this.#subscribes.delete(requestId); | ||
| this.#subscribeCallbacks.delete(requestId); | ||
| this.#fetches.delete(requestId); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete the fetch entry by fetchRequestId.
Line 184 tries to clean up #fetches, but it passes the subscribe request id instead of the fetch request id we stored earlier. When the join fetch fails before an object stream arrives, that entry is left behind, causing leaks and stale state for future FETCH_* control messages. Delete using fetchRequestId.
- this.#fetches.delete(requestId);
+ this.#fetches.delete(fetchRequestId);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| this.#subscribes.delete(requestId); | |
| this.#subscribeCallbacks.delete(requestId); | |
| this.#fetches.delete(requestId); | |
| } | |
| this.#subscribes.delete(requestId); | |
| this.#fetches.delete(fetchRequestId); | |
| } |
🤖 Prompt for AI Agents
In js/moq/src/ietf/subscriber.ts around lines 182 to 184, the cleanup deletes
this.#fetches using the subscribe requestId instead of the fetchRequestId that
was stored for the JOIN_FETCH; change the delete to use fetchRequestId (i.e.
this.#fetches.delete(fetchRequestId)) and ensure the fetchRequestId variable
from the earlier JOIN_FETCH handling is in scope so the correct fetch entry is
removed to prevent leaks/stale state.
| group = new Group(frame.groupId); | ||
| track.writeGroup(group); | ||
| } else if (group.sequence !== frame.groupId) { | ||
| throw new Error(`fetch returned multiple groups: ${group.sequence} !== ${frame.groupId}`); | ||
| } | ||
|
|
||
| if (frame.objectId !== nextObjectId) { | ||
| throw new Error(`fetch returned object ID out of order: ${frame.objectId} !== ${nextObjectId}`); | ||
| } | ||
|
|
||
| if (frame.subgroupId !== 0) { | ||
| throw new Error(`fetch returned subgroup ID: ${frame.subgroupId}`); | ||
| } | ||
|
|
||
| nextObjectId++; | ||
|
|
||
| track.writeFrame(frame.payload); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Route joining fetch payloads into the Group, not Track.writeFrame.
Line 393 currently calls track.writeFrame(...), which manufactures brand-new groups for every fetched object and drops the Group you just created. That breaks the hand-off when handleGroup resumes the same sequence. Push the payload into the existing Group instead (guard for safety so TypeScript is happy).
nextObjectId++;
-
- track.writeFrame(frame.payload);
+ if (!group) {
+ throw new Error("fetch group was not initialized");
+ }
+ group.writeFrame(frame.payload);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| group = new Group(frame.groupId); | |
| track.writeGroup(group); | |
| } else if (group.sequence !== frame.groupId) { | |
| throw new Error(`fetch returned multiple groups: ${group.sequence} !== ${frame.groupId}`); | |
| } | |
| if (frame.objectId !== nextObjectId) { | |
| throw new Error(`fetch returned object ID out of order: ${frame.objectId} !== ${nextObjectId}`); | |
| } | |
| if (frame.subgroupId !== 0) { | |
| throw new Error(`fetch returned subgroup ID: ${frame.subgroupId}`); | |
| } | |
| nextObjectId++; | |
| track.writeFrame(frame.payload); | |
| } | |
| group = new Group(frame.groupId); | |
| track.writeGroup(group); | |
| } else if (group.sequence !== frame.groupId) { | |
| throw new Error(`fetch returned multiple groups: ${group.sequence} !== ${frame.groupId}`); | |
| } | |
| if (frame.objectId !== nextObjectId) { | |
| throw new Error(`fetch returned object ID out of order: ${frame.objectId} !== ${nextObjectId}`); | |
| } | |
| if (frame.subgroupId !== 0) { | |
| throw new Error(`fetch returned subgroup ID: ${frame.subgroupId}`); | |
| } | |
| nextObjectId++; | |
| if (!group) { | |
| throw new Error("fetch group was not initialized"); | |
| } | |
| group.writeFrame(frame.payload); | |
| } |
🤖 Prompt for AI Agents
In js/moq/src/ietf/subscriber.ts around lines 377 to 394, the code incorrectly
calls track.writeFrame(frame.payload) which creates new Group instances and
drops the Group you just created; instead push the payload into the existing
Group. Replace the track.writeFrame call with a guarded call to the current
group (e.g. check that group is defined and of the expected type, then call
group.writeFrame(frame.payload); otherwise throw or assert) so TypeScript is
satisfied and the payload is appended to the existing Group rather than creating
a new one.
| handleFetchOk(msg: FetchOk) { | ||
| const fetch = this.#fetches.get(msg.requestId); | ||
| if (!fetch) { | ||
| throw new Error(`unknown fetch: requestId=${msg.requestId}`); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t tear down the connection when FetchOk arrives after the data stream.
handleFetch immediately removes the fetch from #fetches. When the peer follows up with FETCH_OK, Line 406 now throws, which bubbles out of the control loop and kills the connection. Treat the absence as “already processed” instead of an error.
- const fetch = this.#fetches.get(msg.requestId);
- if (!fetch) {
- throw new Error(`unknown fetch: requestId=${msg.requestId}`);
- }
+ const fetch = this.#fetches.get(msg.requestId);
+ if (!fetch) {
+ return; // stream already completed and cleaned up
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| handleFetchOk(msg: FetchOk) { | |
| const fetch = this.#fetches.get(msg.requestId); | |
| if (!fetch) { | |
| throw new Error(`unknown fetch: requestId=${msg.requestId}`); | |
| } | |
| handleFetchOk(msg: FetchOk) { | |
| const fetch = this.#fetches.get(msg.requestId); | |
| if (!fetch) { | |
| return; // stream already completed and cleaned up | |
| } |
🤖 Prompt for AI Agents
In js/moq/src/ietf/subscriber.ts around lines 404 to 408, handleFetchOk
currently throws when the fetch is not found which can occur if the fetch was
already removed after the data stream; change the behavior to treat a missing
fetch as already processed instead of an error — replace the throw with a no-op
or a debug log and return early so the control loop does not receive an
exception and the connection is not torn down.
I hate it but I thiiiink this is correct.
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Refactor