From 97fbb7608f54c54d44670a96f616a23ef89db6a7 Mon Sep 17 00:00:00 2001 From: paschal533 Date: Tue, 12 May 2026 16:20:34 +0100 Subject: [PATCH 1/2] refactor(transport-webtransport): align session lifecycle with WebRTC pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The WebTransport maConn now directly owns the WebTransport session, mirroring how RTCPeerConnectionMultiaddrConnection owns RTCPeerConnection in the WebRTC transport. Before this change, the session lived in a closure inside dial() via a cleanUpWTSession callback, requiring a closed flag to coordinate five call sites that could all race to call wt.close(). The wt.closed promise handlers were also wired in dial() rather than in the class responsible for the connection lifecycle. Changes: session-to-conn.ts: - Replace cleanUpWTSession callback with webTransport: WebTransport field - Wire wt.closed.then/.catch in the constructor, mirroring WebRTC's onconnectionstatechange handler — clean close calls onTransportClosed(), error calls abort(err) - sendClose/sendReset call wt.close() directly (with try/catch) instead of delegating to a closure - sessionClosedByUs flag prevents incorrect remote_close metric increments when the local side initiates the close index.ts: - Remove WebTransportSessionCleanup type, cleanUpWTSession closure, closed flag, and the standalone wt.closed.then/.catch block - abortListener delegates to maConn.abort() when maConn exists, otherwise closes wt directly - Catch block guards against double metric-counting when the abort signal fired (timeout label already incremented by the listener) muxer.ts: - onCreateStream: catch DOMException from createBidirectionalStream() and call abort on both the muxer and maConn before rethrowing, triggering the full connection-manager cleanup chain - Incoming stream reader: same abort-on-failure pattern for the inbound path Closes #1982 --- packages/transport-webtransport/src/index.ts | 83 +++++++------------ packages/transport-webtransport/src/muxer.ts | 22 ++++- .../src/session-to-conn.ts | 40 +++++++-- .../transport-webtransport/test/muxer.spec.ts | 77 +++++++++++++++++ 4 files changed, 159 insertions(+), 63 deletions(-) create mode 100644 packages/transport-webtransport/test/muxer.spec.ts diff --git a/packages/transport-webtransport/src/index.ts b/packages/transport-webtransport/src/index.ts index 75f05ff5c1..bb8247860b 100644 --- a/packages/transport-webtransport/src/index.ts +++ b/packages/transport-webtransport/src/index.ts @@ -1,7 +1,7 @@ /** * @packageDocumentation * - * A [libp2p transport](https://docs.libp2p.io/concepts/transports/overview/) based on [WebTransport](https://www.w3.org/TR/webtransport/). + * A [libp2p transport](https://libp2p.io/docs/transports-overview/) based on [WebTransport](https://www.w3.org/TR/webtransport/). * * > ⚠️ **Note** * > @@ -31,13 +31,13 @@ import { noise } from '@chainsafe/libp2p-noise' import { InvalidCryptoExchangeError, InvalidParametersError, serviceCapabilities, transportSymbol } from '@libp2p/interface' import { WebTransport as WebTransportMatcher } from '@multiformats/multiaddr-matcher' import { CustomProgressEvent } from 'progress-events' -import createListener from './listener.js' -import { webtransportMuxer } from './muxer.js' +import createListener from './listener.ts' +import { webtransportMuxer } from './muxer.ts' import { toMultiaddrConnection } from './session-to-conn.ts' -import { isSubset } from './utils/is-subset.js' -import { parseMultiaddr } from './utils/parse-multiaddr.js' +import { isSubset } from './utils/is-subset.ts' +import { parseMultiaddr } from './utils/parse-multiaddr.ts' import { WebTransportMessageStream } from './utils/webtransport-message-stream.ts' -import WebTransport from './webtransport.js' +import WebTransport from './webtransport.ts' import type { Upgrader, Transport, CreateListenerOptions, DialTransportOptions, Listener, ComponentLogger, Logger, Connection, MultiaddrConnection, CounterGroup, Metrics, PeerId, OutboundConnectionUpgradeEvents, PrivateKey } from '@libp2p/interface' import type { Multiaddr } from '@multiformats/multiaddr' import type { MultihashDigest } from 'multiformats/hashes/interface' @@ -53,10 +53,6 @@ export interface WebTransportCertificate { secret: string } -interface WebTransportSessionCleanup { - (metric: string): void -} - export interface WebTransportInit { certificates?: WebTransportCertificate[] } @@ -129,48 +125,27 @@ class WebTransportTransport implements Transport { const { url, certhashes, remotePeer } = parseMultiaddr(ma) let abortListener: (() => void) | undefined let maConn: MultiaddrConnection | undefined - let cleanUpWTSession: WebTransportSessionCleanup = () => {} - let closed = false + let wt: InstanceType | undefined let ready = false let authenticated = false try { this.metrics?.dialerEvents.increment({ pending: true }) - const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { + wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { serverCertificateHashes: certhashes.map(certhash => ({ algorithm: 'sha-256', value: certhash.digest })) }) - cleanUpWTSession = (metric: string) => { - if (closed) { - // already closed session - return - } - - try { - this.metrics?.dialerEvents.increment({ [metric]: true }) - wt.close() - } catch (err) { - this.log.error('error closing wt session - %e', err) - } finally { - // This is how we specify the connection is closed and shouldn't be used. - if (maConn != null) { - maConn.timeline.close = Date.now() - } - - closed = true - } - } - // if the dial is aborted before we are ready, close the WebTransport session abortListener = () => { - if (ready) { - cleanUpWTSession('noise_timeout') + this.metrics?.dialerEvents.increment({ [ready ? 'noise_timeout' : 'ready_timeout']: true }) + if (maConn != null) { + maConn.abort(new Error('dial aborted')) } else { - cleanUpWTSession('ready_timeout') + try { wt?.close() } catch (err) { this.log.error('error aborting wt session - %e', err) } } } options.signal.addEventListener('abort', abortListener, { @@ -187,22 +162,16 @@ class WebTransportTransport implements Transport { ready = true this.metrics?.dialerEvents.increment({ ready: true }) - - // this promise resolves/throws when the session is closed - wt.closed.catch((err: Error) => { - this.log.error('error on remote wt session close - %e', err) - }) - .finally(() => { - cleanUpWTSession('remote_close') - }) - this.metrics?.dialerEvents.increment({ open: true }) + // maConn takes ownership of the session: wires wt.closed lifecycle and + // calls wt.close() directly from sendClose/sendReset maConn = toMultiaddrConnection({ remoteAddr: ma, - cleanUpWTSession, + webTransport: wt, direction: 'outbound', - log: this.components.logger.forComponent('libp2p:webtransport:connection') + log: this.components.logger.forComponent('libp2p:webtransport:connection'), + onSessionClose: (reason) => { this.metrics?.dialerEvents.increment({ [reason]: true }) } }) authenticated = await this.authenticateWebTransport({ @@ -227,12 +196,20 @@ class WebTransportTransport implements Transport { } catch (err: any) { this.log.error('caught wt session err - %e', err) - if (authenticated) { - cleanUpWTSession('upgrade_error') - } else if (ready) { - cleanUpWTSession('noise_error') + if (!options.signal.aborted) { + if (authenticated) { + this.metrics?.dialerEvents.increment({ upgrade_error: true }) + } else if (ready) { + this.metrics?.dialerEvents.increment({ noise_error: true }) + } else { + this.metrics?.dialerEvents.increment({ ready_error: true }) + } + } + + if (maConn != null) { + maConn.abort(err) } else { - cleanUpWTSession('ready_error') + try { wt?.close() } catch (closeErr) { this.log.error('error closing wt session - %e', closeErr) } } throw err diff --git a/packages/transport-webtransport/src/muxer.ts b/packages/transport-webtransport/src/muxer.ts index db7eae9d9b..b6c77d5367 100644 --- a/packages/transport-webtransport/src/muxer.ts +++ b/packages/transport-webtransport/src/muxer.ts @@ -1,7 +1,7 @@ import { AbstractStreamMuxer } from '@libp2p/utils' -import { webtransportBiDiStreamToStream } from './stream.js' +import { webtransportBiDiStreamToStream } from './stream.ts' import type { WebTransportStream } from './stream.ts' -import type WebTransport from './webtransport.js' +import type WebTransport from './webtransport.ts' import type { CreateStreamOptions, MultiaddrConnection, StreamMuxer, StreamMuxerFactory } from '@libp2p/interface' const PROTOCOL = '/webtransport' @@ -43,12 +43,26 @@ class WebTransportStreamMuxer extends AbstractStreamMuxer { } }) .catch(err => { - this.log.error('could not create a new stream - %e', err) + this.log.error('incoming stream reader failed - %e', err) + this.abort(err) + this.maConn.abort(err) }) } async onCreateStream (options: CreateStreamOptions): Promise { - const wtStream = await this.webTransport.createBidirectionalStream() + let wtStream: WebTransportBidirectionalStream + + try { + wtStream = await this.webTransport.createBidirectionalStream() + } catch (err: unknown) { + const error = err instanceof Error ? err : new Error(String(err)) + // The WebTransport session is dead (e.g. after laptop sleep/resume) - + // abort all open streams and trigger the full connection-manager cleanup chain + this.abort(error) + this.maConn.abort(error) + throw error + } + options?.signal?.throwIfAborted() return webtransportBiDiStreamToStream( diff --git a/packages/transport-webtransport/src/session-to-conn.ts b/packages/transport-webtransport/src/session-to-conn.ts index d95624d37c..65d83dd1bb 100644 --- a/packages/transport-webtransport/src/session-to-conn.ts +++ b/packages/transport-webtransport/src/session-to-conn.ts @@ -1,19 +1,37 @@ import { AbstractMultiaddrConnection } from '@libp2p/utils' import type { AbortOptions, MultiaddrConnection } from '@libp2p/interface' import type { AbstractMultiaddrConnectionInit, SendResult } from '@libp2p/utils' +import type WebTransport from './webtransport.ts' import type { Uint8ArrayList } from 'uint8arraylist' export interface WebTransportSessionMultiaddrConnectionInit extends Omit { - cleanUpWTSession(metric: string): void + webTransport: WebTransport + onSessionClose?: (reason: string) => void } class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection { - private cleanUpWTSession: (metric: string) => void + private readonly webTransport: WebTransport + private sessionClosedByUs = false constructor (init: WebTransportSessionMultiaddrConnectionInit) { super(init) - this.cleanUpWTSession = init.cleanUpWTSession + this.webTransport = init.webTransport + + init.webTransport.closed + .then(() => { + if (!this.sessionClosedByUs) { + init.onSessionClose?.('remote_close') + } + this.onTransportClosed() + }) + .catch((err: Error) => { + this.log.error('error on remote wt session close - %e', err) + if (!this.sessionClosedByUs) { + init.onSessionClose?.('remote_close') + } + this.abort(err) + }) } sendData (data: Uint8ArrayList): SendResult { @@ -24,11 +42,21 @@ class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection } sendReset (): void { - this.cleanUpWTSession('abort') + this.sessionClosedByUs = true + try { + this.webTransport.close() + } catch (err) { + this.log.error('error closing wt session - %e', err) + } } async sendClose (options?: AbortOptions): Promise { - this.cleanUpWTSession('close') + this.sessionClosedByUs = true + try { + this.webTransport.close() + } catch (err) { + this.log.error('error closing wt session - %e', err) + } options?.signal?.throwIfAborted() } @@ -42,7 +70,7 @@ class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection } /** - * Convert a socket into a MultiaddrConnection + * Convert a WebTransport session into a MultiaddrConnection * https://github.com/libp2p/interface-transport#multiaddrconnection */ export const toMultiaddrConnection = (init: WebTransportSessionMultiaddrConnectionInit): MultiaddrConnection => { diff --git a/packages/transport-webtransport/test/muxer.spec.ts b/packages/transport-webtransport/test/muxer.spec.ts new file mode 100644 index 0000000000..49f6b35b62 --- /dev/null +++ b/packages/transport-webtransport/test/muxer.spec.ts @@ -0,0 +1,77 @@ +import { defaultLogger } from '@libp2p/logger' +import { expect } from 'aegir/chai' +import Sinon from 'sinon' +import { stubInterface } from 'sinon-ts' +import { webtransportMuxer } from '../src/muxer.js' +import type { MultiaddrConnection } from '@libp2p/interface' + +/** + * Builds a minimal WebTransport-like object suitable for unit tests. + * The real WebTransport API is browser-only, so we construct a plain + * object that satisfies the interface used by WebTransportStreamMuxer. + */ +function makeWt (opts: { + createBidirectionalStream?: () => Promise + incomingBidirectionalStreams?: ReadableStream +}): any { + return { + createBidirectionalStream: opts.createBidirectionalStream ?? Sinon.stub().rejects(new Error('not configured')), + incomingBidirectionalStreams: opts.incomingBidirectionalStreams ?? new ReadableStream({ start () {} }), + close: Sinon.stub(), + closed: new Promise(() => {}), + ready: Promise.resolve() + } +} + +describe('WebTransportStreamMuxer', () => { + it('aborts maConn when createBidirectionalStream throws', async () => { + const sessionError = new DOMException( + "Failed to execute 'createBidirectionalStream' on 'WebTransport': No connection.", + 'InvalidStateError' + ) + + const wt = makeWt({ + createBidirectionalStream: Sinon.stub().rejects(sessionError) + }) + + const maConn = stubInterface({ + log: defaultLogger().forComponent('libp2p:webtransport:test') + }) + + const factory = webtransportMuxer(wt) + const muxer = factory.createStreamMuxer(maConn) + + await expect(muxer.createStream()).to.eventually.be.rejectedWith(sessionError.message) + + expect(maConn.abort.calledOnce).to.be.true() + expect(maConn.abort.firstCall.args[0]).to.equal(sessionError) + }) + + it('aborts maConn when the incoming stream reader fails', async () => { + const sessionError = new Error('WebTransport session lost') + + // A ReadableStream that errors immediately when the reader is acquired + const incomingBidirectionalStreams = new ReadableStream({ + start (controller) { + controller.error(sessionError) + } + }) + + const wt = makeWt({ incomingBidirectionalStreams }) + + // Use Promise.withResolvers so we can await the async abort() call + const { promise: abortCalled, resolve: resolveAbort } = Promise.withResolvers() + + const maConn = stubInterface({ + log: defaultLogger().forComponent('libp2p:webtransport:test') + }) + maConn.abort.callsFake((err: Error) => { resolveAbort(err) }) + + webtransportMuxer(wt).createStreamMuxer(maConn) + + // The reader runs in a microtask — wait for the abort to fire + const capturedError = await abortCalled + + expect(capturedError).to.equal(sessionError) + }) +}) From 1a9114cc1fb6e3f20b3ce6f00059b2d8cbd84400 Mon Sep 17 00:00:00 2001 From: paschal533 Date: Tue, 12 May 2026 18:35:56 +0100 Subject: [PATCH 2/2] fix(transport-webtransport): fix lint errors in session-to-conn and muxer.spec - Move WebTransport import before @libp2p/interface to satisfy import/order - Use method shorthand syntax for onSessionClose and createBidirectionalStream to satisfy @typescript-eslint/method-signature-style --- packages/transport-webtransport/src/session-to-conn.ts | 4 ++-- packages/transport-webtransport/test/muxer.spec.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/transport-webtransport/src/session-to-conn.ts b/packages/transport-webtransport/src/session-to-conn.ts index 65d83dd1bb..088cd8f292 100644 --- a/packages/transport-webtransport/src/session-to-conn.ts +++ b/packages/transport-webtransport/src/session-to-conn.ts @@ -1,12 +1,12 @@ import { AbstractMultiaddrConnection } from '@libp2p/utils' +import type WebTransport from './webtransport.ts' import type { AbortOptions, MultiaddrConnection } from '@libp2p/interface' import type { AbstractMultiaddrConnectionInit, SendResult } from '@libp2p/utils' -import type WebTransport from './webtransport.ts' import type { Uint8ArrayList } from 'uint8arraylist' export interface WebTransportSessionMultiaddrConnectionInit extends Omit { webTransport: WebTransport - onSessionClose?: (reason: string) => void + onSessionClose?(reason: string): void } class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection { diff --git a/packages/transport-webtransport/test/muxer.spec.ts b/packages/transport-webtransport/test/muxer.spec.ts index 49f6b35b62..d001f734e7 100644 --- a/packages/transport-webtransport/test/muxer.spec.ts +++ b/packages/transport-webtransport/test/muxer.spec.ts @@ -11,7 +11,7 @@ import type { MultiaddrConnection } from '@libp2p/interface' * object that satisfies the interface used by WebTransportStreamMuxer. */ function makeWt (opts: { - createBidirectionalStream?: () => Promise + createBidirectionalStream?(): Promise incomingBidirectionalStreams?: ReadableStream }): any { return {