From a542295226e9bfd1438d9d938c62b07d66a9aed6 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 16:48:12 +0200 Subject: [PATCH 01/19] timeout --- src/components/P2P/handleProtocolCommands.ts | 25 +++++++++++++------- src/components/P2P/index.ts | 2 +- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 91b5abefb..2801b8fd8 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -42,13 +42,20 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - // Check if stream is already closed - if (stream.status === 'closed' || stream.status === 'closing') { + if ( + stream.status === 'closed' || + stream.status === 'closing' || + stream.status === 'aborted' + ) { P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - - // Resume stream in case it's paused - we need to write + if ( + stream.writeStatus !== 'writable' && + stream.writeStatus !== 'closing' + ) { + return + } stream.resume() const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) @@ -56,7 +63,9 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } catch (e) { P2P_LOGGER.error(`Error sending error response: ${e.message}`) try { - stream.abort(e as Error) + if (stream.status === 'open' || stream.status === 'closing') { + stream.abort(e as Error) + } } catch {} } } @@ -141,11 +150,9 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - // Handle backpressure - if send returns false, wait for drain + // Handle backpressure - if send returns false, wait for drain (no timeout for large streams) if (!stream.send(bytes)) { - await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain - }) + await stream.onDrain() } } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 1336a74ca..735f608dc 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -756,7 +756,7 @@ export class OceanP2P extends EventEmitter { let stream: Stream try { const options = { - signal: AbortSignal.timeout(10000), + signal: AbortSignal.timeout(120000), priority: 100, runOnLimitedConnection: true } From 4b2e2f4cca4b477bb37b874db81fd76079adb4eb Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 17:26:10 +0200 Subject: [PATCH 02/19] restart only when paused --- src/components/P2P/handleProtocolCommands.ts | 24 +++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 2801b8fd8..52b3a1b72 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -45,23 +45,31 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect if ( stream.status === 'closed' || stream.status === 'closing' || - stream.status === 'aborted' + stream.status === 'aborted' || + stream.status === 'reset' ) { - P2P_LOGGER.warn('Stream already closed, cannot send error response') + P2P_LOGGER.warn('Stream already closed/reset, cannot send error response') return } - if ( - stream.writeStatus !== 'writable' && - stream.writeStatus !== 'closing' - ) { + if (stream.writeStatus !== 'writable' && stream.writeStatus !== 'closing') { return } - stream.resume() + // Only resume if stream is paused and still readable; resume() throws if stream is closing/closed + if (stream.readStatus === 'paused') { + try { + stream.resume() + } catch (e) { + P2P_LOGGER.warn( + 'Cannot resume stream (already closing/closed): ' + (e as Error).message + ) + return + } + } const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${e.message}`) + P2P_LOGGER.error(`Error sending error response: ${(e as Error).message}`) try { if (stream.status === 'open' || stream.status === 'closing') { stream.abort(e as Error) From 0579ca681a4c9a9205e8984d2c37cf2b9d717e12 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 17:59:31 +0200 Subject: [PATCH 03/19] Revert "restart only when paused" This reverts commit 4b2e2f4cca4b477bb37b874db81fd76079adb4eb. --- src/components/P2P/handleProtocolCommands.ts | 39 ++++++-------------- src/components/P2P/index.ts | 2 +- 2 files changed, 13 insertions(+), 28 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 52b3a1b72..91b5abefb 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -42,38 +42,21 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - if ( - stream.status === 'closed' || - stream.status === 'closing' || - stream.status === 'aborted' || - stream.status === 'reset' - ) { - P2P_LOGGER.warn('Stream already closed/reset, cannot send error response') + // Check if stream is already closed + if (stream.status === 'closed' || stream.status === 'closing') { + P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - if (stream.writeStatus !== 'writable' && stream.writeStatus !== 'closing') { - return - } - // Only resume if stream is paused and still readable; resume() throws if stream is closing/closed - if (stream.readStatus === 'paused') { - try { - stream.resume() - } catch (e) { - P2P_LOGGER.warn( - 'Cannot resume stream (already closing/closed): ' + (e as Error).message - ) - return - } - } + + // Resume stream in case it's paused - we need to write + stream.resume() const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${(e as Error).message}`) + P2P_LOGGER.error(`Error sending error response: ${e.message}`) try { - if (stream.status === 'open' || stream.status === 'closing') { - stream.abort(e as Error) - } + stream.abort(e as Error) } catch {} } } @@ -158,9 +141,11 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - // Handle backpressure - if send returns false, wait for drain (no timeout for large streams) + // Handle backpressure - if send returns false, wait for drain if (!stream.send(bytes)) { - await stream.onDrain() + await stream.onDrain({ + signal: AbortSignal.timeout(30000) // 30 second timeout for drain + }) } } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 735f608dc..1336a74ca 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -756,7 +756,7 @@ export class OceanP2P extends EventEmitter { let stream: Stream try { const options = { - signal: AbortSignal.timeout(120000), + signal: AbortSignal.timeout(10000), priority: 100, runOnLimitedConnection: true } From 89ad5e561a128c6d7bd2ae8727e1ac54a3b47778 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 18:00:39 +0200 Subject: [PATCH 04/19] increase timeouts and frames --- src/components/P2P/handleProtocolCommands.ts | 2 +- src/components/P2P/index.ts | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 91b5abefb..ac1dc2fe8 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -144,7 +144,7 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect // Handle backpressure - if send returns false, wait for drain if (!stream.send(bytes)) { await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain + signal: AbortSignal.timeout(5 * 60 * 1000) // 5 minutes timeout for drain }) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 1336a74ca..4ffd2ef8a 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -425,7 +425,14 @@ export class OceanP2P extends EventEmitter { datastore: store, privateKey: this.keyManager.getLibp2pPrivateKey(), transports, - streamMuxers: [yamux()], + streamMuxers: [ + yamux({ + maxMessageSize: 5 * 1024 * 1024, + streamOptions: { + maxStreamWindowSize: 5 * 1024 * 1024 + } + }) + ], connectionEncrypters: [ noise(), tls() From 47857a0413481a3f0c1c0f47c7a91e0eb451f848 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Thu, 26 Feb 2026 18:29:50 +0200 Subject: [PATCH 05/19] do not extra pause stream --- src/components/P2P/handleProtocolCommands.ts | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index ac1dc2fe8..e9abd2ca6 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -34,9 +34,6 @@ export class ReadableString extends Readable { export async function handleProtocolCommands(stream: Stream, connection: Connection) { const { remotePeer, remoteAddr } = connection - // Pause the stream. We do async operations here before writing. - stream.pause() - P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) @@ -48,8 +45,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume stream in case it's paused - we need to write - stream.resume() const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() @@ -90,9 +85,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now write. - stream.resume() - // v3 streams are AsyncIterable let task: Command try { From 6a7b9fbdac70f4749c8ea6b95452a9edd54c7861 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 09:09:47 +0200 Subject: [PATCH 06/19] safe err --- src/components/P2P/handleProtocolCommands.ts | 46 ++++++++++++++++---- 1 file changed, 38 insertions(+), 8 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index e9abd2ca6..5b6ba63c5 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -14,6 +14,24 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' +/** Safe string for logging/sending; never yields "undefined". */ +function safeErrorMessage(err: unknown): string { + if (err == null) return 'Unknown error' + if (typeof (err as Error).message === 'string' && (err as Error).message !== '') { + return (err as Error).message + } + return String(err) +} + +/** True if the error indicates the stream is already closed/reset (no point sending). */ +function isStreamGoneError(err: unknown): boolean { + const msg = safeErrorMessage(err).toLowerCase() + return ( + msg.includes('stream') && + (msg.includes('reset') || msg.includes('closed') || msg.includes('aborted')) + ) +} + export class ReadableString extends Readable { private sent = false @@ -39,9 +57,9 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - // Check if stream is already closed - if (stream.status === 'closed' || stream.status === 'closing') { - P2P_LOGGER.warn('Stream already closed, cannot send error response') + // Skip if stream is already closed, closing, aborted, or reset + if (['closed', 'closing', 'aborted', 'reset'].includes(stream.status)) { + P2P_LOGGER.warn('Stream already closed or reset, cannot send error response') return } @@ -49,7 +67,13 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${e.message}`) + const msg = safeErrorMessage(e) + // Expected when peer closed/reset the stream; avoid noisy error log + if (msg.toLowerCase().includes('closed') || msg.toLowerCase().includes('reset')) { + P2P_LOGGER.warn(`Could not send error response (stream gone): ${msg}`) + } else { + P2P_LOGGER.error(`Error sending error response: ${msg}`) + } try { stream.abort(e as Error) } catch {} @@ -98,11 +122,14 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } } } catch (err) { + const msg = safeErrorMessage(err) P2P_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err.message}` + `Unable to process P2P command: ${msg}` ) - await sendErrorAndClose(400, 'Invalid command') + if (!isStreamGoneError(err)) { + await sendErrorAndClose(400, 'Invalid command') + } return } @@ -144,12 +171,15 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { + const msg = safeErrorMessage(err) P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + err.message, + 'handleProtocolCommands Error: ' + msg, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, err.message) + if (!isStreamGoneError(err)) { + await sendErrorAndClose(500, msg) + } } } From d6ab8be8246fef636180e6102eb35c2c3adf9d6a Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 09:21:00 +0200 Subject: [PATCH 07/19] read data first --- src/components/P2P/handleProtocolCommands.ts | 57 +++++++++++--------- 1 file changed, 33 insertions(+), 24 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 5b6ba63c5..82fad167e 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -55,6 +55,39 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) + // Read command from stream immediately so we don't leave data unread (avoids + // read-buffer overflow reset) and so the client sees progress before any slow work. + let task: Command | null | undefined + try { + for await (const chunk of stream) { + try { + const str = uint8ArrayToString(chunk.subarray()) + task = JSON.parse(str) as Command + } catch (e) { + task = null + break + } + break + } + } catch (err) { + const msg = safeErrorMessage(err) + P2P_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Unable to process P2P command: ${msg}` + ) + if (!isStreamGoneError(err)) { + // sendErrorAndClose not yet defined; stream may be gone anyway + try { + if (!['closed', 'closing', 'aborted', 'reset'].includes(stream.status)) { + const status = { httpStatus: 400, error: msg } + stream.send(uint8ArrayFromString(JSON.stringify(status))) + await stream.close() + } + } catch {} + } + return + } + const sendErrorAndClose = async (httpStatus: number, error: string) => { try { // Skip if stream is already closed, closing, aborted, or reset @@ -109,30 +142,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // v3 streams are AsyncIterable - let task: Command - try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - await sendErrorAndClose(400, 'Invalid command') - return - } - } - } catch (err) { - const msg = safeErrorMessage(err) - P2P_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${msg}` - ) - if (!isStreamGoneError(err)) { - await sendErrorAndClose(400, 'Invalid command') - } - return - } - if (!task) { P2P_LOGGER.error('Invalid or missing task/command data!') await sendErrorAndClose(400, 'Invalid command') From 2bc3d77302dc0359751fad810d31a4c332c42dd2 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 09:40:40 +0200 Subject: [PATCH 08/19] log err --- src/components/P2P/handleProtocolCommands.ts | 28 +++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 82fad167e..15f9a0cf6 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -14,16 +14,27 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' -/** Safe string for logging/sending; never yields "undefined". */ +function unwrapError(err: unknown): unknown { + if ( + err != null && + typeof err === 'object' && + 'error' in err && + (err as { error: unknown }).error instanceof Error + ) { + return (err as { error: Error }).error + } + return err +} + function safeErrorMessage(err: unknown): string { - if (err == null) return 'Unknown error' - if (typeof (err as Error).message === 'string' && (err as Error).message !== '') { - return (err as Error).message + const e = unwrapError(err) + if (e == null) return 'Unknown error' + if (e instanceof Error && typeof e.message === 'string' && e.message !== '') { + return e.message } - return String(err) + return String(e) } -/** True if the error indicates the stream is already closed/reset (no point sending). */ function isStreamGoneError(err: unknown): boolean { const msg = safeErrorMessage(err).toLowerCase() return ( @@ -71,10 +82,7 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } } catch (err) { const msg = safeErrorMessage(err) - P2P_LOGGER.log( - LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${msg}` - ) + P2P_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Unable to process P2P command: ${msg}`) if (!isStreamGoneError(err)) { // sendErrorAndClose not yet defined; stream may be gone anyway try { From 6316567f40797bfffff31b601a8a0e081ff08c3f Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 09:54:15 +0200 Subject: [PATCH 09/19] respect backpressure --- src/components/P2P/handleProtocolCommands.ts | 18 +++++++++++------- src/components/P2P/index.ts | 1 + 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 15f9a0cf6..5bff90fc6 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -172,16 +172,20 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect // Send status first stream.send(uint8ArrayFromString(JSON.stringify(response.status))) - // Stream data chunks without buffering, with backpressure support + const SEND_CHUNK_SIZE = 64 * 1024 if (response.stream) { for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - - // Handle backpressure - if send returns false, wait for drain - if (!stream.send(bytes)) { - await stream.onDrain({ - signal: AbortSignal.timeout(5 * 60 * 1000) // 5 minutes timeout for drain - }) + for (let offset = 0; offset < bytes.length; offset += SEND_CHUNK_SIZE) { + const slice = bytes.subarray( + offset, + Math.min(offset + SEND_CHUNK_SIZE, bytes.length) + ) + if (!stream.send(slice)) { + await stream.onDrain({ + signal: AbortSignal.timeout(5 * 60 * 1000) // 5 minutes timeout for drain + }) + } } } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 4ffd2ef8a..5f43f9f18 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -429,6 +429,7 @@ export class OceanP2P extends EventEmitter { yamux({ maxMessageSize: 5 * 1024 * 1024, streamOptions: { + initialStreamWindowSize: 5 * 1024 * 1024, maxStreamWindowSize: 5 * 1024 * 1024 } }) From 95bcd8c68d1dbe4a3c4deb1c335317e1d8e29714 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 11:46:08 +0200 Subject: [PATCH 10/19] flush on drain --- src/components/P2P/handleProtocolCommands.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 5bff90fc6..ae3702fc4 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -188,6 +188,8 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } } } + // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) + await stream.onDrain({ signal: AbortSignal.timeout(30000) }) } await stream.close() From 77742b7881f9876e2bbf98aca94e532f71bd2e5d Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 15:10:11 +0200 Subject: [PATCH 11/19] Revert "respect backpressure" This reverts commit 6316567f40797bfffff31b601a8a0e081ff08c3f. --- src/components/P2P/handleProtocolCommands.ts | 127 ++++++------------- src/components/P2P/index.ts | 10 +- 2 files changed, 43 insertions(+), 94 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index ae3702fc4..6fe9ba5de 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -14,35 +14,6 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' -function unwrapError(err: unknown): unknown { - if ( - err != null && - typeof err === 'object' && - 'error' in err && - (err as { error: unknown }).error instanceof Error - ) { - return (err as { error: Error }).error - } - return err -} - -function safeErrorMessage(err: unknown): string { - const e = unwrapError(err) - if (e == null) return 'Unknown error' - if (e instanceof Error && typeof e.message === 'string' && e.message !== '') { - return e.message - } - return String(e) -} - -function isStreamGoneError(err: unknown): boolean { - const msg = safeErrorMessage(err).toLowerCase() - return ( - msg.includes('stream') && - (msg.includes('reset') || msg.includes('closed') || msg.includes('aborted')) - ) -} - export class ReadableString extends Readable { private sent = false @@ -63,58 +34,27 @@ export class ReadableString extends Readable { export async function handleProtocolCommands(stream: Stream, connection: Connection) { const { remotePeer, remoteAddr } = connection + // Pause the stream. We do async operations here before writing. + stream.pause() + P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) - // Read command from stream immediately so we don't leave data unread (avoids - // read-buffer overflow reset) and so the client sees progress before any slow work. - let task: Command | null | undefined - try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - task = null - break - } - break - } - } catch (err) { - const msg = safeErrorMessage(err) - P2P_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `Unable to process P2P command: ${msg}`) - if (!isStreamGoneError(err)) { - // sendErrorAndClose not yet defined; stream may be gone anyway - try { - if (!['closed', 'closing', 'aborted', 'reset'].includes(stream.status)) { - const status = { httpStatus: 400, error: msg } - stream.send(uint8ArrayFromString(JSON.stringify(status))) - await stream.close() - } - } catch {} - } - return - } - const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - // Skip if stream is already closed, closing, aborted, or reset - if (['closed', 'closing', 'aborted', 'reset'].includes(stream.status)) { - P2P_LOGGER.warn('Stream already closed or reset, cannot send error response') + // Check if stream is already closed + if (stream.status === 'closed' || stream.status === 'closing') { + P2P_LOGGER.warn('Stream already closed, cannot send error response') return } + // Resume stream in case it's paused - we need to write + stream.resume() const status = { httpStatus, error } stream.send(uint8ArrayFromString(JSON.stringify(status))) await stream.close() } catch (e) { - const msg = safeErrorMessage(e) - // Expected when peer closed/reset the stream; avoid noisy error log - if (msg.toLowerCase().includes('closed') || msg.toLowerCase().includes('reset')) { - P2P_LOGGER.warn(`Could not send error response (stream gone): ${msg}`) - } else { - P2P_LOGGER.error(`Error sending error response: ${msg}`) - } + P2P_LOGGER.error(`Error sending error response: ${e.message}`) try { stream.abort(e as Error) } catch {} @@ -150,6 +90,30 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } + // Resume the stream. We can now write. + stream.resume() + + // v3 streams are AsyncIterable + let task: Command + try { + for await (const chunk of stream) { + try { + const str = uint8ArrayToString(chunk.subarray()) + task = JSON.parse(str) as Command + } catch (e) { + await sendErrorAndClose(400, 'Invalid command') + return + } + } + } catch (err) { + P2P_LOGGER.log( + LOG_LEVELS_STR.LEVEL_ERROR, + `Unable to process P2P command: ${err.message}` + ) + await sendErrorAndClose(400, 'Invalid command') + return + } + if (!task) { P2P_LOGGER.error('Invalid or missing task/command data!') await sendErrorAndClose(400, 'Invalid command') @@ -172,20 +136,16 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect // Send status first stream.send(uint8ArrayFromString(JSON.stringify(response.status))) - const SEND_CHUNK_SIZE = 64 * 1024 + // Stream data chunks without buffering, with backpressure support if (response.stream) { for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - for (let offset = 0; offset < bytes.length; offset += SEND_CHUNK_SIZE) { - const slice = bytes.subarray( - offset, - Math.min(offset + SEND_CHUNK_SIZE, bytes.length) - ) - if (!stream.send(slice)) { - await stream.onDrain({ - signal: AbortSignal.timeout(5 * 60 * 1000) // 5 minutes timeout for drain - }) - } + + // Handle backpressure - if send returns false, wait for drain + if (!stream.send(bytes)) { + await stream.onDrain({ + signal: AbortSignal.timeout(30000) // 30 second timeout for drain + }) } } // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) @@ -194,15 +154,12 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { - const msg = safeErrorMessage(err) P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + msg, + 'handleProtocolCommands Error: ' + err.message, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - if (!isStreamGoneError(err)) { - await sendErrorAndClose(500, msg) - } + await sendErrorAndClose(500, err.message) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 5f43f9f18..1336a74ca 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -425,15 +425,7 @@ export class OceanP2P extends EventEmitter { datastore: store, privateKey: this.keyManager.getLibp2pPrivateKey(), transports, - streamMuxers: [ - yamux({ - maxMessageSize: 5 * 1024 * 1024, - streamOptions: { - initialStreamWindowSize: 5 * 1024 * 1024, - maxStreamWindowSize: 5 * 1024 * 1024 - } - }) - ], + streamMuxers: [yamux()], connectionEncrypters: [ noise(), tls() From 3559264605ebcfc343325e094de9e0477e005ae0 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 15:22:02 +0200 Subject: [PATCH 12/19] try bytestream --- package-lock.json | 1 + package.json | 1 + src/components/P2P/handleProtocolCommands.ts | 38 ++++++++++---------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/package-lock.json b/package-lock.json index 717aa591e..a1c26e461 100644 --- a/package-lock.json +++ b/package-lock.json @@ -31,6 +31,7 @@ "@libp2p/tcp": "^11.0.9", "@libp2p/tls": "^3.0.10", "@libp2p/upnp-nat": "^4.0.9", + "@libp2p/utils": "^7.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", "@oceanprotocol/contracts": "^2.6.0", diff --git a/package.json b/package.json index 8dd057858..963a46fc1 100644 --- a/package.json +++ b/package.json @@ -69,6 +69,7 @@ "@libp2p/tcp": "^11.0.9", "@libp2p/tls": "^3.0.10", "@libp2p/upnp-nat": "^4.0.9", + "@libp2p/utils": "^7.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", "@oceanprotocol/contracts": "^2.6.0", diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 6fe9ba5de..d4cc35142 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -1,6 +1,7 @@ import { Readable } from 'stream' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { byteStream } from '@libp2p/utils' import { P2P_LOGGER } from '../../utils/logging/common.js' import { Command } from '../../@types/commands.js' @@ -14,6 +15,9 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' +const P2P_READ_TIMEOUT_MS = 30_000 +const P2P_DRAIN_TIMEOUT_MS = 30_000 + export class ReadableString extends Readable { private sent = false @@ -90,25 +94,21 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now write. + // Resume the stream. We can now read/write. stream.resume() - // v3 streams are AsyncIterable + const bytes = byteStream(stream) let task: Command try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - await sendErrorAndClose(400, 'Invalid command') - return - } - } + const data = await bytes.read({ + signal: AbortSignal.timeout(P2P_READ_TIMEOUT_MS) + }) + const str = uint8ArrayToString(data.subarray()) + task = JSON.parse(str) as Command } catch (err) { P2P_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err.message}` + `Unable to process P2P command: ${err?.message ?? err}` ) await sendErrorAndClose(400, 'Invalid command') return @@ -133,23 +133,25 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect task.caller = remotePeer.toString() const response: P2PCommandResponse = await handler.handle(task) - // Send status first - stream.send(uint8ArrayFromString(JSON.stringify(response.status))) + // Send status first (byteStream imperative write) + await bytes.write(uint8ArrayFromString(JSON.stringify(response.status)), { + signal: AbortSignal.timeout(P2P_READ_TIMEOUT_MS) + }) // Stream data chunks without buffering, with backpressure support if (response.stream) { for await (const chunk of response.stream as Readable) { - const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) + const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) // Handle backpressure - if send returns false, wait for drain - if (!stream.send(bytes)) { + if (!stream.send(buf)) { await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain + signal: AbortSignal.timeout(P2P_DRAIN_TIMEOUT_MS) }) } } // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) - await stream.onDrain({ signal: AbortSignal.timeout(30000) }) + await stream.onDrain({ signal: AbortSignal.timeout(P2P_DRAIN_TIMEOUT_MS) }) } await stream.close() From c4c1a3ba0f5288580ff316a166b9c15454bd7849 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 16:00:24 +0200 Subject: [PATCH 13/19] Revert "try bytestream" This reverts commit 3559264605ebcfc343325e094de9e0477e005ae0. --- package-lock.json | 1 - package.json | 1 - src/components/P2P/handleProtocolCommands.ts | 38 ++++++++++---------- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/package-lock.json b/package-lock.json index a1c26e461..717aa591e 100644 --- a/package-lock.json +++ b/package-lock.json @@ -31,7 +31,6 @@ "@libp2p/tcp": "^11.0.9", "@libp2p/tls": "^3.0.10", "@libp2p/upnp-nat": "^4.0.9", - "@libp2p/utils": "^7.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", "@oceanprotocol/contracts": "^2.6.0", diff --git a/package.json b/package.json index 963a46fc1..8dd057858 100644 --- a/package.json +++ b/package.json @@ -69,7 +69,6 @@ "@libp2p/tcp": "^11.0.9", "@libp2p/tls": "^3.0.10", "@libp2p/upnp-nat": "^4.0.9", - "@libp2p/utils": "^7.0.9", "@libp2p/websockets": "^10.1.2", "@multiformats/multiaddr": "^12.2.3", "@oceanprotocol/contracts": "^2.6.0", diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index d4cc35142..6fe9ba5de 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -1,7 +1,6 @@ import { Readable } from 'stream' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { byteStream } from '@libp2p/utils' import { P2P_LOGGER } from '../../utils/logging/common.js' import { Command } from '../../@types/commands.js' @@ -15,9 +14,6 @@ import { } from '../../utils/validators.js' import type { Connection, Stream } from '@libp2p/interface' -const P2P_READ_TIMEOUT_MS = 30_000 -const P2P_DRAIN_TIMEOUT_MS = 30_000 - export class ReadableString extends Readable { private sent = false @@ -94,21 +90,25 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now read/write. + // Resume the stream. We can now write. stream.resume() - const bytes = byteStream(stream) + // v3 streams are AsyncIterable let task: Command try { - const data = await bytes.read({ - signal: AbortSignal.timeout(P2P_READ_TIMEOUT_MS) - }) - const str = uint8ArrayToString(data.subarray()) - task = JSON.parse(str) as Command + for await (const chunk of stream) { + try { + const str = uint8ArrayToString(chunk.subarray()) + task = JSON.parse(str) as Command + } catch (e) { + await sendErrorAndClose(400, 'Invalid command') + return + } + } } catch (err) { P2P_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err?.message ?? err}` + `Unable to process P2P command: ${err.message}` ) await sendErrorAndClose(400, 'Invalid command') return @@ -133,25 +133,23 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect task.caller = remotePeer.toString() const response: P2PCommandResponse = await handler.handle(task) - // Send status first (byteStream imperative write) - await bytes.write(uint8ArrayFromString(JSON.stringify(response.status)), { - signal: AbortSignal.timeout(P2P_READ_TIMEOUT_MS) - }) + // Send status first + stream.send(uint8ArrayFromString(JSON.stringify(response.status))) // Stream data chunks without buffering, with backpressure support if (response.stream) { for await (const chunk of response.stream as Readable) { - const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) + const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) // Handle backpressure - if send returns false, wait for drain - if (!stream.send(buf)) { + if (!stream.send(bytes)) { await stream.onDrain({ - signal: AbortSignal.timeout(P2P_DRAIN_TIMEOUT_MS) + signal: AbortSignal.timeout(30000) // 30 second timeout for drain }) } } // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) - await stream.onDrain({ signal: AbortSignal.timeout(P2P_DRAIN_TIMEOUT_MS) }) + await stream.onDrain({ signal: AbortSignal.timeout(30000) }) } await stream.close() From 295efd84d8a37f5acf7ebc2a60bbf76f9e52cbcf Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 16:51:33 +0200 Subject: [PATCH 14/19] use lps --- src/components/P2P/handleProtocolCommands.ts | 50 ++++++++------------ src/components/P2P/index.ts | 39 ++++++++++----- 2 files changed, 45 insertions(+), 44 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 6fe9ba5de..c29e25ae3 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -12,6 +12,7 @@ import { checkGlobalConnectionsRateLimit, checkRequestsRateLimit } from '../../utils/validators.js' +import { lpStream } from '@libp2p/utils' import type { Connection, Stream } from '@libp2p/interface' export class ReadableString extends Readable { @@ -40,18 +41,21 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) + // Resume and use length-prefixed messages (libp2p v3 byteStream migration) + stream.resume() + const lp = lpStream(stream) + const readWriteSignal = () => AbortSignal.timeout(30_000) + const sendErrorAndClose = async (httpStatus: number, error: string) => { try { - // Check if stream is already closed if (stream.status === 'closed' || stream.status === 'closing') { P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - - // Resume stream in case it's paused - we need to write - stream.resume() const status = { httpStatus, error } - stream.send(uint8ArrayFromString(JSON.stringify(status))) + await lp.write(uint8ArrayFromString(JSON.stringify(status)), { + signal: readWriteSignal() + }) await stream.close() } catch (e) { P2P_LOGGER.error(`Error sending error response: ${e.message}`) @@ -90,25 +94,15 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect return } - // Resume the stream. We can now write. - stream.resume() - - // v3 streams are AsyncIterable let task: Command try { - for await (const chunk of stream) { - try { - const str = uint8ArrayToString(chunk.subarray()) - task = JSON.parse(str) as Command - } catch (e) { - await sendErrorAndClose(400, 'Invalid command') - return - } - } + const cmdBytes = await lp.read({ signal: readWriteSignal() }) + const str = uint8ArrayToString(cmdBytes.subarray()) + task = JSON.parse(str) as Command } catch (err) { P2P_LOGGER.log( LOG_LEVELS_STR.LEVEL_ERROR, - `Unable to process P2P command: ${err.message}` + `Unable to process P2P command: ${err?.message ?? err}` ) await sendErrorAndClose(400, 'Invalid command') return @@ -133,23 +127,17 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect task.caller = remotePeer.toString() const response: P2PCommandResponse = await handler.handle(task) - // Send status first - stream.send(uint8ArrayFromString(JSON.stringify(response.status))) + // Send status first (length-prefixed) + await lp.write(uint8ArrayFromString(JSON.stringify(response.status)), { + signal: readWriteSignal() + }) - // Stream data chunks without buffering, with backpressure support + // Stream data chunks as length-prefixed messages if (response.stream) { for await (const chunk of response.stream as Readable) { const bytes = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) - - // Handle backpressure - if send returns false, wait for drain - if (!stream.send(bytes)) { - await stream.onDrain({ - signal: AbortSignal.timeout(30000) // 30 second timeout for drain - }) - } + await lp.write(bytes, { signal: readWriteSignal() }) } - // Ensure last chunk is flushed before closing (avoid remote close before client reads tail) - await stream.onDrain({ signal: AbortSignal.timeout(30000) }) } await stream.close() diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index 1336a74ca..c1ce3050f 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -5,6 +5,7 @@ import { handleProtocolCommands } from './handlers.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import { lpStream } from '@libp2p/utils' import type { Stream } from '@libp2p/interface' import { bootstrap } from '@libp2p/bootstrap' @@ -778,22 +779,34 @@ export class OceanP2P extends EventEmitter { } try { - // Send message and close write side - stream.send(uint8ArrayFromString(message)) - await stream.close() + const lp = lpStream(stream) + const writeSignal = AbortSignal.timeout(10_000) + const readSignal = AbortSignal.timeout(10_000) - // Read and parse status from first chunk - const iterator = stream[Symbol.asyncIterator]() - const { done, value } = await iterator.next() + await lp.write(uint8ArrayFromString(message), { signal: writeSignal }) - if (done || !value) { - return { status: { httpStatus: 500, error: 'No response from peer' } } - } - - const status = JSON.parse(uint8ArrayToString(value.subarray())) + const statusBytes = await lp.read({ signal: readSignal }) + const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) - // Return status and remaining stream - return { status, stream: { [Symbol.asyncIterator]: () => iterator } } + // Return status and remaining stream (length-prefixed messages) + const streamTimeout = 30_000 + return { + status, + stream: { + [Symbol.asyncIterator]: async function* () { + try { + while (true) { + const chunk = await lp.read({ + signal: AbortSignal.timeout(streamTimeout) + }) + yield chunk.subarray ? chunk.subarray() : chunk + } + } catch { + // stream ended or closed + } + } + } + } } catch (err) { P2P_LOGGER.error(`P2P communication error: ${err.message}`) try { From fc1d09e6a56dc272b6d4b41f73170bf62c1b9682 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 17:50:50 +0200 Subject: [PATCH 15/19] err msg log --- src/components/P2P/handleProtocolCommands.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index c29e25ae3..e0e5b3a05 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -58,7 +58,8 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect }) await stream.close() } catch (e) { - P2P_LOGGER.error(`Error sending error response: ${e.message}`) + const msg = e instanceof Error ? e.message : e != null ? String(e) : 'Unknown error' + P2P_LOGGER.error(`Error sending error response: ${msg}`) try { stream.abort(e as Error) } catch {} @@ -142,12 +143,14 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { + const errMessage = + err instanceof Error ? err.message : err != null ? String(err) : 'Unknown error' P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + err.message, + 'handleProtocolCommands Error: ' + errMessage, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, err.message) + await sendErrorAndClose(500, errMessage) } } From 10aa46cfa8baefd0e4ff8df7cb3c9d896b4acabf Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Fri, 27 Feb 2026 17:57:11 +0200 Subject: [PATCH 16/19] serialize error --- src/components/P2P/handleProtocolCommands.ts | 51 ++++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index e0e5b3a05..3a96380f5 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -32,6 +32,35 @@ export class ReadableString extends Readable { } } +/** Serialize any thrown value for debugging (Error, Event, plain object). */ +function serializeErrorForDebug(err: unknown): Record { + try { + if (err instanceof Error) { + return { name: err.name, message: err.message, stack: err.stack } + } + if (err != null && typeof err === 'object') { + const o = err as Record + const out: Record = {} + for (const key of Object.keys(o)) { + try { + const v = o[key] + if (v === null || typeof v !== 'object' || typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { + out[key] = v + } else { + out[key] = String(v) + } + } catch { + out[key] = '[unserializable]' + } + } + return out + } + return { value: err } + } catch { + return { raw: String(err) } + } +} + export async function handleProtocolCommands(stream: Stream, connection: Connection) { const { remotePeer, remoteAddr } = connection @@ -46,13 +75,17 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect const lp = lpStream(stream) const readWriteSignal = () => AbortSignal.timeout(30_000) - const sendErrorAndClose = async (httpStatus: number, error: string) => { + const sendErrorAndClose = async ( + httpStatus: number, + error: string, + errorDebug?: Record + ) => { try { if (stream.status === 'closed' || stream.status === 'closing') { P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - const status = { httpStatus, error } + const status = errorDebug ? { httpStatus, error, errorDebug } : { httpStatus, error } await lp.write(uint8ArrayFromString(JSON.stringify(status)), { signal: readWriteSignal() }) @@ -143,14 +176,22 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { - const errMessage = - err instanceof Error ? err.message : err != null ? String(err) : 'Unknown error' + const errMessage = (() => { + if (err instanceof Error) return err.message + if (err != null && typeof err === 'object' && 'type' in err) { + const e = err as { type?: string; message?: string } + return e.message ?? `Event: ${e.type ?? 'unknown'}` + } + return err != null ? String(err) : 'Unknown error' + })() + const errorDebug = serializeErrorForDebug(err) P2P_LOGGER.logMessageWithEmoji( 'handleProtocolCommands Error: ' + errMessage, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, errMessage) + P2P_LOGGER.error('handleProtocolCommands error object (debug): ' + JSON.stringify(errorDebug)) + await sendErrorAndClose(500, errMessage, errorDebug) } } From 1dc500c2e0ffdac827e25cf0c4006de19bacb59b Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Sat, 28 Feb 2026 10:31:17 +0200 Subject: [PATCH 17/19] cleanup --- src/components/P2P/handleProtocolCommands.ts | 38 +++----------------- src/components/P2P/index.ts | 13 +++---- 2 files changed, 9 insertions(+), 42 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 3a96380f5..0bc2cc25a 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -32,35 +32,6 @@ export class ReadableString extends Readable { } } -/** Serialize any thrown value for debugging (Error, Event, plain object). */ -function serializeErrorForDebug(err: unknown): Record { - try { - if (err instanceof Error) { - return { name: err.name, message: err.message, stack: err.stack } - } - if (err != null && typeof err === 'object') { - const o = err as Record - const out: Record = {} - for (const key of Object.keys(o)) { - try { - const v = o[key] - if (v === null || typeof v !== 'object' || typeof v === 'string' || typeof v === 'number' || typeof v === 'boolean') { - out[key] = v - } else { - out[key] = String(v) - } - } catch { - out[key] = '[unserializable]' - } - } - return out - } - return { value: err } - } catch { - return { raw: String(err) } - } -} - export async function handleProtocolCommands(stream: Stream, connection: Connection) { const { remotePeer, remoteAddr } = connection @@ -70,7 +41,6 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.logMessage('Incoming connection from peer ' + remotePeer, true) P2P_LOGGER.logMessage('Using ' + remoteAddr, true) - // Resume and use length-prefixed messages (libp2p v3 byteStream migration) stream.resume() const lp = lpStream(stream) const readWriteSignal = () => AbortSignal.timeout(30_000) @@ -85,7 +55,9 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect P2P_LOGGER.warn('Stream already closed, cannot send error response') return } - const status = errorDebug ? { httpStatus, error, errorDebug } : { httpStatus, error } + const status = errorDebug + ? { httpStatus, error, errorDebug } + : { httpStatus, error } await lp.write(uint8ArrayFromString(JSON.stringify(status)), { signal: readWriteSignal() }) @@ -184,14 +156,12 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect } return err != null ? String(err) : 'Unknown error' })() - const errorDebug = serializeErrorForDebug(err) P2P_LOGGER.logMessageWithEmoji( 'handleProtocolCommands Error: ' + errMessage, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - P2P_LOGGER.error('handleProtocolCommands error object (debug): ' + JSON.stringify(errorDebug)) - await sendErrorAndClose(500, errMessage, errorDebug) + await sendErrorAndClose(500, errMessage) } } diff --git a/src/components/P2P/index.ts b/src/components/P2P/index.ts index c1ce3050f..378dde3fe 100644 --- a/src/components/P2P/index.ts +++ b/src/components/P2P/index.ts @@ -438,6 +438,9 @@ export class OceanP2P extends EventEmitter { dialTimeout: config.p2pConfig.connectionsDialTimeout, maxConnections: config.p2pConfig.maxConnections, maxPeerAddrsToDial: config.p2pConfig.maxPeerAddrsToDial + }, + connectionMonitor: { + abortConnectionOnPingFailure: false } } if (config.p2pConfig.bootstrapNodes && config.p2pConfig.bootstrapNodes.length > 0) { @@ -788,22 +791,16 @@ export class OceanP2P extends EventEmitter { const statusBytes = await lp.read({ signal: readSignal }) const status = JSON.parse(uint8ArrayToString(statusBytes.subarray())) - // Return status and remaining stream (length-prefixed messages) - const streamTimeout = 30_000 return { status, stream: { [Symbol.asyncIterator]: async function* () { try { while (true) { - const chunk = await lp.read({ - signal: AbortSignal.timeout(streamTimeout) - }) + const chunk = await lp.read() yield chunk.subarray ? chunk.subarray() : chunk } - } catch { - // stream ended or closed - } + } catch {} } } } From 883a284ca8fa922151f828e97778ec9c5e54e54a Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Sat, 28 Feb 2026 10:32:29 +0200 Subject: [PATCH 18/19] cleanup --- src/components/P2P/handleProtocolCommands.ts | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/src/components/P2P/handleProtocolCommands.ts b/src/components/P2P/handleProtocolCommands.ts index 0bc2cc25a..bbf942f98 100644 --- a/src/components/P2P/handleProtocolCommands.ts +++ b/src/components/P2P/handleProtocolCommands.ts @@ -148,20 +148,12 @@ export async function handleProtocolCommands(stream: Stream, connection: Connect await stream.close() } catch (err) { - const errMessage = (() => { - if (err instanceof Error) return err.message - if (err != null && typeof err === 'object' && 'type' in err) { - const e = err as { type?: string; message?: string } - return e.message ?? `Event: ${e.type ?? 'unknown'}` - } - return err != null ? String(err) : 'Unknown error' - })() P2P_LOGGER.logMessageWithEmoji( - 'handleProtocolCommands Error: ' + errMessage, + 'handleProtocolCommands Error: ' + err.message, true, GENERIC_EMOJIS.EMOJI_CROSS_MARK, LOG_LEVELS_STR.LEVEL_ERROR ) - await sendErrorAndClose(500, errMessage) + await sendErrorAndClose(500, err.message) } } From d69881b4041c2c0647b0e44f58e25ded84ab7ce5 Mon Sep 17 00:00:00 2001 From: giurgiur99 Date: Sat, 28 Feb 2026 14:44:15 +0200 Subject: [PATCH 19/19] system tests target --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e9f2e47b7..6f1fa84bd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -317,7 +317,7 @@ jobs: with: repository: 'oceanprotocol/ocean.js' path: 'ocean.js' - ref: feature/refactor_signatures + ref: main - name: Build ocean-js working-directory: ${{ github.workspace }}/ocean.js run: |