From 6e38a17f01376253045954def9e048c1c3bb80f1 Mon Sep 17 00:00:00 2001 From: Manuel Schiller Date: Wed, 4 Mar 2026 21:00:55 +0100 Subject: [PATCH 1/5] fix streaming --- .../streaming-ssr/tests/fast-serial.spec.ts | 15 + .../streaming-ssr/tests/sync-only.spec.ts | 15 + packages/router-core/src/ssr/ssr-server.ts | 45 ++- .../src/ssr/transformStreamWithRouter.ts | 103 ++++-- .../tests/getNormalizedURL.test.ts | 14 + .../tests/ssr-server-idempotency.test.ts | 82 +++++ packages/router-core/tests/ssr-server.test.ts | 66 ++++ .../tests/transformStreamWithRouter.test.ts | 339 ++++++++++++++++++ 8 files changed, 632 insertions(+), 47 deletions(-) create mode 100644 packages/router-core/tests/ssr-server-idempotency.test.ts create mode 100644 packages/router-core/tests/ssr-server.test.ts create mode 100644 packages/router-core/tests/transformStreamWithRouter.test.ts diff --git a/e2e/react-start/streaming-ssr/tests/fast-serial.spec.ts b/e2e/react-start/streaming-ssr/tests/fast-serial.spec.ts index 339d6408ea7..53e4d875b00 100644 --- a/e2e/react-start/streaming-ssr/tests/fast-serial.spec.ts +++ b/e2e/react-start/streaming-ssr/tests/fast-serial.spec.ts @@ -16,6 +16,21 @@ test.describe('Fast serialization (serialization completes before render)', () = expect(responseHtml).toContain('$_TSR.router') expect(responseHtml).toContain('$_TSR.e()') expect(responseHtml).toContain('$tsr-stream-barrier') + + // Router scripts and end marker must be injected before closing tags. + const bodyCloseIndex = responseHtml.lastIndexOf('') + const htmlCloseIndex = responseHtml.lastIndexOf('') + const barrierIndex = responseHtml.indexOf('$tsr-stream-barrier') + const endMarkerIndex = responseHtml.indexOf('$_TSR.e()') + + expect(bodyCloseIndex).toBeGreaterThan(-1) + expect(htmlCloseIndex).toBeGreaterThan(-1) + expect(barrierIndex).toBeGreaterThan(-1) + expect(endMarkerIndex).toBeGreaterThan(-1) + + expect(barrierIndex).toBeLessThan(bodyCloseIndex) + expect(endMarkerIndex).toBeLessThan(bodyCloseIndex) + expect(endMarkerIndex).toBeLessThan(htmlCloseIndex) }) test('all data is available immediately', async ({ page }) => { diff --git a/e2e/react-start/streaming-ssr/tests/sync-only.spec.ts b/e2e/react-start/streaming-ssr/tests/sync-only.spec.ts index a0060a07ee1..31473e6a99d 100644 --- a/e2e/react-start/streaming-ssr/tests/sync-only.spec.ts +++ b/e2e/react-start/streaming-ssr/tests/sync-only.spec.ts @@ -68,6 +68,21 @@ test('Sync-only route has bootstrap scripts in initial HTML', async ({ // SSR should include the barrier script tag in the HTML (rendered by ) // This is the critical marker transformStreamWithRouter can scan for. expect(responseHtml).toContain('$tsr-stream-barrier') + + // Router scripts and end marker must be injected before closing tags. + const bodyCloseIndex = responseHtml.lastIndexOf('') + const htmlCloseIndex = responseHtml.lastIndexOf('') + const barrierIndex = responseHtml.indexOf('$tsr-stream-barrier') + const endMarkerIndex = responseHtml.indexOf('$_TSR.e()') + + expect(bodyCloseIndex).toBeGreaterThan(-1) + expect(htmlCloseIndex).toBeGreaterThan(-1) + expect(barrierIndex).toBeGreaterThan(-1) + expect(endMarkerIndex).toBeGreaterThan(-1) + + expect(barrierIndex).toBeLessThan(bodyCloseIndex) + expect(endMarkerIndex).toBeLessThan(bodyCloseIndex) + expect(endMarkerIndex).toBeLessThan(htmlCloseIndex) }) test('Navigating to sync-only from home page', async ({ page }) => { diff --git a/packages/router-core/src/ssr/ssr-server.ts b/packages/router-core/src/ssr/ssr-server.ts index 8cea703eeb6..9ebd513ffa1 100644 --- a/packages/router-core/src/ssr/ssr-server.ts +++ b/packages/router-core/src/ssr/ssr-server.ts @@ -91,13 +91,8 @@ class ScriptBuffer { liftBarrier() { if (this._scriptBarrierLifted || this._cleanedUp) return this._scriptBarrierLifted = true - if (this._queue.length > 0 && !this._pendingMicrotask) { - this._pendingMicrotask = true - queueMicrotask(() => { - this._pendingMicrotask = false - this.injectBufferedScripts() - }) - } + this._pendingMicrotask = false + this.injectBufferedScripts() } /** @@ -274,15 +269,24 @@ export function attachRouterServerSsrUtils({ : defaultSerovalPlugins const signalSerializationComplete = () => { + if (_serializationFinished) return _serializationFinished = true + + const listeners = serializationFinishedListeners.slice() + + for (const listener of listeners) { + try { + listener() + } catch (err) { + console.error('Serialization listener error:', err) + } + } try { - serializationFinishedListeners.forEach((l) => l()) router.emit({ type: 'onSerializationFinished' }) } catch (err) { - console.error('Serialization listener error:', err) + console.error('Error emitting onSerializationFinished:', err) } finally { serializationFinishedListeners.length = 0 - renderFinishedListeners.length = 0 } } @@ -320,16 +324,19 @@ export function attachRouterServerSsrUtils({ onSerializationFinished: (listener) => serializationFinishedListeners.push(listener), setRenderFinished: () => { - // Wrap in try-catch to ensure scriptBuffer.liftBarrier() is always called - try { - renderFinishedListeners.forEach((l) => l()) - } catch (err) { - console.error('Error in render finished listener:', err) - } finally { - // Clear listeners after calling them to prevent memory leaks - renderFinishedListeners.length = 0 + const listeners = renderFinishedListeners.slice() + renderFinishedListeners.length = 0 + + for (const listener of listeners) { + try { + listener() + } catch (err) { + console.error('Error in render finished listener:', err) + } } + scriptBuffer.liftBarrier() + scriptBuffer.flush() }, takeBufferedScripts() { const scripts = scriptBuffer.takeAll() @@ -396,7 +403,7 @@ export function getOrigin(request: Request) { // chromium treats search params differently than paths, i.e. "|" is not encoded in search params. export function getNormalizedURL(url: string | URL, base?: string | URL) { // ensure backslashes are encoded correctly in the URL - if (typeof url === 'string') url = url.replace('\\', '%5C') + if (typeof url === 'string') url = url.replace(/\\/g, '%5C') const rawUrl = new URL(url, base) const { path: decodedPathname, handledProtocolRelativeURL } = decodePath( diff --git a/packages/router-core/src/ssr/transformStreamWithRouter.ts b/packages/router-core/src/ssr/transformStreamWithRouter.ts index 99fe436256d..8bac45f7bf7 100644 --- a/packages/router-core/src/ssr/transformStreamWithRouter.ts +++ b/packages/router-core/src/ssr/transformStreamWithRouter.ts @@ -21,7 +21,7 @@ export function transformPipeableStreamWithRouter( // Use string constants for simple indexOf matching const BODY_END_TAG = '' -const HTML_END_TAG = '' +const SCRIPT_END_TAG = '' // Minimum length of a valid closing tag: = 4 characters const MIN_CLOSING_TAG_LENGTH = 4 @@ -122,11 +122,22 @@ export function transformStreamWithRouter( let controller: ReadableStreamDefaultController | undefined let isStreamClosed = false let lifetimeTimeoutHandle: ReturnType | undefined + let appReader: ReadableStreamDefaultReader | undefined + + const cancelActiveReader = (reason?: unknown) => { + const activeReader = appReader + appReader = undefined + void activeReader?.cancel(reason).catch(() => { + // ignore + }) + } const cleanup = () => { if (cleanedUp) return cleanedUp = true + cancelActiveReader() + if (lifetimeTimeoutHandle !== undefined) { clearTimeout(lifetimeTimeoutHandle) lifetimeTimeoutHandle = undefined @@ -170,20 +181,27 @@ export function transformStreamWithRouter( start(c) { controller = c }, - cancel() { + cancel(reason) { isStreamClosed = true + cancelActiveReader(reason) cleanup() }, }) ;(async () => { const reader = appStream.getReader() + appReader = reader try { while (true) { const { done, value } = await reader.read() if (done) break if (cleanedUp || isStreamClosed) return - controller?.enqueue(value as unknown as Uint8Array) + + if (typeof value === 'string') { + controller?.enqueue(textEncoder.encode(value)) + } else { + controller?.enqueue(value as Uint8Array) + } } if (cleanedUp || isStreamClosed) return @@ -198,6 +216,7 @@ export function transformStreamWithRouter( safeError(error) cleanup() } finally { + appReader = undefined reader.releaseLock() } })().catch((error) => { @@ -216,8 +235,17 @@ export function transformStreamWithRouter( let lifetimeTimeoutHandle: ReturnType | undefined let cleanedUp = false - let controller: ReadableStreamDefaultController + let controller: ReadableStreamDefaultController let isStreamClosed = false + let appReader: ReadableStreamDefaultReader | undefined + + const cancelActiveReader = (reason?: unknown) => { + const activeReader = appReader + appReader = undefined + void activeReader?.cancel(reason).catch(() => { + // ignore + }) + } const textDecoder = new TextDecoder() @@ -227,7 +255,7 @@ export function transformStreamWithRouter( // between-chunk text buffer; keep bounded to avoid unbounded memory let leftover = '' - // captured closing tags from onward + // captured closing tags and trailing tail from onward let pendingClosingTags = '' // conservative cap: enough to hold any partial closing tag + a bit @@ -273,6 +301,8 @@ export function transformStreamWithRouter( if (cleanedUp) return cleanedUp = true + cancelActiveReader() + try { stopListeningToInjectedHtml?.() stopListeningToSerializationFinished?.() @@ -294,7 +324,6 @@ export function transformStreamWithRouter( pendingRouterHtml = '' leftover = '' pendingClosingTags = '' - router.serverSsr?.cleanup() } @@ -302,14 +331,16 @@ export function transformStreamWithRouter( start(c) { controller = c }, - cancel() { + cancel(reason) { isStreamClosed = true + cancelActiveReader(reason) cleanup() }, }) - function flushPendingRouterHtml() { + function flushPendingRouterHtml({ force = false }: { force?: boolean } = {}) { if (!pendingRouterHtml) return + if (!force && !streamBarrierLifted) return safeEnqueue(pendingRouterHtml) pendingRouterHtml = '' } @@ -336,9 +367,8 @@ export function transformStreamWithRouter( if (leftover) safeEnqueue(leftover) if (decoderRemainder) safeEnqueue(decoderRemainder) - flushPendingRouterHtml() + flushPendingRouterHtml({ force: true }) if (pendingClosingTags) safeEnqueue(pendingClosingTags) - safeClose() cleanup() } @@ -361,9 +391,7 @@ export function transformStreamWithRouter( const html = router.serverSsr?.takeBufferedHtml() if (!html) return - // If we've already captured (pendingClosingTags), we must keep appending - // so injection stays before the stored closing tags. - if (isAppRendering || leftover || pendingClosingTags) { + if (!streamBarrierLifted || isAppRendering || leftover || pendingClosingTags) { appendRouterHtml(html) } else { safeEnqueue(html) @@ -382,6 +410,7 @@ export function transformStreamWithRouter( // Transform the appStream ;(async () => { const reader = appStream.getReader() + appReader = reader try { while (true) { const { done, value } = await reader.read() @@ -390,21 +419,43 @@ export function transformStreamWithRouter( if (cleanedUp || isStreamClosed) return const text = - value instanceof Uint8Array - ? textDecoder.decode(value, { stream: true }) - : String(value) + typeof value === 'string' + ? value + : ArrayBuffer.isView(value) + ? textDecoder.decode(value as Uint8Array, { stream: true }) + : String(value) // Fast path: most chunks have no pending left-over. const chunkString = leftover ? leftover + text : text - if (!streamBarrierLifted) { - if (chunkString.includes(TSR_SCRIPT_BARRIER_ID)) { - streamBarrierLifted = true - router.serverSsr?.liftScriptBarrier() + const barrierFoundInChunk = + !streamBarrierLifted && chunkString.includes(TSR_SCRIPT_BARRIER_ID) + + if (barrierFoundInChunk) { + const barrierMarkerIndex = chunkString.indexOf(TSR_SCRIPT_BARRIER_ID) + const barrierScriptClosedInChunk = + barrierMarkerIndex !== -1 && + chunkString.indexOf(SCRIPT_END_TAG, barrierMarkerIndex) !== -1 + + streamBarrierLifted = true + router.serverSsr?.liftScriptBarrier() + + const barrierChunkBodyEndIndex = chunkString.indexOf(BODY_END_TAG) + if (barrierChunkBodyEndIndex !== -1) { + safeEnqueue(chunkString.slice(0, barrierChunkBodyEndIndex)) + pendingClosingTags = chunkString.slice(barrierChunkBodyEndIndex) + } else { + safeEnqueue(chunkString) } + + if (barrierScriptClosedInChunk) { + flushPendingRouterHtml() + } + + leftover = '' + continue } - // If we already saw , everything else is part of tail; buffer it. if (pendingClosingTags) { pendingClosingTags += chunkString leftover = '' @@ -412,15 +463,10 @@ export function transformStreamWithRouter( } const bodyEndIndex = chunkString.indexOf(BODY_END_TAG) - const htmlEndIndex = chunkString.indexOf(HTML_END_TAG) - if ( - bodyEndIndex !== -1 && - htmlEndIndex !== -1 && - bodyEndIndex < htmlEndIndex - ) { - pendingClosingTags = chunkString.slice(bodyEndIndex) + if (bodyEndIndex !== -1) { safeEnqueue(chunkString.slice(0, bodyEndIndex)) + pendingClosingTags = chunkString.slice(bodyEndIndex) flushPendingRouterHtml() leftover = '' continue @@ -480,6 +526,7 @@ export function transformStreamWithRouter( safeError(error) cleanup() } finally { + appReader = undefined reader.releaseLock() } })().catch((error) => { diff --git a/packages/router-core/tests/getNormalizedURL.test.ts b/packages/router-core/tests/getNormalizedURL.test.ts index d459c8b4185..3e9ff186a4a 100644 --- a/packages/router-core/tests/getNormalizedURL.test.ts +++ b/packages/router-core/tests/getNormalizedURL.test.ts @@ -125,4 +125,18 @@ describe('getNormalizedURL', () => { expect(normalizedUrl.url.hash).toBe(expectedHash) }, ) + + test('should encode all backslashes in pathname', () => { + const normalized = getNormalizedURL('https://example.com/path\\a\\b\\c') + expect(normalized.url.pathname).toBe('/path%5Ca%5Cb%5Cc') + }) + + test('should encode backslashes in pathname while preserving query and hash', () => { + const normalized = getNormalizedURL( + 'https://example.com/path\\a?x=1&y=2#section', + ) + expect(normalized.url.pathname).toBe('/path%5Ca') + expect(normalized.url.search).toBe('?x=1&y=2') + expect(normalized.url.hash).toBe('#section') + }) }) diff --git a/packages/router-core/tests/ssr-server-idempotency.test.ts b/packages/router-core/tests/ssr-server-idempotency.test.ts new file mode 100644 index 00000000000..c83e5368f66 --- /dev/null +++ b/packages/router-core/tests/ssr-server-idempotency.test.ts @@ -0,0 +1,82 @@ +import { describe, expect, it, vi } from 'vitest' + +vi.mock('seroval', async (importOriginal) => { + const actual = await importOriginal() + + return { + ...actual, + crossSerializeStream: (_value: unknown, options: any) => { + options.onDone() + options.onError?.(new Error('forced duplicate completion')) + }, + getCrossReferenceHeader: () => '', + } +}) + +import { attachRouterServerSsrUtils } from '../src/ssr/ssr-server' + +describe('ssr-server serialization completion idempotency', () => { + it('emits onSerializationFinished only once when serializer signals done then error', async () => { + const emit = vi.fn() + const consoleErrorSpy = vi + .spyOn(console, 'error') + .mockImplementation(() => undefined) + + const router = { + options: { + ssr: {}, + dehydrate: async () => undefined, + }, + state: { + matches: [], + }, + isShell: () => false, + emit, + } as any + + attachRouterServerSsrUtils({ router, manifest: undefined }) + + let listenerCalls = 0 + router.serverSsr.onSerializationFinished(() => { + listenerCalls++ + }) + + await router.serverSsr.dehydrate() + + const serializationEmits = emit.mock.calls.filter( + ([event]) => event?.type === 'onSerializationFinished', + ) + + expect(listenerCalls).toBe(1) + expect(serializationEmits).toHaveLength(1) + expect(router.serverSsr.isSerializationFinished()).toBe(true) + + consoleErrorSpy.mockRestore() + }) + + it('does not clear render-finished listeners when serialization completes first', async () => { + const router = { + options: { + ssr: {}, + dehydrate: async () => undefined, + }, + state: { + matches: [], + }, + isShell: () => false, + emit: vi.fn(), + } as any + + attachRouterServerSsrUtils({ router, manifest: undefined }) + + let called = false + router.serverSsr.onRenderFinished(() => { + called = true + }) + + await router.serverSsr.dehydrate() + router.serverSsr.setRenderFinished() + + expect(called).toBe(true) + }) +}) diff --git a/packages/router-core/tests/ssr-server.test.ts b/packages/router-core/tests/ssr-server.test.ts new file mode 100644 index 00000000000..4f659efe989 --- /dev/null +++ b/packages/router-core/tests/ssr-server.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it, vi } from 'vitest' +import { attachRouterServerSsrUtils } from '../src/ssr/ssr-server' + +function createMinimalRouter() { + return { + options: { + ssr: {}, + dehydrate: async () => undefined, + }, + state: { + matches: [], + }, + isShell: () => false, + emit: () => {}, + } +} + +describe('ssr-server', () => { + it('flushes initial script buffer synchronously on render finish', () => { + const router = createMinimalRouter() as any + attachRouterServerSsrUtils({ router, manifest: undefined }) + + router.serverSsr.setRenderFinished() + const html = router.serverSsr.takeBufferedHtml() + + expect(html).toContain(' { + const router = createMinimalRouter() as any + attachRouterServerSsrUtils({ router, manifest: undefined }) + const consoleErrorSpy = vi + .spyOn(console, 'error') + .mockImplementation(() => undefined) + + let called = false + router.serverSsr.onRenderFinished(() => { + throw new Error('listener failed') + }) + router.serverSsr.onRenderFinished(() => { + called = true + }) + + router.serverSsr.setRenderFinished() + + expect(called).toBe(true) + expect(consoleErrorSpy).toHaveBeenCalled() + consoleErrorSpy.mockRestore() + }) + + it('flushes scripts enqueued during render-finished listeners', () => { + const router = createMinimalRouter() as any + attachRouterServerSsrUtils({ router, manifest: undefined }) + + router.serverSsr.onRenderFinished(() => { + router.serverSsr.injectScript('window.__render_listener_script__=1') + }) + + router.serverSsr.setRenderFinished() + const html = router.serverSsr.takeBufferedHtml() + + expect(html).toContain('window.__render_listener_script__=1') + expect(html).toContain(' void>>() + let bufferedHtml = options?.initialBufferedHtml ?? '' + let serializationFinished = options?.serializationFinished ?? false + let cleanedUp = false + + const router = { + subscribe(event: string, listener: () => void) { + const set = listeners.get(event) ?? new Set<() => void>() + set.add(listener) + listeners.set(event, set) + return () => { + set.delete(listener) + } + }, + emit({ type }: { type: string }) { + listeners.get(type)?.forEach((listener) => listener()) + }, + serverSsr: { + isSerializationFinished() { + return serializationFinished + }, + takeBufferedHtml() { + if (!bufferedHtml) return undefined + const html = bufferedHtml + bufferedHtml = '' + return html + }, + injectHtml(html: string) { + bufferedHtml += html + router.emit({ type: 'onInjectedHtml' }) + }, + setRenderFinished() { + serializationFinished = true + router.emit({ type: 'onSerializationFinished' }) + }, + liftScriptBarrier() {}, + cleanup() { + cleanedUp = true + }, + }, + } + + return { + router: router as any, + wasCleanedUp() { + return cleanedUp + }, + } +} + +async function readAll(stream: ReadableStream) { + const reader = stream.getReader() + const decoder = new TextDecoder() + let result = '' + while (true) { + const { done, value } = await reader.read() + if (done) break + if (typeof value === 'string') { + result += value + } else { + result += decoder.decode(value, { stream: true }) + } + } + result += decoder.decode() + return result +} + +describe('transformStreamWithRouter', () => { + it('propagates cancel to underlying source in fast path', async () => { + const { router, wasCleanedUp } = createRouterMock({ + serializationFinished: true, + }) + const encoder = new TextEncoder() + let canceled = false + + const appStream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('hello')) + }, + cancel() { + canceled = true + }, + }) + + const transformed = transformStreamWithRouter(router, appStream) + const reader = transformed.getReader() + + await reader.read() + await reader.cancel('test cancel') + await Promise.resolve() + + expect(canceled).toBe(true) + expect(wasCleanedUp()).toBe(true) + }) + + it('inserts router html before closing tags when and are split across chunks', async () => { + const { router } = createRouterMock({ + serializationFinished: false, + }) + const encoder = new TextEncoder() + + const appStream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + '
content
', + ), + ) + router.serverSsr.injectHtml('') + controller.enqueue(encoder.encode('')) + controller.close() + }, + }) + + const transformed = transformStreamWithRouter(router, appStream) + const html = await readAll(transformed) + + const injectedIndex = html.indexOf('') + const bodyEndIndex = html.indexOf('') + const htmlEndIndex = html.indexOf('') + + expect(injectedIndex, html).toBeGreaterThan(-1) + expect(bodyEndIndex, html).toBeGreaterThan(-1) + expect(htmlEndIndex, html).toBeGreaterThan(-1) + expect(injectedIndex).toBeLessThan(htmlEndIndex) + expect(injectedIndex).toBeGreaterThanOrEqual(0) + }) + + it('propagates cancel to underlying source in normal path', async () => { + const { router, wasCleanedUp } = createRouterMock({ + serializationFinished: false, + }) + let canceled = false + + const appStream = new ReadableStream({ + pull() { + // Keep stream open + }, + cancel() { + canceled = true + }, + }) + + const transformed = transformStreamWithRouter(router, appStream) + const reader = transformed.getReader() + + await reader.cancel('normal-path-cancel') + await Promise.resolve() + + expect(canceled).toBe(true) + expect(wasCleanedUp()).toBe(true) + }) + + it('encodes string chunks in fast path', async () => { + const { router } = createRouterMock({ + serializationFinished: true, + }) + + const appStream = new ReadableStream({ + start(controller) { + controller.enqueue('hello-string') + controller.close() + }, + }) + + const transformed = transformStreamWithRouter(router, appStream) + const html = await readAll(transformed) + + expect(html).toContain('hello-string') + expect(html).toContain('') + }) + + it('flushes pendingRouterHtml when barrier and land in the same chunk', async () => { + // when the barrier script and appear in the same chunk, + // scripts buffered in pendingRouterHtml (e.g. from initialBufferedHtml) must be + // emitted immediately — not deferred to tryFinish(). They must appear between + // the barrier content and , not after . + const { router } = createRouterMock({ + serializationFinished: false, + initialBufferedHtml: '', + }) + const encoder = new TextEncoder() + + const appStream = new ReadableStream({ + start(controller) { + // Barrier and are in the SAME chunk + controller.enqueue( + encoder.encode( + '
content
' + + '' + + '', + ), + ) + controller.close() + }, + }) + + router.serverSsr.setRenderFinished() + + const transformed = transformStreamWithRouter(router, appStream) + const html = await readAll(transformed) + + const initialScriptIndex = html.indexOf( + '', + ) + const bodyEndIndex = html.indexOf('') + + expect(initialScriptIndex, html).toBeGreaterThan(-1) + expect(bodyEndIndex, html).toBeGreaterThan(-1) + // Scripts must appear BEFORE + expect(initialScriptIndex).toBeLessThan(bodyEndIndex) + }) + + it('flushes pendingRouterHtml when barrier has no in same chunk', async () => { + // when the barrier is found but is in a later chunk, + // scripts buffered before the barrier should be flushed immediately after + // emitting the barrier chunk rather than waiting for tryFinish(). + const { router } = createRouterMock({ + serializationFinished: false, + initialBufferedHtml: '', + }) + const encoder = new TextEncoder() + + const appStream = new ReadableStream({ + start(controller) { + // Barrier chunk — no yet + controller.enqueue( + encoder.encode( + '
content
' + + '', + ), + ) + // arrives in a later chunk + controller.enqueue(encoder.encode('')) + controller.close() + }, + }) + + router.serverSsr.setRenderFinished() + + const transformed = transformStreamWithRouter(router, appStream) + const html = await readAll(transformed) + + const preBarrierScriptIndex = html.indexOf( + '', + ) + const bodyEndIndex = html.indexOf('') + + expect(preBarrierScriptIndex, html).toBeGreaterThan(-1) + expect(bodyEndIndex, html).toBeGreaterThan(-1) + // Scripts must appear BEFORE + expect(preBarrierScriptIndex).toBeLessThan(bodyEndIndex) + }) + + it('does not inject pendingRouterHtml into an open split barrier tag', async () => { + const { router } = createRouterMock({ + serializationFinished: false, + initialBufferedHtml: '', + }) + const encoder = new TextEncoder() + + const appStream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + '
content
')) + controller.close() + }, + }) + + router.serverSsr.setRenderFinished() + + const transformed = transformStreamWithRouter(router, appStream) + const html = await readAll(transformed) + + const barrierMarkerIndex = html.indexOf('$tsr-stream-barrier') + const barrierTagCloseIndex = html.indexOf('>', barrierMarkerIndex) + const splitBarrierScriptIndex = html.indexOf( + '', + ) + const bodyEndIndex = html.indexOf('') + + expect(barrierMarkerIndex, html).toBeGreaterThan(-1) + expect(barrierTagCloseIndex, html).toBeGreaterThan(-1) + expect(splitBarrierScriptIndex, html).toBeGreaterThan(-1) + expect(bodyEndIndex, html).toBeGreaterThan(-1) + + expect(splitBarrierScriptIndex).toBeGreaterThan(barrierTagCloseIndex) + expect(splitBarrierScriptIndex).toBeLessThan(bodyEndIndex) + }) + + it('cancels underlying source on lifetime timeout cleanup', async () => { + vi.useFakeTimers() + + const { router } = createRouterMock({ + serializationFinished: false, + }) + + let canceled = false + + const appStream = new ReadableStream({ + pull() { + // Keep open forever to force lifetime timeout path + }, + cancel() { + canceled = true + }, + }) + + const transformed = transformStreamWithRouter(router, appStream, { + lifetimeMs: 10, + }) + + try { + const reader = transformed.getReader() + const readExpectation = expect(reader.read()).rejects.toThrow( + 'Stream lifetime exceeded', + ) + + await vi.advanceTimersByTimeAsync(11) + + await readExpectation + expect(canceled).toBe(true) + } finally { + vi.useRealTimers() + } + }) +}) From 208ac33bf9419ef0b38513b9391e92d076c2deb7 Mon Sep 17 00:00:00 2001 From: Manuel Schiller Date: Wed, 4 Mar 2026 21:01:07 +0100 Subject: [PATCH 2/5] add transformStreamWithRouter benchmark --- .../tests/transformStreamWithRouter.bench.ts | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 packages/router-core/tests/transformStreamWithRouter.bench.ts diff --git a/packages/router-core/tests/transformStreamWithRouter.bench.ts b/packages/router-core/tests/transformStreamWithRouter.bench.ts new file mode 100644 index 00000000000..d0a372938a0 --- /dev/null +++ b/packages/router-core/tests/transformStreamWithRouter.bench.ts @@ -0,0 +1,153 @@ +import { ReadableStream } from 'node:stream/web' +import { bench, describe } from 'vitest' +import { TSR_SCRIPT_BARRIER_ID } from '../src/ssr/constants' +import { transformStreamWithRouter } from '../src/ssr/transformStreamWithRouter' + +type RouterMockOptions = { + serializationFinished?: boolean + initialBufferedHtml?: string +} + +function createRouterMock(options?: RouterMockOptions) { + const listeners = new Map void>>() + let bufferedHtml = options?.initialBufferedHtml ?? '' + let serializationFinished = options?.serializationFinished ?? false + + const router = { + subscribe(event: string, listener: () => void) { + const set = listeners.get(event) ?? new Set<() => void>() + set.add(listener) + listeners.set(event, set) + return () => { + set.delete(listener) + } + }, + emit(event: string) { + listeners.get(event)?.forEach((listener) => listener()) + }, + serverSsr: { + isSerializationFinished() { + return serializationFinished + }, + takeBufferedHtml() { + if (!bufferedHtml) return undefined + const html = bufferedHtml + bufferedHtml = '' + return html + }, + injectHtml(html: string) { + bufferedHtml += html + router.emit('onInjectedHtml') + }, + setRenderFinished() { + serializationFinished = true + router.emit('onSerializationFinished') + }, + liftScriptBarrier() {}, + cleanup() {}, + }, + } + + return router as any +} + +const encoder = new TextEncoder() + +function createChunkedStream( + chunks: Array, + onEnqueue?: (index: number) => void, +) { + return new ReadableStream({ + start(controller) { + for (let i = 0; i < chunks.length; i++) { + controller.enqueue(chunks[i]!) + onEnqueue?.(i) + } + controller.close() + }, + }) +} + +async function consumeStream(stream: ReadableStream) { + const reader = stream.getReader() + let totalBytes = 0 + while (true) { + const { done, value } = await reader.read() + if (done) break + totalBytes += value.byteLength + } + return totalBytes +} + +function makeHtmlChunk(size: number) { + const body = 'x'.repeat(Math.max(0, size - 64)) + return `
${body}
` +} + +describe('transformStreamWithRouter bench - fast path', () => { + const binaryChunks = Array.from({ length: 120 }, () => + encoder.encode(makeHtmlChunk(4096)), + ) + + const stringChunks = Array.from({ length: 120 }, () => makeHtmlChunk(4096)) + + bench('fast path - uint8 chunks', async () => { + const router = createRouterMock({ serializationFinished: true }) + const appStream = createChunkedStream(binaryChunks) + const transformed = transformStreamWithRouter(router, appStream) + await consumeStream(transformed) + }) + + bench('fast path - string chunks', async () => { + const router = createRouterMock({ serializationFinished: true }) + const appStream = createChunkedStream(stringChunks) + const transformed = transformStreamWithRouter(router, appStream) + await consumeStream(transformed) + }) +}) + +describe('transformStreamWithRouter bench - streaming path', () => { + const headChunk = '
content
' + const barrierChunk = `` + const tailChunk = '
' + + bench('barrier + initial buffered html', async () => { + const router = createRouterMock({ + serializationFinished: false, + initialBufferedHtml: '', + }) + + const appStream = createChunkedStream([ + encoder.encode(headChunk), + encoder.encode(barrierChunk), + encoder.encode(tailChunk), + ]) + + const transformed = transformStreamWithRouter(router, appStream) + await consumeStream(transformed) + }) + + bench('barrier + incremental injected html', async () => { + const router = createRouterMock({ serializationFinished: false }) + + const appStream = createChunkedStream( + [ + encoder.encode(headChunk), + encoder.encode(barrierChunk), + encoder.encode(makeHtmlChunk(2048)), + encoder.encode(makeHtmlChunk(2048)), + encoder.encode(tailChunk), + ], + (index) => { + if (index <= 3) { + router.serverSsr.injectHtml( + ``, + ) + } + }, + ) + + const transformed = transformStreamWithRouter(router, appStream) + await consumeStream(transformed) + }) +}) \ No newline at end of file From ef3fb7b595761348327b89037903f9f3d2b25156 Mon Sep 17 00:00:00 2001 From: Manuel Schiller Date: Wed, 4 Mar 2026 21:01:18 +0100 Subject: [PATCH 3/5] refactor --- packages/router-core/package.json | 5 + .../src/ssr/transformStreamWithRouter.ts | 521 ++++++++++-------- .../tests/transformStreamWithRouter.test.ts | 25 + 3 files changed, 312 insertions(+), 239 deletions(-) diff --git a/packages/router-core/package.json b/packages/router-core/package.json index f61fe1f43ef..b6692fed6f2 100644 --- a/packages/router-core/package.json +++ b/packages/router-core/package.json @@ -31,6 +31,11 @@ "test:build": "publint --strict && attw --ignore-rules no-resolution --pack .", "test:unit": "vitest", "test:unit:dev": "pnpm run test:unit --watch", + "bench:transform-stream": "vitest bench tests/transformStreamWithRouter.bench.ts", + "bench:transform-stream:prof": "pnpm dlx @platformatic/flame run --md-format=detailed ../../node_modules/vitest/vitest.mjs bench tests/transformStreamWithRouter.bench.ts", + "profile:transform-stream": "pnpm build && pnpm run profile:transform-stream:run:prof", + "profile:transform-stream:run": "node ./tests/transformStreamWithRouter.profile.mjs", + "profile:transform-stream:run:prof": "pnpm dlx @platformatic/flame run --delay=none --md-format=detailed ./tests/transformStreamWithRouter.profile.mjs", "build": "vite build" }, "type": "module", diff --git a/packages/router-core/src/ssr/transformStreamWithRouter.ts b/packages/router-core/src/ssr/transformStreamWithRouter.ts index 8bac45f7bf7..2ac6292c5cf 100644 --- a/packages/router-core/src/ssr/transformStreamWithRouter.ts +++ b/packages/router-core/src/ssr/transformStreamWithRouter.ts @@ -1,5 +1,6 @@ import { ReadableStream } from 'node:stream/web' import { Readable } from 'node:stream' +import { isAscii } from 'node:buffer' import { TSR_SCRIPT_BARRIER_ID } from './constants' import type { AnyRouter } from '../router' @@ -30,9 +31,6 @@ const MIN_CLOSING_TAG_LENGTH = 4 const DEFAULT_SERIALIZATION_TIMEOUT_MS = 60000 const DEFAULT_LIFETIME_TIMEOUT_MS = 60000 -// Module-level encoder (stateless, safe to reuse) -const textEncoder = new TextEncoder() - /** * Finds the position just after the last valid HTML closing tag in the string. * @@ -97,157 +95,217 @@ function findLastClosingTagEnd(str: string): number { return -1 } -export function transformStreamWithRouter( - router: AnyRouter, - appStream: ReadableStream, - opts?: { - /** Timeout for serialization to complete after app render finishes (default: 60000ms) */ - timeoutMs?: number - /** Maximum lifetime of the stream transform (default: 60000ms). Safety net for cleanup. */ - lifetimeMs?: number - }, -) { - // Check upfront if serialization already finished synchronously - // This is the fast path for routes with no deferred data - const serializationAlreadyFinished = - router.serverSsr?.isSerializationFinished() ?? false +type StreamPhase = 'streaming' | 'appDone' | 'finished' +type BarrierPhase = 'locked' | 'lifted' +type TailPhase = 'normal' | 'afterBody' + +type StreamLifecycle = { + stream: ReadableStream + setReader: (reader: ReadableStreamDefaultReader | undefined) => void + isClosed: () => boolean + isCleanedUp: () => boolean + safeEnqueue: (chunk: string | Uint8Array) => void + safeClose: () => void + safeError: (error: unknown) => void + cleanup: () => void +} - // Take any HTML that was buffered before we started listening - const initialBufferedHtml = router.serverSsr?.takeBufferedHtml() +function createStreamLifecycle(opts: { + router: AnyRouter + lifetimeMs: number + withLifetimeTimeout?: boolean + onCleanup?: () => void +}): StreamLifecycle { + const withLifetimeTimeout = opts.withLifetimeTimeout ?? true + let cleanedUp = false + let isStreamClosed = false + let controller: ReadableStreamDefaultController | undefined + let appReader: ReadableStreamDefaultReader | undefined + let lifetimeTimeoutHandle: ReturnType | undefined - // True passthrough: if serialization already finished and nothing buffered, - // we can avoid any decoding/scanning while still honoring cleanup + setRenderFinished. - if (serializationAlreadyFinished && !initialBufferedHtml) { - let cleanedUp = false - let controller: ReadableStreamDefaultController | undefined - let isStreamClosed = false - let lifetimeTimeoutHandle: ReturnType | undefined - let appReader: ReadableStreamDefaultReader | undefined - - const cancelActiveReader = (reason?: unknown) => { - const activeReader = appReader - appReader = undefined - void activeReader?.cancel(reason).catch(() => { - // ignore - }) + const cancelActiveReader = (reason?: unknown) => { + const activeReader = appReader + appReader = undefined + void activeReader?.cancel(reason).catch(() => { + // ignore + }) + } + + const safeEnqueue = (chunk: string | Uint8Array) => { + if (isStreamClosed) return + if (typeof chunk === 'string') { + controller?.enqueue(Buffer.from(chunk, 'utf8')) + } else { + controller?.enqueue(chunk) } + } - const cleanup = () => { - if (cleanedUp) return - cleanedUp = true + const safeClose = () => { + if (isStreamClosed) return + isStreamClosed = true + try { + controller?.close() + } catch { + // ignore + } + } - cancelActiveReader() + const safeError = (error: unknown) => { + if (isStreamClosed) return + isStreamClosed = true + try { + controller?.error(error) + } catch { + // ignore + } + } - if (lifetimeTimeoutHandle !== undefined) { - clearTimeout(lifetimeTimeoutHandle) - lifetimeTimeoutHandle = undefined - } + const cleanup = () => { + if (cleanedUp) return + cleanedUp = true + + cancelActiveReader() - router.serverSsr?.cleanup() + if (lifetimeTimeoutHandle !== undefined) { + clearTimeout(lifetimeTimeoutHandle) + lifetimeTimeoutHandle = undefined } - const safeClose = () => { - if (isStreamClosed) return - isStreamClosed = true - try { - controller?.close() - } catch { - // ignore - } + try { + opts.onCleanup?.() + } catch { + // ignore } - const safeError = (error: unknown) => { - if (isStreamClosed) return + opts.router.serverSsr?.cleanup() + } + + const stream = new ReadableStream({ + start(c) { + controller = c + }, + cancel(reason) { isStreamClosed = true - try { - controller?.error(error) - } catch { - // ignore - } - } + cancelActiveReader(reason) + cleanup() + }, + }) - const lifetimeMs = opts?.lifetimeMs ?? DEFAULT_LIFETIME_TIMEOUT_MS + if (withLifetimeTimeout) { lifetimeTimeoutHandle = setTimeout(() => { if (!cleanedUp && !isStreamClosed) { console.warn( - `SSR stream transform exceeded maximum lifetime (${lifetimeMs}ms), forcing cleanup`, + `SSR stream transform exceeded maximum lifetime (${opts.lifetimeMs}ms), forcing cleanup`, ) safeError(new Error('Stream lifetime exceeded')) cleanup() } - }, lifetimeMs) + }, opts.lifetimeMs) + } - const stream = new ReadableStream({ - start(c) { - controller = c - }, - cancel(reason) { - isStreamClosed = true - cancelActiveReader(reason) - cleanup() - }, + return { + stream, + setReader: (reader) => { + appReader = reader + }, + isClosed: () => isStreamClosed, + isCleanedUp: () => cleanedUp, + safeEnqueue, + safeClose, + safeError, + cleanup, + } +} + +export function transformStreamWithRouter( + router: AnyRouter, + appStream: ReadableStream, + opts?: { + /** Timeout for serialization to complete after app render finishes (default: 60000ms) */ + timeoutMs?: number + /** Maximum lifetime of the stream transform (default: 60000ms). Safety net for cleanup. */ + lifetimeMs?: number + }, +) { + // Check upfront if serialization already finished synchronously + // This is the fast path for routes with no deferred data + const serializationAlreadyFinished = + router.serverSsr?.isSerializationFinished() ?? false + + // Take any HTML that was buffered before we started listening + const initialBufferedHtml = router.serverSsr?.takeBufferedHtml() + + const lifetimeMs = opts?.lifetimeMs ?? DEFAULT_LIFETIME_TIMEOUT_MS + + // True passthrough: if serialization already finished and nothing buffered, + // we can avoid any decoding/scanning while still honoring cleanup + setRenderFinished. + if (serializationAlreadyFinished && !initialBufferedHtml) { + const lifecycle = createStreamLifecycle({ + router, + lifetimeMs, + withLifetimeTimeout: true, }) ;(async () => { const reader = appStream.getReader() - appReader = reader + lifecycle.setReader(reader) try { while (true) { const { done, value } = await reader.read() if (done) break - if (cleanedUp || isStreamClosed) return + if (lifecycle.isCleanedUp() || lifecycle.isClosed()) return if (typeof value === 'string') { - controller?.enqueue(textEncoder.encode(value)) + lifecycle.safeEnqueue(value) } else { - controller?.enqueue(value as Uint8Array) + lifecycle.safeEnqueue(value as Uint8Array) } } - if (cleanedUp || isStreamClosed) return + if (lifecycle.isCleanedUp() || lifecycle.isClosed()) return router.serverSsr?.setRenderFinished() - safeClose() - cleanup() + lifecycle.safeClose() + lifecycle.cleanup() } catch (error) { - if (cleanedUp) return + if (lifecycle.isCleanedUp()) return console.error('Error reading appStream:', error) router.serverSsr?.setRenderFinished() - safeError(error) - cleanup() + lifecycle.safeError(error) + lifecycle.cleanup() } finally { - appReader = undefined - reader.releaseLock() + lifecycle.setReader(undefined) } })().catch((error) => { - if (cleanedUp) return + if (lifecycle.isCleanedUp()) return console.error('Error in stream transform:', error) - safeError(error) - cleanup() + lifecycle.safeError(error) + lifecycle.cleanup() }) - return stream + return lifecycle.stream + } + + type TransformState = { + streamPhase: StreamPhase + barrierPhase: BarrierPhase + tailPhase: TailPhase + serializationDone: boolean + } + + const state: TransformState = { + streamPhase: 'streaming', + barrierPhase: 'locked', + tailPhase: 'normal', + serializationDone: serializationAlreadyFinished, } let stopListeningToInjectedHtml: (() => void) | undefined let stopListeningToSerializationFinished: (() => void) | undefined let serializationTimeoutHandle: ReturnType | undefined - let lifetimeTimeoutHandle: ReturnType | undefined - let cleanedUp = false - - let controller: ReadableStreamDefaultController - let isStreamClosed = false - let appReader: ReadableStreamDefaultReader | undefined - - const cancelActiveReader = (reason?: unknown) => { - const activeReader = appReader - appReader = undefined - void activeReader?.cancel(reason).catch(() => { - // ignore - }) - } const textDecoder = new TextDecoder() + let usedBinaryDecoder = false // concat'd router HTML; avoids array joins on each flush let pendingRouterHtml = initialBufferedHtml ?? '' @@ -261,87 +319,35 @@ export function transformStreamWithRouter( // conservative cap: enough to hold any partial closing tag + a bit const MAX_LEFTOVER_CHARS = 2048 - let isAppRendering = true - let streamBarrierLifted = false - let serializationFinished = serializationAlreadyFinished - - function safeEnqueue(chunk: string | Uint8Array) { - if (isStreamClosed) return - if (typeof chunk === 'string') { - controller.enqueue(textEncoder.encode(chunk)) - } else { - controller.enqueue(chunk) - } - } - - function safeClose() { - if (isStreamClosed) return - isStreamClosed = true - try { - controller.close() - } catch { - // ignore - } - } - - function safeError(error: unknown) { - if (isStreamClosed) return - isStreamClosed = true - try { - controller.error(error) - } catch { - // ignore - } - } - - /** - * Cleanup with guards; must be idempotent. - */ - function cleanup() { - if (cleanedUp) return - cleanedUp = true - - cancelActiveReader() - - try { - stopListeningToInjectedHtml?.() - stopListeningToSerializationFinished?.() - } catch { - // ignore - } - stopListeningToInjectedHtml = undefined - stopListeningToSerializationFinished = undefined - - if (serializationTimeoutHandle !== undefined) { - clearTimeout(serializationTimeoutHandle) - serializationTimeoutHandle = undefined - } - if (lifetimeTimeoutHandle !== undefined) { - clearTimeout(lifetimeTimeoutHandle) - lifetimeTimeoutHandle = undefined - } + const lifecycle = createStreamLifecycle({ + router, + lifetimeMs, + withLifetimeTimeout: true, + onCleanup: () => { + try { + stopListeningToInjectedHtml?.() + stopListeningToSerializationFinished?.() + } catch { + // ignore + } + stopListeningToInjectedHtml = undefined + stopListeningToSerializationFinished = undefined - pendingRouterHtml = '' - leftover = '' - pendingClosingTags = '' - router.serverSsr?.cleanup() - } + if (serializationTimeoutHandle !== undefined) { + clearTimeout(serializationTimeoutHandle) + serializationTimeoutHandle = undefined + } - const stream = new ReadableStream({ - start(c) { - controller = c - }, - cancel(reason) { - isStreamClosed = true - cancelActiveReader(reason) - cleanup() + pendingRouterHtml = '' + leftover = '' + pendingClosingTags = '' }, }) - function flushPendingRouterHtml({ force = false }: { force?: boolean } = {}) { + function flushPendingRouterHtml(force = false) { if (!pendingRouterHtml) return - if (!force && !streamBarrierLifted) return - safeEnqueue(pendingRouterHtml) + if (!force && state.barrierPhase === 'locked') return + lifecycle.safeEnqueue(pendingRouterHtml) pendingRouterHtml = '' } @@ -354,8 +360,8 @@ export function transformStreamWithRouter( * Finish only when app done and serialization complete. */ function tryFinish() { - if (isAppRendering || !serializationFinished) return - if (cleanedUp || isStreamClosed) return + if (state.streamPhase === 'streaming' || !state.serializationDone) return + if (lifecycle.isCleanedUp() || lifecycle.isClosed()) return if (serializationTimeoutHandle !== undefined) { clearTimeout(serializationTimeoutHandle) @@ -363,45 +369,39 @@ export function transformStreamWithRouter( } // Flush any remaining bytes in the TextDecoder - const decoderRemainder = textDecoder.decode() - - if (leftover) safeEnqueue(leftover) - if (decoderRemainder) safeEnqueue(decoderRemainder) - flushPendingRouterHtml({ force: true }) - if (pendingClosingTags) safeEnqueue(pendingClosingTags) - safeClose() - cleanup() + const decoderRemainder = usedBinaryDecoder ? textDecoder.decode() : '' + + if (leftover) lifecycle.safeEnqueue(leftover) + if (decoderRemainder) lifecycle.safeEnqueue(decoderRemainder) + flushPendingRouterHtml(true) + if (pendingClosingTags) lifecycle.safeEnqueue(pendingClosingTags) + state.streamPhase = 'finished' + lifecycle.safeClose() + lifecycle.cleanup() } - // Safety net: cleanup even if consumer never reads - const lifetimeMs = opts?.lifetimeMs ?? DEFAULT_LIFETIME_TIMEOUT_MS - lifetimeTimeoutHandle = setTimeout(() => { - if (!cleanedUp && !isStreamClosed) { - console.warn( - `SSR stream transform exceeded maximum lifetime (${lifetimeMs}ms), forcing cleanup`, - ) - safeError(new Error('Stream lifetime exceeded')) - cleanup() - } - }, lifetimeMs) - if (!serializationAlreadyFinished) { stopListeningToInjectedHtml = router.subscribe('onInjectedHtml', () => { - if (cleanedUp || isStreamClosed) return + if (lifecycle.isCleanedUp() || lifecycle.isClosed()) return const html = router.serverSsr?.takeBufferedHtml() if (!html) return - if (!streamBarrierLifted || isAppRendering || leftover || pendingClosingTags) { + if ( + state.barrierPhase === 'locked' || + state.streamPhase === 'streaming' || + leftover || + state.tailPhase === 'afterBody' + ) { appendRouterHtml(html) } else { - safeEnqueue(html) + lifecycle.safeEnqueue(html) } }) stopListeningToSerializationFinished = router.subscribe( 'onSerializationFinished', () => { - serializationFinished = true + state.serializationDone = true tryFinish() }, ) @@ -410,42 +410,72 @@ export function transformStreamWithRouter( // Transform the appStream ;(async () => { const reader = appStream.getReader() - appReader = reader + lifecycle.setReader(reader) try { while (true) { const { done, value } = await reader.read() if (done) break - if (cleanedUp || isStreamClosed) return + if (lifecycle.isCleanedUp() || lifecycle.isClosed()) return - const text = - typeof value === 'string' - ? value - : ArrayBuffer.isView(value) - ? textDecoder.decode(value as Uint8Array, { stream: true }) - : String(value) + const rawUint8Chunk = value instanceof Uint8Array ? value : undefined + + if ( + rawUint8Chunk && + !leftover && + !pendingRouterHtml && + state.tailPhase === 'normal' && + state.barrierPhase === 'lifted' + ) { + if (rawUint8Chunk.indexOf(60) === -1) { + lifecycle.safeEnqueue(rawUint8Chunk) + continue + } + } + + let text: string + if (typeof value === 'string') { + text = value + } else if (ArrayBuffer.isView(value)) { + const view = value as Uint8Array + if (!isAscii(view)) { + usedBinaryDecoder = true + text = textDecoder.decode(view, { stream: true }) + } else { + text = Buffer.from( + view.buffer, + view.byteOffset, + view.byteLength, + ).toString('utf8') + } + } else { + text = String(value) + } // Fast path: most chunks have no pending left-over. const chunkString = leftover ? leftover + text : text - const barrierFoundInChunk = - !streamBarrierLifted && chunkString.includes(TSR_SCRIPT_BARRIER_ID) + const barrierMarkerIndex = + state.barrierPhase === 'locked' + ? chunkString.indexOf(TSR_SCRIPT_BARRIER_ID) + : -1 - if (barrierFoundInChunk) { - const barrierMarkerIndex = chunkString.indexOf(TSR_SCRIPT_BARRIER_ID) + if (barrierMarkerIndex !== -1) { const barrierScriptClosedInChunk = - barrierMarkerIndex !== -1 && chunkString.indexOf(SCRIPT_END_TAG, barrierMarkerIndex) !== -1 - streamBarrierLifted = true + state.barrierPhase = 'lifted' router.serverSsr?.liftScriptBarrier() const barrierChunkBodyEndIndex = chunkString.indexOf(BODY_END_TAG) if (barrierChunkBodyEndIndex !== -1) { - safeEnqueue(chunkString.slice(0, barrierChunkBodyEndIndex)) + lifecycle.safeEnqueue( + chunkString.slice(0, barrierChunkBodyEndIndex), + ) pendingClosingTags = chunkString.slice(barrierChunkBodyEndIndex) + state.tailPhase = 'afterBody' } else { - safeEnqueue(chunkString) + lifecycle.safeEnqueue(chunkString) } if (barrierScriptClosedInChunk) { @@ -456,7 +486,7 @@ export function transformStreamWithRouter( continue } - if (pendingClosingTags) { + if (state.tailPhase === 'afterBody') { pendingClosingTags += chunkString leftover = '' continue @@ -465,24 +495,38 @@ export function transformStreamWithRouter( const bodyEndIndex = chunkString.indexOf(BODY_END_TAG) if (bodyEndIndex !== -1) { - safeEnqueue(chunkString.slice(0, bodyEndIndex)) + lifecycle.safeEnqueue(chunkString.slice(0, bodyEndIndex)) pendingClosingTags = chunkString.slice(bodyEndIndex) + state.tailPhase = 'afterBody' flushPendingRouterHtml() leftover = '' continue } + // Fast non-mutating passthrough in normal path: no pending injected HTML, + // no buffered carry-over, and no structural rewrite needed for this chunk. + if (!pendingRouterHtml && !leftover) { + if (rawUint8Chunk) { + lifecycle.safeEnqueue(rawUint8Chunk) + } else { + lifecycle.safeEnqueue(chunkString) + } + continue + } + const lastClosingTagEnd = findLastClosingTagEnd(chunkString) if (lastClosingTagEnd > 0) { - safeEnqueue(chunkString.slice(0, lastClosingTagEnd)) + lifecycle.safeEnqueue(chunkString.slice(0, lastClosingTagEnd)) flushPendingRouterHtml() leftover = chunkString.slice(lastClosingTagEnd) if (leftover.length > MAX_LEFTOVER_CHARS) { // Ensure bounded memory even if a consumer streams long text sequences // without any closing tags. This may reduce injection granularity but is correct. - safeEnqueue(leftover.slice(0, leftover.length - MAX_LEFTOVER_CHARS)) + lifecycle.safeEnqueue( + leftover.slice(0, leftover.length - MAX_LEFTOVER_CHARS), + ) leftover = leftover.slice(-MAX_LEFTOVER_CHARS) } } else { @@ -491,7 +535,7 @@ export function transformStreamWithRouter( const combined = chunkString if (combined.length > MAX_LEFTOVER_CHARS) { const flushUpto = combined.length - MAX_LEFTOVER_CHARS - safeEnqueue(combined.slice(0, flushUpto)) + lifecycle.safeEnqueue(combined.slice(0, flushUpto)) leftover = combined.slice(flushUpto) } else { leftover = combined @@ -499,42 +543,41 @@ export function transformStreamWithRouter( } } - if (cleanedUp || isStreamClosed) return + if (lifecycle.isCleanedUp() || lifecycle.isClosed()) return - isAppRendering = false + state.streamPhase = 'appDone' router.serverSsr?.setRenderFinished() - if (serializationFinished) { + if (state.serializationDone) { tryFinish() } else { const timeoutMs = opts?.timeoutMs ?? DEFAULT_SERIALIZATION_TIMEOUT_MS serializationTimeoutHandle = setTimeout(() => { - if (!cleanedUp && !isStreamClosed) { + if (!lifecycle.isCleanedUp() && !lifecycle.isClosed()) { console.error('Serialization timeout after app render finished') - safeError( + lifecycle.safeError( new Error('Serialization timeout after app render finished'), ) - cleanup() + lifecycle.cleanup() } }, timeoutMs) } } catch (error) { - if (cleanedUp) return + if (lifecycle.isCleanedUp()) return console.error('Error reading appStream:', error) - isAppRendering = false + state.streamPhase = 'appDone' router.serverSsr?.setRenderFinished() - safeError(error) - cleanup() + lifecycle.safeError(error) + lifecycle.cleanup() } finally { - appReader = undefined - reader.releaseLock() + lifecycle.setReader(undefined) } })().catch((error) => { - if (cleanedUp) return + if (lifecycle.isCleanedUp()) return console.error('Error in stream transform:', error) - safeError(error) - cleanup() + lifecycle.safeError(error) + lifecycle.cleanup() }) - return stream + return lifecycle.stream } diff --git a/packages/router-core/tests/transformStreamWithRouter.test.ts b/packages/router-core/tests/transformStreamWithRouter.test.ts index 75b723ec35f..84bfb0bb9c1 100644 --- a/packages/router-core/tests/transformStreamWithRouter.test.ts +++ b/packages/router-core/tests/transformStreamWithRouter.test.ts @@ -10,6 +10,7 @@ function createRouterMock(options?: { let bufferedHtml = options?.initialBufferedHtml ?? '' let serializationFinished = options?.serializationFinished ?? false let cleanedUp = false + let setRenderFinishedCalls = 0 const router = { subscribe(event: string, listener: () => void) { @@ -38,6 +39,7 @@ function createRouterMock(options?: { router.emit({ type: 'onInjectedHtml' }) }, setRenderFinished() { + setRenderFinishedCalls++ serializationFinished = true router.emit({ type: 'onSerializationFinished' }) }, @@ -53,6 +55,9 @@ function createRouterMock(options?: { wasCleanedUp() { return cleanedUp }, + getSetRenderFinishedCalls() { + return setRenderFinishedCalls + }, } } @@ -178,6 +183,26 @@ describe('transformStreamWithRouter', () => { expect(html).toContain('') }) + it('calls setRenderFinished in fast path when app stream completes', async () => { + const { router, getSetRenderFinishedCalls } = createRouterMock({ + serializationFinished: true, + }) + const encoder = new TextEncoder() + + const appStream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode('done')) + controller.close() + }, + }) + + const transformed = transformStreamWithRouter(router, appStream) + const html = await readAll(transformed) + + expect(html).toContain('done') + expect(getSetRenderFinishedCalls()).toBe(1) + }) + it('flushes pendingRouterHtml when barrier and land in the same chunk', async () => { // when the barrier script and appear in the same chunk, // scripts buffered in pendingRouterHtml (e.g. from initialBufferedHtml) must be From 21d442e6a5c8a6b725de67ba113d5d2c4ce0aacc Mon Sep 17 00:00:00 2001 From: Manuel Schiller Date: Wed, 4 Mar 2026 21:06:25 +0100 Subject: [PATCH 4/5] add benchmark script --- .../transformStreamWithRouter.profile.mjs | 148 ++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 packages/router-core/tests/transformStreamWithRouter.profile.mjs diff --git a/packages/router-core/tests/transformStreamWithRouter.profile.mjs b/packages/router-core/tests/transformStreamWithRouter.profile.mjs new file mode 100644 index 00000000000..7fa63aee48d --- /dev/null +++ b/packages/router-core/tests/transformStreamWithRouter.profile.mjs @@ -0,0 +1,148 @@ +// @ts-nocheck +import { ReadableStream } from 'node:stream/web' +import { transformStreamWithRouter } from '../dist/esm/ssr/transformStreamWithRouter.js' +import { TSR_SCRIPT_BARRIER_ID } from '../dist/esm/ssr/constants.js' + +function createRouterMock(options = {}) { + const listeners = new Map() + let bufferedHtml = options.initialBufferedHtml ?? '' + let serializationFinished = options.serializationFinished ?? false + + const router = { + subscribe(event, listener) { + const set = listeners.get(event) ?? new Set() + set.add(listener) + listeners.set(event, set) + return () => { + set.delete(listener) + } + }, + emit(event) { + listeners.get(event)?.forEach((listener) => listener()) + }, + serverSsr: { + isSerializationFinished() { + return serializationFinished + }, + takeBufferedHtml() { + if (!bufferedHtml) return undefined + const html = bufferedHtml + bufferedHtml = '' + return html + }, + injectHtml(html) { + bufferedHtml += html + router.emit('onInjectedHtml') + }, + setRenderFinished() { + serializationFinished = true + router.emit('onSerializationFinished') + }, + liftScriptBarrier() {}, + cleanup() {}, + }, + } + + return router +} + +const encoder = new TextEncoder() + +function createChunkedStream(chunks, onEnqueue) { + return new ReadableStream({ + start(controller) { + for (let i = 0; i < chunks.length; i++) { + controller.enqueue(chunks[i]) + onEnqueue?.(i) + } + controller.close() + }, + }) +} + +async function consumeStream(stream) { + const reader = stream.getReader() + let totalBytes = 0 + while (true) { + const { done, value } = await reader.read() + if (done) break + totalBytes += value.byteLength + } + return totalBytes +} + +function makeHtmlChunk(size) { + const body = 'x'.repeat(Math.max(0, size - 64)) + return `
${body}
` +} + +async function runFastPathWorkload(iterations = 500) { + const binaryChunks = Array.from({ length: 120 }, () => + encoder.encode(makeHtmlChunk(4096)), + ) + + let total = 0 + for (let i = 0; i < iterations; i++) { + const router = createRouterMock({ serializationFinished: true }) + const appStream = createChunkedStream(binaryChunks) + const transformed = transformStreamWithRouter(router, appStream) + total += await consumeStream(transformed) + } + + return total +} + +async function runStreamingWorkload(iterations = 300) { + const headChunk = '
content
' + const barrierChunk = `` + const tailChunk = '
' + + let total = 0 + for (let i = 0; i < iterations; i++) { + const router = createRouterMock({ serializationFinished: false }) + + const appStream = createChunkedStream( + [ + encoder.encode(headChunk), + encoder.encode(barrierChunk), + encoder.encode(makeHtmlChunk(2048)), + encoder.encode(makeHtmlChunk(2048)), + encoder.encode(tailChunk), + ], + (index) => { + if (index <= 3) { + router.serverSsr.injectHtml( + ``, + ) + } + }, + ) + + const transformed = transformStreamWithRouter(router, appStream) + total += await consumeStream(transformed) + } + + return total +} + +async function main() { + const fast = await runFastPathWorkload() + const streaming = await runStreamingWorkload() + process.stdout.write( + JSON.stringify( + { + mode: 'transformStreamWithRouter.profile', + fast, + streaming, + total: fast + streaming, + }, + null, + 2, + ) + '\n', + ) +} + +main().catch((error) => { + console.error(error) + process.exitCode = 1 +}) From 82faa556fa9916ed15554d6a380f23520ec69452 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 4 Mar 2026 20:07:47 +0000 Subject: [PATCH 5/5] ci: apply automated fixes --- packages/router-core/tests/transformStreamWithRouter.bench.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/router-core/tests/transformStreamWithRouter.bench.ts b/packages/router-core/tests/transformStreamWithRouter.bench.ts index d0a372938a0..8272070b3db 100644 --- a/packages/router-core/tests/transformStreamWithRouter.bench.ts +++ b/packages/router-core/tests/transformStreamWithRouter.bench.ts @@ -150,4 +150,4 @@ describe('transformStreamWithRouter bench - streaming path', () => { const transformed = transformStreamWithRouter(router, appStream) await consumeStream(transformed) }) -}) \ No newline at end of file +})