Skip to content
Draft
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ The `toString` method uses `toHex`.
| `pssSend` | `POST /pss/send/:topic/:target` [🔗](https://docs.ethswarm.org/api/#tag/Postal-Service-for-Swarm/paths/~1pss~1send~1%7Btopic%7D~1%7Btargets%7D/post) | ❌✅✅ |
| `pssSubscribe` _Websocket_ | `GET /pss/subscribe/:topic` [🔗](https://docs.ethswarm.org/api/#tag/Postal-Service-for-Swarm/paths/~1pss~1subscribe~1%7Btopic%7D/get) | ❌❌✅ |
| `pssReceive` | `GET /pss/subscribe/:topic` [🔗](https://docs.ethswarm.org/api/#tag/Postal-Service-for-Swarm/paths/~1pss~1subscribe~1%7Btopic%7D/get) | ❌❌✅ |
| `gsocSubscribe` _WebSocket_ | `GET /gsoc/subscribe/:address` | ❌❌✅ |
| `pubsubConnect` _WebSocket_ | `GET /pubsub/:topicAddress` | ❌❌✅ |
| `listPubsubTopics` | `GET /pubsub/` | ❌❌✅ |
| `getPostageBatches` | `GET /stamps` [🔗](https://docs.ethswarm.org/api/#tag/Postage-Stamps/paths/~1stamps/get) | ❌✅✅ |
| `getGlobalPostageBatches` | `GET /batches` [🔗](https://docs.ethswarm.org/api/#tag/Postage-Stamps/paths/~1batches/get) | ❌✅✅ |
| `getPostageBatch` | `GET /stamps/:batchId` [🔗](https://docs.ethswarm.org/api/#tag/Postage-Stamps/paths/~1stamps~1%7Bbatch_id%7D/get) | ❌✅✅ |
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
],
"scripts": {
"prepublishOnly": "NODE_ENV=production npm run build",
"prepare": "npm run build",
"build": "rimraf dist && npm run build:node && npm run build:types && npm run build:browser",
"build:node": "tsc -p tsconfig.json && tsc -p tsconfig-mjs.json && ./build-fixup && babel --plugins \"babel-plugin-add-import-extension\" --out-dir dist/mjs/ dist/mjs/",
"build:types": "tsc --emitDeclarationOnly --declaration --outDir dist/types",
Expand Down
144 changes: 144 additions & 0 deletions src/bee.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import * as grantee from './modules/grantee'
import * as gsoc from './modules/gsoc'
import * as pinning from './modules/pinning'
import * as pubsub from './modules/pubsub'
import type { PubsubModeParams } from './modules/pubsub'
import * as pss from './modules/pss'
import { rchash } from './modules/rchash'
import * as status from './modules/status'
Expand Down Expand Up @@ -56,6 +58,10 @@
GsocMessageHandler,
GsocSubscription,
Health,
PubsubMessageHandler,
PubsubMode,
PubsubSubscription,
PubsubTopicListResponse,
LastCashoutActionResponse,
LastChequesForPeerResponse,
LastChequesResponse,
Expand Down Expand Up @@ -1155,7 +1161,7 @@
for (let i = 0n; i < 0xffffn; i++) {
const signer = new PrivateKey(Binary.numberToUint256(start + i, 'BE'))
const socAddress = makeSOCAddress(identifier, signer.publicKey().address())
// TODO: test the significance of the hardcoded 256

Check warning on line 1164 in src/bee.ts

View workflow job for this annotation

GitHub Actions / check (16.x)

Unexpected 'todo' comment: 'TODO: test the significance of the...'
const actualProximity = 256 - Binary.proximity(socAddress.toUint8Array(), targetOverlay.toUint8Array())

if (actualProximity <= 256 - proximity) {
Expand Down Expand Up @@ -1287,6 +1293,144 @@
return subscription
}

/**
* Connects to a pubsub topic via WebSocket, acting as a publisher (read + write).
*
* The mode enum and its constructor arguments are passed directly
*
* @param mode Pubsub mode enum value (e.g. `PubsubMode.GSOC_EPHEMERAL`)
* @param handler Message handler with `onMessage`, `onError`, `onClose` callbacks
* @param brokerPeer Multiaddress of the broker peer to connect to
* @param modeParams Constructor arguments for the selected mode (topic, optional params)
* @returns A {@link PubsubSubscription} with `send(payload)` and `cancel()` methods
*/
pubsubConnect<M extends PubsubMode>(
mode: M,
handler: PubsubMessageHandler,
brokerPeer: string,
...modeParams: PubsubModeParams[M]
): PubsubSubscription {
const modeInstance = pubsub.createPubsubMode(mode, ...modeParams)

const ws = pubsub.connect(
this.url,
modeInstance.topicAddress,
brokerPeer,
modeInstance.getPublisherHeaders() ?? undefined,
this.requestOptions.headers,
)
// Ensure binary frames are delivered as ArrayBuffer (not Blob) in browser environments.
// prepareWebsocketData handles ArrayBuffer but not Blob.
ws.binaryType = 'arraybuffer'

const PING_INTERVAL_MS = 50_000
let pingTimer: ReturnType<typeof setInterval> | null = null

const startPing = () => {
if (typeof ws.ping === 'function') {
pingTimer = setInterval(() => {
try {
ws.ping()
} catch {
// ignore errors on closed sockets
}
}, PING_INTERVAL_MS)
}
}

const stopPing = () => {
if (pingTimer !== null) {
clearInterval(pingTimer)
pingTimer = null
}
}

let cancelled = false
const cancel = () => {
if (!cancelled) {
cancelled = true
stopPing()

if (ws.terminate) {
ws.terminate()
} else {
ws.close()
}
}
}

let ready = false
const sendQueue: Uint8Array[] = []

const flushQueue = () => {
ready = true

for (const msg of sendQueue) {
ws.send(msg)
}
sendQueue.length = 0
}

const subscription: PubsubSubscription = {
cancel,
send: async (payload: Uint8Array | string): Promise<void> => {
const encoded = await modeInstance.encodeMessage(payload)

if (ready) {
ws.send(encoded)
} else {
sendQueue.push(encoded)
}
},
}

ws.onopen = () => {
if (cancelled) {
ws.close()

return
}

startPing()
flushQueue()

if (handler.onOpen) {
handler.onOpen(subscription)
}
}

ws.onmessage = async event => {
const data = await prepareWebsocketData(event.data)

if (data.length) {
handler.onMessage(modeInstance.decodeMessage(data), subscription)
}
}
ws.onerror = event => {
if (!cancelled) {
handler.onError(new BeeError(event.message), subscription)
}
}
ws.onclose = () => {
stopPing()

if (!cancelled) {
handler.onClose(subscription)
}
}

return subscription
}

/**
* Lists all active pubsub topics this node is participating in.
*
* @param requestOptions Options for making requests, such as timeouts, custom HTTP agents, headers, etc.
*/
async listPubsubTopics(requestOptions?: BeeRequestOptions): Promise<PubsubTopicListResponse> {
return pubsub.listTopics(this.getRequestOptionsForCall(requestOptions))
}

/**
* Creates a feed manifest chunk and returns the reference to it.
*
Expand Down Expand Up @@ -1845,7 +1989,7 @@
gasPrice?: NumberString | string | bigint,
requestOptions?: BeeRequestOptions,
): Promise<TransactionId> {
// TODO: check BZZ in tests

Check warning on line 1992 in src/bee.ts

View workflow job for this annotation

GitHub Actions / check (16.x)

Unexpected 'todo' comment: 'TODO: check BZZ in tests'
const amountString =
amount instanceof BZZ ? amount.toPLURString() : asNumberString(amount, { min: 1n, name: 'amount' })

Expand Down Expand Up @@ -2512,7 +2656,7 @@
* @deprecated Use `getPostageBatches` instead
*/
async getAllPostageBatch(requestOptions?: BeeRequestOptions): Promise<PostageBatch[]> {
return stamps.getAllPostageBatches(this.getRequestOptionsForCall(requestOptions)) // TODO: remove in June 2025

Check warning on line 2659 in src/bee.ts

View workflow job for this annotation

GitHub Actions / check (16.x)

Unexpected 'todo' comment: 'TODO: remove in June 2025'
}

/**
Expand All @@ -2523,7 +2667,7 @@
* @deprecated Use `getGlobalPostageBatches` instead
*/
async getAllGlobalPostageBatch(requestOptions?: BeeRequestOptions): Promise<GlobalPostageBatch[]> {
return stamps.getGlobalPostageBatches(this.getRequestOptionsForCall(requestOptions)) // TODO: remove in June 2025

Check warning on line 2670 in src/bee.ts

View workflow job for this annotation

GitHub Actions / check (16.x)

Unexpected 'todo' comment: 'TODO: remove in June 2025'
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { Stamper } from './stamper/stamper'
export { MerkleTree } from 'cafe-utility'
export type { Chunk } from './chunk/cac'
export type { SingleOwnerChunk } from './chunk/soc'
export { GsocEphemeralMode, createPubsubMode } from './modules/pubsub'
export type { PubsubModeParams, PubsubModeInstance } from './modules/pubsub'
export { MantarayNode } from './manifest/manifest'
export { SUPPORTED_BEE_VERSION, SUPPORTED_BEE_VERSION_EXACT } from './modules/debug/status'
export * from './types'
Expand Down
184 changes: 184 additions & 0 deletions src/modules/pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import { Binary, Types } from 'cafe-utility'
import WebSocket from 'isomorphic-ws'
import { makeContentAddressedChunk } from '../chunk/cac'
import { makeSOCAddress } from '../chunk/soc'
import type { BeeRequestOptions } from '../types'
import { GsocEphemeralParams, PubsubMode, PubsubTopicListResponse } from '../types'
import { Bytes } from '../utils/bytes'
import { http } from '../utils/http'
import { EthAddress, Identifier, PrivateKey, Signature, Span } from '../utils/typed-bytes'
import { NULL_IDENTIFIER } from '../utils/constants'

const endpoint = 'pubsub'
const ENCODER = new TextEncoder()

const SIG_SIZE = Signature.LENGTH
const SPAN_WS_SIZE = Span.LENGTH

export interface IPubsubMode {
readonly topicAddress: string
getPublisherHeaders(): Record<string, string> | null
encodeMessage(payload: Uint8Array | string): Promise<Uint8Array>
decodeMessage(frame: Uint8Array): Bytes
}

export class GsocEphemeralMode implements IPubsubMode {
readonly topicAddress: string
private readonly socIdentifier: Identifier
private readonly privateKey: PrivateKey | null
private readonly externalAddressHex: string | null
private readonly signFn: ((data: Uint8Array) => Signature | Promise<Signature>) | null

constructor(params: GsocEphemeralParams) {
if (params.socId !== undefined) {
if (typeof params.socId === 'string') {
this.socIdentifier = new Identifier(Binary.keccak256(ENCODER.encode(params.socId)))
} else {
this.socIdentifier = new Identifier(params.socId)
}
} else {
this.socIdentifier = new Identifier(NULL_IDENTIFIER)
}

if ('address' in params && params.address && params.signFn) {
// External signer path — can publish
this.privateKey = null
const signerAddress = new EthAddress(params.address)
this.externalAddressHex = Binary.uint8ArrayToHex(signerAddress.toUint8Array())
this.signFn = params.signFn
this.topicAddress = makeSOCAddress(this.socIdentifier, signerAddress).toHex()
} else if ('topic' in params && params.topic !== undefined) {
// Ephemeral signer path — can publish
if (typeof params.topic === 'string') {
this.privateKey = new PrivateKey(Binary.keccak256(ENCODER.encode(params.topic)))
} else {
this.privateKey = new PrivateKey(params.topic)
}
const signerAddress = this.privateKey.publicKey().address()
this.externalAddressHex = null
this.signFn = null
this.topicAddress = makeSOCAddress(this.socIdentifier, signerAddress).toHex()
} else {
// Subscriber-only path — read-only, topicAddress provided directly
this.privateKey = null
this.externalAddressHex = null
this.signFn = null
this.topicAddress = params.topicAddress
}
}

getPublisherHeaders(): Record<string, string> | null {
if (!this.privateKey && !this.externalAddressHex) {
return null
}

const signerHex = this.privateKey
? Binary.uint8ArrayToHex(this.privateKey.publicKey().address().toUint8Array())
: this.externalAddressHex!

return {
'swarm-pubsub-gsoc-eth-address': signerHex,
'swarm-pubsub-gsoc-topic': this.socIdentifier.toHex(),
}
}

async encodeMessage(payload: Uint8Array | string): Promise<Uint8Array> {
const rawPayload = typeof payload === 'string' ? ENCODER.encode(payload) : payload
const cac = makeContentAddressedChunk(rawPayload)

let sigBytes: Uint8Array

if (this.privateKey) {
const soc = cac.toSingleOwnerChunk(this.socIdentifier, this.privateKey)
sigBytes = soc.signature.toUint8Array()
} else if (this.signFn) {
const sigData = Binary.concatBytes(this.socIdentifier.toUint8Array(), cac.address.toUint8Array())
const sig = await this.signFn(sigData)
sigBytes = sig instanceof Signature ? sig.toUint8Array() : new Signature(sig).toUint8Array()
} else {
throw new Error('Cannot encode messages in subscriber-only mode (no signer available)')
}

return Binary.concatBytes(sigBytes, cac.span.toUint8Array(), cac.payload.toUint8Array())
}

decodeMessage(frame: Uint8Array): Bytes {
return new Bytes(frame.slice(SIG_SIZE + SPAN_WS_SIZE))
}
}

export type PubsubModeParams = {
[PubsubMode.GSOC_EPHEMERAL]: ConstructorParameters<typeof GsocEphemeralMode>
}

export type PubsubModeInstance = {
[PubsubMode.GSOC_EPHEMERAL]: GsocEphemeralMode & IPubsubMode
}

export function createPubsubMode<M extends PubsubMode>(
mode: M,
...params: PubsubModeParams[M]
): PubsubModeInstance[M] & IPubsubMode {
switch (mode) {
case PubsubMode.GSOC_EPHEMERAL: {
const [modeParams] = params as PubsubModeParams[PubsubMode.GSOC_EPHEMERAL]

return new GsocEphemeralMode(modeParams) as unknown as PubsubModeInstance[M] & IPubsubMode
}
default:
throw new Error(`Unknown pubsub mode: ${mode}`)
}
}

export function connect(
url: string,
topicAddress: string,
brokerPeer: string,
modeHeaders?: Record<string, string>,
requestHeaders?: Record<string, string>,
): WebSocket {
const wsUrl = url.replace(/^http/i, 'ws')
const headers: Record<string, string> = {
...requestHeaders,
'swarm-pubsub-peer': brokerPeer,
...modeHeaders,
}

// Browsers cannot set custom headers on WebSocket connections.
// Pass them as query params instead; the server accepts both.
const isBrowser = typeof window !== 'undefined' && typeof window.WebSocket !== 'undefined'

if (isBrowser) {
const params = new URLSearchParams(headers)

return new WebSocket(`${wsUrl}/${endpoint}/${topicAddress}?${params.toString()}`)
}

return new WebSocket(`${wsUrl}/${endpoint}/${topicAddress}`, { headers })
}

export async function listTopics(requestOptions: BeeRequestOptions): Promise<PubsubTopicListResponse> {
const response = await http<unknown>(requestOptions, {
method: 'get',
url: `${endpoint}/`,
responseType: 'json',
})

const body = Types.asObject(response.data, { name: 'response.data' })
const topicsRaw = Types.asArray(body.topics, { name: 'topics' })

return {
topics: topicsRaw.map((item, i) => {
const t = Types.asObject(item, { name: `topics[${i}]` })

return {
topicAddress: Types.asString(t.topicAddress, { name: 'topicAddress' }),
mode: Types.asInteger(t.mode, { name: 'mode' }),
role: Types.asString(t.role, { name: 'role' }) as 'broker' | 'subscriber',
connections: Types.asArray(t.connections, { name: 'connections' }).map((c, j) =>
Types.asString(c, { name: `connections[${j}]` }),
),
}
}),
}
}
Loading
Loading