diff --git a/packages/synapse-core/src/sp/upload-streaming.ts b/packages/synapse-core/src/sp/upload-streaming.ts index 859d3191..d681d5e0 100644 --- a/packages/synapse-core/src/sp/upload-streaming.ts +++ b/packages/synapse-core/src/sp/upload-streaming.ts @@ -89,7 +89,7 @@ export async function uploadPieceStreaming( ? new Blob([options.data as Uint8Array]).stream() : (options.data as ReadableStream) // ReadableStream types dont match between browsers and Node.js - let size = isUint8Array(options.data) ? options.data.length : options.size + const size = isUint8Array(options.data) ? options.data.length : options.size // Add size tracking and progress reporting let bytesUploaded = 0 @@ -140,10 +140,15 @@ export async function uploadPieceStreaming( // both execute regardless of path. let fetchBody: ReadableStream | Blob let fetchOptions: Record = {} + // PUT /pdp/piece/uploads/{uuid} + const headers: Record = { + 'Content-Type': 'application/octet-stream', + } if (supportsStreamingFetchBody()) { fetchBody = bodyStream fetchOptions = { duplex: 'half' } + // Length on a ReadableStream body is conveyed via chunked encoding. } else { const chunks: Uint8Array[] = [] let totalSize = 0 @@ -155,17 +160,7 @@ export async function uploadPieceStreaming( totalSize += value.length } fetchBody = new Blob(chunks as BlobPart[]) - // Override Content-Length with the actual accumulated size since we now - // know it precisely, even for ReadableStream inputs without a pre-set size - if (size == null) { - size = totalSize - } - } - - // PUT /pdp/piece/uploads/{uuid} - const headers: Record = { - 'Content-Type': 'application/octet-stream', - ...(size == null ? {} : { 'Content-Length': size.toString() }), + headers['Content-Length'] = (size ?? totalSize).toString() } const uploadResponse = await request.put(new URL(`pdp/piece/uploads/${uploadUuid}`, options.serviceURL), { diff --git a/packages/synapse-core/src/utils/streams.ts b/packages/synapse-core/src/utils/streams.ts index e7900903..b61336c8 100644 --- a/packages/synapse-core/src/utils/streams.ts +++ b/packages/synapse-core/src/utils/streams.ts @@ -142,7 +142,8 @@ export function supportsStreamingFetchBody(): boolean { if (_supportsStreamBody !== undefined) return _supportsStreamBody try { let duplexAccessed = false - const hasContentType = new Request('', { + // Absolute URL required: Node throws on `''` (no base URL). + const hasContentType = new Request('http://x', { body: new ReadableStream(), method: 'POST', get duplex() { diff --git a/packages/synapse-core/test/sp.test.ts b/packages/synapse-core/test/sp.test.ts index d30ed028..a080ea1d 100644 --- a/packages/synapse-core/test/sp.test.ts +++ b/packages/synapse-core/test/sp.test.ts @@ -1145,6 +1145,33 @@ InvalidSignature(address expected, address actual) assert.strictEqual(progressCalls[progressCalls.length - 1], testData.length) }) + it('should not set Content-Length on a streaming ReadableStream body', async () => { + // Length on a streaming body is conveyed via chunked encoding. + const pieceCid = Piece.parse(mockPieceCidStr) + const testData = new Uint8Array(SIZE_CONSTANTS.MIN_UPLOAD_SIZE).fill(0x42) + let capturedHeaders: Headers | undefined + + server.use( + postPieceUploadsHandler(mockUuid), + http.put(`https://pdp.example.com/pdp/piece/uploads/${mockUuid}`, async ({ request }) => { + capturedHeaders = request.headers + // Drain the body so the upstream pipeline runs to completion. + await request.arrayBuffer() + return HttpResponse.text('No Content', { status: 204 }) + }), + finalizePieceUploadHandler(mockUuid, mockPieceCidStr) + ) + + await uploadPieceStreaming({ + serviceURL: 'https://pdp.example.com', + data: testData, + pieceCid, + }) + + assert.isDefined(capturedHeaders) + assert.strictEqual(capturedHeaders?.get('content-length'), null) + }) + it('should fail when session creation returns error', async () => { const pieceCid = Piece.parse(mockPieceCidStr) const testData = new Uint8Array(SIZE_CONSTANTS.MIN_UPLOAD_SIZE).fill(0x42)