From a86a8aa3c28146da77a944db408aafa2cad21911 Mon Sep 17 00:00:00 2001 From: paschal533 Date: Fri, 17 Apr 2026 12:35:27 +0100 Subject: [PATCH] fix(transport-webrtc): fix UnexpectedEOFError race in readCandidatesUntilConnected MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A race condition caused intermittent failures in the WebRTC 'simple' transport compliance test: the listener closes its signaling stream just before the initiator's connectionstatechange fires for the 'connected' state. The initiator then receives EOF on the signaling stream and threw an UnexpectedEOFError, discarding a perfectly good peer connection. Root cause: connectedPromise was declared inside the try block, making it inaccessible in the catch handler for stream read errors. Fix: move connectedPromise outside the try block. In the catch handler, if the PC is not yet 'connected', await connectedPromise — if it resolves we ignore the stream error (the connection succeeded); if it rejects we re-throw the original stream error. --- .../src/private-to-private/util.ts | 23 ++-- packages/transport-webrtc/test/maconn.spec.ts | 118 ++++++++++++++++++ 2 files changed, 134 insertions(+), 7 deletions(-) diff --git a/packages/transport-webrtc/src/private-to-private/util.ts b/packages/transport-webrtc/src/private-to-private/util.ts index 7a4d4894e7..e6cbd73b5c 100644 --- a/packages/transport-webrtc/src/private-to-private/util.ts +++ b/packages/transport-webrtc/src/private-to-private/util.ts @@ -16,10 +16,10 @@ export interface ReadCandidatesOptions extends AbortOptions, LoggerOptions, Prog } export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream: MessageStream, options: ReadCandidatesOptions): Promise => { - try { - const connectedPromise = Promise.withResolvers() - resolveOnConnected(pc, connectedPromise) + const connectedPromise = Promise.withResolvers() + resolveOnConnected(pc, connectedPromise) + try { // read candidates until we are connected or we reach the end of the stream while (true) { // if we connect, stop trying to read from the stream @@ -66,10 +66,19 @@ export const readCandidatesUntilConnected = async (pc: RTCPeerConnection, stream } } } catch (err) { - options.log.error('%s error parsing ICE candidate - %e', options.direction, err) - - if (options.signal?.aborted === true && pc.connectionState !== 'connected') { - throw err + options.log.error('%s error reading ICE candidates - %e', options.direction, err) + + // If the peer connection is not connected, the error may still be + // recoverable — the signaling stream can close just before the + // connectionstatechange event fires. Wait for the connected promise to + // settle: if the PC connects we can ignore the stream error; if it fails + // or was never going to connect, re-throw. + if (pc.connectionState !== 'connected') { + try { + await connectedPromise.promise + } catch { + throw err + } } } } diff --git a/packages/transport-webrtc/test/maconn.spec.ts b/packages/transport-webrtc/test/maconn.spec.ts index 6905f5ca47..5d5273fd00 100644 --- a/packages/transport-webrtc/test/maconn.spec.ts +++ b/packages/transport-webrtc/test/maconn.spec.ts @@ -1,9 +1,13 @@ /* eslint-disable @typescript-eslint/no-unused-expressions */ +import { ConnectionFailedError } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' +import { pbStream, streamPair } from '@libp2p/utils' import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import { stubObject } from 'sinon-ts' +import { Message } from '../src/private-to-private/pb/message.js' +import { readCandidatesUntilConnected } from '../src/private-to-private/util.js' import { toMultiaddrConnection } from '../src/rtcpeerconnection-to-conn.ts' import { RTCPeerConnection } from '../src/webrtc/index.js' import type { CounterGroup } from '@libp2p/interface' @@ -34,3 +38,117 @@ describe('Multiaddr Connection', () => { expect(metrics.increment.calledWith({ close: true })).to.be.true }) }) + +describe('readCandidatesUntilConnected', () => { + it('throws ConnectionFailedError when the peer connection enters failed state', async () => { + const [localStream, remoteStream] = await streamPair() + + const pc: any = { + connectionState: 'checking' as RTCPeerConnectionState, + onconnectionstatechange: null as any + } + + // Schedule ICE failure after a short delay + setTimeout(() => { + pc.connectionState = 'failed' + pc.onconnectionstatechange?.(new Event('connectionstatechange')) + }, 50) + + const messageStream = pbStream(localStream).pb(Message) + + await expect( + readCandidatesUntilConnected(pc, messageStream, { + direction: 'initiator', + signal: AbortSignal.timeout(5000), + log: defaultLogger().forComponent('test:webrtc') + }) + ).to.eventually.be.rejectedWith(ConnectionFailedError) + + await remoteStream.close() + }) + + it('does not fail when peer connection is temporarily disconnected then recovers to connected', async () => { + const [localStream, remoteStream] = await streamPair() + + const pc: any = { + connectionState: 'checking' as RTCPeerConnectionState, + onconnectionstatechange: null as any + } + + // ICE briefly goes disconnected at t=30ms, then recovers at t=60ms + setTimeout(() => { + pc.connectionState = 'disconnected' + pc.onconnectionstatechange?.(new Event('connectionstatechange')) + }, 30) + + setTimeout(() => { + pc.connectionState = 'connected' + pc.onconnectionstatechange?.(new Event('connectionstatechange')) + }, 60) + + const messageStream = pbStream(localStream).pb(Message) + + await expect( + readCandidatesUntilConnected(pc, messageStream, { + direction: 'recipient', + signal: AbortSignal.timeout(5000), + log: defaultLogger().forComponent('test:webrtc') + }) + ).to.eventually.be.undefined + + await remoteStream.close() + }) + + it('throws ConnectionFailedError when peer connection goes disconnected then failed', async () => { + const [localStream, remoteStream] = await streamPair() + + const pc: any = { + connectionState: 'checking' as RTCPeerConnectionState, + onconnectionstatechange: null as any + } + + // ICE goes disconnected at t=30ms, then fails at t=60ms + setTimeout(() => { + pc.connectionState = 'disconnected' + pc.onconnectionstatechange?.(new Event('connectionstatechange')) + }, 30) + + setTimeout(() => { + pc.connectionState = 'failed' + pc.onconnectionstatechange?.(new Event('connectionstatechange')) + }, 60) + + const messageStream = pbStream(localStream).pb(Message) + + await expect( + readCandidatesUntilConnected(pc, messageStream, { + direction: 'recipient', + signal: AbortSignal.timeout(5000), + log: defaultLogger().forComponent('test:webrtc') + }) + ).to.eventually.be.rejectedWith(ConnectionFailedError) + + await remoteStream.close() + }) + + it('returns without error when peer connection reaches connected state', async () => { + const [localStream, remoteStream] = await streamPair() + + const pc: any = { + connectionState: 'connected' as RTCPeerConnectionState, + onconnectionstatechange: null as any + } + + const messageStream = pbStream(localStream).pb(Message) + + void remoteStream.close() + + await expect( + readCandidatesUntilConnected(pc, messageStream, { + direction: 'initiator', + signal: AbortSignal.timeout(5000), + log: defaultLogger().forComponent('test:webrtc') + }) + ).to.eventually.be.undefined + }) +})