diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java index 7d3f8629f0eb..9a015fc1b4fe 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java @@ -57,6 +57,9 @@ public enum OzoneManagerVersion implements ComponentVersion { ATOMIC_CREATE_IF_NOT_EXISTS(12, "OzoneManager version that supports explicit create-if-not-exists key semantics"), + + S3_BYTE_RANGE_GET(13, + "OzoneManager version that supports S3 byte range GET key location filtering"), FUTURE_VERSION(-1, "Used internally in the client when the server side is " + " newer and an unknown server version has arrived to the client."); diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java index c8611043fe44..dfecea0af600 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java @@ -178,6 +178,21 @@ OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName, int partNumber) throws IOException; + /** + * Get OzoneKey in S3 context with content positioned at the requested + * byte range start. + * @param bucketName Name of the Bucket + * @param keyName Key name + * @param partNumber Multipart-upload part number, or zero for the whole key + * @param startOffset inclusive byte range start offset + * @param endOffset inclusive byte range end offset + * @return {@link OzoneKey} + * @throws IOException + */ + OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName, + int partNumber, long startOffset, long endOffset) + throws IOException; + OzoneVolume buildOzoneVolume(OmVolumeArgs volume); /** diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java index 6fe64a263765..65b4a259be5e 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java @@ -1880,6 +1880,36 @@ public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName, return getOzoneKeyDetails(keyInfo); } + @Override + public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName, + int partNumber, long startOffset, long endOffset) throws IOException { + OmKeyInfo keyInfo; + if (omVersion.compareTo(OzoneManagerVersion.S3_BYTE_RANGE_GET) >= 0) { + keyInfo = getS3RangedKeyInfo(bucketName, keyName, partNumber, + startOffset, endOffset); + } else if (partNumber != 0) { + if (omVersion.compareTo(OzoneManagerVersion.S3_PART_AWARE_GET) >= 0) { + keyInfo = getS3PartKeyInfo(bucketName, keyName, partNumber); + } else { + keyInfo = getS3KeyInfo(bucketName, keyName, false); + List filteredKeyLocationInfo = keyInfo + .getLatestVersionLocations().getBlocksLatestVersionOnly().stream() + .filter(omKeyLocationInfo -> omKeyLocationInfo.getPartNumber() == + partNumber) + .collect(Collectors.toList()); + keyInfo.updateLocationInfoList(filteredKeyLocationInfo, true, true); + keyInfo.setDataSize(filteredKeyLocationInfo.stream() + .mapToLong(OmKeyLocationInfo::getLength) + .sum()); + } + keyInfo.setByteRangeStartOffset(startOffset); + } else { + keyInfo = getS3KeyInfo(bucketName, keyName, false); + keyInfo.setByteRangeStartOffset(startOffset); + } + return getOzoneKeyDetails(keyInfo); + } + @Nonnull private OmKeyInfo getS3KeyInfo( String bucketName, String keyName, boolean isHeadOp) throws IOException { @@ -1926,6 +1956,32 @@ private OmKeyInfo getS3PartKeyInfo( return keyInfoWithS3Context.getKeyInfo(); } + @Nonnull + private OmKeyInfo getS3RangedKeyInfo( + String bucketName, String keyName, int partNumber, long startOffset, + long endOffset) throws IOException { + verifyBucketName(bucketName); + Objects.requireNonNull(keyName, "keyName == null"); + + OmKeyArgs.Builder keyArgs = new OmKeyArgs.Builder() + // Volume name is not important, as we call GetKeyInfo with + // assumeS3Context = true, OM will infer the correct s3 volume. + .setVolumeName(OzoneConfigKeys.OZONE_S3_VOLUME_NAME_DEFAULT) + .setBucketName(bucketName) + .setKeyName(keyName) + .setSortDatanodesInPipeline(topologyAwareReadEnabled) + .setLatestVersionLocation(getLatestVersionLocation) + .setForceUpdateContainerCacheFromSCM(false) + .setByteRange(startOffset, endOffset); + if (partNumber != 0) { + keyArgs.setMultipartUploadPartNumber(partNumber); + } + KeyInfoWithVolumeContext keyInfoWithS3Context = + ozoneManagerClient.getKeyInfo(keyArgs.build(), true); + keyInfoWithS3Context.getUserPrincipal().ifPresent(this::updateS3Principal); + return keyInfoWithS3Context.getKeyInfo(); + } + @Override public OmKeyInfo getKeyInfo( String volumeName, String bucketName, String keyName, @@ -2332,7 +2388,7 @@ public OzoneOutputStream createFile(String volumeName, String bucketName, */ private OzoneInputStream getInputStreamWithRetryFunction( OmKeyInfo keyInfo) throws IOException { - return createInputStream(keyInfo, omKeyInfo -> { + OzoneInputStream inputStream = createInputStream(keyInfo, omKeyInfo -> { try { return getKeyInfo(omKeyInfo.getVolumeName(), omKeyInfo.getBucketName(), omKeyInfo.getKeyName(), true); @@ -2341,6 +2397,10 @@ private OzoneInputStream getInputStreamWithRetryFunction( return null; } }); + if (keyInfo.getByteRangeStartOffset() > 0) { + inputStream.seek(keyInfo.getByteRangeStartOffset()); + } + return inputStream; } @Override diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java index 5d2de09c48e5..ddb8a1ebdde3 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java @@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.OzoneAcl; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.audit.Auditable; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ByteRange; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs; import org.apache.hadoop.ozone.security.GDPRSymmetricKey; @@ -62,6 +63,8 @@ public final class OmKeyArgs extends WithMetadata implements Auditable { // been modified. private Long expectedDataGeneration = null; private final String expectedETag; + private final Long byteRangeStart; + private final Long byteRangeEnd; private OmKeyArgs(Builder b) { super(b); @@ -84,6 +87,8 @@ private OmKeyArgs(Builder b) { this.tags = b.tags.build(); this.expectedDataGeneration = b.expectedDataGeneration; this.expectedETag = b.expectedETag; + this.byteRangeStart = b.byteRangeStart; + this.byteRangeEnd = b.byteRangeEnd; } public boolean getIsMultipartKey() { @@ -170,6 +175,18 @@ public String getExpectedETag() { return expectedETag; } + public boolean hasByteRange() { + return byteRangeStart != null && byteRangeEnd != null; + } + + public long getByteRangeStart() { + return byteRangeStart; + } + + public long getByteRangeEnd() { + return byteRangeEnd; + } + @Override public Map toAuditMap() { Map auditMap = new LinkedHashMap<>(); @@ -218,6 +235,11 @@ public KeyArgs toProtobuf() { if (expectedETag != null) { builder.setExpectedETag(expectedETag); } + if (hasByteRange()) { + builder.setByteRange(ByteRange.newBuilder() + .setStart(byteRangeStart) + .setEnd(byteRangeEnd)); + } return builder.build(); } @@ -244,6 +266,8 @@ public static class Builder extends WithMetadata.Builder { private final MapBuilder tags; private Long expectedDataGeneration = null; private String expectedETag; + private Long byteRangeStart; + private Long byteRangeEnd; public Builder() { this(AclListBuilder.empty()); @@ -290,6 +314,8 @@ public Builder(OmKeyArgs obj) { obj.forceUpdateContainerCacheFromSCM; this.expectedDataGeneration = obj.expectedDataGeneration; this.expectedETag = obj.expectedETag; + this.byteRangeStart = obj.byteRangeStart; + this.byteRangeEnd = obj.byteRangeEnd; this.tags = MapBuilder.of(obj.tags); this.acls = AclListBuilder.of(obj.acls); } @@ -365,6 +391,12 @@ public Builder setMultipartUploadPartNumber(int multipartUploadPartNumber) { return this; } + public Builder setByteRange(long start, long end) { + this.byteRangeStart = start; + this.byteRangeEnd = end; + return this; + } + @Override public Builder addMetadata(String key, String value) { super.addMetadata(key, value); diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java index da6c46f9b6c0..ab1b7f33a96f 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java @@ -110,6 +110,7 @@ public final class OmKeyInfo extends WithParentObjectId // This allows a key to be created an committed atomically if the original has not // been modified. private Long expectedDataGeneration = null; + private long byteRangeStartOffset; private OmKeyInfo(Builder b) { super(b); @@ -129,6 +130,7 @@ private OmKeyInfo(Builder b) { this.ownerName = b.ownerName; this.tags = b.tags.build(); this.expectedDataGeneration = b.expectedDataGeneration; + this.byteRangeStartOffset = b.byteRangeStartOffset; } private static Codec newCodec(boolean ignorePipeline) { @@ -189,6 +191,14 @@ public Long getExpectedDataGeneration() { return expectedDataGeneration; } + public long getByteRangeStartOffset() { + return byteRangeStartOffset; + } + + public void setByteRangeStartOffset(long offset) { + this.byteRangeStartOffset = offset; + } + public String getOwnerName() { return ownerName; } @@ -492,6 +502,7 @@ public static class Builder extends WithParentObjectId.Builder { private boolean isFile; private final MapBuilder tags; private Long expectedDataGeneration = null; + private long byteRangeStartOffset; public Builder() { this.acls = AclListBuilder.empty(); @@ -514,6 +525,7 @@ public Builder(OmKeyInfo obj) { this.fileChecksum = obj.fileChecksum; this.isFile = obj.isFile; this.expectedDataGeneration = obj.expectedDataGeneration; + this.byteRangeStartOffset = obj.byteRangeStartOffset; this.tags = MapBuilder.of(obj.tags); obj.keyLocationVersions.forEach(keyLocationVersion -> this.omKeyLocationInfoGroups.add( @@ -685,6 +697,11 @@ public Builder setExpectedDataGeneration(Long existingGeneration) { return this; } + public Builder setByteRangeStartOffset(long offset) { + this.byteRangeStartOffset = offset; + return this; + } + @Override protected void validate() { super.validate(); @@ -804,6 +821,9 @@ private KeyInfo getProtobuf(boolean ignorePipeline, String fullKeyName, if (expectedDataGeneration != null) { kb.setExpectedDataGeneration(expectedDataGeneration); } + if (byteRangeStartOffset > 0) { + kb.setByteRangeStartOffset(byteRangeStartOffset); + } if (ownerName != null) { kb.setOwnerName(ownerName); } @@ -857,6 +877,9 @@ public static Builder builderFromProtobuf(KeyInfo keyInfo) { if (keyInfo.hasExpectedDataGeneration()) { builder.setExpectedDataGeneration(keyInfo.getExpectedDataGeneration()); } + if (keyInfo.hasByteRangeStartOffset()) { + builder.setByteRangeStartOffset(keyInfo.getByteRangeStartOffset()); + } if (keyInfo.hasOwnerName()) { builder.setOwnerName(keyInfo.getOwnerName()); diff --git a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot index 18315fb12fb5..320a9530f255 100644 --- a/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot +++ b/hadoop-ozone/dist/src/main/smoketest/s3/MultipartUpload.robot @@ -137,6 +137,13 @@ Test Multipart Upload Complete ${result} = Execute AWSS3ApiCli get-object --bucket ${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 2 /tmp/${PREFIX}-multipartKey1-part2.result Compare files /tmp/part2 /tmp/${PREFIX}-multipartKey1-part2.result + ${result} = Execute AWSS3ApiCli get-object --bucket ${BUCKET} --key ${PREFIX}/multipartKey1 --part-number 2 --range bytes=2-4 /tmp/${PREFIX}-multipartKey1-part2-range.result + Should contain ${result} ContentRange + Should contain ${result} bytes 2-4/1047552 + Should contain ${result} AcceptRanges + Execute dd if=/tmp/part2 of=/tmp/${PREFIX}-multipartKey1-part2-range.expected skip=2 bs=1 count=3 2>/dev/null + Compare files /tmp/${PREFIX}-multipartKey1-part2-range.expected /tmp/${PREFIX}-multipartKey1-part2-range.result + Test Multipart Upload with user defined metadata size larger than 2 KB ${custom_metadata_value} = Generate Random String 3000 ${result} = Initiate MPU ${BUCKET} ${PREFIX}/mpuWithLargeMetadata 255 --metadata="custom-key1=${custom_metadata_value}" @@ -456,4 +463,3 @@ Test Multipart Upload Part with wrong Content-MD5 header # Abort the multipart upload (cleanup) Abort MPU ${BUCKET} ${PREFIX}/mpu/md5test/key2 ${uploadID} - diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java index 42c4de1d503d..b8ac83d736fe 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v1/AbstractS3SDKV1Tests.java @@ -35,6 +35,7 @@ import com.amazonaws.AmazonServiceException.ErrorType; import com.amazonaws.HttpMethod; import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.Headers; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.AccessControlList; import com.amazonaws.services.s3.model.Bucket; @@ -1492,6 +1493,41 @@ public void testGetParticularPart(@TempDir Path tempDir) throws Exception { assertEquals(allPartContentLength, (partOneContentLength + partTwoContentLength + partThreeContentLength)); } + @Test + public void testGetParticularPartWithRange(@TempDir Path tempDir) throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + final int partSize = (int) (5 * MB); + final int rangeStart = 2; + final int rangeEnd = 4; + + s3Client.createBucket(bucketName); + + File multipartUploadFile = Files.createFile(tempDir.resolve("multipartupload.txt")).toFile(); + createFile(multipartUploadFile, (int) (15 * MB)); + multipartUpload(bucketName, keyName, multipartUploadFile, partSize, null, null, null); + + GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, keyName) + .withPartNumber(2) + .withRange(rangeStart, rangeEnd); + S3Object s3Object = s3Client.getObject(getObjectRequest); + + ObjectMetadata objectMetadata = s3Object.getObjectMetadata(); + assertEquals(rangeEnd - rangeStart + 1, objectMetadata.getContentLength()); + assertThat(objectMetadata.getContentRange()) + .containsExactly((long) rangeStart, (long) rangeEnd); + assertEquals(partSize, objectMetadata.getInstanceLength()); + assertEquals("bytes " + rangeStart + "-" + rangeEnd + "/" + partSize, + objectMetadata.getRawMetadataValue(Headers.CONTENT_RANGE)); + + byte[] uploadedBytes = Files.readAllBytes(multipartUploadFile.toPath()); + byte[] expectedBytes = Arrays.copyOfRange(uploadedBytes, partSize + rangeStart, + partSize + rangeEnd + 1); + try (S3ObjectInputStream objectContent = s3Object.getObjectContent()) { + assertThat(IOUtils.toByteArray(objectContent)).containsExactly(expectedBytes); + } + } + @Test public void testGetNotExistedPart(@TempDir Path tempDir) throws Exception { final String bucketName = getBucketName(); diff --git a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java index 3204e3fe5ff7..ac3449d1ebbb 100644 --- a/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java +++ b/hadoop-ozone/integration-test-s3/src/test/java/org/apache/hadoop/ozone/s3/awssdk/v2/AbstractS3SDKV2Tests.java @@ -1026,6 +1026,37 @@ public void testLowLevelMultipartUpload(@TempDir Path tempDir) throws Exception assertEquals(userMetadata, headObjectResponse.metadata()); } + @Test + public void testGetParticularPartWithRange(@TempDir Path tempDir) throws Exception { + final String bucketName = getBucketName(); + final String keyName = getKeyName(); + final int partSize = (int) (5 * MB); + final int rangeStart = 2; + final int rangeEnd = 4; + + s3Client.createBucket(b -> b.bucket(bucketName)); + + File multipartUploadFile = Files.createFile(tempDir.resolve("multipartupload.txt")).toFile(); + createFile(multipartUploadFile, (int) (15 * MB)); + multipartUpload(bucketName, keyName, multipartUploadFile, partSize, + Collections.emptyMap(), Collections.emptyList()); + + ResponseBytes rangedPart = s3Client.getObjectAsBytes(b -> b + .bucket(bucketName) + .key(keyName) + .partNumber(2) + .range("bytes=" + rangeStart + "-" + rangeEnd)); + + GetObjectResponse getObjectResponse = rangedPart.response(); + assertEquals(rangeEnd - rangeStart + 1, getObjectResponse.contentLength()); + assertEquals("bytes " + rangeStart + "-" + rangeEnd + "/" + partSize, getObjectResponse.contentRange()); + + byte[] uploadedBytes = Files.readAllBytes(multipartUploadFile.toPath()); + byte[] expectedBytes = Arrays.copyOfRange(uploadedBytes, partSize + rangeStart, + partSize + rangeEnd + 1); + assertThat(rangedPart.asByteArray()).containsExactly(expectedBytes); + } + @Test public void testResumableDownloadWithEtagMismatch() throws Exception { // Arrange diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index dcdb3828ae6d..7c30b7437a9a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -1532,6 +1532,53 @@ void testGetNotExistedPart() throws IOException { assertEquals(0, locationList.size()); } + @Test + void testGetByteRangeFiltersLocations() throws IOException { + String keyName = RandomStringUtils.secure().nextAlphabetic(5); + + initKeyTableForMultipartTest(keyName, VOLUME_NAME); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(keyName) + .setByteRange(150, 260) + .build(); + OmKeyInfo omKeyInfo = keyManager.getKeyInfo(keyArgs, RESOLVED_BUCKET, "test"); + + List locationList = omKeyInfo + .getLatestVersionLocations().getLocationList(); + assertEquals(2, locationList.size()); + assertEquals(1, locationList.get(0).getPartNumber()); + assertEquals(2, locationList.get(1).getPartNumber()); + assertEquals(50, omKeyInfo.getByteRangeStartOffset()); + assertEquals(111, omKeyInfo.getDataSize()); + } + + @Test + void testGetParticularPartWithByteRangeFiltersLocations() + throws IOException { + String keyName = RandomStringUtils.secure().nextAlphabetic(5); + + initKeyTableForMultipartTest(keyName, VOLUME_NAME); + + OmKeyArgs keyArgs = new OmKeyArgs.Builder() + .setVolumeName(VOLUME_NAME) + .setBucketName(BUCKET_NAME) + .setKeyName(keyName) + .setMultipartUploadPartNumber(3) + .setByteRange(20, 40) + .build(); + OmKeyInfo omKeyInfo = keyManager.getKeyInfo(keyArgs, RESOLVED_BUCKET, "test"); + + List locationList = omKeyInfo + .getLatestVersionLocations().getLocationList(); + assertEquals(1, locationList.size()); + assertEquals(3, locationList.get(0).getPartNumber()); + assertEquals(20, omKeyInfo.getByteRangeStartOffset()); + assertEquals(21, omKeyInfo.getDataSize()); + } + private OmKeyInfo getMockedOmKeyInfo(OmBucketInfo bucketInfo, long parentId, String key, long objectId) { OmKeyInfo omKeyInfo = mock(OmKeyInfo.class); if (bucketInfo.getBucketLayout().isFileSystemOptimized()) { @@ -1668,6 +1715,7 @@ private void initKeyTableForMultipartTest(String keyName, String volume) throws for (int i = 0; i < 5; i++) { OmKeyLocationInfo locationInfo1 = new OmKeyLocationInfo.Builder() .setBlockID(new BlockID(i, i)) + .setLength(100L) .setPartNumber(i) .build(); locationInfoList.add(locationInfo1); @@ -1681,6 +1729,7 @@ private void initKeyTableForMultipartTest(String keyName, String volume) throws .setKeyName(keyName) .setBucketName(BUCKET_NAME) .setVolumeName(volume) + .setDataSize(500L) .setReplicationConfig(RatisReplicationConfig.getInstance(THREE)) .setOmKeyLocationInfos(locationInfoGroups) .build(); diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto index 22c71cc29852..0f96a821e96e 100644 --- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto +++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto @@ -1053,6 +1053,12 @@ message ListBucketsResponse { repeated BucketInfo bucketInfo = 2; } +message ByteRange { + // Inclusive start and end offsets: [start, end]. + optional uint64 start = 1; + optional uint64 end = 2; +} + message KeyArgs { required string volumeName = 1; required string bucketName = 2; @@ -1101,6 +1107,10 @@ message KeyArgs { // the given ETag for the operation to succeed. This is used for // S3 conditional writes with the If-Match header. optional string expectedETag = 24; + + // Byte range requested by S3 GET. OM uses it to return only the + // key locations needed to satisfy the read. + optional ByteRange byteRange = 25; } message KeyLocation { @@ -1192,6 +1202,10 @@ message KeyInfo { // This allows a key to be created an committed atomically if the original has not // been modified. optional uint64 expectedDataGeneration = 22; + + // Offset within the first returned key location where ranged reads + // should start. + optional uint64 byteRangeStartOffset = 23; } // KeyInfoProtoLight is a lightweight subset of KeyInfo message containing diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 08b6d6abbf18..d4cc83d1aa7f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -647,9 +647,49 @@ private OmKeyInfo readKeyInfo(OmKeyArgs args, BucketLayout bucketLayout) .sum()); } } + if (args.hasByteRange()) { + filterKeyLocationsByByteRange(value, args.getByteRangeStart(), + args.getByteRangeEnd()); + } return value; } + private void filterKeyLocationsByByteRange(OmKeyInfo keyInfo, + long byteRangeStart, long byteRangeEnd) { + OmKeyLocationInfoGroup latestLocationVersion = + keyInfo.getLatestVersionLocations(); + if (latestLocationVersion == null) { + return; + } + + List currentLocations = + latestLocationVersion.getBlocksLatestVersionOnly(); + List filteredLocations = new ArrayList<>(); + long blockStart = 0; + long firstBlockStart = 0; + for (OmKeyLocationInfo locationInfo : currentLocations) { + long blockEnd = blockStart + locationInfo.getLength() - 1; + if (blockStart > byteRangeEnd) { + break; + } + if (blockEnd >= byteRangeStart) { + if (filteredLocations.isEmpty()) { + firstBlockStart = blockStart; + } + filteredLocations.add(locationInfo); + } + blockStart += locationInfo.getLength(); + } + + keyInfo.updateLocationInfoList(filteredLocations, + latestLocationVersion.isMultipartKey(), true); + if (!filteredLocations.isEmpty()) { + keyInfo.setByteRangeStartOffset(byteRangeStart - firstBlockStart); + } + keyInfo.setDataSize(filteredLocations.isEmpty() ? 0 : + byteRangeEnd - byteRangeStart + 1); + } + private OmKeyInfo getOmKeyInfo(String volumeName, String bucketName, String keyName, BucketLayout bucketLayout) throws IOException { String keyBytes = diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java index 4454b68e0784..943e5bf61eb3 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java @@ -100,6 +100,7 @@ import org.apache.hadoop.ozone.om.response.OMClientResponse; import org.apache.hadoop.ozone.om.upgrade.DisallowedUntilLayoutVersion; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ByteRange; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelSnapshotDiffRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CancelSnapshotDiffResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CheckVolumeAccessRequest; @@ -637,6 +638,14 @@ private LookupKeyResponse lookupKey(LookupKeyRequest request, .setSortDatanodesInPipeline(keyArgs.getSortDatanodes()) .setHeadOp(keyArgs.getHeadOp()) .build(); + if (keyArgs.hasByteRange()) { + ByteRange byteRange = keyArgs.getByteRange(); + if (byteRange.hasStart() && byteRange.hasEnd()) { + omKeyArgs = omKeyArgs.toBuilder() + .setByteRange(byteRange.getStart(), byteRange.getEnd()) + .build(); + } + } OmKeyInfo keyInfo = impl.lookupKey(omKeyArgs); resp.setKeyInfo(keyInfo.getProtobuf(keyArgs.getHeadOp(), clientVersion)); @@ -658,6 +667,14 @@ private GetKeyInfoResponse getKeyInfo(GetKeyInfoRequest request, keyArgs.getForceUpdateContainerCacheFromSCM()) .setMultipartUploadPartNumber(keyArgs.getMultipartNumber()) .build(); + if (keyArgs.hasByteRange()) { + ByteRange byteRange = keyArgs.getByteRange(); + if (byteRange.hasStart() && byteRange.hasEnd()) { + omKeyArgs = omKeyArgs.toBuilder() + .setByteRange(byteRange.getStart(), byteRange.getEnd()) + .build(); + } + } KeyInfoWithVolumeContext keyInfo = impl.getKeyInfo(omKeyArgs, request.getAssumeS3Context()); diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index b74a7d52186d..1df54872e7a3 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -382,14 +382,26 @@ Response handleGetRequest(ObjectRequestContext context, String keyPath) context.setAction(S3GAction.GET_KEY); - OzoneKeyDetails keyDetails = (partNumber != 0) ? - getClientProtocol().getS3KeyDetails(bucketName, keyPath, partNumber) : - getClientProtocol().getS3KeyDetails(bucketName, keyPath); + String rangeHeaderVal = getHeaders().getHeaderString(RANGE_HEADER); + OzoneKey keyMetadata; + OzoneKeyDetails keyDetails = null; + if (rangeHeaderVal == null) { + keyDetails = (partNumber != 0) ? + getClientProtocol().getS3KeyDetails(bucketName, keyPath, partNumber) : + getClientProtocol().getS3KeyDetails(bucketName, keyPath); + keyMetadata = keyDetails; + } else if (partNumber != 0) { + keyDetails = + getClientProtocol().getS3KeyDetails(bucketName, keyPath, partNumber); + keyMetadata = keyDetails; + } else { + keyMetadata = getClientProtocol().headS3Object(bucketName, keyPath); + } - isFile(keyPath, keyDetails); + isFile(keyPath, keyMetadata); Response conditionalResponse = S3ConditionalRequest - .evaluateReadPreconditions(getHeaders(), keyPath, keyDetails); + .evaluateReadPreconditions(getHeaders(), keyPath, keyMetadata); if (conditionalResponse != null) { long metadataLatencyNs = getMetrics().updateGetKeyMetadataStats( startNanos); @@ -399,11 +411,10 @@ Response handleGetRequest(ObjectRequestContext context, String keyPath) return conditionalResponse; } - long length = keyDetails.getDataSize(); + long length = keyMetadata.getDataSize(); LOG.debug("Data length of the key {} is {}", keyPath, length); - String rangeHeaderVal = getHeaders().getHeaderString(RANGE_HEADER); RangeHeader rangeHeader = null; LOG.debug("range Header provided value: {}", rangeHeaderVal); @@ -419,9 +430,14 @@ Response handleGetRequest(ObjectRequestContext context, String keyPath) ResponseBuilder responseBuilder; if (rangeHeaderVal == null || rangeHeader.isReadFull()) { + if (keyDetails == null) { + keyDetails = getClientProtocol().getS3KeyDetails(bucketName, keyPath); + } + final OzoneKeyDetails responseKeyDetails = keyDetails; StreamingOutput output = dest -> { - try (OzoneInputStream key = keyDetails.getContent()) { - long readLength = IOUtils.copy(key, dest, getIOBufferSize(keyDetails.getDataSize())); + try (OzoneInputStream key = responseKeyDetails.getContent()) { + long readLength = IOUtils.copy(key, dest, + getIOBufferSize(responseKeyDetails.getDataSize())); getMetrics().incGetKeySuccessLength(readLength); perf.appendSizeBytes(readLength); } @@ -430,16 +446,19 @@ Response handleGetRequest(ObjectRequestContext context, String keyPath) }; responseBuilder = Response.ok(output) - .header(HttpHeaders.CONTENT_LENGTH, keyDetails.getDataSize()); + .header(HttpHeaders.CONTENT_LENGTH, responseKeyDetails.getDataSize()); } else { long startOffset = rangeHeader.getStartOffset(); long endOffset = rangeHeader.getEndOffset(); long copyLength = endOffset - startOffset + 1; + OzoneKeyDetails rangedKeyDetails = getClientProtocol() + .getS3KeyDetails(bucketName, keyPath, partNumber, startOffset, + endOffset); StreamingOutput output = dest -> { - try (OzoneInputStream ozoneInputStream = keyDetails.getContent()) { - ozoneInputStream.seek(startOffset); + try (OzoneInputStream ozoneInputStream = + rangedKeyDetails.getContent()) { long readLength = IOUtils.copyLarge(ozoneInputStream, dest, 0, copyLength, new byte[getIOBufferSize(copyLength)]); getMetrics().incGetKeySuccessLength(readLength); @@ -459,7 +478,7 @@ Response handleGetRequest(ObjectRequestContext context, String keyPath) } responseBuilder.header(ACCEPT_RANGE_HEADER, RANGE_HEADER_SUPPORTED_UNIT); - addEntityTagHeader(responseBuilder, keyDetails); + addEntityTagHeader(responseBuilder, keyMetadata); MultivaluedMap queryParams = getContext().getUriInfo().getQueryParameters(); @@ -475,8 +494,8 @@ Response handleGetRequest(ObjectRequestContext context, String keyPath) } } - addLastModifiedDate(responseBuilder, keyDetails); - addTagCountIfAny(responseBuilder, keyDetails); + addLastModifiedDate(responseBuilder, keyMetadata); + addTagCountIfAny(responseBuilder, keyMetadata); long metadataLatencyNs = getMetrics().updateGetKeyMetadataStats(startNanos); perf.appendMetaLatencyNanos(metadataLatencyNs); diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java index 35956bc1df8a..f2b33d7ac03d 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.client; import jakarta.annotation.Nonnull; +import java.io.EOFException; import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -133,6 +134,38 @@ public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName, return objectStoreStub.getS3Volume().getBucket(bucketName).getKey(keyName); } + @Override + public OzoneKeyDetails getS3KeyDetails(String bucketName, String keyName, + int partNumber, long startOffset, long endOffset) throws IOException { + OzoneKeyDetails keyDetails = partNumber == 0 + ? getS3KeyDetails(bucketName, keyName) + : getS3KeyDetails(bucketName, keyName, partNumber); + return new OzoneKeyDetails( + keyDetails.getVolumeName(), keyDetails.getBucketName(), + keyDetails.getName(), keyDetails.getDataSize(), + keyDetails.getCreationTime().toEpochMilli(), + keyDetails.getModificationTime().toEpochMilli(), + keyDetails.getOzoneKeyLocations(), + keyDetails.getReplicationConfig(), keyDetails.getMetadata(), + keyDetails.getFileEncryptionInfo(), () -> { + OzoneInputStream stream = keyDetails.getContent(); + long remaining = startOffset; + while (remaining > 0) { + long skipped = stream.skip(remaining); + if (skipped <= 0) { + if (stream.read() == -1) { + throw new EOFException("Unable to skip to S3 byte range " + + "start offset " + startOffset + " for key " + keyName); + } + skipped = 1; + } + remaining -= skipped; + } + return stream; + }, keyDetails.isFile(), keyDetails.getOwner(), keyDetails.getTags(), + keyDetails.getGeneration()); + } + @Override public OzoneVolume buildOzoneVolume(OmVolumeArgs volume) { return null; diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java index 8f310b74062d..fb1a2830095e 100644 --- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java +++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectGet.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.s3.endpoint; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_FSO_DIRECTORY_CREATION_ENABLED; import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertErrorResponse; import static org.apache.hadoop.ozone.s3.endpoint.EndpointTestUtils.assertSucceeds; @@ -39,6 +40,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.time.Instant; import java.time.ZoneId; @@ -46,6 +48,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; +import javax.ws.rs.core.StreamingOutput; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.client.OzoneBucket; @@ -278,6 +281,13 @@ public void getRangeHeader() throws IOException, OS3Exception { String.format("bytes 0-%s/%s", CONTENT.length() - 1, CONTENT.length()), response.getHeaderString("Content-Range")); + when(headers.getHeaderString(RANGE_HEADER)).thenReturn("bytes=1-3"); + response = get(rest, BUCKET_NAME, KEY_NAME); + assertEquals("3", response.getHeaderString("Content-Length")); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + ((StreamingOutput) response.getEntity()).write(output); + assertEquals("123", output.toString(UTF_8.name())); + assertNull(response.getHeaderString(TAG_COUNT_HEADER)); }