From 1ffd06e3715d0b5531b2f7ec3ac47e2a9832f1ec Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Fri, 26 Jun 2026 09:25:26 -0700 Subject: [PATCH] Handle fatal connection attempt errors --- __tests__/negative.test.ts | 40 ++++++++++++++++++++++++++++++++++++++ package-lock.json | 4 ++-- package.json | 2 +- transport/client.ts | 15 ++++++++++++-- 4 files changed, 56 insertions(+), 5 deletions(-) diff --git a/__tests__/negative.test.ts b/__tests__/negative.test.ts index c7d2acd2..5942c118 100644 --- a/__tests__/negative.test.ts +++ b/__tests__/negative.test.ts @@ -107,6 +107,46 @@ describe('should handle incompatabilities', async () => { }); }); + test('fatal connection attempt error should prevent reconnection', async () => { + class FatalConnectionError extends Error { + constructor(message: string) { + super(message); + this.name = 'FatalConnectionError'; + } + } + + const createClientSpy = vi.fn(() => + Promise.reject(new FatalConnectionError('fake fatal connection failure')), + ); + const clientTransport = new WebSocketClientTransport( + createClientSpy, + 'client', + { + isFatalConnectionError: (err) => err instanceof FatalConnectionError, + }, + ); + const serverTransport = new WebSocketServerTransport(wss, 'SERVER'); + const errMock = vi.fn(); + clientTransport.addEventListener('protocolError', errMock); + addPostTestCleanup(async () => { + clientTransport.removeEventListener('protocolError', errMock); + await cleanupTransports([clientTransport, serverTransport]); + }); + + clientTransport.connect(serverTransport.clientId); + await vi.runAllTimersAsync(); + + expect(clientTransport.reconnectOnConnectionDrop).toBe(false); + expect(clientTransport.sessions.size).toBe(0); + expect(createClientSpy).toHaveBeenCalledTimes(1); + expect(errMock).not.toHaveBeenCalled(); + + await testFinishesCleanly({ + clientTransports: [clientTransport], + serverTransport, + }); + }); + test('calling connect consecutively should reuse the same connection', async () => { let connectCalls = 0; const clientTransport = new WebSocketClientTransport( diff --git a/package-lock.json b/package-lock.json index 0274cf1f..9476671f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/river", - "version": "0.217.2", + "version": "0.219.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@replit/river", - "version": "0.217.2", + "version": "0.219.0", "license": "MIT", "dependencies": { "@bufbuild/protobuf": "^2.11.0", diff --git a/package.json b/package.json index f91a8965..747a2584 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@replit/river", "description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!", - "version": "0.218.0", + "version": "0.219.0", "type": "module", "exports": { ".": "./dist/router/index.js", diff --git a/transport/client.ts b/transport/client.ts index 8f61bfda..4f9e440a 100644 --- a/transport/client.ts +++ b/transport/client.ts @@ -238,8 +238,19 @@ export abstract class ClientTransport< } // listeners - protected onConnectingFailed(session: SessionConnecting) { + protected onConnectingFailed( + session: SessionConnecting, + error?: unknown, + ) { const noConnectionSession = super.onConnectingFailed(session); + + if (error instanceof Error && this.options.isFatalConnectionError(error)) { + this.reconnectOnConnectionDrop = false; + this.deleteSession(noConnectionSession, { unhealthy: true }); + + return noConnectionSession; + } + this.tryReconnecting(noConnectionSession.to); return noConnectionSession; @@ -613,7 +624,7 @@ export abstract class ClientTransport< `error connecting to ${connectingSession.to}: ${errStr}`, connectingSession.loggingMetadata, ); - this.onConnectingFailed(connectingSession); + this.onConnectingFailed(connectingSession, error); }, onConnectionTimeout: () => { this.log?.error(