From ef980f00286eaf4e59f3e0e0426087a3fff577a3 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Fri, 26 Jun 2026 11:45:18 -0700 Subject: [PATCH] Backport fatal connection attempt handling --- __tests__/negative.test.ts | 40 ++++++++++++++++++++++++++++++++++++++ package-lock.json | 4 ++-- package.json | 2 +- transport/client.ts | 15 ++++++++++++-- 4 files changed, 56 insertions(+), 5 deletions(-) diff --git a/__tests__/negative.test.ts b/__tests__/negative.test.ts index 50c3accb..af5a92b0 100644 --- a/__tests__/negative.test.ts +++ b/__tests__/negative.test.ts @@ -107,6 +107,46 @@ describe('should handle incompatabilities', async () => { }); }); + test('fatal connection attempt error should prevent reconnection', async () => { + class FatalConnectionError extends Error { + constructor(message: string) { + super(message); + this.name = 'FatalConnectionError'; + } + } + + const createClientSpy = vi.fn(() => + Promise.reject(new FatalConnectionError('fake fatal connection failure')), + ); + const clientTransport = new WebSocketClientTransport( + createClientSpy, + 'client', + { + isFatalConnectionError: (err) => err instanceof FatalConnectionError, + }, + ); + const serverTransport = new WebSocketServerTransport(wss, 'SERVER'); + const errMock = vi.fn(); + clientTransport.addEventListener('protocolError', errMock); + addPostTestCleanup(async () => { + clientTransport.removeEventListener('protocolError', errMock); + await cleanupTransports([clientTransport, serverTransport]); + }); + + clientTransport.connect(serverTransport.clientId); + await vi.runAllTimersAsync(); + + expect(clientTransport.reconnectOnConnectionDrop).toBe(false); + expect(clientTransport.sessions.size).toBe(0); + expect(createClientSpy).toHaveBeenCalledTimes(1); + expect(errMock).not.toHaveBeenCalled(); + + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + }); + }); + test('calling connect consecutively should reuse the same connection', async () => { let connectCalls = 0; const clientTransport = new WebSocketClientTransport( diff --git a/package-lock.json b/package-lock.json index 239e8208..0822c1e4 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.216.2", + "version": "0.216.3", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.216.2", + "version": "0.216.3", "license": "MIT", "dependencies": { "@bufbuild/protobuf": "^2.11.0", diff --git a/package.json b/package.json index fc4c5676..c9059bbb 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.216.2", + "version": "0.216.3", "type": "module", "exports": { ".": { diff --git a/transport/client.ts b/transport/client.ts index 0d5d876c..f362894f 100644 --- a/transport/client.ts +++ b/transport/client.ts @@ -132,8 +132,19 @@ export abstract class ClientTransport< } // listeners - protected onConnectingFailed(session: SessionConnecting) { + protected onConnectingFailed( + session: SessionConnecting, + error?: unknown, + ) { const noConnectionSession = super.onConnectingFailed(session); + + if (error instanceof Error && this.options.isFatalConnectionError(error)) { + this.reconnectOnConnectionDrop = false; + this.deleteSession(noConnectionSession, { unhealthy: true }); + + return noConnectionSession; + } + this.tryReconnecting(noConnectionSession.to); return noConnectionSession; @@ -494,7 +505,7 @@ export abstract class ClientTransport< `error connecting to ${connectingSession.to}: ${errStr}`, connectingSession.loggingMetadata, ); - this.onConnectingFailed(connectingSession); + this.onConnectingFailed(connectingSession, error); }, onConnectionTimeout: () => { this.log?.error(