From 7dba1547b77c651a86991ae4d1a6bb5390f90d99 Mon Sep 17 00:00:00 2001 From: Yagiz Nizipli Date: Fri, 23 Jan 2026 14:59:19 -0500 Subject: [PATCH] add tests for stream standard --- src/workerd/api/tests/BUILD.bazel | 6 + .../tests/streams-standard-coverage-test.js | 1375 +++++++++++++++++ .../streams-standard-coverage-test.wd-test | 18 + 3 files changed, 1399 insertions(+) create mode 100644 src/workerd/api/tests/streams-standard-coverage-test.js create mode 100644 src/workerd/api/tests/streams-standard-coverage-test.wd-test diff --git a/src/workerd/api/tests/BUILD.bazel b/src/workerd/api/tests/BUILD.bazel index 3f3b6f1141c..14f451ad3b0 100644 --- a/src/workerd/api/tests/BUILD.bazel +++ b/src/workerd/api/tests/BUILD.bazel @@ -385,6 +385,12 @@ wd_test( data = ["streams-js-test.js"], ) +wd_test( + src = "streams-standard-coverage-test.wd-test", + args = ["--experimental"], + data = ["streams-standard-coverage-test.js"], +) + wd_test( src = "streams-iocontext-test.wd-test", args = ["--experimental"], diff --git a/src/workerd/api/tests/streams-standard-coverage-test.js b/src/workerd/api/tests/streams-standard-coverage-test.js new file mode 100644 index 00000000000..a22fb9a3ced --- /dev/null +++ b/src/workerd/api/tests/streams-standard-coverage-test.js @@ -0,0 +1,1375 @@ +// Copyright (c) 2025 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +import { strictEqual, ok, throws, rejects, deepStrictEqual } from 'node:assert'; + +// Test read() when stream errors during pull - triggers pending error state +export const readWithPendingError = { + async test() { + let controller; + let pullCount = 0; + + const rs = new ReadableStream({ + start(c) { + controller = c; + }, + pull(c) { + pullCount++; + if (pullCount === 1) { + // First pull - enqueue data and error during same pull + c.enqueue('data'); + // Error the stream while we're inside pull (read is "pending") + c.error(new Error('error during pull')); + } + }, + }); + + const reader = rs.getReader(); + + // First read gets the data + const result1 = await reader.read(); + strictEqual(result1.value, 'data'); + strictEqual(result1.done, false); + + // Second read should get the error + await rejects(reader.read(), { message: 'error during pull' }); + }, +}; + +// Test read() when stream closes during pull - triggers pending close state +export const readWithPendingClose = { + async test() { + let controller; + let pullCount = 0; + + const rs = new ReadableStream({ + start(c) { + controller = c; + }, + pull(c) { + pullCount++; + if (pullCount === 1) { + c.enqueue('data'); + // Close the stream while we're inside pull + c.close(); + } + }, + }); + + const reader = rs.getReader(); + + // First read gets the data + const result1 = await reader.read(); + strictEqual(result1.value, 'data'); + + // Second read should see the stream is closed + const result2 = await reader.read(); + strictEqual(result2.done, true); + }, +}; + +// Test cancel() with pending state +export const cancelWithPendingState = { + async test() { + let controller; + + const rs = new ReadableStream({ + start(c) { + controller = c; + }, + pull(c) { + // Close during pull to set pending state + c.close(); + }, + }); + + const reader = rs.getReader(); + // Start a read to trigger pull which sets pending close state + const readPromise = reader.read(); + + // Release reader so we can cancel + reader.releaseLock(); + + // Cancel should handle pending state + await rs.cancel('cancel reason'); + + // The read should complete (with done=true due to close) + // Note: after releaseLock, pending reads are rejected + }, +}; + +// Test Response.text() with JS-backed ReadableStream +// This exercises AllReader internally +export const responseTextWithJsStream = { + async test() { + const rs = new ReadableStream({ + start(c) { + c.enqueue(new TextEncoder().encode('hello ')); + c.enqueue(new TextEncoder().encode('world')); + c.close(); + }, + }); + + const response = new Response(rs); + const text = await response.text(); + strictEqual(text, 'hello world'); + }, +}; + +// Test Response.arrayBuffer() with JS-backed ReadableStream +export const responseArrayBufferWithJsStream = { + async test() { + const rs = new ReadableStream({ + start(c) { + c.enqueue(new Uint8Array([1, 2, 3])); + c.enqueue(new Uint8Array([4, 5, 6])); + c.close(); + }, + }); + + const response = new Response(rs); + const buffer = await response.arrayBuffer(); + const arr = new Uint8Array(buffer); + deepStrictEqual([...arr], [1, 2, 3, 4, 5, 6]); + }, +}; + +// Test Response.text() with stream that errors +export const responseTextWithErroringStream = { + async test() { + const rs = new ReadableStream({ + start(c) { + c.enqueue(new TextEncoder().encode('partial')); + c.error(new Error('stream error')); + }, + }); + + const response = new Response(rs); + await rejects(response.text(), { message: 'stream error' }); + }, +}; + +// Test Response.text() with byte stream +export const responseTextWithByteStream = { + async test() { + const rs = new ReadableStream({ + type: 'bytes', + start(c) { + c.enqueue(new TextEncoder().encode('byte stream text')); + c.close(); + }, + }); + + const response = new Response(rs); + const text = await response.text(); + strictEqual(text, 'byte stream text'); + }, +}; + +// Test pipeTo basic flow +export const pipeToBasic = { + async test() { + const chunks = []; + const rs = new ReadableStream({ + start(c) { + c.enqueue('a'); + c.enqueue('b'); + c.enqueue('c'); + c.close(); + }, + }); + + const ws = new WritableStream({ + write(chunk) { + chunks.push(chunk); + }, + }); + + await rs.pipeTo(ws); + deepStrictEqual(chunks, ['a', 'b', 'c']); + }, +}; + +// Test pipeTo with source error and preventAbort +export const pipeToSourceErrorPreventAbort = { + async test() { + let aborted = false; + const rs = new ReadableStream({ + start(c) { + c.enqueue('data'); + }, + pull(c) { + c.error(new Error('source failed')); + }, + }); + + const ws = new WritableStream({ + write(chunk) {}, + abort(reason) { + aborted = true; + }, + }); + + await rejects(rs.pipeTo(ws, { preventAbort: true }), { + message: 'source failed', + }); + strictEqual(aborted, false); + }, +}; + +// Test pipeTo with sink error and preventCancel +export const pipeToSinkErrorPreventCancel = { + async test() { + let canceled = false; + const rs = new ReadableStream({ + start(c) { + c.enqueue('data'); + }, + cancel() { + canceled = true; + }, + }); + + const ws = new WritableStream({ + write(chunk) { + throw new Error('sink failed'); + }, + }); + + await rejects(rs.pipeTo(ws, { preventCancel: true }), { + message: 'sink failed', + }); + strictEqual(canceled, false); + }, +}; + +// Test pipeTo with preventClose +export const pipeToPreventClose = { + async test() { + let closed = false; + const rs = new ReadableStream({ + start(c) { + c.enqueue('data'); + c.close(); + }, + }); + + const ws = new WritableStream({ + write(chunk) {}, + close() { + closed = true; + }, + }); + + await rs.pipeTo(ws, { preventClose: true }); + strictEqual(closed, false); + + // We can still write to the writable since it wasn't closed + const writer = ws.getWriter(); + await writer.write('more'); + await writer.close(); + }, +}; + +// Test pipeTo with AbortSignal +export const pipeToWithSignal = { + async test() { + const controller = new AbortController(); + let writeCount = 0; + + const rs = new ReadableStream({ + pull(c) { + c.enqueue(writeCount++); + if (writeCount > 100) c.close(); + }, + }); + + const ws = new WritableStream({ + write(chunk) { + if (chunk === 2) { + // Abort after receiving a few chunks + controller.abort(); + } + }, + }); + + await rejects(rs.pipeTo(ws, { signal: controller.signal }), { + name: 'AbortError', + }); + }, +}; + +// Test tee on already errored stream +export const teeErroredStream = { + async test() { + const error = new Error('stream errored'); + const rs = new ReadableStream({ + start(controller) { + controller.error(error); + }, + }); + + const [branch1, branch2] = rs.tee(); + + await rejects(branch1.getReader().read(), { message: 'stream errored' }); + await rejects(branch2.getReader().read(), { message: 'stream errored' }); + }, +}; + +// Test tee on already closed stream +export const teeClosedStream = { + async test() { + const rs = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + const [branch1, branch2] = rs.tee(); + + const result1 = await branch1.getReader().read(); + const result2 = await branch2.getReader().read(); + + strictEqual(result1.done, true); + strictEqual(result2.done, true); + }, +}; + +// Test tee on byte stream that errors +export const teeErroredByteStream = { + async test() { + const error = new Error('byte stream errored'); + const rs = new ReadableStream({ + type: 'bytes', + start(controller) { + controller.error(error); + }, + }); + + const [branch1, branch2] = rs.tee(); + + await rejects(branch1.getReader().read(), { + message: 'byte stream errored', + }); + await rejects(branch2.getReader().read(), { + message: 'byte stream errored', + }); + }, +}; + +// Test BYOB read on closed stream returns empty view +export const byobReadOnClosedStream = { + async test() { + const rs = new ReadableStream({ + type: 'bytes', + start(controller) { + controller.close(); + }, + }); + + const reader = rs.getReader({ mode: 'byob' }); + const result = await reader.read(new Uint8Array(10)); + + strictEqual(result.done, true); + ok(result.value instanceof Uint8Array); + strictEqual(result.value.byteLength, 0); + }, +}; + +// Test BYOB read with zero-length buffer throws +export const byobReadZeroLengthBuffer = { + async test() { + const rs = new ReadableStream({ + type: 'bytes', + start(controller) { + controller.enqueue(new Uint8Array([1, 2, 3])); + }, + }); + + const reader = rs.getReader({ mode: 'byob' }); + await rejects(reader.read(new Uint8Array(0)), TypeError); + }, +}; + +// Test getDesiredSize on closed stream +export const getDesiredSizeOnClosedStream = { + async test() { + let controller; + const rs = new ReadableStream({ + start(c) { + controller = c; + }, + }); + + controller.close(); + strictEqual(controller.desiredSize, 0); + + const reader = rs.getReader(); + const result = await reader.read(); + strictEqual(result.done, true); + }, +}; + +// Test getDesiredSize on errored stream returns null +export const getDesiredSizeOnErroredStream = { + test() { + let controller; + const rs = new ReadableStream({ + start(c) { + controller = c; + c.error(new Error('test error')); + }, + }); + + strictEqual(controller.desiredSize, null); + }, +}; + +// Test canCloseOrEnqueue returns false for closed stream +export const canCloseOrEnqueueOnClosedStream = { + test() { + let controller; + const rs = new ReadableStream({ + start(c) { + controller = c; + c.close(); + }, + }); + + throws(() => controller.enqueue('test'), TypeError); + throws(() => controller.close(), TypeError); + }, +}; + +// Test canCloseOrEnqueue returns false for errored stream +export const canCloseOrEnqueueOnErroredStream = { + test() { + let controller; + const rs = new ReadableStream({ + start(c) { + controller = c; + c.error(new Error('test error')); + }, + }); + + throws(() => controller.enqueue('test'), TypeError); + throws(() => controller.close(), TypeError); + }, +}; + +// Test hasBackpressure +export const hasBackpressureTest = { + async test() { + let controller; + const rs = new ReadableStream( + { + start(c) { + controller = c; + }, + }, + { highWaterMark: 1 } + ); + + strictEqual(controller.desiredSize, 1); + controller.enqueue('a'); + strictEqual(controller.desiredSize, 0); + controller.enqueue('b'); + strictEqual(controller.desiredSize, -1); + }, +}; + +// Test byte stream controller states +export const byteStreamControllerStates = { + test() { + // Closed byte stream + { + let controller; + const rs = new ReadableStream({ + type: 'bytes', + start(c) { + controller = c; + c.close(); + }, + }); + strictEqual(controller.desiredSize, 0); + } + + // Errored byte stream + { + let controller; + const rs = new ReadableStream({ + type: 'bytes', + start(c) { + controller = c; + c.error(new Error('test')); + }, + }); + strictEqual(controller.desiredSize, null); + } + }, +}; + +// Test WritableStream abort +export const writableStreamAbort = { + async test() { + let abortReason; + const ws = new WritableStream({ + abort(reason) { + abortReason = reason; + }, + }); + + const writer = ws.getWriter(); + await writer.abort('abort reason'); + + strictEqual(abortReason, 'abort reason'); + }, +}; + +// Test WritableStream backpressure +export const writableStreamBackpressure = { + async test() { + let resolveWrite; + const ws = new WritableStream( + { + write() { + return new Promise((resolve) => { + resolveWrite = resolve; + }); + }, + }, + { highWaterMark: 1 } + ); + + const writer = ws.getWriter(); + + strictEqual(writer.desiredSize, 1); + const writePromise = writer.write('a'); + strictEqual(writer.desiredSize, 0); + const writePromise2 = writer.write('b'); + strictEqual(writer.desiredSize, -1); + + resolveWrite(); + await writePromise; + resolveWrite(); + await writePromise2; + }, +}; + +// Test WritableStream close while writing +export const writableStreamCloseWhileWriting = { + async test() { + let resolveWrite; + const ws = new WritableStream({ + write() { + return new Promise((resolve) => { + resolveWrite = resolve; + }); + }, + }); + + const writer = ws.getWriter(); + const writePromise = writer.write('test'); + const closePromise = writer.close(); + + resolveWrite(); + await writePromise; + await closePromise; + }, +}; + +// Test ReadableStream.from with async generator +export const readableStreamFromAsyncGenerator = { + async test() { + async function* gen() { + yield 1; + yield 2; + yield 3; + } + + const rs = ReadableStream.from(gen()); + const reader = rs.getReader(); + + strictEqual((await reader.read()).value, 1); + strictEqual((await reader.read()).value, 2); + strictEqual((await reader.read()).value, 3); + strictEqual((await reader.read()).done, true); + }, +}; + +// Test ReadableStream.from with sync iterable +export const readableStreamFromSyncIterable = { + async test() { + const rs = ReadableStream.from([1, 2, 3]); + const reader = rs.getReader(); + + strictEqual((await reader.read()).value, 1); + strictEqual((await reader.read()).value, 2); + strictEqual((await reader.read()).value, 3); + strictEqual((await reader.read()).done, true); + }, +}; + +// Test error thrown in start algorithm +export const errorInStartAlgorithm = { + async test() { + const rs = new ReadableStream({ + start() { + throw new Error('start error'); + }, + }); + + const reader = rs.getReader(); + await rejects(reader.read(), { message: 'start error' }); + }, +}; + +// Test error thrown in pull algorithm +export const errorInPullAlgorithm = { + async test() { + const rs = new ReadableStream({ + pull() { + throw new Error('pull error'); + }, + }); + + const reader = rs.getReader(); + await rejects(reader.read(), { message: 'pull error' }); + }, +}; + +// Test error thrown in cancel algorithm +export const errorInCancelAlgorithm = { + async test() { + const rs = new ReadableStream({ + cancel() { + throw new Error('cancel error'); + }, + }); + + await rejects(rs.cancel(), { message: 'cancel error' }); + }, +}; + +// Test multiple concurrent reads on value stream +export const multipleConcurrentReads = { + async test() { + let pullCount = 0; + const rs = new ReadableStream({ + pull(c) { + c.enqueue(++pullCount); + if (pullCount >= 3) c.close(); + }, + }); + + const reader = rs.getReader(); + + // Start multiple reads concurrently + const [r1, r2, r3, r4] = await Promise.all([ + reader.read(), + reader.read(), + reader.read(), + reader.read(), + ]); + + strictEqual(r1.value, 1); + strictEqual(r2.value, 2); + strictEqual(r3.value, 3); + strictEqual(r4.done, true); + }, +}; + +// Tests targeting specific uncovered lines + +// Test to trigger forcePullIfNeeded (line 1170-1171) +// This happens when there's a pending BYOB read request +export const forcePullWithByobRequest = { + async test() { + let controller; + const rs = new ReadableStream( + { + type: 'bytes', + start(c) { + controller = c; + }, + pull(c) { + // Respond with less data than requested to keep BYOB request alive + if (c.byobRequest) { + const view = c.byobRequest.view; + view[0] = 42; + c.byobRequest.respond(1); + } + }, + }, + { highWaterMark: 0 } + ); + + const reader = rs.getReader({ mode: 'byob' }); + const result = await reader.read(new Uint8Array(10)); + + strictEqual(result.value[0], 42); + strictEqual(result.value.byteLength, 1); + }, +}; + +// Test consumerCount edge case (lines 954-967) +// This is triggered when checking consumer count during certain operations +export const consumerCountEdgeCase = { + async test() { + const rs = new ReadableStream({ + start(c) { + c.enqueue('data'); + c.close(); + }, + }); + + // Tee creates multiple consumers + const [branch1, branch2] = rs.tee(); + + // Read from both branches + const reader1 = branch1.getReader(); + const reader2 = branch2.getReader(); + + const [r1, r2] = await Promise.all([reader1.read(), reader2.read()]); + + strictEqual(r1.value, 'data'); + strictEqual(r2.value, 'data'); + }, +}; + +// Test cancel on already closed stream +export const cancelClosedStream = { + async test() { + const rs = new ReadableStream({ + start(controller) { + controller.close(); + }, + }); + + // Cancel should resolve immediately for closed stream + await rs.cancel(); + }, +}; + +// Test cancel on already errored stream +export const cancelErroredStream = { + async test() { + const error = new Error('already errored'); + const rs = new ReadableStream({ + start(controller) { + controller.error(error); + }, + }); + + // Cancel should reject with the stream's error + await rejects(rs.cancel(), { message: 'already errored' }); + }, +}; + +// Test Response.text() with stream that returns empty chunks +export const responseTextWithEmptyChunks = { + async test() { + const rs = new ReadableStream({ + start(c) { + c.enqueue(new TextEncoder().encode('hello')); + c.enqueue(new Uint8Array(0)); // Empty chunk - should be skipped + c.enqueue(new TextEncoder().encode(' world')); + c.close(); + }, + }); + + const response = new Response(rs); + const text = await response.text(); + strictEqual(text, 'hello world'); + }, +}; + +// Test Response.arrayBuffer() with stream that returns non-bytes +export const responseArrayBufferWithNonBytes = { + async test() { + // Create a stream that returns a non-byte value + // This is tricky because workerd streams are typically byte-oriented + // We need to use internal APIs or specific conditions + const rs = new ReadableStream({ + start(c) { + c.enqueue(new Uint8Array([1, 2, 3])); + c.close(); + }, + }); + + const response = new Response(rs); + const buffer = await response.arrayBuffer(); + strictEqual(new Uint8Array(buffer).length, 3); + }, +}; + +// Test BYOB read completes successfully +export const byobReadSuccess = { + async test() { + const rs = new ReadableStream({ + type: 'bytes', + start(c) { + c.enqueue(new Uint8Array([1, 2, 3])); + c.close(); + }, + }); + + const reader = rs.getReader({ mode: 'byob' }); + const result = await reader.read(new Uint8Array(10)); + ok(result.value.byteLength > 0); + strictEqual(result.value[0], 1); + }, +}; + +// Test multiple BYOB reads to exercise different code paths +export const multipleByobReads = { + async test() { + let pullCount = 0; + const rs = new ReadableStream({ + type: 'bytes', + pull(c) { + pullCount++; + if (c.byobRequest) { + const view = c.byobRequest.view; + new Uint8Array(view.buffer, view.byteOffset, view.byteLength).set([ + pullCount, + ]); + c.byobRequest.respond(1); + } else { + c.enqueue(new Uint8Array([pullCount])); + } + if (pullCount >= 3) { + c.close(); + } + }, + }); + + const reader = rs.getReader({ mode: 'byob' }); + + const r1 = await reader.read(new Uint8Array(5)); + strictEqual(r1.value[0], 1); + + const r2 = await reader.read(new Uint8Array(5)); + strictEqual(r2.value[0], 2); + + const r3 = await reader.read(new Uint8Array(5)); + strictEqual(r3.value[0], 3); + + const r4 = await reader.read(new Uint8Array(5)); + strictEqual(r4.done, true); + }, +}; + +// Test concurrent reads where one triggers close during pull +export const concurrentReadsWithClose = { + async test() { + let pullCount = 0; + let controller; + + const rs = new ReadableStream({ + start(c) { + controller = c; + }, + pull(c) { + pullCount++; + if (pullCount === 1) { + c.enqueue('first'); + } else if (pullCount === 2) { + c.enqueue('second'); + c.close(); + } + }, + }); + + const reader = rs.getReader(); + + // Start two reads - second one should see the close + const [r1, r2, r3] = await Promise.all([ + reader.read(), + reader.read(), + reader.read(), + ]); + + strictEqual(r1.value, 'first'); + strictEqual(r2.value, 'second'); + strictEqual(r3.done, true); + }, +}; + +// Test concurrent reads where one triggers error during pull +export const concurrentReadsWithError = { + async test() { + let pullCount = 0; + + const rs = new ReadableStream({ + pull(c) { + pullCount++; + if (pullCount === 1) { + c.enqueue('first'); + } else { + c.error(new Error('pull error')); + } + }, + }); + + const reader = rs.getReader(); + + const r1 = await reader.read(); + strictEqual(r1.value, 'first'); + + // Next read should get the error + await rejects(reader.read(), { message: 'pull error' }); + }, +}; + +// Test pipeTo where source errors mid-stream +export const pipeToWithMidStreamError = { + async test() { + let pullCount = 0; + + const rs = new ReadableStream({ + pull(c) { + pullCount++; + if (pullCount <= 2) { + c.enqueue(new TextEncoder().encode(`chunk${pullCount}`)); + } else { + c.error(new Error('source error mid-stream')); + } + }, + }); + + const chunks = []; + const ws = new WritableStream({ + write(chunk) { + chunks.push(new TextDecoder().decode(chunk)); + }, + }); + + await rejects(rs.pipeTo(ws), { message: 'source error mid-stream' }); + strictEqual(chunks.length, 2); + }, +}; + +// Test pipeTo where sink errors mid-stream +export const pipeToWithSinkError = { + async test() { + let writeCount = 0; + + const rs = new ReadableStream({ + pull(c) { + c.enqueue(new TextEncoder().encode('data')); + }, + }); + + const ws = new WritableStream({ + write(chunk) { + writeCount++; + if (writeCount > 2) { + throw new Error('sink error mid-stream'); + } + }, + }); + + await rejects(rs.pipeTo(ws), { message: 'sink error mid-stream' }); + ok(writeCount >= 2); + }, +}; + +// Test pipeTo with all prevention options +export const pipeToWithAllPreventOptions = { + async test() { + let sourceCanceled = false; + let sinkAborted = false; + let sinkClosed = false; + + const rs = new ReadableStream({ + start(c) { + c.enqueue(new TextEncoder().encode('data')); + c.close(); + }, + cancel() { + sourceCanceled = true; + }, + }); + + const ws = new WritableStream({ + close() { + sinkClosed = true; + }, + abort() { + sinkAborted = true; + }, + }); + + await rs.pipeTo(ws, { + preventClose: true, + preventAbort: true, + preventCancel: true, + }); + + strictEqual(sourceCanceled, false); + strictEqual(sinkAborted, false); + strictEqual(sinkClosed, false); + }, +}; + +// Test byte stream with autoAllocateChunkSize +export const byteStreamWithAutoAllocate = { + async test() { + let pullCount = 0; + const rs = new ReadableStream({ + type: 'bytes', + autoAllocateChunkSize: 1024, + pull(c) { + pullCount++; + // With autoAllocateChunkSize, byobRequest should be available + if (c.byobRequest) { + const view = c.byobRequest.view; + new Uint8Array(view.buffer, view.byteOffset, 3).set([1, 2, 3]); + c.byobRequest.respond(3); + } else { + c.enqueue(new Uint8Array([1, 2, 3])); + } + if (pullCount >= 2) { + c.close(); + } + }, + }); + + const reader = rs.getReader(); + const r1 = await reader.read(); + ok(r1.value instanceof Uint8Array); + strictEqual(r1.value.length, 3); + + const r2 = await reader.read(); + ok(r2.value instanceof Uint8Array); + }, +}; + +// Test byte stream without autoAllocateChunkSize using default reader +export const byteStreamDefaultReaderNoAutoAllocate = { + async test() { + let pullCount = 0; + const rs = new ReadableStream({ + type: 'bytes', + // No autoAllocateChunkSize - pull must use enqueue + pull(c) { + pullCount++; + c.enqueue(new Uint8Array([pullCount])); + if (pullCount >= 2) { + c.close(); + } + }, + }); + + const reader = rs.getReader(); // Default reader, not BYOB + const r1 = await reader.read(); + strictEqual(r1.value[0], 1); + + const r2 = await reader.read(); + strictEqual(r2.value[0], 2); + }, +}; + +// Test draining read via Response.text with multiple chunks +export const drainingReadMultipleChunks = { + async test() { + let pullCount = 0; + + const rs = new ReadableStream({ + pull(c) { + pullCount++; + if (pullCount <= 3) { + c.enqueue(new TextEncoder().encode(`chunk${pullCount}`)); + } else { + c.close(); + } + }, + }); + + const response = new Response(rs); + const text = await response.text(); + strictEqual(text, 'chunk1chunk2chunk3'); + }, +}; + +// Test BYOB reader with various TypedArray types +export const byobReaderWithVariousTypes = { + async test() { + const makeStream = () => + new ReadableStream({ + type: 'bytes', + start(c) { + c.enqueue(new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7])); + c.close(); + }, + }); + + // Test with Uint8Array + { + const rs = makeStream(); + const reader = rs.getReader({ mode: 'byob' }); + const result = await reader.read(new Uint8Array(4)); + strictEqual(result.value.byteLength, 4); + } + + // Test with Uint16Array + { + const rs = makeStream(); + const reader = rs.getReader({ mode: 'byob' }); + const result = await reader.read(new Uint16Array(2)); + strictEqual(result.value.byteLength, 4); + } + + // Test with DataView + { + const rs = makeStream(); + const reader = rs.getReader({ mode: 'byob' }); + const result = await reader.read(new DataView(new ArrayBuffer(4))); + strictEqual(result.value.byteLength, 4); + } + }, +}; + +// Test releasing reader while read is pending +export const releaseReaderWhileReadPending = { + async test() { + let pullResolver; + + const rs = new ReadableStream({ + pull() { + return new Promise((resolve) => { + pullResolver = resolve; + }); + }, + }); + + const reader = rs.getReader(); + const readPromise = reader.read(); + + // Release the reader while read is pending + reader.releaseLock(); + + // The pending read should be rejected + await rejects(readPromise, TypeError); + + // Resolve the pull to clean up + if (pullResolver) pullResolver(); + }, +}; + +// Test stream that yields many small chunks +export const manySmallChunks = { + async test() { + let count = 0; + const rs = new ReadableStream({ + pull(c) { + count++; + c.enqueue(new Uint8Array([count])); + if (count >= 100) { + c.close(); + } + }, + }); + + const response = new Response(rs); + const buffer = await response.arrayBuffer(); + strictEqual(new Uint8Array(buffer).length, 100); + }, +}; + +// Tests targeting AllReader non-byte value error path + +// Test Response.arrayBuffer() with stream that returns a non-byte value (string) +// This should trigger the "did not return bytes" error in AllReader +export const responseArrayBufferWithStringChunk = { + async test() { + const rs = new ReadableStream({ + start(c) { + // Enqueue a string instead of bytes - this should cause AllReader to error + c.enqueue('this is not bytes'); + c.close(); + }, + }); + + const response = new Response(rs); + await rejects(response.arrayBuffer(), { + message: 'This ReadableStream did not return bytes.', + }); + }, +}; + +// Test Response.text() with stream that returns non-byte value (number) +export const responseTextWithNumberChunk = { + async test() { + const rs = new ReadableStream({ + start(c) { + // Enqueue a number instead of bytes + c.enqueue(12345); + c.close(); + }, + }); + + const response = new Response(rs); + await rejects(response.text(), { + message: 'This ReadableStream did not return bytes.', + }); + }, +}; + +// Test Response.arrayBuffer() with stream that returns an object +export const responseArrayBufferWithObjectChunk = { + async test() { + const rs = new ReadableStream({ + start(c) { + // Enqueue an object instead of bytes + c.enqueue({ foo: 'bar' }); + c.close(); + }, + }); + + const response = new Response(rs); + await rejects(response.arrayBuffer(), { + message: 'This ReadableStream did not return bytes.', + }); + }, +}; + +// Tests targeting ByteReadable canceled state + +// Test BYOB read on a stream that was canceled before the read +export const byobReadOnCanceledStream = { + async test() { + const rs = new ReadableStream({ + type: 'bytes', + start(c) { + c.enqueue(new Uint8Array([1, 2, 3])); + }, + }); + + // Cancel the stream first + await rs.cancel('test cancel'); + + // Get reader and read - should return done since stream is canceled + const reader = rs.getReader({ mode: 'byob' }); + const result = await reader.read(new Uint8Array(10)); + strictEqual(result.done, true); + }, +}; + +// Test default read on a canceled byte stream +export const defaultReadOnCanceledByteStream = { + async test() { + const rs = new ReadableStream({ + type: 'bytes', + start(c) { + c.enqueue(new Uint8Array([1, 2, 3])); + }, + }); + + // Cancel the stream first + await rs.cancel('test cancel'); + + // Get reader and read - should return done since stream is canceled + const reader = rs.getReader(); + const result = await reader.read(); + strictEqual(result.done, true); + }, +}; + +// Test tee on a stream where source closes during read +export const teeWithSourceCloseDuringRead = { + async test() { + const rs = new ReadableStream({ + start(c) { + c.enqueue('data'); + c.close(); + }, + }); + + // Tee the stream + const [branch1, branch2] = rs.tee(); + + const reader1 = branch1.getReader(); + const reader2 = branch2.getReader(); + + // Read from both branches + const [r1, r2] = await Promise.all([reader1.read(), reader2.read()]); + + strictEqual(r1.value, 'data'); + strictEqual(r2.value, 'data'); + }, +}; + +// Test reader closed promise resolution +export const readerClosedPromise = { + async test() { + const rs = new ReadableStream({ + start(c) { + c.close(); + }, + }); + + const reader = rs.getReader(); + await reader.closed; + }, +}; + +// Test reader closed promise rejection on error +export const readerClosedPromiseError = { + async test() { + const rs = new ReadableStream({ + start(c) { + c.error(new Error('stream error')); + }, + }); + + const reader = rs.getReader(); + await rejects(reader.closed, { message: 'stream error' }); + }, +}; + +// Test writer closed promise resolution +export const writerClosedPromise = { + async test() { + const ws = new WritableStream(); + const writer = ws.getWriter(); + await writer.close(); + await writer.closed; + }, +}; + +// Test writer closed promise rejection on error +export const writerClosedPromiseError = { + async test() { + const ws = new WritableStream({ + write() { + throw new Error('write error'); + }, + }); + + const writer = ws.getWriter(); + + // First trigger the write error + await rejects(writer.write('test'), { message: 'write error' }); + + // Then check that closed also rejects + await rejects(writer.closed, { message: 'write error' }); + }, +}; + +// Test BYOB reader closed promise +export const byobReaderClosedPromise = { + async test() { + const rs = new ReadableStream({ + type: 'bytes', + start(c) { + c.close(); + }, + }); + + const reader = rs.getReader({ mode: 'byob' }); + await reader.closed; + }, +}; diff --git a/src/workerd/api/tests/streams-standard-coverage-test.wd-test b/src/workerd/api/tests/streams-standard-coverage-test.wd-test new file mode 100644 index 00000000000..0047e224e07 --- /dev/null +++ b/src/workerd/api/tests/streams-standard-coverage-test.wd-test @@ -0,0 +1,18 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + v8Flags = ["--expose-gc"], + services = [ + ( name = "streams-standard-coverage-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "streams-standard-coverage-test.js") + ], + compatibilityFlags = [ + "nodejs_compat", + "streams_enable_constructors", + ] + ) + ), + ], +);