Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions packages/next/src/server/pipe-readable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ function createWriterFromResponse(
): WritableStream<Uint8Array> {
let started = false

// Create a promise that will resolve once the response has drained. See
// https://nodejs.org/api/stream.html#stream_event_drain
let drained = new DetachedPromise<void>()
// Lazily created on first backpressure event. For typical responses
// (< ~64KB), res.write() never returns false so this is never allocated.
let drained: DetachedPromise<void> | undefined

function onDrain() {
drained.resolve()
drained?.resolve()
}
res.on('drain', onDrain)

// If the finish event fires, it means we shouldn't block and wait for the
// drain event.
// If the response closes, resolve any pending drain wait and stop listening.
res.once('close', () => {
res.off('drain', onDrain)
drained.resolve()
drained?.resolve()
})

// Create a promise that will resolve once the response has finished. See
Expand All @@ -41,6 +40,13 @@ function createWriterFromResponse(
finished.resolve()
})

// Cache the flush check once instead of per-chunk. The `flush` method is
// added by the `compression` middleware and won't appear/disappear mid-request.
const flushFn =
'flush' in res && typeof (res as any).flush === 'function'
? (res as any).flush.bind(res)
: null

// Create a writable stream that will write to the response.
return new WritableStream<Uint8Array>({
write: async (chunk) => {
Expand Down Expand Up @@ -73,6 +79,7 @@ function createWriterFromResponse(
NextNodeServerSpan.startResponse,
{
spanName: 'start response',
hideSpan: true,
},
() => undefined
)
Expand All @@ -81,15 +88,18 @@ function createWriterFromResponse(
try {
const ok = res.write(chunk)

// Added by the `compression` middleware, this is a function that will
// flush the partially-compressed response to the client.
if ('flush' in res && typeof res.flush === 'function') {
res.flush()
// Flush the partially-compressed response to the client.
if (flushFn) {
flushFn()
}

// If the write returns false, it means there's some backpressure, so
// wait until it's streamed before continuing.
if (!ok) {
if (!drained) {
drained = new DetachedPromise<void>()
res.on('drain', onDrain)
}
await drained.promise

// Reset the drained promise so that we can wait for the next drain event.
Expand Down
Loading