Fix requestSnapshot publish ordering#4282
Conversation
commit: |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4282 +/- ##
===========================================
+ Coverage 58.64% 70.21% +11.56%
===========================================
Files 218 53 -165
Lines 22112 7007 -15105
Branches 5698 1981 -3717
===========================================
- Hits 12968 4920 -8048
+ Misses 9140 2084 -7056
+ Partials 4 3 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
msfstef
left a comment
There was a problem hiding this comment.
I believe that this change breaks sequential processing of SSE messages, and that either way if we want snapshot messages to make it into processing out of band we should make the API explicit about that I think
| // Tracks when subscriber callbacks are actively being delivered from | ||
| // #messageChain. requestSnapshot can inject a nested batch from inside a | ||
| // subscriber; in that reentrant case #publish uses this as an intentional | ||
| // escape hatch to deliver the nested snapshot batch immediately rather than | ||
| // queueing it behind the subscriber that is awaiting it. |
There was a problem hiding this comment.
| // Tracks when subscriber callbacks are actively being delivered from | |
| // #messageChain. requestSnapshot can inject a nested batch from inside a | |
| // subscriber; in that reentrant case #publish uses this as an intentional | |
| // escape hatch to deliver the nested snapshot batch immediately rather than | |
| // queueing it behind the subscriber that is awaiting it. |
I think we should try to avoid agent comment-creep - the flag's function is fairly clear with the comment left when used in code further down
| if (this.#isPublishing) { | ||
| return deliver() | ||
| } |
There was a problem hiding this comment.
This feels a bit odd - we have a message chain promise to serialize message processing, which gets bypassed if you publish messages while a batch of messages is being processed.
Effectively, anyone calling this.#publish either ends up on the message chain, or if the message chain is executing it gets published out of order. In fact, since SSE's onmessage doesn't await the call, I would actually expect for batches of messages from the same sse connection to end up being processed/published out of order.
I'd much prefer if we could explicitly specify an API parameter to skip the strict ordering such that the requestSnapshot call in particular avoids it
There was a problem hiding this comment.
Just seeing this, turns out we have the same concern :)
| // behind the currently-running subscriber callback, which would deadlock | ||
| // when requestSnapshot awaits publication. | ||
| if (this.#isPublishing) { | ||
| return deliver() |
There was a problem hiding this comment.
Since we call deliver() without awaiting i think we can still get "wrong" execution order.
Concrete scenario with two subscribers A and B:
- Batch M1 arrives.
#isPublishingflips to true, outerdeliver()starts. Promise.allspins upcallbackA(M1)andcallbackB(M1)concurrently. Both run up to their first await.- A awaits
requestSnapshot(). B is still mid-await processing M1 (e.g. an async write, a fetch, awaiting downstream state). requestSnapshot()calls#onMessages(snapshot)→#publish(snapshot)→ reentrant branch fires →deliver()invokes bothcallbackA(snapshot)andcallbackB(snapshot)immediately.- B's callback is now re-entered with the snapshot batch before its M1 invocation has returned.
So the per-subscriber invariant "my callback won't be re-entered while a previous invocation is in flight" is broken for any bystander subscriber whose M1 callback is still awaiting something when the reentrant publish fires. The messageChain-level ordering is preserved (next queued batch still waits), but B can observe M1-start → snapshot-start → snapshot-end → M1-end interleaving in its own callback.
In practice this may be fine — if subscribers are effectively stateless across awaits, the window is invisible. But a subscriber that mutates this.something across an await in its callback could see corruption.
Fix
requestSnapshot()so callers can safely read subscribed state immediately after awaiting it.Previously,
requestSnapshot()fetched and injected snapshot data into the stream but did not await the async subscriber publication chain. This meant consumers such as TanStack DB on-demand loading could observe stale state immediately afterawait requestSnapshot().Root Cause
requestSnapshot()called#onMessages(...)without awaiting it. That method publishes through the stream’s serialized subscriber chain, so async subscribers could still be processing the injected snapshot batch afterrequestSnapshot()had already resolved.Adding the await exposed a second edge case: if a subscriber reentrantly called and awaited
requestSnapshot(), the nested publish could be appended behind the currently-running subscriber callback, causing a deadlock.Approach
#onMessages(...)inrequestSnapshot()so the promise resolves only after the injected snapshot batch has been delivered to subscribers.#publish()reentrant-safe by delivering nested publishes immediately when already inside subscriber delivery, avoiding self-deadlock.Shapevisibility, and reentrantrequestSnapshot()calls.Key Invariants
await requestSnapshot()means subscriber callbacks for the injected snapshot batch have completed.snapshot-endandsubset-endare delivered beforerequestSnapshot()resolves.requestSnapshot()calls from subscriber callbacks do not deadlock.Non-goals
Trade-offs
The fix keeps the main publish path serialized while special-casing reentrant delivery to avoid deadlock. Throwing on reentrant
requestSnapshot()would have avoided the deadlock too, but it would introduce a new API restriction. Supporting it is safer and more ergonomic for subscribers.Verification
Results:
Also red/green verified the reentrant regression: the new reentrant test fails with the old chained publish implementation and passes with the reentrant-safe publish fix.
Files changed
packages/typescript-client/src/client.tsrequestSnapshot().#publish()reentrant-safe.packages/typescript-client/test/stream.test.tsrequestSnapshot()regression coverage.packages/typescript-client/test/client.test.tsawait requestSnapshot()are verified..changeset/fix-requestsnapshot-publish-ordering.md@electric-sql/client.