From ccee62a508e020180ba920512e53a3726f2fb83a Mon Sep 17 00:00:00 2001 From: MK Date: Thu, 23 Oct 2025 22:46:17 +0800 Subject: [PATCH] feat: support Blob, Response close https://github.com/eggjs/egg/issues/5638 --- packages/koa/src/application.ts | 62 ++-- packages/koa/src/context.ts | 26 +- packages/koa/src/index.ts | 24 +- packages/koa/src/request.ts | 6 +- packages/koa/src/response.ts | 52 ++- packages/koa/src/utils.ts | 17 + .../koa/test/__snapshots__/index.test.ts.snap | 3 + packages/koa/test/application/respond.test.ts | 299 ++++++++++++++++++ packages/koa/test/response/body.test.ts | 6 +- 9 files changed, 438 insertions(+), 57 deletions(-) create mode 100644 packages/koa/src/utils.ts diff --git a/packages/koa/src/application.ts b/packages/koa/src/application.ts index 638af59783..968acd0a6d 100644 --- a/packages/koa/src/application.ts +++ b/packages/koa/src/application.ts @@ -9,15 +9,13 @@ import { isGeneratorFunction } from 'is-type-of'; import onFinished from 'on-finished'; import statuses from 'statuses'; import compose from 'koa-compose'; - import { HttpError } from 'http-errors'; -import { Context } from './context.ts'; -import { Request } from './request.ts'; -import { Response } from './response.ts'; -import type { CustomError, AnyProto } from './types.ts'; -// Re-export for external use -export { Context, Request, Response }; +import { KoaContext } from './context.ts'; +import { KoaRequest } from './request.ts'; +import { KoaResponse } from './response.ts'; +import type { CustomError, AnyProto } from './types.ts'; +import { isStream } from './utils.ts'; const debug = debuglog('egg/koa/application'); @@ -25,7 +23,7 @@ const debug = debuglog('egg/koa/application'); export type ProtoImplClass = new (...args: any[]) => T; export type Next = () => Promise; type _MiddlewareFunc = (ctx: T, next: Next) => Promise | void; -export type MiddlewareFunc = _MiddlewareFunc & { +export type MiddlewareFunc = _MiddlewareFunc & { _name?: string; }; @@ -47,14 +45,14 @@ export class Application extends Emitter { proxyIpHeader: string; maxIpsCount: number; protected _keys?: string[]; - middleware: MiddlewareFunc[]; - ctxStorage: AsyncLocalStorage; + middleware: MiddlewareFunc[]; + ctxStorage: AsyncLocalStorage; silent: boolean; - ContextClass: ProtoImplClass; + ContextClass: ProtoImplClass; context: AnyProto; - RequestClass: ProtoImplClass; + RequestClass: ProtoImplClass; request: AnyProto; - ResponseClass: ProtoImplClass; + ResponseClass: ProtoImplClass; response: AnyProto; /** @@ -90,11 +88,11 @@ export class Application extends Emitter { this.middleware = []; this.ctxStorage = getAsyncLocalStorage(); this.silent = false; - this.ContextClass = class ApplicationContext extends Context {} as ProtoImplClass; + this.ContextClass = class ApplicationContext extends KoaContext {} as ProtoImplClass; this.context = this.ContextClass.prototype; - this.RequestClass = class ApplicationRequest extends Request {} as ProtoImplClass; + this.RequestClass = class ApplicationRequest extends KoaRequest {} as ProtoImplClass; this.request = this.RequestClass.prototype; - this.ResponseClass = class ApplicationResponse extends Response {} as ProtoImplClass; + this.ResponseClass = class ApplicationResponse extends KoaResponse {} as ProtoImplClass; this.response = this.ResponseClass.prototype; // Set up custom inspect this[util.inspect.custom] = this.inspect.bind(this); @@ -156,7 +154,7 @@ export class Application extends Emitter { /** * Use the given middleware `fn`. */ - use(fn: MiddlewareFunc): this { + use(fn: MiddlewareFunc): this { if (typeof fn !== 'function') throw new TypeError('middleware must be a function!'); const name = fn._name || fn.name || '-'; if (isGeneratorFunction(fn)) { @@ -167,7 +165,7 @@ export class Application extends Emitter { ); } debug('use %o #%d', name, this.middleware.length); - this.middleware.push(fn as MiddlewareFunc); + this.middleware.push(fn as MiddlewareFunc); return this; } @@ -195,7 +193,7 @@ export class Application extends Emitter { /** * return current context from async local storage */ - get currentContext(): Context | undefined { + get currentContext(): KoaContext | undefined { return this.ctxStorage.getStore(); } @@ -203,7 +201,7 @@ export class Application extends Emitter { * Handle request in callback. * @private */ - protected async handleRequest(ctx: Context, fnMiddleware: (ctx: Context) => Promise): Promise { + protected async handleRequest(ctx: KoaContext, fnMiddleware: (ctx: KoaContext) => Promise): Promise { this.emit('request', ctx); const res = ctx.res; res.statusCode = 404; @@ -227,7 +225,7 @@ export class Application extends Emitter { * Initialize a new context. * @private */ - createContext(req: IncomingMessage, res: ServerResponse): Context { + createContext(req: IncomingMessage, res: ServerResponse): KoaContext { const context = new this.ContextClass(this, req, res); return context; } @@ -247,14 +245,13 @@ export class Application extends Emitter { if (this.silent) return; const msg = err.stack || err.toString(); - // oxlint-disable-next-line no-console console.error(`\n${msg.replaceAll(/^/gm, ' ')}\n`); } /** * Response helper. */ - protected _respond(ctx: Context): void { + protected _respond(ctx: KoaContext): void { // allow bypassing koa if (ctx.respond === false) return; @@ -311,8 +308,23 @@ export class Application extends Emitter { res.end(body); return; } - if (body instanceof Stream) { - body.pipe(res); + + // try stream + let stream: Stream.Readable | null = null; + if (body instanceof Blob) { + stream = Stream.Readable.from(body.stream()); + } else if (body instanceof ReadableStream) { + stream = Stream.Readable.from(body); + } else if (body instanceof Response) { + stream = Stream.Readable.from(body?.body ?? ''); + } else if (isStream(body)) { + stream = body; + } + + if (stream) { + Stream.pipeline(stream, res, (err) => { + if (err && ctx.app.listenerCount('error')) ctx.onerror(err); + }); return; } diff --git a/packages/koa/src/context.ts b/packages/koa/src/context.ts index 6382be81d0..5c980f79d1 100644 --- a/packages/koa/src/context.ts +++ b/packages/koa/src/context.ts @@ -8,17 +8,17 @@ import Cookies from 'cookies'; import type { Accepts } from 'accepts'; import type { Application } from './application.ts'; -import type { Request, RequestSocket } from './request.ts'; -import type { Response } from './response.ts'; +import type { KoaRequest, RequestSocket } from './request.ts'; +import type { KoaResponse } from './response.ts'; import type { CustomError, AnyProto } from './types.ts'; -export class Context { +export class KoaContext { [key: symbol | string]: unknown; app: Application; req: IncomingMessage; res: ServerResponse; - request: Request & AnyProto; - response: Response & AnyProto; + request: KoaRequest & AnyProto; + response: KoaResponse & AnyProto; originalUrl: string; respond?: boolean; // oxlint-disable-next-line typescript/no-explicit-any @@ -403,35 +403,35 @@ export class Context { * Response delegation. */ - attachment(...args: Parameters): void { + attachment(...args: Parameters): void { return this.response.attachment(...args); } - redirect(...args: Parameters): void { + redirect(...args: Parameters): void { return this.response.redirect(...args); } - remove(...args: Parameters): void { + remove(...args: Parameters): void { return this.response.remove(...args); } - vary(...args: Parameters): void { + vary(...args: Parameters): void { return this.response.vary(...args); } - has(...args: Parameters): boolean { + has(...args: Parameters): boolean { return this.response.has(...args); } - set(...args: Parameters): void { + set(...args: Parameters): void { return this.response.set(...args); } - append(...args: Parameters): void { + append(...args: Parameters): void { return this.response.append(...args); } - flushHeaders(...args: Parameters): void { + flushHeaders(...args: Parameters): void { return this.response.flushHeaders(...args); } diff --git a/packages/koa/src/index.ts b/packages/koa/src/index.ts index 4a34442bc6..1150189f1b 100644 --- a/packages/koa/src/index.ts +++ b/packages/koa/src/index.ts @@ -3,7 +3,25 @@ import { Application } from './application.ts'; export default Application; export * from './application.ts'; -export * from './context.ts'; -export * from './request.ts'; -export * from './response.ts'; +export { + KoaContext, + /** + * @deprecated Use `KoaContext` instead, keep compatibility with koa + */ + KoaContext as Context, +} from './context.ts'; +export { + KoaRequest, + /** + * @deprecated Use `KoaRequest` instead, keep compatibility with koa + */ + KoaRequest as Request, +} from './request.ts'; +export { + KoaResponse, + /** + * @deprecated Use `KoaResponse` instead, keep compatibility with koa + */ + KoaResponse as Response, +} from './response.ts'; export type { CustomError, AnyProto } from './types.ts'; diff --git a/packages/koa/src/request.ts b/packages/koa/src/request.ts index c8caad13d3..918c893184 100644 --- a/packages/koa/src/request.ts +++ b/packages/koa/src/request.ts @@ -12,19 +12,19 @@ import fresh from 'fresh'; import type { Application } from './application.ts'; import type { Context } from './context.ts'; -import type { Response } from './response.ts'; +import type { KoaResponse } from './response.ts'; export interface RequestSocket extends Socket { encrypted: boolean; } -export class Request { +export class KoaRequest { [key: symbol]: unknown; app: Application; req: IncomingMessage; res: ServerResponse; ctx: Context; - response: Response; + response: KoaResponse; originalUrl: string; constructor(app: Application, ctx: Context, req: IncomingMessage, res: ServerResponse) { diff --git a/packages/koa/src/response.ts b/packages/koa/src/response.ts index 977668f26f..7173c16c0a 100644 --- a/packages/koa/src/response.ts +++ b/packages/koa/src/response.ts @@ -15,18 +15,19 @@ import vary from 'vary'; import encodeUrl from 'encodeurl'; import type { Application } from './application.ts'; -import type { Context } from './context.ts'; -import type { Request } from './request.ts'; +import type { KoaContext } from './context.ts'; +import type { KoaRequest } from './request.ts'; +import { isStream } from './utils.ts'; -export class Response { +export class KoaResponse { [key: symbol]: unknown; app: Application; req: IncomingMessage; res: ServerResponse; - ctx: Context; - request: Request; + ctx: KoaContext; + request: KoaRequest; - constructor(app: Application, ctx: Context, req: IncomingMessage, res: ServerResponse) { + constructor(app: Application, ctx: KoaContext, req: IncomingMessage, res: ServerResponse) { this.app = app; this.req = req; this.res = res; @@ -126,6 +127,13 @@ export class Response { this.remove('Content-Type'); this.remove('Content-Length'); this.remove('Transfer-Encoding'); + + const shouldDestroyOriginal = original && isStream(original); + if (shouldDestroyOriginal) { + // Ignore errors during cleanup to prevent unhandled exceptions when destroying the stream + original.once('error', () => {}); + destroy(original); + } return; } @@ -150,11 +158,9 @@ export class Response { } // stream - if (val instanceof Stream) { + if (isStream(val)) { onFinish(this.res, destroy.bind(null, val)); - // oxlint-disable-next-line eqeqeq if (original != val) { - val.once('error', (err) => this.ctx.onerror(err)); // overwriting if (original !== null && original !== undefined) { this.remove('Content-Length'); @@ -167,6 +173,32 @@ export class Response { return; } + // ReadableStream + if (val instanceof ReadableStream) { + if (setType) this.type = 'bin'; + return; + } + + // blob + if (val instanceof Blob) { + if (setType) this.type = 'bin'; + this.length = val.size; + return; + } + + // Response + if (val instanceof Response) { + this.status = val.status; + if (setType) this.type = 'bin'; + const headers = val.headers; + console.log('headers', headers); + for (const key of headers.keys()) { + this.set(key, headers.get(key)!); + } + + return; + } + // json this.remove('Content-Length'); this.type = 'json'; @@ -193,7 +225,7 @@ export class Response { } const body = this.body; - if (!body || body instanceof Stream) { + if (!body || isStream(body)) { return undefined; } if (typeof body === 'string') { diff --git a/packages/koa/src/utils.ts b/packages/koa/src/utils.ts new file mode 100644 index 0000000000..aa4b7a96fa --- /dev/null +++ b/packages/koa/src/utils.ts @@ -0,0 +1,17 @@ +import Stream from 'node:stream'; + +// https://github.com/koajs/koa/blob/master/lib/is-stream.js +export function isStream(stream: any): stream is Stream.Readable { + return ( + stream instanceof Stream || + (stream !== null && + typeof stream === 'object' && + !!stream.readable && + typeof stream.pipe === 'function' && + typeof stream.read === 'function' && + typeof stream.readable === 'boolean' && + typeof stream.readableObjectMode === 'boolean' && + typeof stream.destroy === 'function' && + typeof stream.destroyed === 'boolean') + ); +} diff --git a/packages/koa/test/__snapshots__/index.test.ts.snap b/packages/koa/test/__snapshots__/index.test.ts.snap index 7527117f47..9e03759690 100644 --- a/packages/koa/test/__snapshots__/index.test.ts.snap +++ b/packages/koa/test/__snapshots__/index.test.ts.snap @@ -9,8 +9,11 @@ exports[`should export Koa class 1`] = ` exports[`should export Koa class 2`] = ` [ "default", + "KoaContext", "Context", + "KoaRequest", "Request", + "KoaResponse", "Response", "Application", ] diff --git a/packages/koa/test/application/respond.test.ts b/packages/koa/test/application/respond.test.ts index 99cfc53cb9..7f1c7836f2 100644 --- a/packages/koa/test/application/respond.test.ts +++ b/packages/koa/test/application/respond.test.ts @@ -8,6 +8,7 @@ import statuses from 'statuses'; import Koa from '../../src/index.ts'; import { once } from 'node:events'; +import type { AddressInfo } from 'node:net'; const pkg = JSON.parse(fs.readFileSync('package.json', 'utf8')); @@ -445,6 +446,116 @@ describe('app.respond', () => { }); }); + describe('when .body is a Response', () => { + it('should keep Response headers', async () => { + const app = new Koa(); + + app.use((ctx) => { + ctx.body = new Response(null, { status: 201, statusText: 'OK', headers: { 'Content-Type': 'text/plain' } }); + }); + + const res = await request(app.callback()) + .head('/') + .expect('content-type', 'text/plain') + .expect('content-length', '2'); + assert.equal(res.status, 201); + }); + + it('should default to octet-stream', () => { + const app = new Koa(); + + app.use((ctx) => { + ctx.body = new Response(null, { status: 200, statusText: 'OK' }); + }); + + return request(app.callback()) + .get('/') + .expect(200) + .expect('content-type', 'application/octet-stream') + .expect(Buffer.from([])); + }); + + it('should respond with body content', async () => { + const app = new Koa(); + + app.use((ctx) => { + ctx.body = new Response('Hello World', { status: 200, headers: { 'Content-Type': 'text/plain' } }); + }); + + const res = await request(app.callback()).get('/').expect(200).expect('content-type', 'text/plain'); + + assert.strictEqual(res.text, 'Hello World'); + }); + + it('should handle Response from fetch() with JSON', async () => { + const app = new Koa(); + + app.use(async (ctx) => { + const jsonData = JSON.stringify({ message: 'Hello from fetch', timestamp: Date.now() }); + const response = new Response(jsonData, { + status: 200, + headers: { + 'Content-Type': 'application/json', + 'X-Custom-Header': 'custom-value', + }, + }); + ctx.body = response; + }); + + const res = await request(app.callback()).get('/').expect(200).expect('content-type', 'application/json'); + + const body = JSON.parse(res.text); + assert.strictEqual(body.message, 'Hello from fetch'); + assert(body.timestamp); + }); + + it('should handle Response from fetch() with streaming body', async () => { + const app = new Koa(); + + app.use(async (ctx) => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('Streaming ')); + controller.enqueue(new TextEncoder().encode('response ')); + controller.enqueue(new TextEncoder().encode('from fetch')); + controller.close(); + }, + }); + + const response = new Response(stream, { + status: 200, + headers: { + 'Content-Type': 'text/plain', + }, + }); + ctx.body = response; + }); + + const res = await request(app.callback()).get('/').expect(200).expect('content-type', 'text/plain'); + + assert.strictEqual(res.text, 'Streaming response from fetch'); + }); + + it('should handle Response from fetch() with Blob body', async () => { + const app = new Koa(); + + app.use(async (ctx) => { + const blob = new Blob(['Hello from Blob'], { type: 'text/plain' }); + const response = new Response(blob, { + status: 200, + headers: { + 'Content-Type': 'text/plain', + }, + }); + ctx.body = response; + }); + + const res = await request(app.callback()).get('/').expect(200).expect('content-type', 'text/plain'); + + assert.strictEqual(res.text, 'Hello from Blob'); + }); + }); + describe('when .body is a string', () => { it('should respond', () => { const app = new Koa(); @@ -602,6 +713,194 @@ describe('app.respond', () => { }); }); + describe('when .body is a ReadableStream', () => { + it('should respond', async () => { + const app = new Koa(); + + app.use(async (ctx) => { + ctx.body = new ReadableStream(); + }); + + return request(app.callback()).head('/').expect(200).expect('content-type', 'application/octet-stream'); + }); + + it('should respond hello', async () => { + const app = new Koa(); + + app.use(async (ctx) => { + const blob = new Blob(['hello']); + ctx.body = blob.stream(); + }); + + return request(app.callback()) + .get('/') + .expect(200) + .expect('content-type', 'application/octet-stream') + .expect(Buffer.from('hello')); + }); + + it('should handle ReadableStream with chunks', async () => { + const app = new Koa(); + + app.use(async (ctx) => { + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('Hello ')); + controller.enqueue(new TextEncoder().encode('World')); + controller.close(); + }, + }); + ctx.body = stream; + }); + + return request(app.callback()) + .get('/') + .expect(200) + .expect('content-type', 'application/octet-stream') + .expect(Buffer.from('Hello World')); + }); + + it('should handle ReadableStream with custom headers', async () => { + const app = new Koa(); + + app.use(async (ctx) => { + ctx.type = 'text/plain'; + ctx.body = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('test content')); + controller.close(); + }, + }); + }); + + const res = await request(app.callback()) + .get('/') + .expect(200) + .expect('content-type', 'text/plain; charset=utf-8'); + + assert.strictEqual(res.text, 'test content'); + }); + }); + + describe('when using pipeline for streams', () => { + it('should handle stream errors when error listener exists', async () => { + const app = new Koa(); + const PassThrough = require('stream').PassThrough; + + let errorCaught = false; + app.once('error', (err) => { + assert(err.message === 'stream error'); + errorCaught = true; + }); + + app.use((ctx) => { + const stream = new PassThrough(); + ctx.body = stream; + + setImmediate(() => { + stream.emit('error', new Error('stream error')); + }); + }); + + await request(app.callback()) + .get('/') + .catch(() => {}); + + await new Promise((resolve) => setTimeout(resolve, 50)); + assert(errorCaught, 'Error should have been caught'); + }); + + it('should not crash when stream errors and no error listener exists', async () => { + const app = new Koa(); + const PassThrough = require('stream').PassThrough; + + app.use((ctx) => { + const stream = new PassThrough(); + ctx.body = stream; + + setImmediate(() => { + stream.emit('error', new Error('stream error')); + }); + }); + + await request(app.callback()) + .get('/') + .catch(() => {}); + + await new Promise((resolve) => setTimeout(resolve, 50)); + }); + + it('should handle ReadableStream errors when error listener exists', async () => { + const app = new Koa(); + + let errorCaught = false; + app.once('error', (err) => { + assert(err.message === 'readable stream error'); + errorCaught = true; + }); + + app.use((ctx) => { + const readable = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('data')); + controller.error(new Error('readable stream error')); + }, + }); + ctx.body = readable; + }); + + await request(app.callback()) + .get('/') + .catch(() => {}); + + await new Promise((resolve) => setTimeout(resolve, 50)); + assert(errorCaught, 'Error should have been caught'); + }); + + it('should cleanup streams on client abort', async () => { + const app = new Koa(); + const PassThrough = require('stream').PassThrough; + const http = require('http'); + + let streamDestroyed = false; + + app.use((ctx) => { + const stream = new PassThrough(); + stream.on('close', () => { + streamDestroyed = true; + }); + ctx.body = stream; + + setImmediate(() => { + stream.write('some data'); + }); + }); + + const server = app.listen(); + + await new Promise((resolve) => { + const req = http.request({ + port: (server.address() as AddressInfo).port, + path: '/', + }); + + req.on('response', (res: any) => { + res.on('data', () => { + req.destroy(); + setTimeout(() => { + server.close(); + resolve(void 0); + }, 50); + }); + }); + + req.end(); + }); + + assert(streamDestroyed, 'Stream should be destroyed on client abort'); + }); + }); + describe('when .body is an Object', () => { it('should respond with json', () => { const app = new Koa(); diff --git a/packages/koa/test/response/body.test.ts b/packages/koa/test/response/body.test.ts index f23f22b23d..738e6d5fc6 100644 --- a/packages/koa/test/response/body.test.ts +++ b/packages/koa/test/response/body.test.ts @@ -109,14 +109,14 @@ describe('res.body=', () => { assert.strictEqual('application/octet-stream', res.header['content-type']); }); - it('should add error handler to the stream, but only once', () => { + it('should not add error handler to stream (handled by pipeline)', () => { const res = response(); const body = new Stream.PassThrough(); assert.strictEqual(body.listenerCount('error'), 0); res.body = body; - assert.strictEqual(body.listenerCount('error'), 1); + assert.strictEqual(body.listenerCount('error'), 0); res.body = body; - assert.strictEqual(body.listenerCount('error'), 1); + assert.strictEqual(body.listenerCount('error'), 0); }); });