From c0b56c8a54a80c7742db13b305eb341fbdaad1d9 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 6 May 2026 04:39:42 +0800 Subject: [PATCH 1/4] Enhance ObjectEndpoint to support chunked transfer encoding in PUT requests Signed-off-by: peterxcli --- .../ozone/s3/endpoint/ObjectEndpoint.java | 10 ++++++++ .../ozone/s3/endpoint/TestObjectPut.java | 24 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index b74a7d52186d..130c59d92026 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -46,6 +46,7 @@ import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes; import com.google.common.collect.ImmutableMap; +import com.google.common.io.FileBackedOutputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -198,12 +199,18 @@ Response handlePutRequest(ObjectRequestContext context, String keyPath, InputStr final long startNanos = context.getStartNanos(); String copyHeader = null; + FileBackedOutputStream spooledBody = null; MultiDigestInputStream multiDigestInputStream = null; try { OzoneVolume volume = context.getVolume(); OzoneBucket bucket = context.getBucket(); final String lengthHeader = getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH); long length = lengthHeader != null ? Long.parseLong(lengthHeader) : 0; + if (lengthHeader == null && body != null) { + spooledBody = new FileBackedOutputStream(getIOBufferSize(1)); + length = IOUtils.copyLarge(body, spooledBody); + body = spooledBody.asByteSource().openStream(); + } if (uploadID != null && !uploadID.equals("")) { if (getHeaders().getHeaderString(COPY_SOURCE_HEADER) == null) { @@ -343,6 +350,9 @@ customMetadata, tags, multiDigestInputStream, getHeaders(), if (multiDigestInputStream != null) { multiDigestInputStream.resetDigests(); } + if (spooledBody != null) { + spooledBody.reset(); + } } } diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index 15398577b58b..653c8f06ebe8 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -54,6 +54,7 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableMap; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -63,10 +64,12 @@ import java.util.Base64; import java.util.Map; import java.util.stream.Stream; +import javax.ws.rs.HttpMethod; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MultivaluedHashMap; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; +import javax.xml.bind.DatatypeConverter; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -246,6 +249,21 @@ void testPutObjectWithSignedChunks() throws Exception { assertEquals(15, keyDetails.getDataSize()); } + @Test + void testPutObjectWithChunkedTransferEncoding() throws Exception { + when(objectEndpoint.getContext().getMethod()).thenReturn(HttpMethod.PUT); + when(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH)).thenReturn(null); + when(headers.getHeaderString("Transfer-Encoding")).thenReturn("chunked"); + when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)).thenReturn(sha256Hex(CONTENT)); + + try (InputStream body = new ByteArrayInputStream(CONTENT.getBytes(StandardCharsets.UTF_8))) { + assertSucceeds(() -> objectEndpoint.put(BUCKET_NAME, KEY_NAME, body)); + } + + OzoneKeyDetails keyDetails = assertKeyContent(bucket, KEY_NAME, CONTENT); + assertEquals(CONTENT.length(), keyDetails.getDataSize()); + } + @Test public void testPutObjectMessageDigestResetDuringException() { MessageDigest messageDigest = mock(MessageDigest.class); @@ -672,4 +690,10 @@ private Response putObject(String bucketName, String keyName) throws IOException private Response putObject(String content) throws IOException, OS3Exception { return put(objectEndpoint, BUCKET_NAME, KEY_NAME, content); } + + private static String sha256Hex(String content) throws NoSuchAlgorithmException { + MessageDigest digest = MessageDigest.getInstance(OzoneConsts.FILE_HASH); + byte[] contentDigest = digest.digest(content.getBytes(StandardCharsets.UTF_8)); + return DatatypeConverter.printHexBinary(contentDigest).toLowerCase(); + } } From befd9264ca46f6cb4b81b95567012fdca94b4207 Mon Sep 17 00:00:00 2001 From: Peter Lee Date: Wed, 6 May 2026 15:45:31 +0800 Subject: [PATCH 2/4] suggestion Co-authored-by: Doroszlai, Attila <6454655+adoroszlai@users.noreply.github.com> --- .../org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 130c59d92026..8753a6f6ac18 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -207,8 +207,8 @@ Response handlePutRequest(ObjectRequestContext context, String keyPath, InputStr final String lengthHeader = getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH); long length = lengthHeader != null ? Long.parseLong(lengthHeader) : 0; if (lengthHeader == null && body != null) { - spooledBody = new FileBackedOutputStream(getIOBufferSize(1)); - length = IOUtils.copyLarge(body, spooledBody); + spooledBody = new FileBackedOutputStream(32); + length = IOUtils.copyLarge(body, spooledBody, new byte[getIOBufferSize(0)]); body = spooledBody.asByteSource().openStream(); } From 2be2da2d6780f6a2940a786a3198db0b663e0995 Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 6 May 2026 17:43:47 +0800 Subject: [PATCH 3/4] add regression test for dir case Signed-off-by: peterxcli --- .../hadoop/ozone/s3/endpoint/TestObjectPut.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index 653c8f06ebe8..f9cd361ceb49 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -511,6 +511,21 @@ void testDirectoryCreation() throws Exception { assertThat(key.isFile()).as("directory").isFalse(); } + @Test + void testDirectoryCreationWithChunkedTransferEncoding() throws Exception { + final String path = "dir/"; + when(objectEndpoint.getContext().getMethod()).thenReturn(HttpMethod.PUT); + when(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH)).thenReturn(null); + when(headers.getHeaderString("Transfer-Encoding")).thenReturn("chunked"); + + try (InputStream body = new ByteArrayInputStream(new byte[0])) { + assertSucceeds(() -> objectEndpoint.put(FSO_BUCKET_NAME, path, body)); + } + + OzoneKeyDetails key = fsoBucket.getKey(path); + assertThat(key.isFile()).as("directory").isFalse(); + } + @Test void testDirectoryCreationOverFile() throws Exception { // GIVEN From 4f7c3527a921772473c20f7e93a27a27883e66dc Mon Sep 17 00:00:00 2001 From: peterxcli Date: Wed, 6 May 2026 19:57:03 +0800 Subject: [PATCH 4/4] stop auto length calculation for multi chunk payload put Signed-off-by: peterxcli --- .../ozone/s3/endpoint/ObjectEndpoint.java | 16 ++++++++--- .../ozone/s3/endpoint/TestObjectPut.java | 28 +++++++++++++++++++ 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 8753a6f6ac18..68719f6d8aaf 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -41,6 +41,7 @@ import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_COUNT_HEADER; import static org.apache.hadoop.ozone.s3.util.S3Consts.TAG_DIRECTIVE_HEADER; +import static org.apache.hadoop.ozone.s3.util.S3Utils.hasMultiChunksPayload; import static org.apache.hadoop.ozone.s3.util.S3Utils.stripQuotes; import static org.apache.hadoop.ozone.s3.util.S3Utils.validateSignatureHeader; import static org.apache.hadoop.ozone.s3.util.S3Utils.wrapInQuotes; @@ -205,11 +206,16 @@ Response handlePutRequest(ObjectRequestContext context, String keyPath, InputStr OzoneVolume volume = context.getVolume(); OzoneBucket bucket = context.getBucket(); final String lengthHeader = getHeaders().getHeaderString(HttpHeaders.CONTENT_LENGTH); + final String rawAmzContentSha256Header = getHeaders().getHeaderString(S3Consts.X_AMZ_CONTENT_SHA256); + final boolean hasMultiChunksUpload = + rawAmzContentSha256Header != null && hasMultiChunksPayload(rawAmzContentSha256Header); + boolean hasCalculatedLength = false; long length = lengthHeader != null ? Long.parseLong(lengthHeader) : 0; - if (lengthHeader == null && body != null) { - spooledBody = new FileBackedOutputStream(32); - length = IOUtils.copyLarge(body, spooledBody, new byte[getIOBufferSize(0)]); + if (lengthHeader == null && body != null && !hasMultiChunksUpload) { + spooledBody = new FileBackedOutputStream(32); + length = IOUtils.copyLarge(body, spooledBody, new byte[getIOBufferSize(0)]); body = spooledBody.asByteSource().openStream(); + hasCalculatedLength = true; } if (uploadID != null && !uploadID.equals("")) { @@ -252,8 +258,10 @@ Response handlePutRequest(ObjectRequestContext context, String keyPath, InputStr getHeaders().getHeaderString(S3Consts.DECODED_CONTENT_LENGTH_HEADER); boolean hasAmzDecodedLengthZero = amzDecodedLength != null && Long.parseLong(amzDecodedLength) == 0; + boolean hasKnownZeroLength = hasAmzDecodedLengthZero || + (length == 0 && (lengthHeader != null || hasCalculatedLength || body == null)); if (canCreateDirectory && - (length == 0 || hasAmzDecodedLengthZero) && + hasKnownZeroLength && StringUtils.endsWith(keyPath, "/") ) { context.setAction(S3GAction.CREATE_DIRECTORY); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java index f9cd361ceb49..a5ff089a7f72 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java @@ -44,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.CALLS_REAL_METHODS; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; import static org.mockito.Mockito.mock; @@ -249,6 +250,33 @@ void testPutObjectWithSignedChunks() throws Exception { assertEquals(15, keyDetails.getDataSize()); } + @Test + void testPutObjectWithSignedChunksWithoutContentLengthDoesNotCalculateTransferLength() throws Exception { + String chunkedContent = "0a;chunk-signature=signature\r\n" + + "1234567890\r\n" + + "05;chunk-signature=signature\r\n" + + "abcde\r\n"; + String keyName = "streaming-object/"; + when(objectEndpoint.getContext().getMethod()).thenReturn(HttpMethod.PUT); + when(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH)).thenReturn(null); + when(headers.getHeaderString(X_AMZ_CONTENT_SHA256)) + .thenReturn(STREAMING_AWS4_HMAC_SHA256_PAYLOAD); + when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER)) + .thenReturn("15"); + + try (MockedStatic mocked = mockStatic(IOUtils.class, CALLS_REAL_METHODS); + InputStream body = new ByteArrayInputStream(chunkedContent.getBytes(StandardCharsets.UTF_8))) { + mocked.when(() -> IOUtils.copyLarge(any(InputStream.class), any(OutputStream.class), any(byte[].class))) + .thenThrow(new AssertionError("streaming payload should not be buffered for length calculation")); + + assertSucceeds(() -> objectEndpoint.put(FSO_BUCKET_NAME, keyName, body)); + } + + OzoneKeyDetails keyDetails = assertKeyContent(fsoBucket, keyName, "1234567890abcde"); + assertThat(keyDetails.isFile()).as("object").isTrue(); + assertEquals(15, keyDetails.getDataSize()); + } + @Test void testPutObjectWithChunkedTransferEncoding() throws Exception { when(objectEndpoint.getContext().getMethod()).thenReturn(HttpMethod.PUT);