Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/interceptors/ClientRequest/NodeClientRequest.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,37 @@ it('does not send request body to the original server given mocked response', as
const text = await getIncomingMessageBody(response)
expect(text).toBe('mock created!')
})


it('abort the request when the interceptor is disposed', async () => {
const emitter = new AsyncEventEmitter<HttpRequestEventMap>()
const request = new NodeClientRequest(
normalizeClientRequestArgs('http:', httpServer.http.url('/write'), {
method: 'POST',
}),
{
emitter,
logger,
}
)

emitter.on('request', async ({ request }) => {
await sleep(200)
request.respondWith(new Response('mock created!', { status: 301 }))
})

request.write('one')
request.write('two')
request.end()

const responseReceived = new DeferredPromise<IncomingMessage>()
request.on('response', (response) => {
responseReceived.resolve(response)
})
const response = await responseReceived

expect(response.statusCode).toBe(301)

const text = await getIncomingMessageBody(response)
expect(text).toBe('mock created!')
})
22 changes: 20 additions & 2 deletions src/interceptors/ClientRequest/NodeClientRequest.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { ClientRequest, IncomingMessage } from 'http'
import type { Logger } from '@open-draft/logger'
import { until } from '@open-draft/until'
import { invariant } from 'outvariant'
import type { ClientRequestEmitter } from '.'
import { AbortControllerManager } from '../../utils/AbortControllerManager'
import {
ClientRequestEndCallback,
ClientRequestEndChunk,
Expand Down Expand Up @@ -38,6 +40,7 @@ export class NodeClientRequest extends ClientRequest {
'EAI_AGAIN',
]

private forgetSignal: () => void
private response: IncomingMessage
private emitter: ClientRequestEmitter
private logger: Logger
Expand All @@ -55,7 +58,22 @@ export class NodeClientRequest extends ClientRequest {
[url, requestOptions, callback]: NormalizedClientRequestArgs,
options: NodeClientOptions
) {
super(requestOptions, callback)
const augmentedRequestOptions = { ...requestOptions }

if (!augmentedRequestOptions.signal) {
const abortController = new AbortController()
augmentedRequestOptions.signal = abortController.signal
}

super(augmentedRequestOptions, callback)

const controllerManager = new AbortControllerManager()

const { signal } = augmentedRequestOptions
invariant(signal, "Missing AbortSignal")

controllerManager.registerSignal(signal);
this.forgetSignal = () => controllerManager.forgetSignal(signal)

this.logger = options.logger.extend(
`request ${requestOptions.method} ${url.href}`
Expand Down Expand Up @@ -182,6 +200,7 @@ export class NodeClientRequest extends ClientRequest {
return mockedResponse
}).then((resolverResult) => {
this.logger.info('the listeners promise awaited!')
this.forgetSignal()

/**
* @fixme We are in the "end()" method that still executes in parallel
Expand Down Expand Up @@ -235,7 +254,6 @@ export class NodeClientRequest extends ClientRequest {
})

this.logger.info('request (mock) is completed')

return this
}

Expand Down
1 change: 1 addition & 0 deletions src/interceptors/ClientRequest/http.get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export function get(protocol: Protocol, options: NodeClientOptions) {
`${protocol}:`,
...args
)

const request = new NodeClientRequest(clientRequestArgs, options)

/**
Expand Down
1 change: 1 addition & 0 deletions src/interceptors/ClientRequest/http.request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export function request(protocol: Protocol, options: NodeClientOptions) {
`${protocol}:`,
...args
)

return new NodeClientRequest(clientRequestArgs, options)
}
}
204 changes: 167 additions & 37 deletions src/interceptors/ClientRequest/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,57 +1,187 @@
import { it, expect, beforeAll, afterAll } from 'vitest'
import { describe, it, expect, beforeAll, afterAll, beforeEach, afterEach } from 'vitest'
import http from 'http'
import { HttpServer } from '@open-draft/test-server/http'
import { DeferredPromise } from '@open-draft/deferred-promise'
import { ClientRequestInterceptor } from '.'
import { AbortControllerManager } from '../../utils/AbortControllerManager'

const httpServer = new HttpServer((app) => {
app.get('/', (_req, res) => {
res.status(200).send('/')
describe('ClientRequestInterceptor', () => {
const httpServer = new HttpServer((app) => {
app.get('/', (_req, res) => {
res.status(200).send('/')
})
app.get('/get', (_req, res) => {
res.status(200).send('/get')
})
})
app.get('/get', (_req, res) => {
res.status(200).send('/get')

const interceptor = new ClientRequestInterceptor()

beforeAll(async () => {
await httpServer.listen()
})

afterAll(async () => {
await httpServer.close()
})

beforeEach(() => {
interceptor.apply()
})

afterEach(() => {
interceptor.dispose()
})

it('forbids calling "respondWith" multiple times for the same request', async () => {
const requestUrl = httpServer.http.url('/')

interceptor.on('request', function firstRequestListener({ request }) {
request.respondWith(new Response())
})

const secondRequestEmitted = new DeferredPromise<void>()
interceptor.on('request', function secondRequestListener({ request }) {
expect(() =>
request.respondWith(new Response(null, { status: 301 }))
).toThrow(
`Failed to respond to "GET ${requestUrl}" request: the "request" event has already been responded to.`
)

secondRequestEmitted.resolve()
})

const request = http.get(requestUrl)
await secondRequestEmitted

const responseReceived = new DeferredPromise<http.IncomingMessage>()
request.on('response', (response) => {
responseReceived.resolve(response)
})

const response = await responseReceived
expect(response.statusCode).toBe(200)
expect(response.statusMessage).toBe('')
})

it('add an AbortSignal to the request if missing', async () => {
const requestUrl = httpServer.http.url('/')

const requestEmitted = new DeferredPromise<void>()
interceptor.on('request', function requestListener({ request }) {
expect(request.signal).toBeInstanceOf(AbortSignal)
requestEmitted.resolve()
})

http.get(requestUrl)
await requestEmitted
})
})

const interceptor = new ClientRequestInterceptor()
it('keeps the existing AbortSignal if the request had one', async () => {
const requestUrl = httpServer.http.url('/')
const controller = new AbortController()

/**
* For some reason, controller.signal !== request.signal, some kind of un/wrapping must be happening.
* Because of that, we test that aborting from the user controller aborts the request
*/

beforeAll(async () => {
interceptor.apply()
await httpServer.listen()
})
const requestEmitted = new DeferredPromise<void>()
interceptor.on('request', function requestListener({ request }) {
expect(request.signal).toBeInstanceOf(AbortSignal)
requestEmitted.resolve()
})

afterAll(async () => {
interceptor.dispose()
await httpServer.close()
})
const request = http.get(requestUrl, { signal: controller.signal })
await requestEmitted

it('forbids calling "respondWith" multiple times for the same request', async () => {
const requestUrl = httpServer.http.url('/')
const requestAborted = new DeferredPromise<void>()
request.on('error', (err) => {
expect(err.name).toEqual('AbortError')
requestAborted.resolve()
})

interceptor.on('request', function firstRequestListener({ request }) {
request.respondWith(new Response())
controller.abort()
await requestAborted
})

const secondRequestEmitted = new DeferredPromise<void>()
interceptor.on('request', function secondRequestListener({ request }) {
expect(() =>
request.respondWith(new Response(null, { status: 301 }))
).toThrow(
`Failed to respond to "GET ${requestUrl}" request: the "request" event has already been responded to.`
)
it('abort ongoing requests when disposed', async () => {
const requestUrl = httpServer.http.url('/')

const requestEmitted = new DeferredPromise<void>()
interceptor.on('request', function requestListener() {
requestEmitted.resolve()
})

const controller = new AbortController()
const requestWithoutUserController = http.get(requestUrl)
const requestWithUserController = http.get(requestUrl, { signal: controller.signal })

const requests = [requestWithoutUserController, requestWithUserController]

secondRequestEmitted.resolve()
const requestsAborted = requests.map(request => {
const requestAborted = new DeferredPromise<void>()
request.on('error', (err) => {
expect(err.name).toEqual('AbortError')
requestAborted.resolve()
})

return requestAborted
})

await requestEmitted
interceptor.dispose()
await Promise.all(requestsAborted)
})

const request = http.get(requestUrl)
await secondRequestEmitted
it('abort upcoming requests when disposed', async () => {
const requestUrl = httpServer.http.url('/')

interceptor.on('request', function requestListener() {
expect.fail('the request should never be sent, yet intercepted')
})

const controller = new AbortController()
const requestWithoutUserController = http.request(requestUrl)
const requestWithUserController = http.request(requestUrl, { signal: controller.signal })

const requests = [requestWithoutUserController, requestWithUserController]

const responseReceived = new DeferredPromise<http.IncomingMessage>()
request.on('response', (response) => {
responseReceived.resolve(response)
const requestsAborted = requests.map(request => {
const requestAborted = new DeferredPromise<void>()
request.on('error', (err) => {
expect(err.name).toEqual('AbortError')
requestAborted.resolve()
})

return requestAborted
})

interceptor.dispose()
requests.forEach(request => request.end())

await Promise.all(requestsAborted)
})

const response = await responseReceived
expect(response.statusCode).toBe(200)
expect(response.statusMessage).toBe('')
})
it('signal is forgotten when the request ends', async () => {
const requestUrl = httpServer.http.url('/')

interceptor.on('request', function requestListener({ request }) {
request.respondWith(new Response())
})

const controller = new AbortController()
const request = http.get(requestUrl, { signal: controller.signal })

const responseReceived = new DeferredPromise<http.IncomingMessage>()
request.on('response', (response) => {
responseReceived.resolve(response)
})

await responseReceived

const manager = new AbortControllerManager()
expect(manager.isRegistered(controller)).toBeFalsy()
expect(manager.isReferenced(controller)).toBeFalsy()
})
})
7 changes: 6 additions & 1 deletion src/interceptors/ClientRequest/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import http from 'http'
import https from 'https'
import { HttpRequestEventMap } from '../../glossary'
import { Interceptor } from '../../Interceptor'
import { AbortControllerManager } from '../../utils/AbortControllerManager'
import { AsyncEventEmitter } from '../../utils/AsyncEventEmitter'
import { get } from './http.get'
import { request } from './http.request'
Expand All @@ -21,7 +22,6 @@ export class ClientRequestInterceptor extends Interceptor<HttpRequestEventMap> {

constructor() {
super(ClientRequestInterceptor.interceptorSymbol)

this.modules = new Map()
this.modules.set('http', http)
this.modules.set('https', https)
Expand All @@ -30,6 +30,11 @@ export class ClientRequestInterceptor extends Interceptor<HttpRequestEventMap> {
protected setup(): void {
const logger = this.logger.extend('setup')

const controllerManager = new AbortControllerManager()
this.subscriptions.push(() => controllerManager.dispose())

controllerManager.decorate()

for (const [protocol, requestModule] of this.modules) {
const { request: pureRequest, get: pureGet } = requestModule

Expand Down
Loading