Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-requestsnapshot-publish-ordering.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/client': patch
---

Fix `requestSnapshot()` so it resolves only after the injected snapshot batch has been delivered to subscribers, including async and reentrant subscriber paths.
34 changes: 27 additions & 7 deletions packages/typescript-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,12 @@ export class ShapeStream<T extends Row<unknown> = Row>
#tickPromiseResolver?: () => void
#tickPromiseRejecter?: (reason?: unknown) => void
#messageChain = Promise.resolve<void[]>([]) // promise chain for incoming messages
// Tracks when subscriber callbacks are actively being delivered from
// #messageChain. requestSnapshot can inject a nested batch from inside a
// subscriber; in that reentrant case #publish uses this as an intentional
// escape hatch to deliver the nested snapshot batch immediately rather than
// queueing it behind the subscriber that is awaiting it.
Comment on lines +612 to +616
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Tracks when subscriber callbacks are actively being delivered from
// #messageChain. requestSnapshot can inject a nested batch from inside a
// subscriber; in that reentrant case #publish uses this as an intentional
// escape hatch to deliver the nested snapshot batch immediately rather than
// queueing it behind the subscriber that is awaiting it.

I think we should try to avoid agent comment-creep - the flag's function is fairly clear with the comment left when used in code further down

#isPublishing = false
#snapshotTracker = new SnapshotTracker()
#pauseLock: PauseLock
#currentFetchUrl?: URL // Current fetch URL for computing shape key
Expand Down Expand Up @@ -1719,11 +1725,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
}

async #publish(messages: Message<T>[]): Promise<void[]> {
// We process messages asynchronously
// but SSE's `onmessage` handler is synchronous.
// We use a promise chain to ensure that the handlers
// execute sequentially in the order the messages were received.
this.#messageChain = this.#messageChain.then(() =>
const deliver = () =>
Promise.all(
Array.from(this.#subscribers.values()).map(async ([callback, __]) => {
try {
Expand All @@ -1735,7 +1737,25 @@ export class ShapeStream<T extends Row<unknown> = Row>
}
})
)
)

// We process messages asynchronously but SSE's `onmessage` handler is
// synchronous. Use a promise chain to ensure handlers execute sequentially
// in the order messages were received. If a subscriber reentrantly requests
// a snapshot, deliver that nested batch immediately instead of appending it
// behind the currently-running subscriber callback, which would deadlock
// when requestSnapshot awaits publication.
if (this.#isPublishing) {
return deliver()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we call deliver() without awaiting i think we can still get "wrong" execution order.
Concrete scenario with two subscribers A and B:

  1. Batch M1 arrives. #isPublishing flips to true, outer deliver() starts.
  2. Promise.all spins up callbackA(M1) and callbackB(M1) concurrently. Both run up to their first await.
  3. A awaits requestSnapshot(). B is still mid-await processing M1 (e.g. an async write, a fetch, awaiting downstream state).
  4. requestSnapshot() calls #onMessages(snapshot)#publish(snapshot) → reentrant branch fires → deliver() invokes both callbackA(snapshot) and callbackB(snapshot) immediately.
  5. B's callback is now re-entered with the snapshot batch before its M1 invocation has returned.

So the per-subscriber invariant "my callback won't be re-entered while a previous invocation is in flight" is broken for any bystander subscriber whose M1 callback is still awaiting something when the reentrant publish fires. The messageChain-level ordering is preserved (next queued batch still waits), but B can observe M1-start → snapshot-start → snapshot-end → M1-end interleaving in its own callback.

In practice this may be fine — if subscribers are effectively stateless across awaits, the window is invisible. But a subscriber that mutates this.something across an await in its callback could see corruption.

}
Comment on lines +1747 to +1749
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a bit odd - we have a message chain promise to serialize message processing, which gets bypassed if you publish messages while a batch of messages is being processed.

Effectively, anyone calling this.#publish either ends up on the message chain, or if the message chain is executing it gets published out of order. In fact, since SSE's onmessage doesn't await the call, I would actually expect for batches of messages from the same sse connection to end up being processed/published out of order.

I'd much prefer if we could explicitly specify an API parameter to skip the strict ordering such that the requestSnapshot call in particular avoids it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just seeing this, turns out we have the same concern :)


this.#messageChain = this.#messageChain.then(async () => {
this.#isPublishing = true
try {
return await deliver()
} finally {
this.#isPublishing = false
}
})

return this.#messageChain
}
Expand Down Expand Up @@ -1901,7 +1921,7 @@ export class ShapeStream<T extends Row<unknown> = Row>
metadata,
new Set(data.map((message) => message.key))
)
this.#onMessages(dataWithEndBoundary, false)
await this.#onMessages(dataWithEndBoundary, false)

// On cold start the stream's offset is still at "now". Advance it
// to the snapshot's position so no updates are missed in between.
Expand Down
9 changes: 5 additions & 4 deletions packages/typescript-client/test/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1694,10 +1694,11 @@ describe.for(fetchAndSse)(
limit: 100,
})

