diff --git a/packages/transport-webtransport/src/index.ts b/packages/transport-webtransport/src/index.ts index 5d9047f8c4..bb8247860b 100644 --- a/packages/transport-webtransport/src/index.ts +++ b/packages/transport-webtransport/src/index.ts @@ -53,10 +53,6 @@ export interface WebTransportCertificate { secret: string } -interface WebTransportSessionCleanup { - (metric: string): void -} - export interface WebTransportInit { certificates?: WebTransportCertificate[] } @@ -129,48 +125,27 @@ class WebTransportTransport implements Transport { const { url, certhashes, remotePeer } = parseMultiaddr(ma) let abortListener: (() => void) | undefined let maConn: MultiaddrConnection | undefined - let cleanUpWTSession: WebTransportSessionCleanup = () => {} - let closed = false + let wt: InstanceType | undefined let ready = false let authenticated = false try { this.metrics?.dialerEvents.increment({ pending: true }) - const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { + wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, { serverCertificateHashes: certhashes.map(certhash => ({ algorithm: 'sha-256', value: certhash.digest })) }) - cleanUpWTSession = (metric: string) => { - if (closed) { - // already closed session - return - } - - try { - this.metrics?.dialerEvents.increment({ [metric]: true }) - wt.close() - } catch (err) { - this.log.error('error closing wt session - %e', err) - } finally { - // This is how we specify the connection is closed and shouldn't be used. - if (maConn != null) { - maConn.timeline.close = Date.now() - } - - closed = true - } - } - // if the dial is aborted before we are ready, close the WebTransport session abortListener = () => { - if (ready) { - cleanUpWTSession('noise_timeout') + this.metrics?.dialerEvents.increment({ [ready ? 'noise_timeout' : 'ready_timeout']: true }) + if (maConn != null) { + maConn.abort(new Error('dial aborted')) } else { - cleanUpWTSession('ready_timeout') + try { wt?.close() } catch (err) { this.log.error('error aborting wt session - %e', err) } } } options.signal.addEventListener('abort', abortListener, { @@ -187,22 +162,16 @@ class WebTransportTransport implements Transport { ready = true this.metrics?.dialerEvents.increment({ ready: true }) - - // this promise resolves/throws when the session is closed - wt.closed.catch((err: Error) => { - this.log.error('error on remote wt session close - %e', err) - }) - .finally(() => { - cleanUpWTSession('remote_close') - }) - this.metrics?.dialerEvents.increment({ open: true }) + // maConn takes ownership of the session: wires wt.closed lifecycle and + // calls wt.close() directly from sendClose/sendReset maConn = toMultiaddrConnection({ remoteAddr: ma, - cleanUpWTSession, + webTransport: wt, direction: 'outbound', - log: this.components.logger.forComponent('libp2p:webtransport:connection') + log: this.components.logger.forComponent('libp2p:webtransport:connection'), + onSessionClose: (reason) => { this.metrics?.dialerEvents.increment({ [reason]: true }) } }) authenticated = await this.authenticateWebTransport({ @@ -227,12 +196,20 @@ class WebTransportTransport implements Transport { } catch (err: any) { this.log.error('caught wt session err - %e', err) - if (authenticated) { - cleanUpWTSession('upgrade_error') - } else if (ready) { - cleanUpWTSession('noise_error') + if (!options.signal.aborted) { + if (authenticated) { + this.metrics?.dialerEvents.increment({ upgrade_error: true }) + } else if (ready) { + this.metrics?.dialerEvents.increment({ noise_error: true }) + } else { + this.metrics?.dialerEvents.increment({ ready_error: true }) + } + } + + if (maConn != null) { + maConn.abort(err) } else { - cleanUpWTSession('ready_error') + try { wt?.close() } catch (closeErr) { this.log.error('error closing wt session - %e', closeErr) } } throw err diff --git a/packages/transport-webtransport/src/muxer.ts b/packages/transport-webtransport/src/muxer.ts index 600d6874fc..b6c77d5367 100644 --- a/packages/transport-webtransport/src/muxer.ts +++ b/packages/transport-webtransport/src/muxer.ts @@ -43,12 +43,26 @@ class WebTransportStreamMuxer extends AbstractStreamMuxer { } }) .catch(err => { - this.log.error('could not create a new stream - %e', err) + this.log.error('incoming stream reader failed - %e', err) + this.abort(err) + this.maConn.abort(err) }) } async onCreateStream (options: CreateStreamOptions): Promise { - const wtStream = await this.webTransport.createBidirectionalStream() + let wtStream: WebTransportBidirectionalStream + + try { + wtStream = await this.webTransport.createBidirectionalStream() + } catch (err: unknown) { + const error = err instanceof Error ? err : new Error(String(err)) + // The WebTransport session is dead (e.g. after laptop sleep/resume) - + // abort all open streams and trigger the full connection-manager cleanup chain + this.abort(error) + this.maConn.abort(error) + throw error + } + options?.signal?.throwIfAborted() return webtransportBiDiStreamToStream( diff --git a/packages/transport-webtransport/src/session-to-conn.ts b/packages/transport-webtransport/src/session-to-conn.ts index d95624d37c..088cd8f292 100644 --- a/packages/transport-webtransport/src/session-to-conn.ts +++ b/packages/transport-webtransport/src/session-to-conn.ts @@ -1,19 +1,37 @@ import { AbstractMultiaddrConnection } from '@libp2p/utils' +import type WebTransport from './webtransport.ts' import type { AbortOptions, MultiaddrConnection } from '@libp2p/interface' import type { AbstractMultiaddrConnectionInit, SendResult } from '@libp2p/utils' import type { Uint8ArrayList } from 'uint8arraylist' export interface WebTransportSessionMultiaddrConnectionInit extends Omit { - cleanUpWTSession(metric: string): void + webTransport: WebTransport + onSessionClose?(reason: string): void } class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection { - private cleanUpWTSession: (metric: string) => void + private readonly webTransport: WebTransport + private sessionClosedByUs = false constructor (init: WebTransportSessionMultiaddrConnectionInit) { super(init) - this.cleanUpWTSession = init.cleanUpWTSession + this.webTransport = init.webTransport + + init.webTransport.closed + .then(() => { + if (!this.sessionClosedByUs) { + init.onSessionClose?.('remote_close') + } + this.onTransportClosed() + }) + .catch((err: Error) => { + this.log.error('error on remote wt session close - %e', err) + if (!this.sessionClosedByUs) { + init.onSessionClose?.('remote_close') + } + this.abort(err) + }) } sendData (data: Uint8ArrayList): SendResult { @@ -24,11 +42,21 @@ class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection } sendReset (): void { - this.cleanUpWTSession('abort') + this.sessionClosedByUs = true + try { + this.webTransport.close() + } catch (err) { + this.log.error('error closing wt session - %e', err) + } } async sendClose (options?: AbortOptions): Promise { - this.cleanUpWTSession('close') + this.sessionClosedByUs = true + try { + this.webTransport.close() + } catch (err) { + this.log.error('error closing wt session - %e', err) + } options?.signal?.throwIfAborted() } @@ -42,7 +70,7 @@ class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection } /** - * Convert a socket into a MultiaddrConnection + * Convert a WebTransport session into a MultiaddrConnection * https://github.com/libp2p/interface-transport#multiaddrconnection */ export const toMultiaddrConnection = (init: WebTransportSessionMultiaddrConnectionInit): MultiaddrConnection => { diff --git a/packages/transport-webtransport/test/muxer.spec.ts b/packages/transport-webtransport/test/muxer.spec.ts new file mode 100644 index 0000000000..d001f734e7 --- /dev/null +++ b/packages/transport-webtransport/test/muxer.spec.ts @@ -0,0 +1,77 @@ +import { defaultLogger } from '@libp2p/logger' +import { expect } from 'aegir/chai' +import Sinon from 'sinon' +import { stubInterface } from 'sinon-ts' +import { webtransportMuxer } from '../src/muxer.js' +import type { MultiaddrConnection } from '@libp2p/interface' + +/** + * Builds a minimal WebTransport-like object suitable for unit tests. + * The real WebTransport API is browser-only, so we construct a plain + * object that satisfies the interface used by WebTransportStreamMuxer. + */ +function makeWt (opts: { + createBidirectionalStream?(): Promise + incomingBidirectionalStreams?: ReadableStream +}): any { + return { + createBidirectionalStream: opts.createBidirectionalStream ?? Sinon.stub().rejects(new Error('not configured')), + incomingBidirectionalStreams: opts.incomingBidirectionalStreams ?? new ReadableStream({ start () {} }), + close: Sinon.stub(), + closed: new Promise(() => {}), + ready: Promise.resolve() + } +} + +describe('WebTransportStreamMuxer', () => { + it('aborts maConn when createBidirectionalStream throws', async () => { + const sessionError = new DOMException( + "Failed to execute 'createBidirectionalStream' on 'WebTransport': No connection.", + 'InvalidStateError' + ) + + const wt = makeWt({ + createBidirectionalStream: Sinon.stub().rejects(sessionError) + }) + + const maConn = stubInterface({ + log: defaultLogger().forComponent('libp2p:webtransport:test') + }) + + const factory = webtransportMuxer(wt) + const muxer = factory.createStreamMuxer(maConn) + + await expect(muxer.createStream()).to.eventually.be.rejectedWith(sessionError.message) + + expect(maConn.abort.calledOnce).to.be.true() + expect(maConn.abort.firstCall.args[0]).to.equal(sessionError) + }) + + it('aborts maConn when the incoming stream reader fails', async () => { + const sessionError = new Error('WebTransport session lost') + + // A ReadableStream that errors immediately when the reader is acquired + const incomingBidirectionalStreams = new ReadableStream({ + start (controller) { + controller.error(sessionError) + } + }) + + const wt = makeWt({ incomingBidirectionalStreams }) + + // Use Promise.withResolvers so we can await the async abort() call + const { promise: abortCalled, resolve: resolveAbort } = Promise.withResolvers() + + const maConn = stubInterface({ + log: defaultLogger().forComponent('libp2p:webtransport:test') + }) + maConn.abort.callsFake((err: Error) => { resolveAbort(err) }) + + webtransportMuxer(wt).createStreamMuxer(maConn) + + // The reader runs in a microtask — wait for the abort to fire + const capturedError = await abortCalled + + expect(capturedError).to.equal(sessionError) + }) +})