Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 24 additions & 47 deletions packages/transport-webtransport/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ export interface WebTransportCertificate {
secret: string
}

interface WebTransportSessionCleanup {
(metric: string): void
}

export interface WebTransportInit {
certificates?: WebTransportCertificate[]
}
Expand Down Expand Up @@ -129,48 +125,27 @@ class WebTransportTransport implements Transport<WebTransportDialEvents> {
const { url, certhashes, remotePeer } = parseMultiaddr(ma)
let abortListener: (() => void) | undefined
let maConn: MultiaddrConnection | undefined
let cleanUpWTSession: WebTransportSessionCleanup = () => {}
let closed = false
let wt: InstanceType<typeof WebTransport> | undefined
let ready = false
let authenticated = false

try {
this.metrics?.dialerEvents.increment({ pending: true })

const wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, {
wt = new WebTransport(`${url}/.well-known/libp2p-webtransport?type=noise`, {
serverCertificateHashes: certhashes.map(certhash => ({
algorithm: 'sha-256',
value: certhash.digest
}))
})

cleanUpWTSession = (metric: string) => {
if (closed) {
// already closed session
return
}

try {
this.metrics?.dialerEvents.increment({ [metric]: true })
wt.close()
} catch (err) {
this.log.error('error closing wt session - %e', err)
} finally {
// This is how we specify the connection is closed and shouldn't be used.
if (maConn != null) {
maConn.timeline.close = Date.now()
}

closed = true
}
}

// if the dial is aborted before we are ready, close the WebTransport session
abortListener = () => {
if (ready) {
cleanUpWTSession('noise_timeout')
this.metrics?.dialerEvents.increment({ [ready ? 'noise_timeout' : 'ready_timeout']: true })
if (maConn != null) {
maConn.abort(new Error('dial aborted'))
} else {
cleanUpWTSession('ready_timeout')
try { wt?.close() } catch (err) { this.log.error('error aborting wt session - %e', err) }
}
}
options.signal.addEventListener('abort', abortListener, {
Expand All @@ -187,22 +162,16 @@ class WebTransportTransport implements Transport<WebTransportDialEvents> {

ready = true
this.metrics?.dialerEvents.increment({ ready: true })

// this promise resolves/throws when the session is closed
wt.closed.catch((err: Error) => {
this.log.error('error on remote wt session close - %e', err)
})
.finally(() => {
cleanUpWTSession('remote_close')
})

this.metrics?.dialerEvents.increment({ open: true })

// maConn takes ownership of the session: wires wt.closed lifecycle and
// calls wt.close() directly from sendClose/sendReset
maConn = toMultiaddrConnection({
remoteAddr: ma,
cleanUpWTSession,
webTransport: wt,
direction: 'outbound',
log: this.components.logger.forComponent('libp2p:webtransport:connection')
log: this.components.logger.forComponent('libp2p:webtransport:connection'),
onSessionClose: (reason) => { this.metrics?.dialerEvents.increment({ [reason]: true }) }
})

authenticated = await this.authenticateWebTransport({
Expand All @@ -227,12 +196,20 @@ class WebTransportTransport implements Transport<WebTransportDialEvents> {
} catch (err: any) {
this.log.error('caught wt session err - %e', err)

if (authenticated) {
cleanUpWTSession('upgrade_error')
} else if (ready) {
cleanUpWTSession('noise_error')
if (!options.signal.aborted) {
if (authenticated) {
this.metrics?.dialerEvents.increment({ upgrade_error: true })
} else if (ready) {
this.metrics?.dialerEvents.increment({ noise_error: true })
} else {
this.metrics?.dialerEvents.increment({ ready_error: true })
}
}

if (maConn != null) {
maConn.abort(err)
} else {
cleanUpWTSession('ready_error')
try { wt?.close() } catch (closeErr) { this.log.error('error closing wt session - %e', closeErr) }
}

throw err
Expand Down
18 changes: 16 additions & 2 deletions packages/transport-webtransport/src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,26 @@ class WebTransportStreamMuxer extends AbstractStreamMuxer<WebTransportStream> {
}
})
.catch(err => {
this.log.error('could not create a new stream - %e', err)
this.log.error('incoming stream reader failed - %e', err)
this.abort(err)
this.maConn.abort(err)
})
}

async onCreateStream (options: CreateStreamOptions): Promise<WebTransportStream> {
const wtStream = await this.webTransport.createBidirectionalStream()
let wtStream: WebTransportBidirectionalStream

try {
wtStream = await this.webTransport.createBidirectionalStream()
} catch (err: unknown) {
const error = err instanceof Error ? err : new Error(String(err))
// The WebTransport session is dead (e.g. after laptop sleep/resume) -
// abort all open streams and trigger the full connection-manager cleanup chain
this.abort(error)
this.maConn.abort(error)
throw error
}

options?.signal?.throwIfAborted()

return webtransportBiDiStreamToStream(
Expand Down
40 changes: 34 additions & 6 deletions packages/transport-webtransport/src/session-to-conn.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
import { AbstractMultiaddrConnection } from '@libp2p/utils'
import type WebTransport from './webtransport.ts'
import type { AbortOptions, MultiaddrConnection } from '@libp2p/interface'
import type { AbstractMultiaddrConnectionInit, SendResult } from '@libp2p/utils'
import type { Uint8ArrayList } from 'uint8arraylist'

export interface WebTransportSessionMultiaddrConnectionInit extends Omit<AbstractMultiaddrConnectionInit, 'name' | 'stream'> {
cleanUpWTSession(metric: string): void
webTransport: WebTransport
onSessionClose?(reason: string): void
}

class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection {
private cleanUpWTSession: (metric: string) => void
private readonly webTransport: WebTransport
private sessionClosedByUs = false

constructor (init: WebTransportSessionMultiaddrConnectionInit) {
super(init)

this.cleanUpWTSession = init.cleanUpWTSession
this.webTransport = init.webTransport

init.webTransport.closed
.then(() => {
if (!this.sessionClosedByUs) {
init.onSessionClose?.('remote_close')
}
this.onTransportClosed()
})
.catch((err: Error) => {
this.log.error('error on remote wt session close - %e', err)
if (!this.sessionClosedByUs) {
init.onSessionClose?.('remote_close')
}
this.abort(err)
})
}

sendData (data: Uint8ArrayList): SendResult {
Expand All @@ -24,11 +42,21 @@ class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection
}

sendReset (): void {
this.cleanUpWTSession('abort')
this.sessionClosedByUs = true
try {
this.webTransport.close()
} catch (err) {
this.log.error('error closing wt session - %e', err)
}
}

async sendClose (options?: AbortOptions): Promise<void> {
this.cleanUpWTSession('close')
this.sessionClosedByUs = true
try {
this.webTransport.close()
} catch (err) {
this.log.error('error closing wt session - %e', err)
}
options?.signal?.throwIfAborted()
}

Expand All @@ -42,7 +70,7 @@ class WebTransportSessionMultiaddrConnection extends AbstractMultiaddrConnection
}

/**
* Convert a socket into a MultiaddrConnection
* Convert a WebTransport session into a MultiaddrConnection
* https://github.com/libp2p/interface-transport#multiaddrconnection
*/
export const toMultiaddrConnection = (init: WebTransportSessionMultiaddrConnectionInit): MultiaddrConnection => {
Expand Down
77 changes: 77 additions & 0 deletions packages/transport-webtransport/test/muxer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { defaultLogger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { webtransportMuxer } from '../src/muxer.js'
import type { MultiaddrConnection } from '@libp2p/interface'

/**
* Builds a minimal WebTransport-like object suitable for unit tests.
* The real WebTransport API is browser-only, so we construct a plain
* object that satisfies the interface used by WebTransportStreamMuxer.
*/
function makeWt (opts: {
createBidirectionalStream?(): Promise<WebTransportBidirectionalStream>
incomingBidirectionalStreams?: ReadableStream<WebTransportBidirectionalStream>
}): any {
return {
createBidirectionalStream: opts.createBidirectionalStream ?? Sinon.stub().rejects(new Error('not configured')),
incomingBidirectionalStreams: opts.incomingBidirectionalStreams ?? new ReadableStream({ start () {} }),
close: Sinon.stub(),
closed: new Promise<void>(() => {}),
ready: Promise.resolve()
}
}

describe('WebTransportStreamMuxer', () => {
it('aborts maConn when createBidirectionalStream throws', async () => {
const sessionError = new DOMException(
"Failed to execute 'createBidirectionalStream' on 'WebTransport': No connection.",
'InvalidStateError'
)

const wt = makeWt({
createBidirectionalStream: Sinon.stub().rejects(sessionError)
})

const maConn = stubInterface<MultiaddrConnection>({
log: defaultLogger().forComponent('libp2p:webtransport:test')
})

const factory = webtransportMuxer(wt)
const muxer = factory.createStreamMuxer(maConn)

await expect(muxer.createStream()).to.eventually.be.rejectedWith(sessionError.message)

expect(maConn.abort.calledOnce).to.be.true()
expect(maConn.abort.firstCall.args[0]).to.equal(sessionError)
})

it('aborts maConn when the incoming stream reader fails', async () => {
const sessionError = new Error('WebTransport session lost')

// A ReadableStream that errors immediately when the reader is acquired
const incomingBidirectionalStreams = new ReadableStream<WebTransportBidirectionalStream>({
start (controller) {
controller.error(sessionError)
}
})

const wt = makeWt({ incomingBidirectionalStreams })

// Use Promise.withResolvers so we can await the async abort() call
const { promise: abortCalled, resolve: resolveAbort } = Promise.withResolvers<Error>()

const maConn = stubInterface<MultiaddrConnection>({
log: defaultLogger().forComponent('libp2p:webtransport:test')
})
maConn.abort.callsFake((err: Error) => { resolveAbort(err) })

webtransportMuxer(wt).createStreamMuxer(maConn)

// The reader runs in a microtask — wait for the abort to fire
const capturedError = await abortCalled

expect(capturedError).to.equal(sessionError)
})
})
Loading