diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e9f2e47b7..6f1fa84bd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -317,7 +317,7 @@ jobs: with: repository: 'oceanprotocol/ocean.js' path: 'ocean.js' - ref: feature/refactor_signatures + ref: main - name: Build ocean-js working-directory: ${{ github.workspace }}/ocean.js run: | diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 91b5abefb..bbf942f98 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -12,6 +12,7 @@ import { checkGlobalConnectionsRateLimit, checkRequestsRateLimit } from '../../utils/validators.js' +import { lpStream } from '@libp2p/utils' import type { Connection, Stream } from '@libp2p/interface' export class ReadableString extends Readable { @@ -40,21 +41,30 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) - const sendErrorAndClose = async (httpStatus: number, error: string) => { + stream.resume() + const lp = lpStream(stream) + const readWriteSignal = () => AbortSignal.timeout(30_000) + + const sendErrorAndClose = async ( + httpStatus: number, + error: string, + errorDebug?: Record + ) => { try { - // Check if stream is already closed if (stream.status === 'closed' || stream.status === 'closing') { P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - - // Resume stream in case it's paused - we need to write - stream.resume() - const status = { httpStatus, error } - stream.send(uint8ArrayFromString(JSON.stringify(status))) + const status = errorDebug + ? { httpStatus, error, errorDebug } + : { httpStatus, error } + await lp.write(uint8ArrayFromString(JSON.stringify(status)), { + signal: readWriteSignal() + }) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${e.message}`) + const msg = e instanceof Error ? e.message : e != null ? String(e) : 'Unknown error' + P2P_LOGGER.error(`Error sending error response: ${msg}`) try { stream.abort(e as Error) } catch {} @@ -90,25 +100,15 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now write. - stream.resume() - - // v3 streams are AsyncIterable let task: Command try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - await sendErrorAndClose(400, 'Invalid command') - return - } - } + const cmdBytes = await lp.read({ signal: readWriteSignal() }) + const str = uint8ArrayToString(cmdBytes.subarray()) + task = JSON.parse(str) as Command } catch (err) { P2P_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err.message}` + `Unable to process P2P command: ${err?.message ?? err}` ) await sendErrorAndClose(400, 'Invalid command') return @@ -133,20 +133,16 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect task.caller = remotePeer.toString() const response: P2PCommandResponse = await handler.handle(task) - // Send status first - stream.send(uint8ArrayFromString(JSON.stringify(response.status))) + // Send status first (length-prefixed) + await lp.write(uint8ArrayFromString(JSON.stringify(response.status)), { + signal: readWriteSignal() + }) - // Stream data chunks without buffering, with backpressure support + // Stream data chunks as length-prefixed messages if (response.stream) { for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - - // Handle backpressure - if send returns false, wait for drain - if (!stream.send(bytes)) { - await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain - }) - } + await lp.write(bytes, { signal: readWriteSignal() }) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 1336a74ca..378dde3fe 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -5,6 +5,7 @@ import { handleProtocolCommands } from './handlers.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import { lpStream } from '@libp2p/utils' import type { Stream } from '@libp2p/interface' import { bootstrap } from '@libp2p/bootstrap' @@ -437,6 +438,9 @@ export class OceanP2P extends EventEmitter { dialTimeout: config.p2pConfig.connectionsDialTimeout, maxConnections: config.p2pConfig.maxConnections, maxPeerAddrsToDial: config.p2pConfig.maxPeerAddrsToDial + }, + connectionMonitor: { + abortConnectionOnPingFailure: false } } if (config.p2pConfig.bootstrapNodes && config.p2pConfig.bootstrapNodes.length > 0) { @@ -778,22 +782,28 @@ export class OceanP2P extends EventEmitter { } try { - // Send message and close write side - stream.send(uint8ArrayFromString(message)) - await stream.close() + const lp = lpStream(stream) + const writeSignal = AbortSignal.timeout(10_000) + const readSignal = AbortSignal.timeout(10_000) - // Read and parse status from first chunk - const iterator = stream[Symbol.asyncIterator]() - const { done, value } = await iterator.next() + await lp.write(uint8ArrayFromString(message), { signal: writeSignal }) - if (done || !value) { - return { status: { httpStatus: 500, error: 'No response from peer' } } - } - - const status = JSON.parse(uint8ArrayToString(value.subarray())) + const statusBytes = await lp.read({ signal: readSignal }) + const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) - // Return status and remaining stream - return { status, stream: { [Symbol.asyncIterator]: () => iterator } } + return { + status, + stream: { + [Symbol.asyncIterator]: async function* () { + try { + while (true) { + const chunk = await lp.read() + yield chunk.subarray ? chunk.subarray() : chunk + } + } catch {} + } + } + } } catch (err) { P2P_LOGGER.error(`P2P communication error: ${err.message}`) try {