// Wait until shape reflects the snapshot
await vi.waitFor(() => {
expect(shape.currentRows.length).toBe(data.length)
})
// requestSnapshot must not resolve until subscriber callbacks for the
// injected snapshot batch have completed. Callers such as TanStack DB's
// on-demand loadSubset rely on this to make immediate reads after await
// consistent.
expect(shape.currentRows.length).toBe(data.length)

// Compare keys in stream vs returned snapshot data
const returnedKeys = new Set(data.map((m) => m.key))
Expand Down
180 changes: 180 additions & 0 deletions packages/typescript-client/test/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import {
ShapeStream,
isChangeMessage,
isControlMessage,
Message,
Row,
_resetHttpWarningForTesting,
Expand All @@ -24,6 +25,185 @@ describe(`ShapeStream`, () => {

afterEach(() => aborter.abort())

it(`requestSnapshot waits for snapshot messages to be published to subscribers before resolving`, async () => {
const snapshotRow = {
key: `test-1`,
value: { id: `1` },
headers: {
operation: `insert`,
relation: [`public`, `test`],
},
offset: `0_0`,
}

const fetchMock = vi.fn(() =>
Promise.resolve(
new Response(
JSON.stringify({
metadata: {
snapshot_mark: 1,
xmin: `1`,
xmax: `2`,
xip_list: [],
database_lsn: `0`,
},
data: [snapshotRow],
}),
{
status: 200,
headers: {
'content-type': `application/json`,
'electric-handle': `handle-1`,
'electric-offset': `0_0`,
'electric-schema': `{"id":{"type":"text"}}`,
},
}
)
)
)

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `test` },
log: `changes_only`,
signal: aborter.signal,
fetchClient: fetchMock,
subscribe: false,
})

let releaseSubscriber!: () => void
const subscriberFinished = new Promise<void>((resolve) => {
releaseSubscriber = resolve
})
let snapshotRequestResolved = false
let publishedMessages: Message<Row>[] = []

stream.subscribe(async (messages) => {
if (messages.some(isChangeMessage)) {
publishedMessages = messages
await subscriberFinished
}
})

const snapshotRequest = stream.requestSnapshot({ limit: 1 }).then(() => {
snapshotRequestResolved = true
})

await resolveInMacrotask(undefined)
expect(snapshotRequestResolved).toBe(false)

releaseSubscriber()
await snapshotRequest

expect(publishedMessages.some(isChangeMessage)).toBe(true)
expect(
publishedMessages.some(
(message) =>
isControlMessage(message) &&
message.headers.control === `snapshot-end`
)
).toBe(true)
expect(
publishedMessages.some(
(message) =>
isControlMessage(message) && message.headers.control === `subset-end`
)
).toBe(true)
})

it(`requestSnapshot can be awaited reentrantly from a subscriber`, async () => {
const streamRow = {
key: `stream-1`,
value: { id: `1` },
headers: {
operation: `insert`,
relation: [`public`, `test`],
},
offset: `0_0`,
}
const snapshotRow = {
key: `snapshot-1`,
value: { id: `2` },
headers: {
operation: `insert`,
relation: [`public`, `test`],
},
offset: `0_1`,
}

let requestCount = 0
const fetchMock = vi.fn(() => {
requestCount++

if (requestCount === 1) {
return Promise.resolve(
new Response(
JSON.stringify([
streamRow,
{ headers: { control: `up-to-date` }, offset: `0_0` },
]),
{
status: 200,
headers: {
'content-type': `application/json`,
'electric-handle': `handle-1`,
'electric-offset': `0_0`,
'electric-schema': `{"id":{"type":"text"}}`,
},
}
)
)
}

return Promise.resolve(
new Response(
JSON.stringify({
metadata: {
snapshot_mark: 1,
xmin: `1`,
xmax: `2`,
xip_list: [],
database_lsn: `0`,
},
data: [snapshotRow],
}),
{
status: 200,
headers: {
'content-type': `application/json`,
'electric-handle': `handle-1`,
'electric-offset': `0_1`,
'electric-schema': `{"id":{"type":"text"}}`,
},
}
)
)
})

const stream = new ShapeStream({
url: shapeUrl,
params: { table: `test` },
log: `changes_only`,
signal: aborter.signal,
fetchClient: fetchMock,
subscribe: false,
})

let requestedSnapshot = false
let reentrantSnapshotResolved = false
stream.subscribe(async (messages) => {
if (requestedSnapshot || !messages.some(isChangeMessage)) return

requestedSnapshot = true
await stream.requestSnapshot({ limit: 1 })
reentrantSnapshotResolved = true
})

await vi.waitFor(() => {
expect(reentrantSnapshotResolved).toBe(true)
})
})

it(`should attach specified headers to requests`, async () => {
const eventTarget = new EventTarget()
const requestArgs: Array<RequestInit | undefined> = []
Expand Down
Loading